In [None]:
import dotenv
import os
from dotenv import load_dotenv
load_dotenv() 

PROJECT_ID = os.getenv('PROJECT_ID')
REGION = os.getenv('REGION')
BUCKET_NAME = os.getenv('BUCKET_NAME')
PIPELINE_JSON_PATH = os.getenv('PIPELINE_JSON_PATH')
#SERVICE_ACCOUNT = os.getenv('SERVICE_ACCOUNT')

from google.cloud import aiplatform

aiplatform.init(
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=BUCKET_NAME,
    #service_account=SERVICE_ACCOUNT
)
print("Vertex AI initialized successfully!")

In [None]:
from google.cloud import storage

# Initialize a storage client
storage_client = storage.Client()

# Specify your bucket name
bucket_name = BUCKET_NAME
bucket = storage_client.bucket(bucket_name)
# Check if you can access the bucket
if bucket.exists():
    print(f"Access to bucket '{bucket_name}' verified successfully.")
else:
    print(f"Cannot access bucket '{bucket_name}'. Check permissions.")


In [None]:
from kfp.v2 import dsl
from kfp.v2.dsl import component, Output, Input, Dataset, Model
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Step 1: Data extraction from BigQuery
@component(
    base_image="python:3.9",  # Specify a base image with Python 3.9
    packages_to_install=["google-cloud-bigquery", "pandas"]
)
def extract_data_from_bigquery(
    query: str,
    output_data_path: Output[Dataset]  # Output to save the file
):
    from google.cloud import bigquery
    client = bigquery.Client()

    # Fetch data
    df = client.query(query).to_dataframe()

    # Save the data to a CSV file
    df.to_csv(output_data_path.path, index=False)
    print(f"Data extracted and saved to {output_data_path.path}")

# Step 2: Preprocessing data
@component
def preprocess_data(
    input_data_path: Input[Dataset],  # Input path from the extraction task
    target_column: str,
    train_data_path: Output[Dataset],  # Output paths for training and testing data
    test_data_path: Output[Dataset]
):
    # Load the dataset
    df = pd.read_csv(input_data_path.path)

    # Preprocessing steps (fill missing values, encode, etc.)
    df = df.fillna(0)
    df['state'] = df['state'].astype('category').cat.codes  # Encoding categorical variables

    # Split into features and target
    features = df.drop(columns=[target_column])
    target = df[target_column]

    # Split into train and test datasets
    X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.2, random_state=42)

    # Save the processed data to output paths
    train_data = pd.concat([X_train, y_train], axis=1)
    test_data = pd.concat([X_test, y_test], axis=1)

    train_data.to_csv(train_data_path.path, index=False)
    test_data.to_csv(test_data_path.path, index=False)
    print(f"Data processed and saved to {train_data_path.path} and {test_data_path.path}")

# Step 3: Train the regression model
@component
def train_model(
    train_data_path: Input[Dataset],  # Input training data
    model_output_path: Output[Model]  # Output path to save the model
):
    # Load the training data
    train_data = pd.read_csv(train_data_path.path)

    # Assume 'hospitalized' is the target
    y_train = train_data.pop('hospitalized')

    # Train the model using XGBoost (Regressor if it's a regression problem)
    model = xgb.XGBRegressor()
    model.fit(train_data, y_train)

    # Save the trained model
    model.save_model(model_output_path.path)
    print(f"Model trained and saved to {model_output_path.path}")

# Step 4: Evaluate the model's performance
@component
def evaluate_model(
    model_path: Input[Model],  # Input trained model
    test_data_path: Input[Dataset]  # Input test data
):
    # Load the model and test data
    model = xgb.XGBRegressor()
    model.load_model(model_path.path)

    test_data = pd.read_csv(test_data_path.path)

    # Separate features and target
    y_test = test_data.pop('hospitalized')

    # Make predictions
    y_pred = model.predict(test_data)
    accuracy = accuracy_score(y_test, y_pred.round())  # If regression, round predictions for accuracy
    print(f"Model Accuracy: {accuracy:.4f}")

# Define the pipeline

@dsl.pipeline(
    name="mlcov_pipeline",
    description="A pipeline to train and evaluate a regression model",
    pipeline_root=BUCKET_NAME
)
def regression_pipeline(
    bq_query: str = "SELECT * FROM `data-id`",
    target_column: str = "hospitalized"
):
    # Step 1: Extract data from BigQuery
    extract_task = extract_data_from_bigquery(query=bq_query)

    # Step 2: Preprocess data
    preprocess_task = preprocess_data(
        input_data_path=extract_task.outputs['output_data_path'],
        target_column=target_column
    )

    # Step 3: Train the model
    train_task = train_model(
        train_data_path=preprocess_task.outputs['train_data_path']
    )

    # Step 4: Evaluate the model
    evaluate_task = evaluate_model(
        model_path=train_task.outputs['model_output_path'],
        test_data_path=preprocess_task.outputs['test_data_path']
    )

  from kfp.v2 import dsl
  return component_factory.create_component_from_func(


In [4]:
#compile
from kfp.v2 import compiler

# # Compile the pipeline to a JSON file
# pipeline_func = regression_pipeline
# compiler.Compiler().compile(pipeline_func=pipeline_func, package_path='mlcov_reg2.json')

compiler.Compiler().compile(
    pipeline_func=regression_pipeline,
    package_path="mlcov_pipeline.json"
)

In [5]:
from google.cloud import aiplatform
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
import json

In [None]:
# Initialize the AI Platform
aiplatform.init(
    project=PROJECT_ID,
    location=REGION,  # Replace <YOUR_REGION> with your region, e.g., "us-central1"
)

# Define the job specification
job = aiplatform.PipelineJob(
    display_name="mlcov_inc_pipeline",
    template_path=PIPELINE_JSON_PATH,  # Path to the job spec in Cloud Storage
    pipeline_root= BUCKET_NAME,  # Where to store pipeline artifacts
    parameter_values={
        "bq_query": "SELECT * FROM `data-id`",
        "target_column": "hospitalized"
    }
)

# Run the pipeline
job.run(sync=True)