Skip to content

Commit

Permalink
Added Monitoring into predict.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas-George-T committed Dec 13, 2023
1 parent 42e7edd commit 5161bcb
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 4 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,5 @@ logs

# environment files
.env

ecommerce-mlops-406821-40598235283c.json
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ In managing models for Staging, Production, and Archiving, we rely on MLflow.

# Cost Analysis

Breakdown of the costs associated with the Machine Learning pipeline on Google Cloud Platform hosting on US East1
Breakdown of the costs associated with the Machine Learning pipeline on Google Cloud Platform (GCP) hosted on US East1 Region.

## Initial Cost Analysis

Expand Down
6 changes: 4 additions & 2 deletions gcpdeploy/src/serve/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ WORKDIR /app
# Copy the current directory contents into the container at /app
COPY serve/predict.py /app/

COPY serve/ecommerce-mlops-406821-40598235283c.json /app/

# Install Flask and google-cloud-storage
RUN pip install Flask google-cloud-storage joblib scikit-learn grpcio gcsfs python-dotenv pandas flask
RUN pip install Flask google-cloud-storage joblib scikit-learn grpcio gcsfs python-dotenv pandas flask google-cloud-logging google-cloud-bigquery google-auth

ENV AIP_STORAGE_URI=gs://ecommerce_retail_online_mlops/model
ENV AIP_HEALTH_ROUTE=/ping
ENV AIP_PREDICT_ROUTE=/predict
ENV AIP_HTTP_PORT=8080
ENV BUCKET_NAME=ecommerce_retail_online_mlops
ENV PROJECT_ID=ecommerce-mlops-406821

ENV BIGQUERY_TABLE_ID=ecommerce-mlops-406821.mlops_project_dataset.model_monitoring_copy

# Run serve.py when the container launches
ENTRYPOINT ["python", "predict.py"]
109 changes: 108 additions & 1 deletion gcpdeploy/src/serve/predict.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from flask import Flask, jsonify, request
from google.cloud import storage
# from google.cloud import storage
import joblib
import os
import json
from dotenv import load_dotenv
import pandas as pd
# Experimental Start
import time
from datetime import datetime
from google.cloud import storage, logging, bigquery
from google.cloud.bigquery import SchemaField
from google.api_core.exceptions import NotFound
from google.oauth2 import service_account
from google.logging.type import log_severity_pb2 as severity
# Experimental End

load_dotenv()

Expand All @@ -13,6 +22,57 @@

app = Flask(__name__)

## Experimental start
# Set up Google Cloud logging
service_account_file = 'ecommerce-mlops-406821-40598235283c.json'
credentials = service_account.Credentials.from_service_account_file(service_account_file)
client = logging.Client(credentials=credentials)
logger = client.logger('training_pipeline')
# Initialize BigQuery client
bq_client = bigquery.Client(credentials=credentials)
table_id = os.environ['BIGQUERY_TABLE_ID']


def get_table_schema():
"""Build the table schema for the output table
Returns:
List: List of `SchemaField` objects"""
return [

SchemaField("PC1", "FLOAT", mode="NULLABLE"),
SchemaField("PC2", "FLOAT", mode="NULLABLE"),
SchemaField("PC3", "FLOAT", mode="NULLABLE"),
SchemaField("PC4", "FLOAT", mode="NULLABLE"),
SchemaField("PC5", "FLOAT", mode="NULLABLE"),
SchemaField("PC6", "FLOAT", mode="NULLABLE"),
SchemaField("prediction", "FLOAT", mode="NULLABLE"),
SchemaField("timestamp", "TIMESTAMP", mode="NULLABLE"),
SchemaField("latency", "FLOAT", mode="NULLABLE"),
]


def create_table_if_not_exists(client, table_id, schema):
"""Create a BigQuery table if it doesn't exist
Args:
client (bigquery.client.Client): A BigQuery Client
table_id (str): The ID of the table to create
schema (List): List of `SchemaField` objects
Returns:
None"""
try:
client.get_table(table_id) # Make an API request.
print("Table {} already exists.".format(table_id))
except NotFound:
print("Table {} is not found. Creating table...".format(table_id))
table = bigquery.Table(table_id, schema=schema)
client.create_table(table) # Make an API request.
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))

## Experimental End

def initialize_variables():
"""
Initialize environment variables.
Expand Down Expand Up @@ -101,8 +161,50 @@ def predict():

request_instances = request_json['instances']

## Experimental start
logger.log_text("Received prediction request.", severity='INFO')

prediction_start_time = time.time()
current_timestamp = datetime.now().isoformat()
## Experimental end

prediction = model.predict(pd.DataFrame(list(request_instances)))

## Experimental start
prediction_end_time = time.time()
prediction_latency = prediction_end_time - prediction_start_time
## Experimental end

prediction = prediction.tolist()

## Experimental start

logger.log_text(f"Prediction results: {prediction}", severity='INFO')

rows_to_insert = [
{
"PC1": instance['PC1'],
"PC2": instance['PC2'],
"PC3": instance['PC3'],
"PC4": instance['PC4'],
"PC5": instance['PC5'],
"PC6": instance['PC6'],
"prediction": pred,
"timestamp": current_timestamp,
"latency": prediction_latency
}
for instance, pred in zip(request_instances, prediction)
]

errors = bq_client.insert_rows_json(table_id, rows_to_insert)
if errors == []:
logger.log_text("New predictions inserted into BigQuery.", severity='INFO')
else:
logger.log_text(f"Encountered errors inserting predictions into BigQuery: {errors}", severity='ERROR')


## Experiment end
# print("prediction",prediction)
output = {'predictions': [{'cluster': pred} for pred in prediction]}
return jsonify(output)

Expand All @@ -112,6 +214,11 @@ def predict():

model = load_model(bucket, bucket_name)

## Experiment start
schema = get_table_schema()
create_table_if_not_exists(bq_client, table_id, schema)
## Experiment end


if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ gcsfs
python-dotenv
kaleido==0.2.1
grpcio==1.51.3
google-cloud-logging
google-cloud-bigquery
google-auth

0 comments on commit 5161bcb

Please sign in to comment.