In [4]:
# IMPORT THE REQUIRED LIBRARIES

from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Output,
                        Model,
                        Metrics,
                        Markdown,
                        HTML,
                        component, 
                        OutputPath, 
                        InputPath)

from kfp.v2 import compiler
from google.cloud import aiplatform as vertex_
from google.cloud.aiplatform import pipeline_jobs

from datetime import datetime
import pandas as pd

In [5]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
REGION = 'europe-west1'

In [6]:
BUCKET_NAME="gs://"+"osn-smartcapex-data-uploaded-sbx/staging"

PIPELINE_ROOT = f"{BUCKET_NAME}/smartcapex_pipeline/"

In [7]:
# Custom base image created using docker

IMAGE_NAME = "smartcapex-pipeline"
BASE_IMAGE = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/osn-smartcapex-404-sbx/{IMAGE_NAME}"

In [8]:
vertex_.init(project=PROJECT_ID, location=REGION)

Read the Dataset

In [9]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="components/get_data.yaml"
)

def get_data(
    project_id: str, 
    dataset_src: str,
    table_id: str,
    dataset_raw: Output[Dataset],
):
    """
    Get data from BigQuery
    """
    
    from google.cloud import bigquery 
    import pandas as pd
    
    client = bigquery.Client(project=project_id)
    query = f"select * from `{project_id}.{dataset_src}.{table_id}`"
    data = client.query(query = query).to_dataframe()
    data.to_csv(dataset_raw.path, index=False)
    print(f"Data successfully read from BigQuery table : {project_id}.{dataset_src}.{table_id}")

In [10]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="components/preprocessing.yaml"
)

def preprocess_data(
    raw_sites_df: Input[Dataset],
    raw_oss_2g_df: Input[Dataset],
    raw_oss_3g_df: Input[Dataset],
    raw_oss_4g_df: Input[Dataset],
    raw_oss_rtx_df: Input[Dataset],
    dataset_sites_preprocessed: Output[Dataset],
    dataset_oss_preprocessed: Output[Dataset],
):
    
    import pandas as pd
    from src.d02_preprocessing.process_sites import sites_preprocessing
    from src.d02_preprocessing.process_oss import preprocessing_oss_counter_weekly
   
    raw_sites_df = pd.read_csv(raw_sites_df.path)
    raw_oss_2g_df = pd.read_csv(raw_oss_2g_df.path)
    raw_oss_3g_df = pd.read_csv(raw_oss_3g_df.path)
    raw_oss_4g_df = pd.read_csv(raw_oss_4g_df.path)
    raw_oss_rtx_df = pd.read_csv(raw_oss_rtx_df.path)
    
    
    sites_preprocessed = sites_preprocessing(raw_sites_df)
    oss_preprocessed = preprocessing_oss_counter_weekly(sites_preprocessed,raw_oss_2g_df,raw_oss_3g_df,raw_oss_4g_df,raw_oss_rtx_df)
    sites_preprocessed.to_csv(dataset_sites_preprocessed.path, index=False)
    oss_preprocessed.to_csv(dataset_oss_preprocessed.path, index=False)
    

In [11]:
@component(
    base_image=BASE_IMAGE,
)
def compute_sites_distances(
    dataset_sites_preprocessed: Input[Dataset],
    df_distance: Output[Dataset],

):
    
    import pandas as pd
    from src.d03_capacity.technical_modules.cluster_selection import compute_distance_between_sites
   
    df_sites = pd.read_csv(dataset_sites_preprocessed.path)
    distance = compute_distance_between_sites(df_sites)
    
    distance.to_csv(df_distance.path, index=False)


In [12]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="components/save_to_bigquery.yaml"
)

def save_to_bigquery(
    dataset_preprocessed: Input[Dataset],
    project_id: str,
    dataset_id: str,
    table_output: str
):
    import pandas as pd
    import pandas_gbq
    
    dataset_preprocessed = pd.read_csv(dataset_preprocessed.path)
    
    pandas_gbq.to_gbq(dataset_preprocessed, f'{project_id}.{dataset_id}.{table_output}', project_id=project_id, if_exists='replace')

In [13]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="components/traffic_forcasting.yaml"
)

def traffic_forcasting(
    dataset_preprocessed: Input[Dataset],
    dataset_site_preprocessed: Input[Dataset],
    df_traffic_predicted: Output[Dataset]
    

):
    import pandas as pd
    import pandas_gbq
    from src.d03_capacity.technical_modules.traffic_forecasting import prophet
    
    traffic_preprocessed = pd.read_csv(dataset_preprocessed.path)
    site_preprocessed = pd.read_csv(dataset_site_preprocessed.path)
    
    traffic_predicted = prophet(traffic_preprocessed)
    
    traffic_predicted.drop('site_id', axis=1, inplace=True)
    traffic_predicted = traffic_predicted.merge(site_preprocessed[['site_id', 'cell_name']].drop_duplicates(),
                                                      how='left',
                                                      on='cell_name')
    traffic_predicted.dropna(subset=['site_id'], inplace=True)
    
    traffic_predicted.to_csv(df_traffic_predicted.path, index=False)
    

In [14]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="components/upgrade_selection.yaml"
)

def process_bands_to_upgrade(
    dataset_distance: Input[Dataset],
    df_traffic_predicted: Input[Dataset],
    dataset_site_preprocessed: Input[Dataset],
    dataset_typology_sector: Input[Dataset],
    dataset_selected_band_per_site : Output[Dataset],
    dataset_affected_cells: Output[Dataset],
    dataset_cluster_future_upgrades: Output[Dataset]
    
    
    
):
    import pandas as pd
    import pandas_gbq
    from src.d03_capacity.technical_modules.upgrade_selection import upgrade_selection_pipeline
    from src.d03_capacity.technical_modules.cluster_selection import get_cluster_of_future_upgrades
    from src.d00_conf.conf import conf, conf_loader
    
    conf_loader("OSN")

    
    df_predicted_traffic_kpis = pd.read_csv(df_traffic_predicted.path)
    df_sites = pd.read_csv(dataset_site_preprocessed.path)
    df_typology_sector = pd.read_csv(dataset_typology_sector.path)
    df_distance = pd.read_csv(dataset_distance.path)
    print("Process bands to upgrade")
    
    selected_band_per_site = upgrade_selection_pipeline(df_predicted_traffic_kpis.copy(),df_typology_sector,df_sites)
    
    df_affected_cells = df_predicted_traffic_kpis.merge(selected_band_per_site,
                                                              on='site_id',
                                                              how='inner')
    
    df_cluster_future_upgrades = get_cluster_of_future_upgrades(selected_band_per_site,
                                                                df_distance,
                                                                max_neighbors=conf['TRAFFIC_IMPROVEMENT'][
                                                                    'MAX_NUMBER_OF_NEIGHBORS'])
    
    selected_band_per_site.to_csv(dataset_selected_band_per_site.path, index=False)
    df_affected_cells.to_csv(dataset_affected_cells.path, index=False)
    df_cluster_future_upgrades.to_csv(dataset_cluster_future_upgrades.path, index=False)

In [15]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="components/train_technical_pipeline.yaml"
)
def train_technical_pipeline (
    dataset_oss_preprocessed: Input[Dataset],
    dataset_site_preprocessed: Input[Dataset],
    dataset_distance: Input[Dataset],
    dataset_cell_affected: Output[Dataset],
    dataset_list_of_upgrades: Output[Dataset],
    data_traffic_features: Output[Dataset],
    voice_traffic_features: Output[Dataset],
):
    import pandas as pd
    from src.d03_capacity.technical_modules.activation_model import get_affected_cells_with_interactions_between_upgrades
    from src.d03_capacity.technical_modules.cluster_selection import get_cluster_of_affected_sites
    from src.d03_capacity.technical_modules.traffic_improvement import get_all_traffic_improvement_features,compute_traffic_after
    from src.d00_conf.conf import conf, conf_loader

    conf_loader("OSN")

    df_traffic_weekly_kpis = pd.read_csv(dataset_oss_preprocessed.path)
    df_sites = pd.read_csv(dataset_site_preprocessed.path)
    df_distance = pd.read_csv(dataset_distance.path)

    print("train_technical_pipeline")
    if conf['COUNTRY']=='OSN':
        df_traffic_weekly_kpis = df_traffic_weekly_kpis.replace({'cell_band': {'F2_U2100': 'U2100', 'F3_U2100': 'U2100', 'F1_U900': 'U900',
                                                                           'F1_U2100':'U2100','F4_U2100':'U2100','F2_U900':'U900'}})


    print("enter get_affected_cells_with_interactions_between_upgrades")
    df_cell_affected = get_affected_cells_with_interactions_between_upgrades(df_traffic_weekly_kpis)

    list_of_upgrades, sites_to_remove = get_cluster_of_affected_sites(df_cell_affected,
                                                                      df_distance)

    df_cell_affected.to_csv(dataset_cell_affected.path, index=False)
    list_of_upgrades.to_csv(dataset_list_of_upgrades.path, index=False)
    
    
    print("Process features to train traffic improvement and traffic trend models")
    df_data_traffic_features = get_all_traffic_improvement_features(df_traffic_weekly_kpis,
                                                                       df_cell_affected,
                                                                       list_of_upgrades,
                                                                       sites_to_remove,
                                                                       df_sites,
                                                                       type_of_traffic='data',
                                                                       group_bands=False,
                                                                       remove_sites_with_more_than_one_upgrade_same_cluster=True,
                                                                       kpi_to_compute_upgrade_effect=[
                                                                           "total_data_traffic_dl_gb"],
                                                                       capacity_kpis_features=[
                                                                           "cell_occupation_dl_percentage"],
                                                                       upgraded_to_not_consider=[])
    
    df_data_traffic_features = compute_traffic_after(df_data_traffic_features,df_traffic_weekly_kpis,'total_data_traffic_dl_gb')
    
    df_voix_traffic_features = get_all_traffic_improvement_features(df_traffic_weekly_kpis,
                                                                       df_cell_affected,
                                                                       list_of_upgrades,
                                                                       sites_to_remove,
                                                                       type_of_traffic='voice',
                                                                       group_bands=False,
                                                                       remove_sites_with_more_than_one_upgrade_same_cluster=True,
                                                                       kpi_to_compute_upgrade_effect=[
                                                                           "total_voice_traffic_kerlands"],
                                                                       capacity_kpis_features=[
                                                                           "cell_occupation_dl_percentage"],
                                                                       upgraded_to_not_consider=[])
    
    df_voix_traffic_features = compute_traffic_after(df_voix_traffic_features,df_traffic_weekly_kpis,'total_voice_traffic_kerlands')
    
    
    df_data_traffic_features.to_csv(data_traffic_features.path, index=False)
    df_voice_traffic_features.to_csv(voice_traffic_features.path, index=False)
    
    
    print("Train traffic improvement and traffic trend models")
    
#     train_traffic_improvement_model(df_data_traffic_features,
#                                                            type_of_traffic='data',
#                                                            remove_samples_with_target_variable_lower=True,
#                                                            output_route=conf['PATH']['MODELS'],
#                                                            results_route=conf['PATH']['MODELS_OUTPUT'],
#                                                            bands_to_consider=['G900', 'G1800','L2600', 'L1800', 'L800', 'U2100',
#                                                                               'U900'],
#                                                            save_model=True)
    
#     train_traffic_improvement_model(df_voice_traffic_features,
#                                                            type_of_traffic='voice',
#                                                            remove_samples_with_target_variable_lower=True,
#                                                            output_route=conf['PATH']['MODELS'],
#                                                            results_route=conf['PATH']['MODELS_OUTPUT'],
#                                                            bands_to_consider=['G900', 'G1800','L2600','L1800', 'L800', 'U2100',
#                                                                               'U900'],
#                                                            save_model=True)
    
    
    
    
    
    
    
    
    
    

    

In [16]:
# USE TIMESTAMP TO DEFINE UNIQUE PIPELINE NAMES
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
DISPLAY_NAME = 'pipeline-smartcapex-job{}'.format(TIMESTAMP)

In [17]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="smartcapex-pipeline"   
)

def pipeline(
     
    dataset_src: str,
    table_site_raw: str,   
    table_oss_2g_raw: str, 
    table_oss_3g_raw: str, 
    table_oss_4g_raw: str, 
    table_oss_rtx_raw: str,
    table_typology_sector_raw: str,
    dataset_preprocessing: str,
    df_sites: str,
    oss_counter_weekly: str,
    dataset_intermediate : str,
    df_traffic_predicted :str,
    df_distance :str,
    df_selected_band_per_site :str,
    df_affected_cells :str,
    df_future_upgrades :str,
    project: str = PROJECT_ID,
):

    get_data_sites_op = get_data(project,dataset_src,table_site_raw)
    get_data_oss_2g_op = get_data(project,dataset_src,table_oss_2g_raw)
    get_data_oss_3g_op = get_data(project,dataset_src,table_oss_3g_raw)
    get_data_oss_4g_op = get_data(project,dataset_src,table_oss_4g_raw)
    get_data_oss_rtx_op = get_data(project,dataset_src,table_oss_rtx_raw)
    
    data_preprocess_op = preprocess_data(get_data_sites_op.outputs["dataset_raw"],get_data_oss_2g_op.outputs["dataset_raw"],get_data_oss_3g_op.outputs["dataset_raw"],\
                                         get_data_oss_4g_op.outputs["dataset_raw"],get_data_oss_rtx_op.outputs["dataset_raw"])
    
    save_to_bigquery_sites_op = save_to_bigquery(data_preprocess_op.outputs["dataset_sites_preprocessed"],project,dataset_preprocessing,df_sites)
    save_to_bigquery_oss_op = save_to_bigquery(data_preprocess_op.outputs["dataset_oss_preprocessed"],project,dataset_preprocessing,oss_counter_weekly)
    
    traffic_forcasting_op = (traffic_forcasting(data_preprocess_op.outputs["dataset_oss_preprocessed"],data_preprocess_op.outputs["dataset_sites_preprocessed"])).set_cpu_limit('16').set_memory_limit('60G')
    save_to_bigquery_traffic_predicted_op = save_to_bigquery(traffic_forcasting_op.outputs["df_traffic_predicted"],project,dataset_intermediate,df_traffic_predicted)
    
    compute_sites_distances_op = compute_sites_distances(data_preprocess_op.outputs["dataset_sites_preprocessed"])
    save_to_bigquery_sites_distances_op = save_to_bigquery(compute_sites_distances_op.outputs["df_distance"],project,dataset_intermediate,df_distance)

    get_data_typology_op = get_data(project,dataset_src,table_typology_sector_raw)
    upgrade_selection_op = process_bands_to_upgrade(compute_sites_distances_op.outputs["df_distance"],traffic_forcasting_op.outputs["df_traffic_predicted"],data_preprocess_op.outputs["dataset_sites_preprocessed"],get_data_typology_op.outputs["dataset_raw"])
    
    save_to_bigquery_selected_band_per_site_op = save_to_bigquery(upgrade_selection_op.outputs["dataset_selected_band_per_site"],project,dataset_intermediate,df_selected_band_per_site)
    save_to_bigquery_affected_cells_op = save_to_bigquery(upgrade_selection_op.outputs["dataset_affected_cells"],project,dataset_intermediate,df_affected_cells)
    save_to_bigquery_future_upgrades_op = save_to_bigquery(upgrade_selection_op.outputs["dataset_cluster_future_upgrades"],project,dataset_intermediate,df_future_upgrades)
    
    train_technical_pipeline_op = train_technical_pipeline(data_preprocess_op.outputs["dataset_oss_preprocessed"],data_preprocess_op.outputs["dataset_sites_preprocessed"],compute_sites_distances_op.outputs["df_distance"])

    

In [18]:
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='components/smartcapex-pipeline.json')



In [19]:
start_pipeline = pipeline_jobs.PipelineJob(
    display_name="smartcapex-pipeline",
    template_path="components/smartcapex-pipeline.json",
    parameter_values={
        "project": "osn-smartcapex-404-sbx",
        "dataset_src": "Dataset_SmartCapex",
        "table_site_raw": "Thies_referentielSite-OCI",   
        "table_oss_2g_raw": "osscounter2G-OCI", 
        "table_oss_3g_raw": "osscounter3G-OCI", 
        "table_oss_4g_raw": "osscounter4G-OCI", 
        "table_oss_rtx_raw": "trx2G-OCI",
        "table_typology_sector_raw": "typology_sector",
        "dataset_preprocessing": "preprocessing",
        "df_sites": "df_sites",
        "oss_counter_weekly": "oss_counter_weekly",
        "dataset_intermediate": "intermediate",
        "df_traffic_predicted":"df_predicted_traffic_kpis",
        "df_distance": "df_distance",
        "df_selected_band_per_site" :"df_selected_band_per_site_upgrade_selection" ,
        "df_affected_cells" :"df_affected_cells_upgrade_selection",
        "df_future_upgrades" :"df_future_upgrades_upgrade_selection",
        
    }
)

In [20]:
start_pipeline.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/989544951348/locations/europe-west1/pipelineJobs/smartcapex-pipeline-20230712151832
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/989544951348/locations/europe-west1/pipelineJobs/smartcapex-pipeline-20230712151832')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west1/pipelines/runs/smartcapex-pipeline-20230712151832?project=989544951348
PipelineJob projects/989544951348/locations/europe-west1/pipelineJobs/smartcapex-pipeline-20230712151832 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/989544951348/locations/europe-west1/pipelineJobs/smartcapex-pipeline-20230712151832 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/989544951348/locations/europe-west1/pipelineJobs/smartcapex-pipeline-20230712151832 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/989544951348/locatio

RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [train-technical-pipeline].; Job (project_id = osn-smartcapex-404-sbx, job_id = 7162106593146830848) is failed due to the above error.; Failed to handle the job: {project_number = 989544951348, job_id = 7162106593146830848}"
