In [None]:
## Copyright 2025 Google LLC
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
##   http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##

In [1]:
!pip install kfp

Collecting kfp
  Downloading kfp-2.12.1.tar.gz (345 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m345.4/345.4 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting kfp-pipeline-spec==0.6.0 (from kfp)
  Downloading kfp_pipeline_spec-0.6.0-py3-none-any.whl.metadata (293 bytes)
Collecting kfp-server-api<2.5.0,>=2.1.0 (from kfp)
  Downloading kfp_server_api-2.4.0.tar.gz (83 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.0/84.0 kB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting kubernetes<31,>=8.0.0 (from kfp)
  Downloading kubernetes-30.1.0-py2.py3-none-any.whl.metadata (1.5 kB)
Collecting urllib3<2.0.0 (from kfp)
  Downloading urllib3-1.26.20-py2.py3-none-any.whl.metadata (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.1/50.1 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
Downloading kfp_pipe

In [None]:
# --- Imports and Config ---
import kfp
from kfp import dsl
from kfp.dsl import Input, Output, Model, Dataset, Artifact, OutputPath
from typing import NamedTuple, Optional
import google.cloud.aiplatform as aip
import os
import shutil
import tempfile

# --- Configuration ---
PROJECT_ID = "my-project-id"
REGION = "us-central1"
PIPELINE_ROOT = "gs://my-bucket/meridian-pipeline-root"
BQ_DATASET = "meridiansampledataset" # Dataset name in BQ
BQ_TABLE_NAME = "meridiantable" # Table name in BQ
OUTPUT_GCS_DIR = f"{PIPELINE_ROOT}/outputs"
ROI_MU = 0.2
ROI_SIGMA = 0.9
N_CHAINS = 7
N_ADAPT = 500
N_BURNIN = 500
N_KEEP = 1000
RANDOM_SEED = 1
REPORT_START_DATE = '2021-01-25'
REPORT_END_DATE = '2024-01-15'
STANDARD_BASE_IMAGE = "python:3.10-slim"
GPU_BASE_IMAGE = "gcr.io/deeplearning-platform-release/tf-gpu.2-15.py310"
MERIDIAN_MODEL_FILENAME = "model_save.pkl"
PIPELINE_NAME = "meridian-mmm-gpu-bq-pipeline-v1" # Give it a new version name
PIPELINE_JSON = f"{PIPELINE_NAME}.json"

# --- train_meridian_model ---
@dsl.component(
    base_image=GPU_BASE_IMAGE,
    packages_to_install=[
        "google-meridian[and-cuda]", "numpy<2","tensorflow_probability", "pandas",
        "google-cloud-storage", "arviz", "matplotlib", "dill",
        "google-cloud-bigquery","db-dtypes",
        "pyarrow" # Often needed by BQ client's to_dataframe()
    ],
)
def train_meridian_model(
    project_id: str,
    bq_dataset: str,
    bq_table_name: str,
    roi_mu: float, roi_sigma: float, n_chains: int,
    n_adapt: int, n_burnin: int, n_keep: int, seed: int,
    output_model: Output[Model],
):
    import numpy as np
    import pandas as pd
    import tensorflow as tf
    import tensorflow_probability as tfp
    import os
    import logging
    import time
    import datetime
    from google.cloud import bigquery
    from meridian import constants
    from meridian.data import load
    from meridian.model import model, spec, prior_distribution

    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    MERIDIAN_MODEL_FILENAME = "model_save.pkl" # Define inside component too

    gpus = tf.config.list_physical_devices('GPU')
    if gpus:
        logging.info(f"GPUs available: {gpus}")
        try:
            for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True)
            logging.info("Enabled memory growth for GPUs.")
        except RuntimeError as e: logging.error(f"Error setting memory growth: {e}")
    else: logging.warning("No GPU detected by TensorFlow. Running on CPU.")

    # --- Define Mappings ---
    coord_to_columns = load.CoordToColumns(
        time='time', geo='geo', controls=['GQV', 'Competitor_Sales'], population='population',
        kpi='conversions', revenue_per_kpi='revenue_per_conversion',
        media=[f'Channel{i}_impression' for i in range(5)], ## HERE FOR THE SAMPLE DATASET
        media_spend=[f'Channel{i}_spend' for i in range(5)], ## HERE FOR THE SAMPLE DATASET
        organic_media=['Organic_channel0_impression'], non_media_treatments=['Promo'],
    )
    correct_media_to_channel = {f'Channel{i}_impression': f'Channel_{i}' for i in range(5)}
    correct_media_spend_to_channel = {f'Channel{i}_spend': f'Channel_{i}' for i in range(5)}
    # ----------------------------------------------------------------------

    # --- BigQuery Data Loading Start ---
    bq_table_full_id = f"{project_id}.{bq_dataset}.{bq_table_name}"
    logging.info(f"Attempting to load data from BigQuery table: {bq_table_full_id}")

    try:
        client = bigquery.Client(project=project_id)
        logging.info("BigQuery client created successfully.")
    except Exception as e:
        logging.error(f"Failed to create BigQuery client: {e}")
        raise e

    sql_query = f"SELECT * FROM `{bq_table_full_id}`"
    logging.info(f"Executing query: {sql_query}")

    try:
        df = client.query(sql_query).to_dataframe()
        logging.info(f"Successfully loaded {len(df)} rows and {len(df.columns)} columns from BigQuery.")

        # --- Convert time column, BQ To Dataframe converts the datetime so we need to convert it yyyy-mm-dd ---
        time_col_name = coord_to_columns.time
        if time_col_name in df.columns:
            logging.info(f"Converting time column '{time_col_name}' to string format 'YYYY-MM-DD'")
            if pd.api.types.is_datetime64_any_dtype(df[time_col_name]) or isinstance(df[time_col_name].iloc[0], pd.Timestamp) or isinstance(df[time_col_name].iloc[0], datetime.date):
                 df[time_col_name] = pd.to_datetime(df[time_col_name]).dt.strftime('%Y-%m-%d')
                 logging.info(f"Conversion of '{time_col_name}' complete.")
                 # logging.info("\nDataFrame Info (after time conversion):") # Optional detailed log
                 # df.info(verbose=True, buf=open(os.devnull, 'w')) # Log info without printing to stdout
            elif pd.api.types.is_string_dtype(df[time_col_name]):
                 logging.info(f"Column '{time_col_name}' is already string type. Checking format (first row): {df[time_col_name].iloc[0]}")
            else:
                 logging.warning(f"Column '{time_col_name}' is not a recognized datetime or string type ({df[time_col_name].dtype}). Meridian might still fail.")
        else:
            logging.error(f"Specified time column '{time_col_name}' not found in DataFrame!")
            raise ValueError(f"Time column '{time_col_name}' defined in coord_to_columns not found in BigQuery results.")
        # --- End Time Conversion ---

        # --- Optional Data Validation ---
        logging.info("First 5 rows of loaded data (post-conversion):")
        logging.info(df.head().to_string()) # Use to_string for logging DataFrames
        # Add required_cols check if desired, using logging.warning or logging.error
        # --- End Validation ---

    except Exception as e:
        logging.error(f"Error loading data from BigQuery or processing DataFrame: {e}")
        raise e

    # --- Use DataFrameDataLoader ---
    logging.info("Initializing Meridian DataFrameDataLoader...")
    try:
        loader = load.DataFrameDataLoader(
            df=df, # Pass the DataFrame loaded from BQ
            kpi_type='non_revenue', # Assuming this is still correct
            coord_to_columns=coord_to_columns,
            media_to_channel=correct_media_to_channel,
            media_spend_to_channel=correct_media_spend_to_channel,
        )
        data = loader.load()
        logging.info("Data successfully loaded into Meridian InputData format.")
    except Exception as e:
        logging.error(f"Error during Meridian data loading process (DataFrameDataLoader): {e}")
        raise e
    # --- BigQuery Data Loading End ---


    logging.info("Configuring model...")
    prior = prior_distribution.PriorDistribution(
        roi_m=tfp.distributions.LogNormal(roi_mu, roi_sigma, name=constants.ROI_M)
    )
    model_spec_obj = spec.ModelSpec(prior=prior)
    mmm = model.Meridian(input_data=data, model_spec=model_spec_obj) # Use the 'data' object loaded from BQ

    logging.info("Sampling prior...")
    mmm.sample_prior(500)
    logging.info(f"Sampling posterior with {n_chains} chains...")
    start_time = time.time()
    mmm.sample_posterior(
        n_chains=n_chains, n_adapt=n_adapt, n_burnin=n_burnin, n_keep=n_keep, seed=seed
    )
    end_time = time.time()
    logging.info(f"Posterior sampling complete. Duration: {end_time - start_time:.2f} seconds.")

    save_file_path = os.path.join(output_model.path, MERIDIAN_MODEL_FILENAME)
    logging.info(f"Saving model artifact using model.save_mmm to file: {save_file_path}")
    try:
        os.makedirs(output_model.path, exist_ok=True)
        model.save_mmm(mmm, save_file_path)
        logging.info("Model saved successfully using meridian.model.model.save_mmm.")
    except Exception as e:
        logging.error(f"meridian.model.model.save_mmm failed: {e}")
        raise e

    output_model.metadata["framework"] = "Meridian"
    output_model.metadata["saved_filename"] = MERIDIAN_MODEL_FILENAME
    output_model.metadata["description"] = f"Trained Meridian MMM model (BQ Input, saved via save_mmm to {MERIDIAN_MODEL_FILENAME})"
    logging.info("Training component finished.")


# --- Other components (generate_summary_report, run_budget_optimization) remain unchanged ---
@dsl.component(
    base_image=STANDARD_BASE_IMAGE,
    packages_to_install=[
        "google-meridian[and-cuda]", "tensorflow", "tensorflow_probability",
        "pandas", "numpy", "arviz", "matplotlib", "google-cloud-storage","dill"
    ],
)
def generate_summary_report(
    model_artifact: Input[Model],
    output_gcs_dir: str,
    report_filename: str,
    start_date: str,
    end_date: str,
    summary_report_artifact: Output[Artifact],
):
    # --- This component's *internal* code does not need to change ---
    # It loads the model artifact produced by the previous step.
    import os
    import logging
    import time
    import tempfile
    from meridian.analysis import summarizer
    from meridian.model import model
    from google.cloud import storage
    from urllib.parse import urlparse

    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    MERIDIAN_MODEL_FILENAME = "model_save.pkl"
    def upload_local_file_to_gcs(local_path: str, gcs_uri: str):
        storage_client = storage.Client()
        parsed_uri = urlparse(gcs_uri)
        bucket_name = parsed_uri.netloc
        destination_blob_name = parsed_uri.path.lstrip('/')
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(destination_blob_name)
        blob.upload_from_filename(local_path)
        logging.info(f"File {local_path} uploaded to {gcs_uri}")

    model_dir_path = model_artifact.path
    load_file_path = os.path.join(model_dir_path, MERIDIAN_MODEL_FILENAME)
    logging.info(f"Attempting to load model from file: {load_file_path}")
    if not os.path.exists(load_file_path):
        raise FileNotFoundError(f"Expected model file {MERIDIAN_MODEL_FILENAME} not found in {model_dir_path}")
    try:
        mmm = model.load_mmm(load_file_path)
        logging.info("Model loaded successfully.")
    except Exception as e:
        logging.error(f"Model loading failed: {e}")
        raise e

    if not output_gcs_dir.startswith("gs://"):
        raise ValueError("output_gcs_dir must be a GCS path (gs://...)")
    final_gcs_uri = os.path.join(output_gcs_dir, report_filename)

    with tempfile.TemporaryDirectory() as temp_dir:
        logging.info(f"Generating summary report locally in: {temp_dir}")
        local_report_source_path = os.path.join(temp_dir, report_filename)
        try:
            mmm_summarizer = summarizer.Summarizer(mmm)
            mmm_summarizer.output_model_results_summary(
                filename=report_filename,
                filepath=temp_dir,
                start_date=start_date,
                end_date=end_date
            )
            logging.info(f"Meridian saved report locally to: {local_report_source_path}")
            if not os.path.exists(local_report_source_path):
                logging.error(f"Meridian did not create the expected local report file: {local_report_source_path}")
                raise FileNotFoundError(f"Report file not created locally by Meridian at {local_report_source_path}")
            logging.info(f"Manually uploading {local_report_source_path} to {final_gcs_uri}")
            upload_local_file_to_gcs(local_report_source_path, final_gcs_uri)
            summary_report_artifact.uri = final_gcs_uri
            summary_report_artifact.metadata["gcs_path"] = final_gcs_uri
            summary_report_artifact.metadata["filename"] = report_filename
            logging.info(f"Set KFP artifact URI to: {summary_report_artifact.uri}")
        except Exception as e:
            logging.error(f"Failed to generate or upload summary report: {e}")
            raise e
    logging.info("Summary report component finished.")


@dsl.component(
    base_image=STANDARD_BASE_IMAGE,  # CPU
    packages_to_install=[
        "google-meridian", # Removed cuda variant if not needed
        "pandas", "numpy", "google-cloud-storage", "dill"
    ],
)
def run_budget_optimization(
    model_artifact: Input[Model],
    output_gcs_dir: str,
    report_filename: str,
    optimization_report_artifact: Output[Artifact],
):
    # --- This component's *internal* code does not need to change ---
    # It loads the model artifact produced by the training step.
    import os
    import logging
    import time
    import tempfile
    from meridian.analysis import optimizer
    from meridian.model import model
    from google.cloud import storage
    from urllib.parse import urlparse

    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    MERIDIAN_MODEL_FILENAME = "model_save.pkl"
    def upload_local_file_to_gcs(local_path: str, gcs_uri: str):
        storage_client = storage.Client()
        parsed_uri = urlparse(gcs_uri)
        bucket_name = parsed_uri.netloc
        destination_blob_name = parsed_uri.path.lstrip('/')
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(destination_blob_name)
        blob.upload_from_filename(local_path)
        logging.info(f"File {local_path} uploaded to {gcs_uri}")

    model_dir_path = model_artifact.path
    load_file_path = os.path.join(model_dir_path, MERIDIAN_MODEL_FILENAME)
    logging.info(f"Attempting to load model from file: {load_file_path}")
    if not os.path.exists(load_file_path):
        raise FileNotFoundError(f"Expected model file {MERIDIAN_MODEL_FILENAME} not found in {model_dir_path}")
    try:
        mmm = model.load_mmm(load_file_path)
        logging.info("Model loaded successfully.")
    except Exception as e:
        logging.error(f"Model loading failed: {e}")
        raise e

    if not output_gcs_dir.startswith("gs://"):
        raise ValueError("output_gcs_dir must be a GCS path (gs://...)")
    final_gcs_uri = os.path.join(output_gcs_dir, report_filename)

    with tempfile.TemporaryDirectory() as temp_dir:
        logging.info(f"Running optimization and generating report locally in: {temp_dir}")
        local_report_source_path = os.path.join(temp_dir, report_filename)
        try:
            budget_optimizer = optimizer.BudgetOptimizer(mmm)
            optimization_results = budget_optimizer.optimize()
            logging.info("Optimization calculation complete.")
            optimization_results.output_optimization_summary(
                filename=report_filename,
                filepath=temp_dir
            )
            logging.info(f"Meridian saved optimization report locally to: {local_report_source_path}")
            if not os.path.exists(local_report_source_path):
                 logging.error(f"Meridian did not create the expected local report file: {local_report_source_path}")
                 raise FileNotFoundError(f"Optimization report file not created locally by Meridian at {local_report_source_path}")
            logging.info(f"Manually uploading {local_report_source_path} to {final_gcs_uri}")
            upload_local_file_to_gcs(local_report_source_path, final_gcs_uri)
            optimization_report_artifact.uri = final_gcs_uri
            optimization_report_artifact.metadata["gcs_path"] = final_gcs_uri
            optimization_report_artifact.metadata["filename"] = report_filename
            logging.info(f"Set KFP artifact URI to: {optimization_report_artifact.uri}")
        except Exception as e:
            logging.error(f"Failed during budget optimization or reporting/uploading: {e}")
            raise e
    logging.info("Optimization component finished.")


# --- Pipeline Definition ---
@dsl.pipeline(
    name=PIPELINE_NAME,
    description="Runs Meridian MMM (GPU) reading from BigQuery",
    pipeline_root=PIPELINE_ROOT,
)
def meridian_pipeline(
    # --- REMOVE data_csv_url ---
    # data_csv_url: str = DATA_CSV_URL,
    # --- ADD BQ Parameters with defaults ---
    project_id: str = PROJECT_ID,
    bq_dataset: str = BQ_DATASET,
    bq_table_name: str = BQ_TABLE_NAME,
    # --- End BQ Parameters ---
    output_gcs_dir: str = OUTPUT_GCS_DIR,
    roi_mu: float = ROI_MU,
    roi_sigma: float = ROI_SIGMA,
    n_chains: int = N_CHAINS,
    n_adapt: int = N_ADAPT,
    n_burnin: int = N_BURNIN,
    n_keep: int = N_KEEP,
    seed: int = RANDOM_SEED,
    report_start_date: str = REPORT_START_DATE,
    report_end_date: str = REPORT_END_DATE,
    summary_report_filename: str = "summary_output.html",
    optimization_report_filename: str = "optimization_output.html",
):
    # Step : Train Model (pass BQ params)
    train_task = train_meridian_model(
        # --- Pass BQ params ---
        project_id=project_id,
        bq_dataset=bq_dataset,
        bq_table_name=bq_table_name,
        # --- End BQ params ---
        roi_mu=roi_mu, roi_sigma=roi_sigma,
        n_chains=n_chains, n_adapt=n_adapt, n_burnin=n_burnin, n_keep=n_keep, seed=seed,
    )
    train_task.set_cpu_limit("16").set_memory_limit("64G")
    train_task.set_accelerator_limit(1).set_accelerator_type('NVIDIA_TESLA_T4')

    # Step : Generate Summary Report (no change needed here)
    summary_task = generate_summary_report(
        model_artifact=train_task.outputs["output_model"],
        output_gcs_dir=output_gcs_dir,
        report_filename=summary_report_filename,
        start_date=report_start_date,
        end_date=report_end_date,
    )
    summary_task.set_cpu_limit("16").set_memory_limit("64G") # Adjust if needed

    # Step : Run Budget Optimization (no change needed here)
    optimization_task = run_budget_optimization(
        model_artifact=train_task.outputs["output_model"],
        output_gcs_dir=output_gcs_dir,
        report_filename=optimization_report_filename,
    )
    optimization_task.set_cpu_limit("16").set_memory_limit("64G") # Adjust if needed


# --- Pipeline Compilation and Execution ---
if __name__ == "__main__":
    kfp.compiler.Compiler().compile(
        pipeline_func=meridian_pipeline, package_path=PIPELINE_JSON
    )
    print(f"Pipeline compiled to {PIPELINE_JSON}")

    aip.init(project=PROJECT_ID, location=REGION, staging_bucket=PIPELINE_ROOT)
    print(f"Initialized Vertex AI SDK for project {PROJECT_ID} in {REGION}")

    job = aip.PipelineJob(
        display_name=PIPELINE_NAME, # Use updated name
        template_path=PIPELINE_JSON,
        pipeline_root=PIPELINE_ROOT,
        parameter_values={
            "project_id": PROJECT_ID,
            "bq_dataset": BQ_DATASET,
            "bq_table_name": BQ_TABLE_NAME,
            "output_gcs_dir": OUTPUT_GCS_DIR,
            "roi_mu": ROI_MU,
            "roi_sigma": ROI_SIGMA,
            "n_chains": N_CHAINS,
            "n_adapt": N_ADAPT,
            "n_burnin": N_BURNIN,
            "n_keep": N_KEEP,
            "seed": RANDOM_SEED,
            "report_start_date": REPORT_START_DATE,
            "report_end_date": REPORT_END_DATE,
            "summary_report_filename": "summary_output.html",
            "optimization_report_filename": "optimization_output.html",
        },
        enable_caching=False, # Caching to edit as desired
    )

    print("Submitting pipeline job...")
    job.submit()
    print(f"Pipeline job submitted. View in Cloud Console: {job._dashboard_uri()}")