In [13]:
GOOGLE_CLOUD_PROJECT="nyc-transit-426211"
USER="andrub818@gmail.com"

In [16]:
from datetime import datetime, timedelta
from kfp import dsl, compiler
from kfp.dsl import component
import google.cloud.aiplatform as aip

@component(
    base_image='python:3.13',
    packages_to_install=['pandas', 'prophet', 'google-cloud-bigquery', 'google-cloud-storage', 'gcsfs', 'joblib==1.3.2']
)
def preprocess_and_train(base_path: str, dt: str) -> str:
    import pandas as pd
    import pickle
    from datetime import datetime, timedelta
    from google.cloud import bigquery
    from prophet import Prophet

    # Define the paths for features and labels
    # base_path will be motor_vehicle_crashes
    project_name = "nyc-transit-426211"
    dataset = base_path
    table = "crashes"
    model_path = f"{base_path}/trained_models/model_{dt}.pickle"

    client = bigquery.Client(project='nyc-transit-426211')
    query = f"""
        select
            crash_date as ds,
            count(*) as y
        from `{project_name}.{dataset}.{table}`
        where crash_date <= '{dt}'
        group by 1
        order by 1 asc
    """
    df = client.query(query).to_dataframe()

    # Fit the model
    model = Prophet()
    model.fit(df)
    print("Training complete!")

    # Save the model to GCS
    fs = gcsfs.GCSFileSystem()
    with fs.open(model_path, 'wb') as f:
        pickle.dump(model, f)
    
    return base_path

@component(
    base_image='python:3.13',
    packages_to_install=['pandas', 'scikit-learn', 'google-cloud-storage', 'gcsfs', 'joblib==1.3.2']
)
def write_predictions(base_path: str) -> None:
    pass
    # import pandas as pd
    # import gcsfs
    # import joblib
    # import logging
    # from sklearn.metrics import classification_report

    # model_path = f"{base_path}/trained_models/model.joblib"
    # test_features_path = f"{base_path}/preprocessed_data/test_features.csv"
    # test_labels_path = f"{base_path}/preprocessed_data/test_labels.csv"

    # fs = gcsfs.GCSFileSystem()

    # # Load model and test data
    # with fs.open(test_features_path, 'r') as f:
    #     features_test_standardized = pd.read_csv(f)

    # with fs.open(test_labels_path, 'r') as f:
    #     target_test_encoded = pd.read_csv(f)

    # # Load the model from GCS
    # with fs.open(model_path, 'rb') as model_file:
    #     model = joblib.load(model_file)

    # predictions = model.predict(features_test_standardized)
    # report = classification_report(predictions, target_test_encoded)

    # # Output evaluation report
    # logging.info(classification_report)

In [17]:
@dsl.pipeline(
    name='motor-vehicle-crashes-pipeline',
    description='An pipeline that trains and evaluates NYC motor vehicle crashes.'
)
def mvc_pipeline():
    dt = (datetime.today() - timedelta(days=61)).strftime('%Y-%m-%d')
    preprocess_and_train_task = preprocess_and_train(base_path = "motor_vehicle_crashes", dt = dt)
    evaluate_task = write_predictions(base_path = preprocess_and_train_task.output)

# Compile the pipeline
pipeline_func = mvc_pipeline
pipeline_filename = f"preprocess_train_evaluate.json"

compiler.Compiler().compile(pipeline_func=pipeline_func,
                            package_path=pipeline_filename)

job = aip.PipelineJob(
    display_name = "mvc_pipeline",
    template_path = pipeline_filename,
    project = GOOGLE_CLOUD_PROJECT,
)
job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/270973361065/locations/us-central1/pipelineJobs/motor-vehicle-crashes-pipeline-20250202221204
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/270973361065/locations/us-central1/pipelineJobs/motor-vehicle-crashes-pipeline-20250202221204')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/motor-vehicle-crashes-pipeline-20250202221204?project=270973361065
