# 1. Import Dependencies

In [12]:
import os
import pickle
import numpy as np
import pandas as pd
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression, Perceptron

# 2. Load Data

In [13]:
# Load the breast cancer dataset
X, y = datasets.load_breast_cancer(return_X_y=True)

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.9, random_state=0)

# Save X_test and y_test as separate CSV files (Optional)
# np.savetxt("X_test.csv", X_test, delimiter=",", header=",".join([f"feature_{i}" for i in range(X_test.shape[1])]), comments="")
# np.savetxt("y_test.csv", y_test, delimiter=",", header="target", comments="")

# Combine X_test and y_test into a single DataFrame
X_test_df = pd.DataFrame(X_test, columns=[f"feature_{i}" for i in range(X_test.shape[1])])
y_test_df = pd.DataFrame(y_test, columns=["target"])

df_test = pd.concat([X_test_df, y_test_df], axis=1)

# Save the combined DataFrame as a CSV file
df_test.to_csv("data/testing_data.csv", index=False)

# 3. Model Training

In [14]:
# Define the path to store models
model_path = "models/"

# Create the folder if it doesn't exist
if not os.path.exists(model_path):
    os.makedirs(model_path)

# Define a list of classifier parameters
parameters = [
    {"clf": LogisticRegression(solver="liblinear", multi_class="ovr"), "name": f"{model_path}/binary-lr.joblib"},
    {"clf": Perceptron(eta0=0.1, random_state=0), "name": f"{model_path}/binary-percept.joblib"},
]

# Iterate through each parameter configuration
for param in parameters:
    clf = param["clf"]  # Retrieve the classifier from the parameter dictionary
    clf.fit(X_train, y_train)  # Fit the classifier on the training data
    
    # Save the trained model to a file using pickle
    model_filename = f"{param['name']}"
    with open(model_filename, 'wb') as model_file:
        pickle.dump(clf, model_file)
    
    print(f"Model saved in {model_filename}")

Model saved in models//binary-lr.joblib
Model saved in models//binary-percept.joblib


# 4. Simple Model Evaluation

In [15]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Example of predictions
model_path = 'models/binary-lr.joblib'
with open(model_path, 'rb') as model_file:
    loaded_model = pickle.load(model_file)

# Make predictions using the loaded model
predictions = loaded_model.predict(X_test)

# Calculate accuracy
accuracy = accuracy_score(y_test, predictions)

# Calculate precision
precision = precision_score(y_test, predictions)

# Calculate recall
recall = recall_score(y_test, predictions)

# Calculate F1-score
f1 = f1_score(y_test, predictions)

print(f"Model loaded from {model_path}")
print(f"Accuracy on the testing dataset: {accuracy:.2f}")
print(f"Precision on the testing dataset: {precision:.2f}")
print(f"Recall on the testing dataset: {recall:.2f}")
print(f"F1-score on the testing dataset: {f1:.2f}")


Model loaded from models/binary-lr.joblib
Accuracy on the testing dataset: 0.93


# 5. Model Deployment

## 5.1. Creation Model Class

In [111]:
%%writefile DockerModel.py

import joblib
import logging

class Score:
    """
    Class to hold metrics for binary classification, including true positives (TP), false positives (FP),
    true negatives (TN), and false negatives (FN).
    """
    def __init__(self, TP=0, FP=0, TN=0, FN=0):
        self.TP = TP  # True Positives
        self.FP = FP  # False Positives
        self.TN = TN  # True Negatives
        self.FN = FN  # False Negatives


class DockerModel:
    """
    Class for loading and predicting using a pre-trained model, handling feedback to update metrics,
    and providing those metrics.
    """
    result = {}  # Dictionary to store input data

    def __init__(self, model_name="models/binary-lr.joblib"):
        """
        Initialize DockerModel with metrics and model name.
        :param model_name: Path to the pre-trained model.
        """
        self.scores = Score(0, 0, 0, 0)
        self.loaded = False
        self.model_name = model_name

    def load(self):
        """
        Load the model from the provided path.
        """
        self.model = joblib.load(self.model_name)
        logging.info(f"Model {self.model_name} Loaded")

    def predict(self, X, features_names=None, meta=None):
        """
        Predict the target using the loaded model.
        :param X: Features for prediction.
        :param features_names: Names of the features, optional.
        :param meta: Additional metadata, optional.
        :return: Predicted target values.
        """
        self.result['shape_input_data'] = str(X.shape)
        logging.info(f"Received request: {X}")
        if not self.loaded:
            self.load()
            self.loaded = True
        predictions = self.model.predict(X)
        return predictions

    def send_feedback(self, features, feature_names, reward, truth, routing=""):
        """
        Provide feedback on predictions and update the metrics.
        :param features: Features used for prediction.
        :param feature_names: Names of the features.
        :param reward: Reward signal, not used in this context.
        :param truth: Ground truth target values.
        :param routing: Routing information, optional.
        :return: Empty list as return value is not used.
        """
        predicted = self.predict(features)
        print(f"Predicted: {predicted[0]}, Truth: {truth[0]}")
        if int(truth[0]) == 1:
            if int(predicted[0]) == int(truth[0]):
                self.scores.TP += 1
            else:
                self.scores.FN += 1
        else:
            if int(predicted[0]) == int(truth[0]):
                self.scores.TN += 1
            else:
                self.scores.FP += 1
        return []  # Ignore return statement as its not used

    def calculate_metrics(self):
        """
        Calculate the accuracy, precision, recall, and F1-score.
        :return: accuracy, precision, recall, f1_score
        """
        total_samples = self.scores.TP + self.scores.TN + self.scores.FP + self.scores.FN

        # Check if there are any samples to avoid division by zero
        if total_samples == 0:
            logging.warning("No samples available to calculate metrics.")
            return 0, 0, 0, 0  # Return zeros for all metrics if no samples

        accuracy = (self.scores.TP + self.scores.TN) / total_samples

        # Check if there are any positive predictions to calculate precision
        positive_predictions = self.scores.TP + self.scores.FP
        precision = self.scores.TP / positive_predictions if positive_predictions != 0 else 0

        # Check if there are any actual positives to calculate recall
        actual_positives = self.scores.TP + self.scores.FN
        recall = self.scores.TP / actual_positives if actual_positives != 0 else 0

        # Check if precision and recall are non-zero to calculate F1-score
        if precision + recall == 0:
            f1_score = 0
        else:
            f1_score = 2 * (precision * recall) / (precision + recall)

        # Return the calculated metrics
        return accuracy, precision, recall, f1_score


    def metrics(self):
        """
        Generate metrics for monitoring.
        :return: List of dictionaries containing accuracy, precision, recall, and f1_score.
        """
        accuracy, precision, recall, f1_score = self.calculate_metrics()
        return [
            {"type": "GAUGE", "key": "accuracy", "value": accuracy},
            {"type": "GAUGE", "key": "precision", "value": precision},
            {"type": "GAUGE", "key": "recall", "value": recall},
            {"type": "GAUGE", "key": "f1_score", "value": f1_score},
        ]
        
    def tags(self):
        """
        Retrieve input data used for predictions.
        :return: Dictionary containing input data.
        """
        return self.result


Overwriting DockerModel.py


In [112]:
# Test Model Locally
from DockerModel import DockerModel

demoModel = DockerModel()

In [113]:
demoModel.predict(X_test)

[0,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 0,
 1,
 0,
 1,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 1,
 1,
 0,
 1,
 1,
 1,
 1,
 0,
 1,
 0,
 1,
 0,
 1,
 0,
 1,
 0,
 1,
 0,
 0,
 1,
 0,
 1,
 0,
 0,
 1,
 1,
 1,
 0,
 0,
 1,
 0,
 1,
 1,
 1,
 1,
 0,
 1,
 0,
 0,
 0,
 1,
 1,
 0,
 0,
 0,
 0,
 0,
 1,
 0,
 0,
 1,
 0,
 0,
 1,
 1,
 1,
 1,
 1,
 0,
 0,
 0,
 1,
 0,
 1,
 1,
 1,
 0,
 0,
 1,
 0,
 0,
 0,
 1,
 1,
 0,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 0,
 1,
 0,
 1,
 0,
 0,
 1,
 0,
 0,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 0,
 1,
 0,
 1,
 0,
 1,
 1,
 1,
 0,
 1,
 1,
 1,
 1,
 1,
 0,
 0,
 0,
 1,
 1,
 1,
 0,
 1,
 1,
 0,
 1,
 0,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 0,
 1,
 0,
 1,
 0,
 0,
 1,
 1,
 0,
 1,
 0,
 0,
 0,
 1,
 1,
 1,
 1,
 1,
 1,
 0,
 1,
 1,
 1,
 1,
 0,
 0,
 1,
 1,
 0,
 1,
 1,
 1,
 1,
 0,
 1,
 1,
 0,
 0,
 1,
 1,
 0,
 0,
 1,
 1,
 0,
 1,
 1,
 0,
 0,
 0,
 1,
 1,
 1,
 0,
 1,
 1,
 1,
 1,
 1,
 0,
 1,
 0,
 1,
 0,
 1,
 0,
 1,
 0,
 1,
 1,
 1,
 1,
 0,
 1,
 0,
 1,
 1,
 1,
 0,
 1,
 1,
 1,
 0,
 1,
 0,
 0,
 0,
 1,
 0,
 1,
 1,
 1,
 0,
 1,
 0,


In [114]:
demoModel.send_feedback(X_test, truth = y_test)

[]

In [115]:
%%writefile requirements.txt

pandas==1.3.5
requests==2.28.1
numpy==1.20
seldon-core==1.14.1
scikit-learn==1.0.2

Overwriting requirements.txt


In [116]:
%%writefile Dockerfile

# Use an official Python runtime as a parent image.
# Using a slim image for a smaller final size and reduced attack surface.
FROM python:3.9-slim

# Set the maintainer label for metadata.
LABEL maintainer="fernandodorado.rueda@ingka.com"

# Set environment variables for a consistent build behavior.
# Disabling the buffer helps to log messages synchronously.
ENV PYTHONUNBUFFERED=1

# Set a working directory inside the container to store all our project files.
WORKDIR /app

# First, copy the requirements file to leverage Docker's cache for dependencies.
# By doing this first, changes to the code will not invalidate the cached dependencies.
COPY requirements.txt requirements.txt

# Install the required packages listed in the requirements file.
# It's a good practice to include the --no-cache-dir flag to prevent the caching of dependencies
# that aren't necessary for executing the application.
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the code and model files into the image.
COPY DockerModel.py DockerModel.py
COPY models/    models/

# Expose ports that the application will run on.
# Port 5000 for GRPC
# Port 9000 for REST
EXPOSE 5000 9000

# Set environment variables used by the application.
ENV MODEL_NAME DockerModel
ENV SERVICE_TYPE MODEL

# Change the owner of the directory to user 8888 for security purposes.
# It can prevent unauthorized write access by the application itself.
# Make sure to run the application as this non-root user later if applicable.
RUN chown -R 8888 /app

# Use the exec form of CMD so that the application you run will receive UNIX signals.
# This is helpful for graceful shutdown.
# Here we're using seldon-core-microservice to serve the model.
CMD exec seldon-core-microservice $MODEL_NAME --service-type $SERVICE_TYPE


Overwriting Dockerfile


In [117]:
!docker build . -t docker-model:3.0.0

[1A[1B[0G[?25l[+] Building 0.0s (0/1)                                                         
[?25h[1A[0G[?25l[+] Building 0.2s (2/2)                                                         
[34m => [internal] load build definition from Dockerfile                       0.0s
[0m[34m => => transferring dockerfile: 1.88kB                                     0.0s
[0m[34m => [internal] load .dockerignore                                          0.0s
[0m[34m => => transferring context: 2B                                            0.0s
[0m[?25h[1A[1A[1A[1A[1A[0G[?25l[+] Building 0.3s (2/3)                                                         
[34m => [internal] load build definition from Dockerfile                       0.0s
[0m[34m => => transferring dockerfile: 1.88kB                                     0.0s
[0m[34m => [internal] load .dockerignore                                          0.0s
[0m[34m => => transferring context: 2B                        

In [118]:
# docker run --rm --name docker-model -p 5005:5000 docker-model:1.0.0

In [120]:
import requests
import json

URL = "http://localhost:5005/api/v1.0/predictions"

def send_prediction_request(data):
    
    # Create the headers for the request
    headers = {'Content-Type': 'application/json'}

    try:
        # Send the POST request
        response = requests.post(URL, headers=headers, json=data)
        
        # Check if the request was successful
        response.raise_for_status() # Will raise HTTPError if the HTTP request returned an unsuccessful status code
        
        # If successful, return the JSON data
        return response.json()
    except requests.ConnectionError:
        raise Exception("Failed to connect to the server. Is it running?")
    except requests.Timeout:
        raise Exception("Request timed out. Please try again later.")
    except requests.RequestException as err:
        # For any other requests exceptions, re-raise it
        raise Exception(f"An error occurred with your request: {err}")

X_test 

# Define the data payload
data_payload = {
    "data": {
        "ndarray": [
            [
                1.340e+01, 2.052e+01, 8.864e+01, 5.567e+02, 1.106e-01, 1.469e-01,
                1.445e-01, 8.172e-02, 2.116e-01, 7.325e-02, 3.906e-01, 9.306e-01,
                3.093e+00, 3.367e+01, 5.414e-03, 2.265e-02, 3.452e-02, 1.334e-02,
                1.705e-02, 4.005e-03, 1.641e+01, 2.966e+01, 1.133e+02, 8.444e+02,
                1.574e-01, 3.856e-01, 5.106e-01, 2.051e-01, 3.585e-01, 1.109e-01
            ]
        ]
    }
}

# Define the data payload
data_payload = {
    "data": {
        "ndarray": X_test[0:1].tolist()
    }
}

# Get the response and print it
try:
    response = send_prediction_request(data_payload)
    pretty_json_response = json.dumps(response, indent=4)  # Pretty-print JSON
    print(pretty_json_response)
except Exception as err:
    print(err)


{
    "data": {
        "names": [],
        "ndarray": [
            0
        ]
    },
    "meta": {
        "metrics": [
            {
                "key": "accuracy",
                "type": "GAUGE",
                "value": 1.0
            },
            {
                "key": "precision",
                "type": "GAUGE",
                "value": 1.0
            },
            {
                "key": "recall",
                "type": "GAUGE",
                "value": 1.0
            },
            {
                "key": "f1_score",
                "type": "GAUGE",
                "value": 1.0
            }
        ],
        "tags": {
            "shape_input_data": [
                1,
                30
            ]
        }
    }
}


In [125]:
X_test[i]

NameError: name 'i' is not defined

513

In [142]:
import requests
import json

URL = "http://localhost:5005/api/v1.0/feedback"

def send_prediction_feedback(data):
    
    # Create the headers for the request
    headers = {'Content-Type': 'application/json'}

    try:
        # Send the POST request
        response = requests.post(URL, headers=headers, json=data)
        
        # Check if the request was successful
        response.raise_for_status() # Will raise HTTPError if the HTTP request returned an unsuccessful status code
        
        # If successful, return the JSON data
        return response.json()
    except requests.ConnectionError:
        raise Exception("Failed to connect to the server. Is it running?")
    except requests.Timeout:
        raise Exception("Request timed out. Please try again later.")
    except requests.RequestException as err:
        # For any other requests exceptions, re-raise it
        raise Exception(f"An error occurred with your request: {err}")


# payload = {'request': {'data': {'ndarray': [[12.96, 18.29, 84.18, 525.2, 0.07351, 0.07899, 0.04057, 0.01883, 0.1874, 0.05899, 0.2357, 1.299, 2.397, 20.21, 0.003629, 0.03713, 0.03452, 0.01065, 0.02632, 0.003705, 14.13, 24.61, 96.31, 621.9, 0.09329, 0.2318, 0.1604, 0.06608, 0.3207, 0.07247]]}}, 'truth': {'data': {'ndarray': [1]}}}


for i in range(len(X_test)):
    payload = {'request': {'data': {'ndarray': [X_test[i].tolist()]}}, 'truth': {'data': {'ndarray': [int(y_test[i])]}}}

    # Get the response and print it
    try:
        response = send_prediction_feedback(payload)
        pretty_json_response = json.dumps(response, indent=4)  # Pretty-print JSON
        print(pretty_json_response)
    except Exception as err:
        print(err)


{
    "data": {
        "ndarray": []
    },
    "meta": {
        "metrics": [
            {
                "key": "accuracy",
                "type": "GAUGE",
                "value": 1.0
            },
            {
                "key": "precision",
                "type": "GAUGE",
                "value": 1.0
            },
            {
                "key": "recall",
                "type": "GAUGE",
                "value": 1.0
            },
            {
                "key": "f1_score",
                "type": "GAUGE",
                "value": 1.0
            }
        ],
        "tags": {
            "shape_input_data": "(1, 30)"
        }
    }
}
{
    "data": {
        "ndarray": []
    },
    "meta": {
        "metrics": [
            {
                "key": "accuracy",
                "type": "GAUGE",
                "value": 1.0
            },
            {
                "key": "precision",
                "type": "GAUGE",
                "value": 1.0
           

In [143]:
import requests
import json

URL = "http://localhost:5005/api/v1.0/feedback"

def send_prediction_feedback(data):
    
    # Create the headers for the request
    headers = {'Content-Type': 'application/json'}

    try:
        # Send the POST request
        response = requests.post(URL, headers=headers, json=data)
        
        # Check if the request was successful
        response.raise_for_status() # Will raise HTTPError if the HTTP request returned an unsuccessful status code
        
        # If successful, return the JSON data
        return response.json()
    except requests.ConnectionError:
        raise Exception("Failed to connect to the server. Is it running?")
    except requests.Timeout:
        raise Exception("Request timed out. Please try again later.")
    except requests.RequestException as err:
        # For any other requests exceptions, re-raise it
        raise Exception(f"An error occurred with your request: {err}")




payload = {'request': {'data': {'ndarray': [X_test[0].tolist()]}}, 'truth': {'data': {'ndarray': [0]}}}

# Get the response and print it
try:
    response = send_prediction_feedback(payload)
    pretty_json_response = json.dumps(response, indent=4)  # Pretty-print JSON
    print(pretty_json_response)
except Exception as err:
    print(err)


{
    "data": {
        "ndarray": []
    },
    "meta": {
        "metrics": [
            {
                "key": "accuracy",
                "type": "GAUGE",
                "value": 0.9263566
            },
            {
                "key": "precision",
                "type": "GAUGE",
                "value": 0.953125
            },
            {
                "key": "recall",
                "type": "GAUGE",
                "value": 0.92987806
            },
            {
                "key": "f1_score",
                "type": "GAUGE",
                "value": 0.94135803
            }
        ],
        "tags": {
            "shape_input_data": "(1, 30)"
        }
    }
}
