In [1]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

! pip3 install --upgrade google-cloud-aiplatform {USER_FLAG} -q
! pip3 install -U google-cloud-storage {USER_FLAG} -q
! pip3 install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade -q

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kfp 1.8.13 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.4.0 which is incompatible.
google-cloud-pipeline-components 1.0.14 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.4.0 which is incompatible.[0m[31m
[0m

In [2]:
! pip3 install --user pip pickle-mixin



In [3]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [4]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.13
google_cloud_pipeline_components version: 1.0.14


In [1]:
PROJECT_ID = "vertex-experiments-356417"  # @param {type:"string"}

In [2]:
REGION = "europe-west6-a"  # @param {type: "string"}

In [3]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [4]:
# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

import os
import sys

# If on Vertex AI Workbench, then don't execute this code
IS_COLAB = "google.colab" in sys.modules
if not os.path.exists("/opt/deeplearning/metadata/env_version") and not os.getenv(
    "DL_ANACONDA_HOME"
):
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

In [5]:
BUCKET_NAME = "pipeline-bucket-test"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"

In [6]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "[your-bucket-name]":
    BUCKET_NAME = PROJECT_ID + "aip-" + TIMESTAMP
    BUCKET_URI = "gs://" + BUCKET_NAME

In [7]:
SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

In [8]:
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    if IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

Service Account: 971814406677-compute@developer.gserviceaccount.com


In [9]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

No changes made to gs://pipeline-bucket-test/
No changes made to gs://pipeline-bucket-test/


In [10]:
import google.cloud.aiplatform as aip

In [11]:
PIPELINE_ROOT = "{}/pipeline_root/pipeline_test".format(BUCKET_URI)

In [12]:
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, component)

In [13]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [14]:
# PREPROCESS COMPONENT

In [34]:
@component(
    packages_to_install=["google-cloud-bigquery","pandas","sklearn","db-dtypes","pickle-mixin"],
    base_image="python:3.9"
)
def preprocess(
    message: str,
    output_dataset_train: Output[Dataset],
    output_dataset_test: Output[Dataset],
    output_dataset: Output[Dataset],
    scaler_out: Output[Model]
):
    import pandas as pd
    from sklearn.preprocessing import MinMaxScaler
    from google.cloud import bigquery
    import db_dtypes
    import pickle
    
    output_dataset_train.metadata["Message"] = message + " train dataset"
    output_dataset_test.metadata["Message"] = message + " test dataset"
    
    client = bigquery.Client.from_service_account_info({
      "type": "service_account",
      "project_id": "vertex-experiments-356417",
      "private_key_id": "17b762c0bb9275b6c334b55b433da440f4361a9e",
      "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCvKwfZq1twbDOa\n3dL3Z75eypOpvbloimXAUagOPWR3qMx/tIYOs/LQ1iPSjvtkFkR3hO2MUyJuym3e\nwenodTHdAIRHzdhf1pTCM0CocbXFFOPg/mIxTTXVeOn7aGDNVxJXidGEDLaAoE1L\nMKHMhA0xqtUP/pjmlaJXYJnUHQ6VuDx9z377mKPBI8kDeeLT/kNTB3Vne9hcf/Lc\nOInpi9pyIgG3/afuEhkxQn4aK4ChySIdZ/6LTQSTiZRRvWlVp5IbmVEDeHhW17vg\nkSfvMTc44z/YAFs4O1vK9nDMMUq68141O1/u09wsEPNw/HdDSMP1MzUxjMd120Ji\n8mqeDIM9AgMBAAECggEAC3WB7bYlRPE7HgAxZyBpBnvzlXjVGnblSaO93fKbsi0/\ndYI6e9c/RPShQsq6PNWtFHGEdbfRipRhBB/E3MFf6fB49taLN1fLAvpF2k8YAETJ\n1IPnqgh+Q1EF4320SXKO/12DcINBZCA4djj/+KhR0aQ2jHWQDJOQ22z1GzhsZhRb\n3XsRoQ8N6+BdNJypr8adsG8NJqRYuaBjMbriOpOS8Aj4Ui5ErwGHqtbfG9G8bRMS\nX4Fxrxte32bWFrZpdJOBo0U17zZ6RmrQKWpeVLxckWV2gkz+PETosQ+xRawxCxOD\nYDNFEjI/X38XgOThNsXgsNq8CWCPs0l3he67HhUTMQKBgQDbYuXgoZrurbFT6m2P\n9J/nkDz5zvHWmkH5Odq7rJHUAuwF468hmCKR5hd1yqG/hb9Drjv7TfXsy+OkmqY3\nCuamM/S6inFznHS+DYkSiDBPkvtKX8nQPw8nZZGaBUZDU8fIRjtX0ZWuyvKYcUKU\nmF8aIfa8+bGIqiXfzAyK/nZKrQKBgQDMZvHaLN4ixVtQa9TofAUL4NdvawJGT0Zj\n6hsM1Wc1IzfQQq3NoVqgXZJtQdvBCAFc8b40vN3g/irqWmcYjo8MnEfT9zgK7fFT\nOoaGGjeoUrXpNn/8g8zcqoRzkt3KMnxsb5xmq4wY4jP0zYPt4mUD9hzItAWER5X8\n68Nm3f080QKBgQDMc/pvwau1y8F6xvqcMgnjsBRjam0O5mudbEFgrY6p2p9DTdEk\nqbdt6NTsPY9IAckhtfrIVPtfVxQrjsRgf5Q8SCdGwUbBy4kwpUSdNLZxMjVmNqbJ\noDrrKEjtFQVKMYX6Kxma2gN5BkHXt5WpDedzXzunGbxN9OOfEXaPAl4TcQKBgQC/\n+fwzTyq/e589FFoelMTkmCOfaOD1CmXxV6JaVmCd/trgBpBY7rBAqEHrzgxI62zk\nT8S2oaC/PdkuzQPBg7uFO48W3T3MvhiHdBN/Bka6GBz00hQ5Jc0WgCzwo8Mdsnc8\nHUPHBl4JzQLBbPj5R0ZbBx+XvcTkrZuItiKLjfnX0QKBgGftz65uJQrMCcT4XKNg\n0QXGo/STEhvrzvs4qbdX2/QK5KIxBKtmDO+hywP/MDxnbY9rIl0thQsDjt7yNZ7s\njKDmD2UoE1+aKv2DJkhsMgSCwP6EAIgyR0ZJrsLS3YzXiFh8wLATHwY3+TDIGccK\nkcdUod0W+H5Tm1PZznDi4o/n\n-----END PRIVATE KEY-----\n",
      "client_email": "bq-service@vertex-experiments-356417.iam.gserviceaccount.com",
      "client_id": "105483011423923742826",
      "auth_uri": "https://accounts.google.com/o/oauth2/auth",
      "token_uri": "https://oauth2.googleapis.com/token",
      "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
      "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/bq-service%40vertex-experiments-356417.iam.gserviceaccount.com"
    })
    df = client.list_rows("ingest_bucket.ts").to_dataframe(create_bqstorage_client=True)
    client.close()
    
    df.sort_values(by="index",inplace=True)
    df.set_index("index",inplace=True)

    scaler = MinMaxScaler()
    df_train = df.iloc[:-3]
    scaled = scaler.fit_transform(df_train.values)
    df_scaled = pd.DataFrame(data=scaled, columns=df_train.columns, index=df_train.index)
    df_train = df_scaled
    
    df_test = df.iloc[-3:]
    
    print("PERCORSO PREPROCESSING "+ output_dataset.path+".csv")
    print("PERCORSO PREPROCESSING 2 "+output_dataset.path+".csv")
    df.to_csv(output_dataset.path+".csv")
    df_test.to_csv(output_dataset_test.path+".csv")
    df_train.to_csv(output_dataset_train.path+".csv")

    with open(scaler_out.path+'.pkl','wb') as f:
        pickle.dump(scaler,f)


In [35]:
# MODEL TRAIN - EVALUATE COMPONENT

In [55]:
@component(
    packages_to_install=["pandas","sklearn","db-dtypes","pickle-mixin"],
    base_image="python:3.9",
)
def model_train_evaluation(
    message: str,
    df_train_in: Input[Dataset],
    df_test_in: Input[Dataset],
    model: Output[Model],
    scaler_in: Input[Model]
):
    import pandas as pd
    from sklearn.svm import SVR
    from sklearn.metrics import mean_absolute_error
    from sklearn.metrics import mean_squared_error
    from sklearn.metrics import mean_absolute_percentage_error
    import db_dtypes
    import pickle
    
    df_train = pd.read_csv(df_train_in.path+".csv")
    df_train.set_index("index",inplace=True)
    df_test  = pd.read_csv(df_test_in.path+".csv")
    df_test.set_index("index",inplace=True)
    
    
    lista_valori = list(df_train["TS"])
    
    X = []
    for i in range (0,len(lista_valori)-3):
        l = []
        l.append(lista_valori[i])
        l.append(lista_valori[i+1])
        l.append(lista_valori[i+2])
        X.append(l)
    
    y = []
    for i in range (3,len(lista_valori)):
        y.append(lista_valori[i])
    
    model_svr = SVR()
    model_svr.fit(X,y)
    
    
    t_p = list(df_train["TS"].iloc[-3:])
    for i in range(0,3):
        test = [t_p[-3:]]
        pred = model_svr.predict(test)[0]
        t_p.append(pred)
        
    forecast = t_p[-3:]
    with open(scaler_in.path+'.pkl', 'rb') as f:
        scaler = pickle.load(f)
    forecast_orig = scaler.inverse_transform([forecast])
    
    test = [list(df_test["TS"])]
    test
    
    mae = mean_absolute_error(test, forecast_orig)
    mse = mean_squared_error(test, forecast_orig)
    mape = mean_absolute_percentage_error(test, forecast_orig)

    model.metadata["mae"] = mae
    model.metadata["mse"] = mse
    model.metadata["mape"] = mape

    with open(model.path+'.pkl','wb') as f:
        pickle.dump(model_svr,f)

In [56]:
# MAKE PREDICTION COMPONENT

In [79]:
@component(
    packages_to_install=["google-cloud-bigquery","pandas","db-dtypes","pickle-mixin","sklearn"],
    base_image="python:3.9",
)
def make_prediction(
    message: str,
    df_in: Input[Dataset],
    df_test_in: Input[Dataset],
    scaler_in: Input[Model],
    model_svr_in: Input[Model]
):
    import pandas as pd
    import sklearn
    from google.cloud import bigquery
    import db_dtypes
    import pickle
    
    df = pd.read_csv(df_in.path+".csv")
    df.set_index("index",inplace=True)
    df_test  = pd.read_csv(df_test_in.path+".csv")
    df_test.set_index("index",inplace=True)
    
    with open(scaler_in.path+'.pkl', 'rb') as f:
        scaler = pickle.load(f)
    
    with open(model_svr_in.path+'.pkl', 'rb') as f:
        model = pickle.load(f)
    
    scaled = scaler.fit_transform(df_test.values)
    df_test = pd.DataFrame(data=scaled, columns=df_test.columns, index=df_test.index)
    
    
    t_p = list(df_test["TS"].iloc[-3:])
    for i in range(0,3):
        test = [t_p[-3:]]
        pred = model.predict(test)[0]
        t_p.append(pred)
    
    forecast = t_p[-3:]
    forecast_next = scaler.inverse_transform([forecast])
    df_forecast = pd.DataFrame(data=forecast_next[0], columns=["Forecast"],index=df_test.index+3)
    
    df_res = pd.concat([df, df_forecast.rename(columns={"Forecast":"TS"})],axis=0)
    
    client = bigquery.Client.from_service_account_info({
      "type": "service_account",
      "project_id": "vertex-experiments-356417",
      "private_key_id": "17b762c0bb9275b6c334b55b433da440f4361a9e",
      "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCvKwfZq1twbDOa\n3dL3Z75eypOpvbloimXAUagOPWR3qMx/tIYOs/LQ1iPSjvtkFkR3hO2MUyJuym3e\nwenodTHdAIRHzdhf1pTCM0CocbXFFOPg/mIxTTXVeOn7aGDNVxJXidGEDLaAoE1L\nMKHMhA0xqtUP/pjmlaJXYJnUHQ6VuDx9z377mKPBI8kDeeLT/kNTB3Vne9hcf/Lc\nOInpi9pyIgG3/afuEhkxQn4aK4ChySIdZ/6LTQSTiZRRvWlVp5IbmVEDeHhW17vg\nkSfvMTc44z/YAFs4O1vK9nDMMUq68141O1/u09wsEPNw/HdDSMP1MzUxjMd120Ji\n8mqeDIM9AgMBAAECggEAC3WB7bYlRPE7HgAxZyBpBnvzlXjVGnblSaO93fKbsi0/\ndYI6e9c/RPShQsq6PNWtFHGEdbfRipRhBB/E3MFf6fB49taLN1fLAvpF2k8YAETJ\n1IPnqgh+Q1EF4320SXKO/12DcINBZCA4djj/+KhR0aQ2jHWQDJOQ22z1GzhsZhRb\n3XsRoQ8N6+BdNJypr8adsG8NJqRYuaBjMbriOpOS8Aj4Ui5ErwGHqtbfG9G8bRMS\nX4Fxrxte32bWFrZpdJOBo0U17zZ6RmrQKWpeVLxckWV2gkz+PETosQ+xRawxCxOD\nYDNFEjI/X38XgOThNsXgsNq8CWCPs0l3he67HhUTMQKBgQDbYuXgoZrurbFT6m2P\n9J/nkDz5zvHWmkH5Odq7rJHUAuwF468hmCKR5hd1yqG/hb9Drjv7TfXsy+OkmqY3\nCuamM/S6inFznHS+DYkSiDBPkvtKX8nQPw8nZZGaBUZDU8fIRjtX0ZWuyvKYcUKU\nmF8aIfa8+bGIqiXfzAyK/nZKrQKBgQDMZvHaLN4ixVtQa9TofAUL4NdvawJGT0Zj\n6hsM1Wc1IzfQQq3NoVqgXZJtQdvBCAFc8b40vN3g/irqWmcYjo8MnEfT9zgK7fFT\nOoaGGjeoUrXpNn/8g8zcqoRzkt3KMnxsb5xmq4wY4jP0zYPt4mUD9hzItAWER5X8\n68Nm3f080QKBgQDMc/pvwau1y8F6xvqcMgnjsBRjam0O5mudbEFgrY6p2p9DTdEk\nqbdt6NTsPY9IAckhtfrIVPtfVxQrjsRgf5Q8SCdGwUbBy4kwpUSdNLZxMjVmNqbJ\noDrrKEjtFQVKMYX6Kxma2gN5BkHXt5WpDedzXzunGbxN9OOfEXaPAl4TcQKBgQC/\n+fwzTyq/e589FFoelMTkmCOfaOD1CmXxV6JaVmCd/trgBpBY7rBAqEHrzgxI62zk\nT8S2oaC/PdkuzQPBg7uFO48W3T3MvhiHdBN/Bka6GBz00hQ5Jc0WgCzwo8Mdsnc8\nHUPHBl4JzQLBbPj5R0ZbBx+XvcTkrZuItiKLjfnX0QKBgGftz65uJQrMCcT4XKNg\n0QXGo/STEhvrzvs4qbdX2/QK5KIxBKtmDO+hywP/MDxnbY9rIl0thQsDjt7yNZ7s\njKDmD2UoE1+aKv2DJkhsMgSCwP6EAIgyR0ZJrsLS3YzXiFh8wLATHwY3+TDIGccK\nkcdUod0W+H5Tm1PZznDi4o/n\n-----END PRIVATE KEY-----\n",
      "client_email": "bq-service@vertex-experiments-356417.iam.gserviceaccount.com",
      "client_id": "105483011423923742826",
      "auth_uri": "https://accounts.google.com/o/oauth2/auth",
      "token_uri": "https://oauth2.googleapis.com/token",
      "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
      "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/bq-service%40vertex-experiments-356417.iam.gserviceaccount.com"
    })
    job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("index", bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("TS", bigquery.enums.SqlTypeNames.FLOAT),
    ],
    )
    job = client.load_table_from_dataframe(
        df_res, "ingest_bucket.ts_with_forecast", job_config=None
    )
    job.result()
    client.close()
    

In [80]:
# BUILD PIPE LINE

In [81]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name="test-pipeline",
)
def pipeline(message: str):
    preprocess_task = preprocess("INIZIO PREPROCESSING")
    model_task = model_train_evaluation("INIZIO TRAINING", 
                                        df_train_in = preprocess_task.outputs["output_dataset_train"],
                                        df_test_in = preprocess_task.outputs["output_dataset_test"],
                                        scaler_in = preprocess_task.outputs["scaler_out"])
    prediction_task = make_prediction("FORECASTING", 
                    df_test_in=preprocess_task.outputs["output_dataset_test"], 
                    df_in=preprocess_task.outputs["output_dataset"], 
                    model_svr_in = model_task.outputs["model"],
                    scaler_in = preprocess_task.outputs["scaler_out"])
    

In [82]:
# COMPILE PIPELINE

In [83]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="pipeline_test.json"
)

In [84]:
# RUN THE PIPELINE

In [85]:
DISPLAY_NAME = "test_" + TIMESTAMP

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="pipeline_test.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"message": "INIZIO"},
)

job.run()

! rm pipeline_test.json

Creating PipelineJob
PipelineJob created. Resource name: projects/971814406677/locations/us-central1/pipelineJobs/test-pipeline-20220718101151
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/971814406677/locations/us-central1/pipelineJobs/test-pipeline-20220718101151')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/test-pipeline-20220718101151?project=971814406677
PipelineJob projects/971814406677/locations/us-central1/pipelineJobs/test-pipeline-20220718101151 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/971814406677/locations/us-central1/pipelineJobs/test-pipeline-20220718101151 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/971814406677/locations/us-central1/pipelineJobs/test-pipeline-20220718101151 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/971814406677/locations/us-central1/pipelineJobs/test-pipeline-