In [31]:
import mlflow
import os
import pickle
from mlflow import MlflowClient
import pandas as pd
from dotenv import load_dotenv 

In [28]:
load_dotenv()

BUCKET_NAME = os.getenv("BUCKET_NAME")
EXPERIMENT_NAME = os.getenv("EXPERIMENT_NAME")
RUN_ID = os.getenv("RUN_ID")
ARTIFACT_FOLDER = os.getenv("ARTIFACT_FOLDER")
MODEL_FOLDER = os.getenv("MODEL_FOLDER")
MLFLOW_TRACKING_URI = os.getenv("MLFLOW_TRACKING_URI")

In [21]:
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
experiment_id = [experiment.experiment_id for experiment in mlflow.search_experiments()
                 if experiment.name == EXPERIMENT_NAME][0]

In [26]:
artefacts_uri = f's3://{BUCKET_NAME}/{experiment_id}/{RUN_ID}/artifacts'

model_uri = f'{artefacts_uri}/{MODEL_FOLDER}'
model = mlflow.pyfunc.load_model(model_uri)

Downloading artifacts: 100%|██████████| 9/9 [00:02<00:00,  4.37it/s]


In [29]:
client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)
client.download_artifacts(run_id=RUN_ID, path=ARTIFACT_FOLDER, dst_path='.')

with open(f"{ARTIFACT_FOLDER}/minmax_scaler.bin", "rb") as f_in:
    scaler = pickle.load(f_in)

Downloading artifacts: 100%|██████████| 1/1 [00:00<00:00,  4.86it/s]


In [None]:
def load_scaler_and_model():

    BUCKET_NAME = os.getenv("BUCKET_NAME")
    EXPERIMENT_NAME = os.getenv("EXPERIMENT_NAME")
    RUN_ID = os.getenv("RUN_ID")
    ARTIFACT_FOLDER = os.getenv("ARTIFACT_FOLDER")
    MODEL_FOLDER = os.getenv("MODEL_FOLDER")
    MLFLOW_TRACKING_URI = os.getenv("MLFLOW_TRACKING_URI")

    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    experiment_id = [experiment.experiment_id for experiment in mlflow.search_experiments()
                    if experiment.name == EXPERIMENT_NAME][0]
    
    artefacts_uri = f's3://{BUCKET_NAME}/{experiment_id}/{RUN_ID}/artifacts'

    model_uri = f'{artefacts_uri}/{MODEL_FOLDER}'
    model = mlflow.pyfunc.load_model(model_uri)

    client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)
    client.download_artifacts(run_id=RUN_ID, path=ARTIFACT_FOLDER, dst_path='.')

    with open(f"{ARTIFACT_FOLDER}/minmax_scaler.bin", "rb") as f_in:
        scaler = pickle.load(f_in)

    return model, scaler

In [47]:
def preprocessing(scaler, raw_data: pd.DataFrame):
    new_data = raw_data.copy()
    minmax_cols = ['ParentalEducation', 'StudyTimeWeekly',
                   'Absences', 'ParentalSupport']
    x_sc = scaler.transform(raw_data.loc[:, minmax_cols])
    new_data.loc[:, minmax_cols] = x_sc

    return new_data

In [48]:
def predict(model, scaler, raw_data):
    df_data = pd.DataFrame(raw_data)
    features = preprocessing(scaler, df_data)
    pred = model.predict(features)
    return float(pred[0])


In [50]:
raw_data = [{'StudentID': 2566.0,
 'Age': 17.0,
 'Gender': 0.0,
 'Ethnicity': 0.0,
 'ParentalEducation': 0.5,
 'StudyTimeWeekly': 0.41918744404386094,
 'Absences': 0.3103448275862069,
 'Tutoring': 0.0,
 'ParentalSupport': 0.75,
 'Extracurricular': 0.0,
 'Sports': 0.0,
 'Music': 0.0,
 'Volunteering': 0.0}]

predict(model, scaler, raw_data)

2.0

In [None]:
def lambda_handler(event, context):
    
    predictions_events = []
    
    for record in event['Records']:
        encoded_data = record['kinesis']['data']
        decoded_data = base64.b64decode(encoded_data).decode('utf-8')
        ride_event = json.loads(decoded_data)

        # print(ride_event)
        ride = ride_event['ride']
        ride_id = ride_event['ride_id']
    
        features = prepare_features(ride)
        prediction = predict(features)
    
        prediction_event = {
            'model': 'ride_duration_prediction_model',
            'version': '123',
            'prediction': {
                'ride_duration': prediction,
                'ride_id': ride_id   
            }
        }

        if not TEST_RUN:
            kinesis_client.put_record(
                StreamName=PREDICTIONS_STREAM_NAME,
                Data=json.dumps(prediction_event),
                PartitionKey=str(ride_id)
            )
        
        predictions_events.append(prediction_event)


    return {
        'predictions': predictions_events
    }