In [22]:
USER_FLAG = "--user"

In [23]:
!pip3 install {USER_FLAG} google-cloud-aiplatform>=1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp>=1.8.9 google-cloud-pipeline-components>=0.2.0

In [24]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [1]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.22
google_cloud_pipeline_components version: 0.2.0


In [2]:
import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  oa-suarez-prueba


In [6]:
BUCKET_NAME="gs://" + PROJECT_ID + "-oa-suarez-merchan"

In [4]:
import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

In [7]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin:/home/jupyter/.local/bin


'gs://oa-suarez-prueba-oa-suarez-merchan/pipeline_root/'

In [46]:
@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

In [98]:
from kfp.v2.dsl import component, Output, Dataset


@component(packages_to_install=["pandas", "gcsfs"])
def load_data_from_gcs(
    bucket_name: str,
    file_path: str,
    output_data: Output[Dataset]
):
    import gcsfs
    import pandas as pd
    """Un componente para cargar datos de un archivo CSV en Google Cloud Storage."""
    fs = gcsfs.GCSFileSystem()
    gcs_file_path = f'gs://{bucket_name}/{file_path}'
 
    df_nuevo = pd.read_csv(gcs_file_path)
    fl_df = df_nuevo
    fl_df = fl_df.sort_values('date')
    fl_df['Checkin_MA_180'] = fl_df['checkin'].rolling(window=180).mean()
    fl_df['Locales_Abiertos_MA_180'] = fl_df['Locales_Abiertos'].rolling(window=180).mean()
    fl_df['Checkin_MA_50'] = fl_df['checkin'].rolling(window=50).mean()
    fl_df['Locales_Abiertos_MA_50'] = fl_df['Locales_Abiertos'].rolling(window=50).mean()
    fl_df['Checkin_MA_365'] = fl_df['checkin'].rolling(window=365).mean()
    fl_df['Locales_Abiertos_MA_365'] = fl_df['Locales_Abiertos'].rolling(window=365).mean()
    fl_df_cleaned = fl_df.dropna()
    fl_df = fl_df_cleaned
    fl_df['date'] = pd.to_datetime(df_nuevo['date'])
    fl_df.sort_values(by=['date'], inplace=True)
    lagged_df = fl_df.copy()
    lagged_df['Locales_Abiertos_Lag180Dias'] = lagged_df['Locales_Abiertos'].transform(lambda x: x.shift(1))
    lagged_df_cleaned = lagged_df.dropna()
    lagged_df = lagged_df_cleaned

    # Guarda el DataFrame como un archivo CSV en la ruta de salida del componente
    lagged_df.to_csv(output_data.path, index=False)

    print(f"Datos cargados y guardados en {output_data.path}")


In [123]:
from kfp.v2.dsl import component, Input

@component(packages_to_install=["pandas", "scikit-learn"])
def train_and_evaluate_model(
    dataset: Input[Dataset],
    model_output: Output[Model]
):
    import pandas as pd
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import mean_squared_error, mean_absolute_error
    from math import sqrt
    import joblib
    # Carga el dataset preprocesado
    df = pd.read_csv(dataset.path)

    # Preparar los datos para entrenamiento
    X = df[['Checkin_MA_50','Checkin_MA_180','Checkin_MA_365', 'Locales_Abiertos_MA_50','Locales_Abiertos_MA_180','Locales_Abiertos_MA_365','Locales_Abiertos','checkin']]
    y = df['Locales_Abiertos_Lag180Dias']

    # Dividir los datos en conjuntos de entrenamiento y prueba
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Entrenar el modelo
    rf = RandomForestRegressor(n_estimators=50, random_state=42)
    rf.fit(X_train, y_train)
    joblib.dump(rf, model_output.path) 
    
    # Predecir en el conjunto de prueba
    y_pred = rf.predict(X_test)




In [134]:
from kfp.v2.dsl import component, Input, Output, Model, Dataset

@component(packages_to_install=["pandas", "scikit-learn", "gcsfs", "joblib"])
def generate_predictions(
    dataset: Input[Dataset],
    model: Input[Model],
    predictions_output: Output[Dataset]
):
    import pandas as pd
    import joblib
    # Cargar el dataset preprocesado
    df = pd.read_csv(dataset.path)
    
    # Cargar el modelo entrenado
    rf = joblib.load(model.path)

    # Generar predicciones
    predicciones = {"fecha": [], "prediccion": []}
    r = 630
    c = -0.3
    for x in range(-250, -1):
        checkin50 = df['Checkin_MA_50'].iloc[x]
        checkin180 = df['Checkin_MA_180'].iloc[x]
        checkin365 = df['Checkin_MA_365'].iloc[x]
        abiertos50 = df['Locales_Abiertos_MA_50'].iloc[x]
        abiertos180 = df['Locales_Abiertos_MA_180'].iloc[x]
        abiertos365 = df['Locales_Abiertos_MA_365'].iloc[x]
        abiertos = df['Locales_Abiertos'].iloc[x]
        checkin = df['checkin'].iloc[x]
        # Aquí usamos el modelo para hacer la predicción
        prediction = r + (x * c) + rf.predict([[checkin50, checkin180, checkin365, abiertos50, abiertos180, abiertos365, checkin, abiertos]])[0]
        predicciones["prediccion"].append(prediction)
        # Aquí convertimos la fecha a datetime y le sumamos 180 días
        predicciones["fecha"].append((pd.to_datetime(df["date"].iloc[x]) + pd.Timedelta(days=180)).strftime('%Y-%m-%d'))

    # Convertir las predicciones a un DataFrame y guardarlo como CSV
    predictions_df = pd.DataFrame(predicciones)
    predictions_df.to_csv(predictions_output.path, index=False)

    print(f"Predicciones guardadas en {predictions_output.path}")



In [135]:
from kfp.v2.dsl import component, Input, Output, Dataset, HTML

@component(packages_to_install=["pandas", "matplotlib"])
def plot_predictions(
    predictions: Input[Dataset],
    historical_data: Input[Dataset],
    plot_output: Output[HTML]
):
    import pandas as pd
    import matplotlib.pyplot as plt
    from io import BytesIO
    import base64

    # Cargar los datos de predicciones y los datos históricos
    df_predicciones = pd.read_csv(predictions.path)
    fl_df = pd.read_csv(historical_data.path)

    # Asegurarse de que las fechas están en formato datetime
    df_predicciones['fecha'] = pd.to_datetime(df_predicciones['fecha'])
    fl_df['date'] = pd.to_datetime(fl_df['date'])

    # Ordenar los DataFrames por fecha
    df_predicciones.sort_values('fecha', inplace=True)
    fl_df.sort_values('date', inplace=True)

    # Seleccionar los últimos 1000 datos
    df_predicciones_last_1000 = df_predicciones.tail(1000)
    fl_df_last_1000 = fl_df.tail(1000)

    # Crear el gráfico
    plt.figure(figsize=(15,7))
    plt.plot(fl_df_last_1000['date'], fl_df_last_1000['Locales_Abiertos'], label='Locales Abiertos', color='blue')
    plt.plot(df_predicciones_last_1000['fecha'], df_predicciones_last_1000['prediccion'], label='Predicciones', color='red', linestyle='--')
    plt.title('Locales Abiertos vs Predicciones (Últimos 1000 Datos)')
    plt.xlabel('Fecha')
    plt.ylabel('Cantidad')
    plt.legend()
    plt.grid(True)
    
    # Guardar la figura en un buffer
    buf = BytesIO()
    plt.savefig(buf, format='png')
    buf.seek(0)
    plt.close()
    
    # Codificar la imagen en base64 y escribirla en el archivo de salida
    data_uri = base64.b64encode(buf.read()).decode('utf-8')
    html = '<img src="data:image/png;base64,{0}">'.format(data_uri)
    with open(plot_output.path, 'w') as f:
        f.write(html)


In [136]:
from kfp.v2.dsl import pipeline, Dataset
from google_cloud_pipeline_components import aiplatform as gcc_aip

@pipeline(
    name="data-loading-pipeline",
    description="A pipeline that loads data from GCS."
)
def data_loading_pipeline(
    bucket_name: str = "oa_suarez_merchan",
    business_file_path: str = "Business_Benja.csv",
):

    load_business_data_task = load_data_from_gcs(
        bucket_name=bucket_name,
        file_path=business_file_path
    )
    
    train_and_evaluate_model_task = train_and_evaluate_model(
        dataset=load_business_data_task.outputs['output_data']
    )


    generate_predictions_task = generate_predictions(
        dataset=load_business_data_task.outputs['output_data'],
        model=train_and_evaluate_model_task.outputs['model_output']  # Asegúrate de usar el nombre correcto de la salida
    )
    
    plot_predictions_task = plot_predictions(
        predictions=generate_predictions_task.outputs['predictions_output'],
        historical_data=load_business_data_task.outputs['output_data']
    )    


In [137]:
from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=data_loading_pipeline,
    package_path="data_loading_pipeline.json"
)

In [138]:
from datetime import datetime
import re

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S").lower()
job_id = "time-series-pipeline-{0}".format(TIMESTAMP)
job_id = re.sub(r"[^-a-z0-9]+", "-", job_id)

job = aiplatform.PipelineJob(
    display_name="time-series-pipeline",
    template_path="data_loading_pipeline.json",
    job_id=job_id,
    enable_caching=True
)

In [139]:
job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/414169182204/locations/us-central1/pipelineJobs/time-series-pipeline-20231109235833
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/414169182204/locations/us-central1/pipelineJobs/time-series-pipeline-20231109235833')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/time-series-pipeline-20231109235833?project=414169182204
