In [1]:
import kfp.compiler as compiler
from kfp.dsl import component, pipeline, Input, Output, Dataset


from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components.v1.custom_job import create_custom_training_job_from_component

from typing import Any, Dict, List

from src.ShortTermForecast.constants import *
from src.ShortTermForecast.utils.common import read_yaml

In [2]:
CONFIG = read_yaml(CONFIG_FILE_PATH)

[2025-04-15 20:25:12,065: INFO: common] yaml file: config/config.yaml loaded successfully


In [None]:
#data_ingestion_op
@component(
    base_image="northamerica-northeast1-docker.pkg.dev/cio-workbench-image-np-0ddefe/bi-platform/bi-aaaie/images/b2b_ai/wf_pipeline/training:1.0.1-rc"
)
def data_ingestion_op(
    project_id: str,
    project_location: str,
    bq_dataset: str,
    bq_source_table: str,
    time_column: str,
    target_column: str,
    series_identifier: str,
    attribute_columns: List[str],
    output_dataset: Output[Dataset]
):
    from dataclasses import dataclass
    from google.cloud import bigquery
    from pathlib import Path
    from typing import List
    import logging


    logging_str = "[%(asctime)s: %(levelname)s: %(module)s] %(message)s"

    logging.basicConfig(
        level=logging.INFO,
        format=logging_str
    )

    logger = logging.getLogger("Match-Analysis")

    @dataclass(frozen=True)
    class DataIngestionConfig:
        root_dir: Path
        local_file_name: str
        project_id: str
        project_location: str
        bq_dataset: str
        bq_source_table: str
        time_column: str
        target_column: str
        series_identifier: str
        attribute_columns: List[str]

    class ConfigurationManager:
        def __init__(self):
            pass
        def get_data_ingestion_kfp_config(
                self,
                project_id: str,
                project_location: str,
                bq_dataset: str,
                bq_source_table: str,
                time_column: str,
                target_column: str,
                series_identifier: str,
                attribute_columns: List[str]
        ) -> DataIngestionConfig:
            """
            Returns a DataIngestionConfig configured for Kubeflow Pipeline runs.
            """

            return DataIngestionConfig(
                root_dir=None,
                local_file_name=None,
                project_id=project_id,
                project_location=project_location,
                bq_dataset=bq_dataset,
                bq_source_table=bq_source_table,
                time_column=time_column,
                target_column=target_column,
                series_identifier=series_identifier,
                attribute_columns=attribute_columns
            )        
        
    class DataIngestion:
        def __init__(self, config: DataIngestionConfig):
            self.config = config
            self.client = None
            self.data = None
            logger.info("DataIngestion instance initialized with provided configuration.")

        def _create_series_identifier(self) -> str:
            coalesce_parts = [f"COALESCE({column}, 'None')" for column in self.config.attribute_columns]
            separator = "' '"
            series_identifier = f"CONCAT({f', {separator}, '.join(coalesce_parts)}) AS {self.config.series_identifier}"
            logger.debug(f"Created series identifier: {series_identifier}")
            return series_identifier

        def load(self):
            logger.info("Initializing BigQuery client and loading data.")
            self.client = bigquery.Client(
                project=self.config.project_id,
                location=self.config.project_location
            )

            try:
                self.data = self.client.query(self.data_ingestion_query).to_dataframe()
                logger.info(f"Data loaded successfully. Shape: {self.data.shape}")
            except Exception as e:
                logger.error(f"Error loading data from BigQuery: {str(e)}")
                raise

        def save(self, save_path: str = None):
            if save_path is None:
                save_path = Path(self.config.root_dir, self.config.local_file_name)
            
            logger.info(f"Saving data to {save_path}")
            try:
                self.data.to_csv(save_path, index=False)
                logger.info("Data saved successfully.")
            except Exception as e:
                logger.error(f"Error saving data to CSV: {str(e)}")
                raise

        @property
        def data_ingestion_query(self) -> str:
            query = f"""
            WITH historical_table AS (
                SELECT 
                    {self.config.time_column},
                    {self.attribute_string},
                    SUM({self.config.target_column}) AS {self.config.target_column}
                FROM 
                    `{self.config.project_id}.{self.config.bq_dataset}.{self.config.bq_source_table}`
                WHERE 
                    {self.config.time_column} <= DATE('2025-03-31')
                GROUP BY 
                    {self.config.time_column},
                    {self.attribute_string}
            )
            SELECT 
                {self._create_series_identifier()},
                {self.config.time_column},
                {self.attribute_string},
                {self.config.target_column}
            FROM historical_table
            """
            logger.debug("Generated data ingestion query.")
            return query

        @property
        def attribute_string(self) -> str:
            return ','.join(self.config.attribute_columns)

    data_ingestion_config = ConfigurationManager().get_data_ingestion_kfp_config(
        project_id=project_id,
        project_location=project_location,
        bq_dataset=bq_dataset,
        bq_source_table=bq_source_table,
        time_column=time_column,
        target_column=target_column,
        series_identifier=series_identifier,
        attribute_columns=attribute_columns
    )


    data_ingestion = DataIngestion(data_ingestion_config)

    data_ingestion.load()
    data_ingestion.save(output_dataset.path)

data_ingestion_job = create_custom_training_job_from_component(
    data_ingestion_op,
    display_name='data-ingestion-component',
    machine_type='e2-standard-4'
)


In [4]:
#generate_time_series_cv_op
@component(
    base_image="northamerica-northeast1-docker.pkg.dev/cio-workbench-image-np-0ddefe/bi-platform/bi-aaaie/images/b2b_ai/wf_pipeline/training:1.0.1-rc"
)
def generate_time_series_cv_op(
    input_dataset: Input[Dataset],
    time_column: str,
    forecast_horizon: int,
    output_train: Output[Dataset],
    output_test: Output[Dataset]
):
    from dataclasses import dataclass
    from pathlib import Path
    import logging
    import pandas as pd
    from datetime import timedelta
    from pathlib import Path


    logging_str = "[%(asctime)s: %(levelname)s: %(module)s] %(message)s"

    logging.basicConfig(
        level=logging.INFO,
        format=logging_str
    )

    logger = logging.getLogger("Match-Analysis")
    
    @dataclass(frozen=True)
    class CrossValSplitConfig:
        root_dir: Path
        input_dataset: Path
        time_column: str
        forecast_horizon: int
        train_file_name: str
        test_file_name: str
    
    class ConfigurationManager:
        def __init__(self):
            pass
        def get_cross_validation_split_kfp_config(
                self,
                input_dataset: str,
                time_column: str,
                forecast_horizon: int
        ) -> CrossValSplitConfig:

            return CrossValSplitConfig(
                root_dir=None,
                input_dataset=input_dataset,
                time_column=time_column,
                forecast_horizon=forecast_horizon,
                train_file_name=None,
                test_file_name=None
            )   

    class TimeSeriesCV:
        def __init__(self, config: CrossValSplitConfig) -> None:
            self.config = config
            self.data = None
            self.splits = None
            logger.info("TimeSeriesCV instance initialized with provided configuration.")

        def load(self):
            logger.info(f"Reading input dataset from: {self.config.input_dataset}")
            try:
                self.data = pd.read_csv(self.config.input_dataset)
                self.data[self.config.time_column] = pd.to_datetime(self.data[self.config.time_column])
                logger.info(f"Data loaded successfully. Shape: {self.data.shape}")
            except Exception as e:
                logger.error(f"Error loading data: {str(e)}")
                raise

        def generate_splits(self):
            logger.info("Generating time series cross-validation splits")
            
            train_start = pd.to_datetime("2022-01-01")
            splits = []
            current_train_end = pd.to_datetime("2024-03-31")
            
            for split_index in range(1, 5):
                test_start = current_train_end + timedelta(days=1)
                test_end = test_start + timedelta(days=self.config.forecast_horizon-1)
                
                splits.append({
                    "split_index": split_index,
                    "train_start": train_start,
                    "train_end": current_train_end,
                    "test_start": test_start,
                    "test_end": test_end
                })
                
                current_train_end = test_end
            
            self.splits = splits
            
            logger.info("Generated splits:")
            for s in splits:
                logger.info(f"Split {s['split_index']}:")
                logger.info(f"  Train: {s['train_start'].strftime('%Y-%m-%d')} to {s['train_end'].strftime('%Y-%m-%d')}")
                logger.info(f"  Test:  {s['test_start'].strftime('%Y-%m-%d')} to {s['test_end'].strftime('%Y-%m-%d')}")

        def process_splits(self):
            if self.splits is None:
                logger.error("Splits have not been generated. Call generate_splits() first.")
                raise ValueError("Splits not generated")
                
            all_train_data = []
            all_test_data = []
            
            for s in self.splits:
                logger.info(f"\nProcessing split {s['split_index']}")
                
                train_mask = (self.data[self.config.time_column] >= s["train_start"]) & \
                            (self.data[self.config.time_column] <= s["train_end"])
                
                test_mask = (self.data[self.config.time_column] >= s["test_start"]) & \
                        (self.data[self.config.time_column] <= s["test_end"])

                train_df = self.data.loc[train_mask].copy()
                test_df = self.data.loc[test_mask].copy()
                
                train_df['split_index'] = s['split_index']
                test_df['split_index'] = s['split_index']
                
                all_train_data.append(train_df)
                all_test_data.append(test_df)
                
                logger.info(f"Split {s['split_index']} - Train shape: {train_df.shape}, Test shape: {test_df.shape}")
            

            self.train = pd.concat(all_train_data, ignore_index=True)
            self.test = pd.concat(all_test_data, ignore_index=True)

        def save(self, save_train_path: str = None, save_test_path: str = None):
            if save_train_path is None:
                save_train_path = Path(self.config.root_dir, self.config.train_file_name)
            if save_test_path is None:
                save_test_path = Path(self.config.root_dir, self.config.test_file_name)
                
            logger.info(f"Saving combined training data (shape: {self.train.shape}) to {save_train_path}")
            self.train.to_csv(save_train_path, index=False)
            
            logger.info(f"Saving combined test data (shape: {self.test.shape}) to {save_test_path}")
            self.test.to_csv(save_test_path, index=False)

        
    config = ConfigurationManager()

    time_series_cv_config = config.get_cross_validation_split_kfp_config(
        input_dataset=input_dataset.path,
        time_column=time_column,
        forecast_horizon=forecast_horizon,
    )

    time_series_cv = TimeSeriesCV(time_series_cv_config)

    time_series_cv.load()
    time_series_cv.generate_splits()
    time_series_cv.process_splits()
    time_series_cv.save(
        save_train_path=output_train.path,
        save_test_path=output_test.path
    )

generate_time_series_cv_job = create_custom_training_job_from_component(
    generate_time_series_cv_op,
    display_name='generate-timeseries-cv-job',
    machine_type='e2-standard-4'
)

In [5]:
@pipeline(
    name="b2b-wf-short-term-prediction-experiments",
    description="A Kubeflow pipeline for training forecast models using AutoML Forecast on Vertex AI Pipelines from a BigQuery view."
)
def forecast_pipeline(
    project_id: str,
    project_location: str,
    bq_dataset: str,
    bq_source_table: str,
    time_column: str,
    target_column: str,
    series_identifier: str,
    attribute_columns: List[str],
    forecast_horizon: int
):
    
    data_ingestion_task = data_ingestion_job(
        project_id=project_id,
        project_location=project_location,
        bq_dataset=bq_dataset,
        bq_source_table=bq_source_table,
        time_column=time_column,
        target_column=target_column,
        series_identifier=series_identifier,
        attribute_columns=attribute_columns
    )
    
    generate_time_series_cv_task = generate_time_series_cv_job(
        input_dataset=data_ingestion_task.outputs['output_dataset'],
        time_column=time_column,
        forecast_horizon=forecast_horizon,
    )

In [6]:
aiplatform.init(
    project=CONFIG.general_setup.project_id,
    location=CONFIG.general_setup.project_location,
    staging_bucket=f"gs://{CONFIG.general_setup.project_id}_{CONFIG.general_setup.gcs_bucket}"
)

compiler.Compiler().compile(
    pipeline_func=forecast_pipeline,
    package_path=CONFIG.general_setup.pipeline_package_path
)


job = pipeline_jobs.PipelineJob(
    display_name="b2b_wf_short_term_prediction_sma_pipeline",
    template_path=CONFIG.general_setup.pipeline_package_path,
    parameter_values={
        'project_id':        CONFIG.general_setup.project_id,
        'project_location':  CONFIG.general_setup.project_location,
        'bq_dataset':        CONFIG.data_ingestion.bq_dataset,
        'bq_source_table':   CONFIG.data_ingestion.bq_source_table,
        'time_column':       CONFIG.general_setup.time_column,
        'target_column':     CONFIG.general_setup.target_column,
        'series_identifier': CONFIG.general_setup.series_identifier,
        'attribute_columns': CONFIG.general_setup.attribute_columns,
        'forecast_horizon':  CONFIG.general_setup.forecast_horizon
    }
)

job.run()

Creating PipelineJob
[2025-04-15 20:25:15,246: INFO: base] Creating PipelineJob
PipelineJob created. Resource name: projects/7796273458/locations/northamerica-northeast1/pipelineJobs/b2b-wf-short-term-prediction-experiments-20250415202512
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/7796273458/locations/northamerica-northeast1/pipelineJobs/b2b-wf-short-term-prediction-experiments-20250415202512')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/northamerica-northeast1/pipelines/runs/b2b-wf-short-term-prediction-experiments-20250415202512?project=7796273458
PipelineJob projects/7796273458/locations/northamerica-northeast1/pipelineJobs/b2b-wf-short-term-prediction-experiments-20250415202512 current state:
PipelineState.PIPELINE_STATE_RUNNING


RuntimeError: Job failed with:
code: 9
message: " The DAG failed because some tasks failed. The failed tasks are: [data-ingestion-op].; Job (project_id = wb-ai-acltr-tbs-3-pr-a62583, job_id = 2752902238044160000) is failed due to the above error.; Failed to handle the job: {project_number = 7796273458, job_id = 2752902238044160000}"
