In [None]:
!pip install -q -U google-cloud-aiplatform kfp google-cloud-pipeline-components

  Preparing metadata (setup.py) ... [?25l[?25hdone


In [None]:
from google.cloud.aiplatform import gapic as aiplatform_gapic
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component
from google.cloud import storage, aiplatform
from google.oauth2 import service_account

In [None]:
PROJECT_ID = "mlops-pipeline-427209"
REGION = "us-central1"
BUCKET_NAME = "mlops_vertexai_hh"

In [None]:
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipeline_root"
BUCKET_PATH = f"gs://{BUCKET_NAME}"
MODEL_DIR = f"{BUCKET_PATH}/model"
DATA_DIR = f"{BUCKET_PATH}/data"

In [None]:
credentials = service_account.Credentials.from_service_account_file("credentials.json")

In [None]:
aiplatform.init(project=PROJECT_ID, location=REGION, credentials=credentials)

In [None]:
!gcloud auth login --cred-file=credentials.json


You are already authenticated with 
'1048017730256-compute@developer.gserviceaccount.com'.
Do you wish to proceed and overwrite existing credentials?

Do you want to continue (Y/n)?  Y


Authenticated with service account credentials for: [1048017730256-compute@developer.gserviceaccount.com].
Your current project is [None].  You can change this setting by running:
  $ gcloud config set project PROJECT_ID


In [None]:
@component(base_image="python:3.11", packages_to_install=["google-cloud-bigquery"])
def export_data_to_gcs(
    project_id: str, dataset_id: str, table_id: str, gcs_bucket_path: str
) -> str:
    from google.cloud import bigquery

    client = bigquery.Client(project=project_id)
    destination_uri = f"{gcs_bucket_path}/netflix_data_*.csv"
    dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
    table_ref = dataset_ref.table(table_id)

    extract_job = client.extract_table(table_ref, destination_uri, location="US")
    extract_job.result()

    print(f"Data exported to {destination_uri}")
    return destination_uri

In [None]:
@component(
    base_image="python:3.11",
    packages_to_install=[
        "torch",
        "torchvision",
        "numpy",
        "pandas",
        "gcsfs",
        "google-cloud-storage",
    ],
)
def train_model(project_id: str, gcs_bucket_path: str, data_path: str) -> str:
    from google.cloud import storage
    import torch
    import torch.nn as nn
    import torch.optim as optim
    import torch.nn.functional as F
    import pandas as pd
    import numpy as np
    import gcsfs
    from torch.utils.data import DataLoader, Dataset

    class NetflixDataset(Dataset):
        def __init__(self, data):
            self.users = torch.tensor(data["User_Id"].values.astype(np.int32))
            self.movies = torch.tensor(data["Movie_Id"].values.astype(np.int32))
            self.ratings = torch.tensor(data["Rating"].values.astype(np.float32))

        def __len__(self):
            return len(self.users)

        def __getitem__(self, idx):
            return (
                self.users[idx].to(device),
                self.movies[idx].to(device),
                self.ratings[idx].to(device),
            )

    class RecommenderNN(nn.Module):
        def __init__(self, num_users, num_movies, emb_size=100):
            super(RecommenderNN, self).__init__()
            self.user_emb = nn.Embedding(num_users + 1, emb_size)
            self.movie_emb = nn.Embedding(num_movies + 1, emb_size)
            self.fc1 = nn.Linear(2 * emb_size, 256)
            self.fc2 = nn.Linear(256, 128)
            self.fc3 = nn.Linear(128, 64)
            self.fc4 = nn.Linear(64, 1)

        def forward(self, user, movie):
            user_emb = self.user_emb(user)
            movie_emb = self.movie_emb(movie)
            x = torch.cat([user_emb, movie_emb], dim=1)
            x = F.relu(self.fc1(x))
            x = F.relu(self.fc2(x))
            x = F.relu(self.fc3(x))
            return self.fc4(x).squeeze()

    def calculate_accuracy(y_pred, y_true):
        y_pred_rounded = torch.round(y_pred)
        correct_predictions = (y_pred_rounded == y_true).float().sum()
        accuracy = correct_predictions / y_true.shape[0]
        return accuracy.item()

    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = RecommenderNN(num_users=2649429, num_movies=17770).to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    client = storage.Client()
    fs = gcsfs.GCSFileSystem(project=project_id)
    files = fs.glob(data_path)
    data = pd.concat([pd.read_csv(fs.open(file_)) for file_ in files], ignore_index=True)

    dataset = NetflixDataset(data)
    data_loader = DataLoader(dataset, batch_size=512, shuffle=True)

    for epoch in range(5):
        model.train()
        total_loss = 0
        total_accuracy = 0
        count_batches = 0

        for users, movies, ratings in data_loader:
            optimizer.zero_grad()
            outputs = model(users, movies)
            loss = criterion(outputs, ratings)
            loss.backward()
            optimizer.step()

            accuracy = calculate_accuracy(outputs, ratings)
            total_loss += loss.item()
            total_accuracy += accuracy
            count_batches += 1

        avg_loss = total_loss / count_batches
        avg_accuracy = total_accuracy / count_batches

        print(f"Epoch {epoch+1}: Loss = {avg_loss}, Accuracy = {avg_accuracy}")

    model_path = f"{gcs_bucket_path}/model.pth"
    torch.save(model.state_dict(), model_path)

    return model_path

In [None]:
@component(base_image="python:3.11")
def deploy_model_op(project_id: str, model_path: str):
    from google.cloud import aiplatform

    model = aiplatform.Model.upload(
        project=project_id,
        display_name="netflix-recommender",
        artifact_uri=model_path,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/pytorch-cpu.2-1:latest",
    )
    endpoint = model.deploy(
        machine_type="n1-standard-2",
        min_replica_count=1,
        max_replica_count=2
    )

In [None]:
@pipeline(name="netflix-recommender-pipeline", pipeline_root=PIPELINE_ROOT)
def netflix_recommender_pipeline(project_id: str, gcs_bucket_path: str):
    export_op = export_data_to_gcs(
        project_id=project_id,
        dataset_id="dataflow",
        table_id="netflix_prize_data",
        gcs_bucket_path=gcs_bucket_path
    )
    train_op = (
        train_model(
            project_id=project_id,
            gcs_bucket_path=gcs_bucket_path,
            data_path=export_op.output,
        )
        .set_memory_limit("32Gi")
        # .add_node_selector_constraint("NVIDIA_TESLA_T4")
    )
    deploy_model_op(project_id=project_id, model_path=train_op.output)

In [None]:
compiler.Compiler().compile(
    pipeline_func=netflix_recommender_pipeline,
    package_path="pipeline_job.yaml",
)

job = aiplatform.PipelineJob(
    display_name="netflix_recommender_pipeline",
    template_path="pipeline_job.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project_id": PROJECT_ID, "gcs_bucket_path": BUCKET_PATH},
)

job.submit(
    service_account="1048017730256-compute@developer.gserviceaccount.com"
)

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/1048017730256/locations/us-central1/pipelineJobs/netflix-recommender-pipeline-20240623061045
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/1048017730256/locations/us-central1/pipelineJobs/netflix-recommender-pipeline-20240623061045')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/netflix-recommender-pipeline-20240623061045?project=1048017730256
