In [None]:
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
import logging
import mlflow
import mlflow.sklearn
from mlflow.models.signature import infer_signature
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.preprocessing import StandardScaler
from sklearn import datasets
import torch
import torch.quantization
# Load the iris dataset
iris = load_iris()#load_iris()
X = iris.data
y = iris.target

In [25]:
class IrisDataProcessor:
    def __init__(self, data, feature_columns, target_column, test_size=0.2, random_state=42):

        self.data = pd.DataFrame(data, columns=feature_columns + [target_column])
        self.feature_columns = feature_columns
        self.target_column = target_column
        self.test_size = test_size
        self.random_state = random_state
        self.scaler = StandardScaler()

    def prepare_data(self):
        # Separate features and target variable
        X = self.data[self.feature_columns]
        y = self.data[self.target_column]

        # Split data into training and testing sets
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=self.test_size, random_state=self.random_state
        )
        
        logging.info("Data split into training and test sets with test size of %.2f", self.test_size)

        # Feature scaling using StandardScaler on training data
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        logging.info("Feature scaling applied to training and test data.")
        
        return X_train_scaled, X_test_scaled, y_train, y_test

    def get_feature_stats(self):

        # Compute mean and standard deviation for each feature
        feature_stats = self.data[self.feature_columns].agg(['mean', 'std']).T
        feature_stats.columns = ['Mean', 'Standard Deviation']
        
        logging.info("Feature statistics computed.")
        
        return feature_stats

        


### Experiment Tracking and Model Development

In [27]:
class IrisExperiment:
    def __init__(self, data_processor):

        self.data_processor = data_processor
        # Correcting self.models to be a dictionary
        self.models = {
            "Logistic Regression": LogisticRegression(max_iter=200),
            "Random Forest": RandomForestClassifier(n_estimators=100, random_state=42)
        }
        mlflow.set_experiment("Iris_Classification_Experiments")
        mlflow.set_tracking_uri("http://127.0.0.1:5000")

    def run_experiment(self):
        # Prepare data
        X_train, X_test, y_train, y_test = self.data_processor.prepare_data()

        # Train each model, evaluate, and log results
        for model_name, model in self.models.items():
            with mlflow.start_run(run_name=model_name):
                # Cross-validate and train model
                cv_score = np.mean(cross_val_score(model, X_train, y_train, cv=5))
                model.fit(X_train, y_train)

                # Predict and calculate metrics
                y_pred = model.predict(X_test)
                metrics = {
                    "CV_Accuracy": cv_score,
                    "Test_Accuracy": accuracy_score(y_test, y_pred),
                    "Precision": precision_score(y_test, y_pred, average="macro"),
                    "Recall": recall_score(y_test, y_pred, average="macro")
                }

                # Log results
                self.log_results(model, model_name, metrics)

    def log_results(self, model, model_name, metrics):

        mlflow.log_params({"Model_Type": model_name})
        mlflow.log_metrics(metrics)
        mlflow.sklearn.log_model(model, model_name)
        print(f"{model_name} logged with metrics: {metrics}")

# Usage example:
# Assuming data_processor is an initialized instance of IrisDataProcessor
data_processor = IrisDataProcessor(data, feature_columns, target_column)
experiment = IrisExperiment(data_processor)
experiment.run_experiment()




MlflowException: API request to http://127.0.0.1:5000/api/2.0/mlflow/experiments/get-by-name failed with exception HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /api/2.0/mlflow/experiments/get-by-name?experiment_name=Iris_Classification_Experiments (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000002AB87109430>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))

In [20]:
class IrisModelOptimizer:
    def __init__(self, experiment):

        self.experiment = experiment
        # Placeholder for quantized model
        self.quantized_model = None

    def quantize_model(self):

        model = self.experiment.models["Logistic Regression"]

        # Convert model to PyTorch
        model_torch = torch.jit.script(model)

        # Quantize the model
        self.quantized_model = torch.quantization.quantize_dynamic(
            model_torch, {torch.nn.Linear}, dtype=torch.qint8
        )
        
        mlflow.pytorch.log_model(self.quantized_model, "Quantized_Logistic_Regression")
        print("Quantized Logistic Regression model logged to MLflow.")
        return self.quantized_model

    def run_tests(self):

        # Load test data
        X_train, X_test, y_train, y_test = self.experiment.data_processor.prepare_data()

        # Test prediction and accuracy
        y_pred = self.quantized_model(X_test)
        accuracy = accuracy_score(y_test, y_pred.argmax(axis=1))  # Assuming quantized output format

        assert accuracy > 0.5, "Accuracy is lower than expected!"
        print(f"Quantized model accuracy test passed with {accuracy:.2f} accuracy.")

        

# Example Usage

In [None]:
def main():
    # Define the feature columns and target column for the Iris dataset
    feature_columns = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth']
    target_column = 'Species'
    
    # Convert the dataset into a format compatible with the processor
    data = np.column_stack((X, y))  # Combine features and target for processing

    # Step 1: Initialize Data Processor with the Iris data
    processor = IrisDataProcessor(data=data, feature_columns=feature_columns, target_column=target_column)
    X_train, X_test, y_train, y_test = processor.prepare_data()
    print("Data processing complete. Training and test sets prepared.")
    
    # Step 2: Initialize and run experiments for model training and tracking
    experiment = IrisExperiment(processor)
    experiment.run_experiment()
    
    # Step 3: Initialize optimizer and perform quantization, then run tests
    optimizer = IrisModelOptimizer(experiment)
    optimizer.quantize_model()
    optimizer.run_tests()

if __name__ == "__main__":
    main()


    


Data processing complete. Training and test sets prepared.




MlflowException: API request to http://127.0.0.1:5000/api/2.0/mlflow/experiments/get-by-name failed with exception HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /api/2.0/mlflow/experiments/get-by-name?experiment_name=Iris_Classification_Experiments (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000002AB8868FA40>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))