# 🔹 Setup and Imports
This section installs and imports required libraries for the ETL and ML pipeline.

# Google Cloud ETL Pipeline with Machine Learning Model Deployment

### SQL - BigQuery - VertexAi 

In [None]:
# STEP 0: Setup
PROJECT_ID = "gcloud-flow-123"
REGION = "us-central1"
INSTANCE_NAME = "sales-db"
DB_NAME = "diamonds_db"
BUCKET_NAME = "diamonds-etl-bucket-123"
BQ_DATASET = "diamonds_wh"
BQ_TABLE = "diamonds_parent"
USER = "etl_user"


In [None]:
!gcloud sql instances describe $INSTANCE_NAME \
  --project=$PROJECT_ID \
  --format="value(serviceAccountEmailAddress)"


In [None]:
!gsutil iam ch serviceAccount:pxxxxxxxxxxxxxxxxx-qir44b@gcp-sa-cloud-sql.iam.gserviceaccount.com:roles/storage.objectAdmin gs://$BUCKET_NAME


In [None]:
!gcloud sql export csv $INSTANCE_NAME gs://$BUCKET_NAME/diamonds_ideal.csv \
  --database=$DB_NAME \
  --query="SELECT * FROM diamonds_ideal" \
  --project=$PROJECT_ID



# 🔹 Load Raw Data
Here we load the raw sales or diamonds dataset from CSV/BigQuery/Cloud Storage.

In [None]:
!gcloud sql export csv $INSTANCE_NAME gs://$BUCKET_NAME/diamonds_premium.csv \
  --database=$DB_NAME \
  --query="SELECT * FROM diamonds_premium" \
  --project=$PROJECT_ID



In [None]:
!gcloud sql export csv $INSTANCE_NAME gs://$BUCKET_NAME/diamonds_good.csv \
  --database=$DB_NAME \
  --query="SELECT * FROM diamonds_good" \
  --project=$PROJECT_ID



In [None]:
!gcloud sql export csv $INSTANCE_NAME gs://$BUCKET_NAME/diamonds_very_good.csv \
  --database=$DB_NAME \
  --query="SELECT * FROM diamonds_very_good" \
  --project=$PROJECT_ID



In [None]:
!gcloud sql export csv $INSTANCE_NAME gs://$BUCKET_NAME/diamonds_fair.csv \
  --database=$DB_NAME \
  --query="SELECT * FROM diamonds_fair" \
  --project=$PROJECT_ID



In [None]:
!gcloud storage buckets list --project=$PROJECT_ID


# 🔹 Data Transformation
We perform cleaning and transformation (renaming columns, aggregating tables, etc.).

In [None]:
!gcloud storage ls gs://$BUCKET_NAME


In [None]:
!bq mk --dataset $PROJECT_ID:$BQ_DATASET

In [None]:
!bq load \
  --autodetect \
  --source_format=CSV \
  gcloud-flow-123:diamonds_wh.diamonds_ideal \
  gs://$BUCKET_NAME/diamonds_ideal.csv


In [None]:
!bq load \
  --autodetect \
  --source_format=CSV \
  gcloud-flow-123:diamonds_wh.diamonds_premium \
  gs://$BUCKET_NAME/diamonds_premium.csv


In [None]:
!bq load \
  --autodetect \
  --source_format=CSV \
  gcloud-flow-123:diamonds_wh.diamonds_good \
  gs://$BUCKET_NAME/diamonds_good.csv


# 🔹 Load into BigQuery
We create or replace tables in BigQuery with the processed data.

In [None]:
!bq load \
  --autodetect \
  --source_format=CSV \
  gcloud-flow-123:diamonds_wh.diamonds_very_good \
  gs://$BUCKET_NAME/diamonds_very_good.csv


In [None]:
!bq load \
  --autodetect \
  --source_format=CSV \
  gcloud-flow-123:diamonds_wh.diamonds_fair \
  gs://$BUCKET_NAME/diamonds_fair.csv


In [None]:
!bq ls gcloud-flow-123:diamonds_wh


In [None]:
!bq query --use_legacy_sql=false \
"CREATE OR REPLACE TABLE \`gcloud-flow-123.diamonds_wh.diamonds_full\` AS \
SELECT double_field_0 AS carat,string_field_1 AS cut,string_field_2 AS color, string_field_3 AS clarity, double_field_4 AS depth, double_field_5 AS table, double_field_6 AS x, double_field_7 AS y, double_field_8 AS z, int64_field_9 AS price \
FROM \`gcloud-flow-123.diamonds_wh.diamonds_fair\` \
UNION ALL \
SELECT double_field_0, string_field_1, string_field_2, string_field_3,double_field_4, double_field_5, double_field_6, double_field_7, double_field_8, int64_field_9 \
FROM \`gcloud-flow-123.diamonds_wh.diamonds_good\` \
UNION ALL \
SELECT double_field_0, string_field_1, string_field_2, string_field_3,double_field_4, double_field_5, double_field_6, double_field_7, double_field_8, int64_field_9 \
FROM \`gcloud-flow-123.diamonds_wh.diamonds_ideal\` \
UNION ALL \
SELECT double_field_0, string_field_1, string_field_2, string_field_3,double_field_4, double_field_5, double_field_6, double_field_7, double_field_8, int64_field_9 \
FROM \`gcloud-flow-123.diamonds_wh.diamonds_premium\` \
UNION ALL \
SELECT double_field_0, string_field_1, string_field_2, string_field_3,double_field_4, double_field_5, double_field_6, double_field_7, double_field_8, int64_field_9 \
FROM \`gcloud-flow-123.diamonds_wh.diamonds_very_good\`;" 


In [None]:
# !bq show --format=prettyjson gcloud-flow-123:diamonds_wh.diamonds_fair


# 🔹 Train ML Model in Vertex AI
Using AutoML Tabular Regression to train a price prediction model.

In [None]:
!bq query --use_legacy_sql=false \
"SELECT * FROM \`gcloud-flow-123.diamonds_wh.diamonds_full\` LIMIT 10"


In [None]:
PROJECT_ID="gcloud-flow-123"
REGION="us-central1"
BQ_TABLE="diamonds_full"
MODEL_DISPLAY_NAME="diamond_price_predictor"


In [None]:
!gcloud services enable aiplatform.googleapis.com


In [None]:
!export GRPC_VERBOSITY=ERROR


In [None]:
from google.cloud import aiplatform

PROJECT_ID = "gcloud-flow-123"
REGION = "us-central1"
BQ_TABLE = f"bq://{PROJECT_ID}.diamonds_wh.diamonds_full"
MODEL_DISPLAY_NAME = "diamond_price_predictor"

aiplatform.init(project=PROJECT_ID, location=REGION)

# Create Tabular Dataset using the correct URI
dataset = aiplatform.TabularDataset.create(
    display_name="diamonds_dataset",
    bq_source=BQ_TABLE
)


# 🔹 Deploy Model and Predict
Deploy the trained model to a Vertex AI endpoint and run predictions.

In [None]:
model = aiplatform.AutoMLTabularTrainingJob(
    display_name=MODEL_DISPLAY_NAME,
    optimization_prediction_type="regression"
)

model = model.run(
    dataset=dataset,
    target_column="price",
    budget_milli_node_hours=1000  # 1 hour
)


In [None]:
endpoint = model.deploy(
    machine_type="n1-standard-4",
    min_replica_count=1,
    max_replica_count=1,
)


In [None]:
instances = [
    {
        "carat": 6.0,
        "cut": "Ideal",
        "color": "H",
        "clarity": "VS2",
        "depth": 61.5,
        "table": 55.0,
        "x": 6.5,
        "y": 6.5,
        "z": 4.0
    }
]

prediction = endpoint.predict(instances=instances)

print("Prediction:", prediction)


In [None]:
batch_prediction_job = model.batch_predict(
    job_display_name="diamonds_batch_predict",
    bigquery_source="bq://gcloud-flow-123.sales_dw.diamonds_full",
    bigquery_destination_prefix="bq://gcloud-flow-123.sales_dw.diamonds_predictions",
    machine_type="n1-standard-4",
    sync=True
)
