In [1]:
from google.cloud import aiplatform, storage
import tempfile
import joblib
import os
import pandas as pd
from app.config import PROCESSOR_DIR, PROCESSOR_FILENAME, GCS_BUCKET_NAME

In [2]:
def load_preprocessor_from_gcs(bucket_name, blob_path):
    #Download preprocessor from GCS and load it
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_path)

    with tempfile.NamedTemporaryFile(delete=False) as temp_file:
        blob.download_to_filename(temp_file.name)
        preprocessor = joblib.load(temp_file.name)

    return preprocessor


def process_and_upload_file_to_gcs(gcs_input_uri, preprocessor, bucket_name, source_file, destination_blob):

    os.makedirs("../processed_batch_data", exist_ok=True)

    df = pd.read_csv(gcs_input_uri)
    processed_data = preprocessor.transform(df)
    transformed_df = pd.DataFrame(processed_data)
    transformed_df.to_csv("processed_batch_data/transformed_input.csv", index=False)

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob)
    blob.upload_from_filename(source_file)

    transformed_data_gcs_uri = f"gs://{bucket_name}/{destination_blob}"
    return transformed_data_gcs_uri
        

In [None]:
gcs_input_uri = 'gs://flight_price_data/data/airlines_flights_data_small_batch.csv'
BUCKET_URI = "gs://flight_price_data/output"

PREPROCESSOR_BLOB_PATH = f"{PROCESSOR_DIR}/{PROCESSOR_FILENAME}"
preprocessor = load_preprocessor_from_gcs(GCS_BUCKET_NAME, PREPROCESSOR_BLOB_PATH)

model = aiplatform.Model(model_name="projects/flight-price-prediction-470515/locations/us-central1/models/3970181456759619584")

transformed_data_gcs_uri = process_and_upload_file_to_gcs(
    gcs_input_uri, preprocessor, GCS_BUCKET_NAME, 
    "processed_batch_data/transformed_input.csv", "data/transformed_input.csv")



batch_predict_job = model.batch_predict(
    job_display_name="Flight_Price_Prediction_Batch_Job",
    gcs_source=transformed_data_gcs_uri,
    gcs_destination_prefix=BUCKET_URI,
    instances_format="csv",
    predictions_format="jsonl",
    machine_type="n1-standard-4",
    starting_replica_count=1,
    max_replica_count=1,
    service_account="vertexai-sa@flight-price-prediction-470515.iam.gserviceaccount.com",
    generate_explanation=True,
    sync=False
)

Creating BatchPredictionJob


BatchPredictionJob created. Resource name: projects/373714928690/locations/us-central1/batchPredictionJobs/247598697041362944
To use this BatchPredictionJob in another session:
bpj = aiplatform.BatchPredictionJob('projects/373714928690/locations/us-central1/batchPredictionJobs/247598697041362944')
View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/247598697041362944?project=373714928690
BatchPredictionJob projects/373714928690/locations/us-central1/batchPredictionJobs/247598697041362944 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/373714928690/locations/us-central1/batchPredictionJobs/247598697041362944 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/373714928690/locations/us-central1/batchPredictionJobs/247598697041362944 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/373714928690/locations/us-central1/batchPredictionJobs/247598697041362944 current state:
