# Machine Learning Pipelines and Orchestration with Prefect


In [None]:
# lib/config.py

CATEGORICAL_COLS = ['Pclass', 'Sex', 'Embarked']
DATA_FOLDER = "../data"
MODELS_DIRPATH = "../models"

In [8]:
# lib/preprocessing.py

from typing import List, Tuple
import numpy as np
import pandas as pd
import scipy.sparse
from loguru import logger
from sklearn.feature_extraction import DictVectorizer
from sklearn.preprocessing import LabelEncoder

CATEGORICAL_COLS = ['Pclass', 'Sex', 'Embarked']

def encode_categorical_cols(df):
    label_encoders = {}
    for col in CATEGORICAL_COLS:
        df[col] = df[col].fillna("Unknown")
        label_encoders[col] = LabelEncoder()
        df[col] = label_encoders[col].fit_transform(df[col])
    return df, label_encoders

def extract_x_y(df, categorical_cols=None, dv=None, with_target=True):
    if categorical_cols is None:
        categorical_cols = CATEGORICAL_COLS
    
    dicts = df[categorical_cols].to_dict(orient="records")
    y = None

    if with_target:
        if dv is None:
            dv = DictVectorizer()
            dv.fit(dicts)
        y = df["2urvived"].values

    x = dv.transform(dicts)
    return x, y, dv

def transform_categorical_cols(df, encoders): # for prediction
    for col, encoder in encoders.items():
        df[col] = df[col].fillna("Unknown")
        df[col] = encoder.transform(df[col])
    return df

def process_data(filepath: str, dv=None, with_target: bool = True, label_encoders=None) -> scipy.sparse.csr_matrix:
    """
    Load data from a parquet file
    Compute target (duration column) and apply threshold filters (optional)
    Turn features to sparce matrix
    :return The sparce matrix, the target' values and the
    dictvectorizer object if needed.
    """
    # DF
    df = pd.read_csv(filepath)
    selected_cols = ['Age', 'Fare', 'Sex', 'sibsp', 'Parch', 'Pclass', 'Embarked', '2urvived']
    df = df[selected_cols].dropna()
    if with_target: #train-test
        logger.debug(f"{filepath} | Encoding categorical columns...")
        df1, label_encoders = encode_categorical_cols(df)
        logger.debug(f"{filepath} | Extracting X and y...")
        x, y, dv = extract_x_y(df1, dv=dv)
        return x, y, dv, label_encoders
    else: #predict
        logger.debug(f"{filepath} | Encoding categorical columns...")
        df1 = transform_categorical_cols(df, label_encoders)
        logger.debug(f"{filepath} | Extracting X and y...")
        x, _, dv = extract_x_y(df1, dv=dv, with_target=with_target)
        return x, None, dv, label_encoders  # Return None for y in prediction

In [9]:
# lib/helpers.py

from typing import Any
import pickle

def load_pickle(path: str):
    with open(path, "rb") as f:
        loaded_obj = pickle.load(f)
    return loaded_obj


def save_pickle(path: str, obj: Any):
    with open(path, "wb") as f:
        pickle.dump(obj, f)

In [10]:
# lib/workflows.py

import os
from typing import Optional
import numpy as np
import pandas as pd
import scipy.sparse
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split


def train_model(X: scipy.sparse.csr_matrix, y: np.ndarray) -> RandomForestClassifier:
    """Train and return a RandomForestClassifier model"""
    model = RandomForestClassifier()
    model.fit(X, y)
    return model


def predict(X: scipy.sparse.csr_matrix, model: RandomForestClassifier) -> np.ndarray:
    """
    Predict target values using the trained model
    Args:
        X: Testing set (features)
        model: Trained RandomForestClassifier model
    Returns:
        Predictions as a numpy array
    """
    return model.predict(X)


def evaluate_model(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """
    Evaluate model performance using accuarcy from two arrays
    Args:
        y_true: True target values
        y_pred: Predicted target values
    Returns:
        Accuarcy Score 
    """
    return accuracy_score(y_true, y_pred)


def train_model_workflow(
    data_filepath: str,
    artifacts_filepath: Optional[str] = None,
) -> dict:
    """Train a model and save it to a file"""
    logger.info("Processing training data...")
    X_train, y_train, dv, label_encoders = process_data(filepath=data_filepath, with_target=True)
    logger.info("Processing test data...")
    X_test, y_test, _, _ = process_data(filepath=data_filepath, with_target=True, dv=dv, label_encoders=label_encoders)
    logger.info("Training model...")
    model = train_model(X_train, y_train)
    logger.info("Making predictions and evaluating...")
    y_pred = predict(X_test, model) 
    accuarcy = evaluate_model(y_test, y_pred)

    if artifacts_filepath is not None:
        logger.info(f"Saving artifacts to {artifacts_filepath}...")
        save_pickle(os.path.join(artifacts_filepath, "dv.pkl"), dv)
        save_pickle(os.path.join(artifacts_filepath, "label_encoders.pkl"), label_encoders)
        save_pickle(os.path.join(artifacts_filepath, "model.pkl"), model)

    return {"model": model, "dv": dv, "label_encoders": label_encoders, "accuarcy": accuarcy}

def batch_predict_workflow(
    input_filepath: str,
    model: Optional[RandomForestClassifier] = None,
    dv: Optional[DictVectorizer] = None,
    label_encoders: Optional[dict] = None,
    artifacts_filepath: Optional[str] = None,
) -> np.ndarray:
    """Make predictions on a new dataset"""
    if dv is None:
        dv = load_pickle(os.path.join(artifacts_filepath, "dv.pkl"))
    if model is None:
        model = load_pickle(os.path.join(artifacts_filepath, "model.pkl"))
    if label_encoders is None:
        label_encoders = load_pickle(os.path.join(artifacts_filepath, "label_encoders.pkl"))

    X, _, _, _ = process_data(filepath=input_filepath, with_target=False, dv=dv, label_encoders=label_encoders)
    y_pred = predict(X, model)

    return y_pred