Project by:
- Jack Chen 4427737
- Joost Litjes 4540700
- Felicia Hung 7568479

In [263]:
import numpy as np
import pandas as pd

import os

import sklearn

from scipy import stats

import plotly.express as px 
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.io as pio

In [264]:
# px image size
px.defaults.width = 600
px.defaults.height = 600

Task 1

In [265]:
# Import database
db = pd.read_csv("blood_transfusion.csv")
db.describe()

Unnamed: 0,months_since_last_donation,total_number_of_donations,total_blood_donated,months_since_first_donation,class
count,748.0,748.0,748.0,748.0,748.0
mean,9.506684,5.514706,1378.676471,34.282086,0.237968
std,8.095396,5.839307,1459.826781,24.376714,0.426124
min,0.0,1.0,250.0,2.0,0.0
25%,2.75,2.0,500.0,16.0,0.0
50%,7.0,4.0,1000.0,28.0,0.0
75%,14.0,7.0,1750.0,50.0,0.0
max,74.0,50.0,12500.0,98.0,1.0


Task 2

In [266]:
# Features Seperated
numeric_features = [
    "months_since_last_donation",
    "total_number_of_donations",
    "total_blood_donated",
    "months_since_first_donation",
]

categorical_features = [
    "class",
]

In [267]:
# Export px image to html page
def exportImage(plot, name):
    pio.write_html(plot, os.path.join("plots", name + '.html'))

# Normalization function
def normalize_column(column):
    min_val = column.min()
    max_val = column.max()
    return (column - min_val) / (max_val - min_val)

for column in numeric_features:
    db[column] = normalize_column(db[column])


In [268]:
# Convert categorical features to strings from numeric if needed
# This helps px display images correctly
db = db.astype({col: str for col in db.columns if col in categorical_features})

# Normalize numeric data
for column in numeric_features:
    db[column] = normalize_column(db[column])

# Split data for class 0 and class 1
class_0_df = db[db['class'] == "0"]
class_1_df = db[db['class'] == "1"]

In [269]:
# Dataset overview comparing class 0 and class 1 
fig = go.Figure()
for i, column in enumerate(numeric_features):
    fig.add_trace(
        go.Box(x=db['class'], 
        y=db[column], 
        name=column),
    )

fig.update_layout(
    boxmode='group',
    width=len(numeric_features)*200, height=400, title_text="Comparing trends between class 0 and class 1 for Numeric Features")
exportImage(fig, "Comparing trends between class 0 and class 1 for Numeric Features")

In [270]:
# Compare distributions of features between class 0 and class 1
for df_name, data in zip(["class 0", "class 1"], [class_0_df, class_1_df]):
    fig = make_subplots(rows=len(numeric_features), cols=len(numeric_features))

    # Each subfigure
    for i, feature_to_plot_y in enumerate(numeric_features):
        for j, feature_to_plot_x in enumerate(numeric_features):
            trace = go.Scatter(x=data[feature_to_plot_x], y=data[feature_to_plot_y], text="", mode='markers', showlegend=False)
            fig.add_trace(trace, row=j+1, col=i+1)

    # Add x and y labels to the subplots
    for i, feature in enumerate(numeric_features):
        fig.update_xaxes(title_text=feature, row=len(numeric_features), col=i+1)
        fig.update_yaxes(title_text=feature, row=i+1, col=1)

    fig.update_layout(height=len(numeric_features)*250, width=len(numeric_features)*250, title_text=f"Comparing feature relations with {df_name}")
    exportImage(fig, f"Comparing feature relations with {df_name}")

In [271]:
# Convert categorical features to numerical using one-hot encoding
data_encoded = pd.get_dummies(db, columns=['class'], drop_first=False)

# calculate the column indexes of numeric / categorical features (after one-hot encoding!)
correlation_matrix_categorical = list(data_encoded[
    data_encoded.columns.difference(numeric_features)
].columns)
correlation_matrix = data_encoded[numeric_features + correlation_matrix_categorical].corr()
numeric_features_indexes = [correlation_matrix.columns.get_loc(col) for col in numeric_features]
categorical_features_indexes = [correlation_matrix.columns.get_loc(col) for col in correlation_matrix_categorical]

# Correlation Heatmap A
data = correlation_matrix.iloc[numeric_features_indexes, numeric_features_indexes]
fig = px.imshow(
    data,
    labels=dict(x="Numeric Features", y="Numeric Features", color="Correlation"),
    title="Correlation Heatmap of Numerical Features",
)
exportImage(fig, "Correlation Heatmap of Numerical Features")

# Correlation Heatmap B
data = correlation_matrix.iloc[numeric_features_indexes, categorical_features_indexes]
fig = px.imshow(
    data,
    labels=dict(x="Numeric Features", y="Categorical Features", color="Correlation"),
    title="Correlation Heatmap of Numerical vs Categorical Features",
)
exportImage(fig, "Correlation Heatmap of Numerical vs Categorical Features")


Task 3

In [272]:
from sklearn.model_selection import train_test_split

# train_test_split process
def split_data(db, train_features, label_feature, test_size, seed = 101):
    return train_test_split(db[train_features].to_numpy(), db[label_feature].to_numpy(), test_size=test_size, random_state=seed)

# Define the list of test sizes that will be tested
test_sizes = [
        0.15,
        0.35, 
        0.50, 
        0.65, 
        0.85
        ]

myDataVariants = []
for test_size in test_sizes:
    X_train, X_test, y_train, y_test = split_data(db, numeric_features, "class", test_size)
    myDataVariants.append(
        {
            "test_size": test_size,
            "X_train": X_train,
            "X_test": X_test,
            "y_train": y_train,
            "y_test": y_test
        }
    )

In [273]:
# Manual KNNClassifier class
# Is used the same as any other sklearn classifier when fitting and predicting data 

class KNNClassifier:
    def __init__(self, k=3):
        self.k = k

    def fit(self, X_train, y_train):
        self.X_train = X_train
        self.y_train = y_train

    # Euclidean distance taking into account 2+D vectors
    def euclidean_distance(self, x1, x2):
        return np.linalg.norm(x1 - x2)

    # Predict based on fitted data
    def predict(self, X):
        y_pred = []
        for x in X:
            distances = []
            for i in range(len(self.X_train)):
                distance = self.euclidean_distance(x, self.X_train[i])
                distances.append((distance, self.y_train[i]))

            # Sort the distances and select the k-nearest neighbors
            distances.sort(key=lambda x: x[0])
            neighbors = distances[:self.k]

            # Count the votes from the k-nearest neighbors
            class_votes = {}
            for neighbor in neighbors:
                label = neighbor[1]
                if label in class_votes:
                    class_votes[label] += 1
                else:
                    class_votes[label] = 1

            # Return the class with the most votes as the prediction
            predicted_class = max(class_votes, key=class_votes.get)
            y_pred.append(predicted_class)

        return y_pred

In [274]:
# Manual implementation of a confusion matrix counter
def confusion_matrix(y_true, y_pred):
    TP = TN = FP = FN = 0
    for true, pred in zip(y_true, y_pred):
        true = int(true)
        pred = int(pred)
        if true == 1:
            if pred == 1:
                TP += 1
            else:
                FN += 1
        else:
            if pred == 1:
                FP += 1
            else:
                TN += 1

    return {
        "TP": TP,
        "TN": TN,
        "FP": FP,
        "FN": FN
    }

from sklearn.metrics import classification_report
from sklearn.metrics import fbeta_score

In [275]:
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
import random

# Function that performs the classification tests from a list of classifiers and a database of variable training and test data splits
def classifierTestsA(dataVariants, classifiers, method = 'default'):
    results = {}
    for data in dataVariants:
        for classifier in classifiers:
            # Train the classifier on the main dataset
            classifier.fit(data['X_train'], data['y_train'])

            # Predict on the alternative dataset
            y_pred = classifier.predict(data['X_test'])

            # Compare the predicted labels with the actual labels
            conf_matrix = confusion_matrix(data['y_test'], y_pred)
            class_report = classification_report(data['y_test'], y_pred, output_dict=True, zero_division=1)
            fbeta = fbeta_score(data['y_test'], y_pred, average='macro', beta=0.5)

            if classifier.__class__.__name__ not in results:
                results[classifier.__class__.__name__] = [] 
            
            results[classifier.__class__.__name__].append({
                'method': method,	
                'classifier': classifier.__class__.__name__,
                'test_size': data['test_size'],	
                'confusion_matrix': conf_matrix,
                'classification_report': class_report,
                'f_beta_score': fbeta,
            })

    return results


In [276]:
import json

# Define the list of classifiers that will be tested
classifiers = [
    KNNClassifier(),
    GaussianNB(),
    SVC(),
    MLPClassifier(),
    ]

resultsA = classifierTestsA(myDataVariants, classifiers)


Stochastic Optimizer: Maximum iterations (200) reached and the optimization hasn't converged yet.


Stochastic Optimizer: Maximum iterations (200) reached and the optimization hasn't converged yet.


Stochastic Optimizer: Maximum iterations (200) reached and the optimization hasn't converged yet.


Stochastic Optimizer: Maximum iterations (200) reached and the optimization hasn't converged yet.



In [277]:
# Plot the groups of scores that are contained in a single visualization
score_groups = [
    [
        'confusion_matrix_TP',
        'confusion_matrix_TN',
        'confusion_matrix_FP',
        'confusion_matrix_FN',
    ],
    [
        'classification_report_0_precision',
        'classification_report_0_recall',
        'classification_report_0_f1-score',
        'classification_report_1_precision',
        'classification_report_1_recall',
        'classification_report_1_f1-score',
        'f_beta_score',
    ],
    [
        'classification_report_accuracy',
        'classification_report_macro avg_precision',
        'classification_report_macro avg_recall',
        'classification_report_macro avg_f1-score',
        'classification_report_weighted avg_precision',
        'classification_report_weighted avg_recall',
        'classification_report_weighted avg_f1-score',
        'f_beta_score',
    ]
]

In [301]:
# This function will convert results from into a classifierTests function into a grouped database by mean
# It will then compare the results of the groups using the predifined scores in the results
def compareResults(results, grouper, score_groups):
    result_df = pd.concat([pd.json_normalize(results[key], sep='_') for key in results], ignore_index=True)
    result_df = result_df.groupby(grouper).mean().reset_index()

    for test_size in [0.15, 0.35, 0.50]:
        if 'test_size' not in grouper:
            _result_df = result_df[result_df['test_size'] == test_size]
        else:
            _result_df = result_df.copy()

        for yList in score_groups:
            fig = px.line(_result_df, x=grouper[0], y=yList,
                        markers=True, title=f'Scores by {grouper[0]} with test size {test_size}',
                        labels={'value': 'Score'})
            exportImage(fig, f'Scores by {grouper[0]} with test size {test_size} and methods {yList[0]}')

    if len(grouper) > 1:
        fig = go.Figure(data=[go.Surface(
        z=result_df.pivot(grouper[0], grouper[1], 'f_beta_score').values,
        x=result_df[grouper[0]].unique(),
        y=result_df[grouper[1]].unique()
        )])

        fig.update_layout(
            title=f'f-beta-score by {grouper[0]} and {grouper[1]}',
            scene=dict(
                xaxis_title=grouper[0],
                yaxis_title=grouper[1],
                zaxis_title='f_beta_score'
        ))
        exportImage(fig, f'f-beta-score by {grouper[0]} and {grouper[1]}')

In [302]:
# Compare the results grouped by classifier and test size
compareResults(resultsA, ['classifier', 'test_size'], score_groups)
compareResults(resultsA, ['test_size'], score_groups)

In [280]:
import inspect
from sklearn.model_selection import KFold

# Function that performs the classification tests from a list of classifiers and a database of variable training and test data splits
# This version uses kFolds to look for the best parameters
def classifierTestsB(dataVariants, classifiers, n_splits):
    results = {}
    kf = KFold(n_splits=n_splits, random_state=None, shuffle=False)
    for data in dataVariants:
        for i, (train_index, test_index) in enumerate(kf.split(data['X_train'])):
            X_train, X_test = data['X_train'][train_index], data['X_train'][test_index]
            y_train, y_test = data['y_train'][train_index], data['y_train'][test_index]

            for classifier in classifiers:
                classifier.fit(X_train, y_train)
                y_pred = classifier.predict(X_test)

                conf_matrix = confusion_matrix(y_test, y_pred)
                class_report = classification_report(y_test, y_pred, output_dict=True, zero_division=1)
                fbeta = fbeta_score(y_test, y_pred, average='macro', beta=0.5)
                
                if classifier.__class__.__name__ not in results:
                    results[classifier.__class__.__name__] = []

                attributes = {}
                for attr_name, attr_value in inspect.getmembers(classifier):
                    if isinstance(attr_value, (int, float, str)):
                        if attr_name != "__module__":
                            attributes[attr_name] = attr_value
                
                results[classifier.__class__.__name__].append({
                    'classifier': classifier.__class__.__name__,
                    'test_size': data['test_size'],	
                    'fold': i,
                    'parameters': attributes,
                    'confusion_matrix': conf_matrix,
                    'classification_report': class_report,
                    'f_beta_score': fbeta,
                })

    return results

In [281]:
# Define the list of classifiers that will be tested
classifiers = [SVC(kernel=kernel) for kernel in ['linear', 'poly', 'rbf', 'sigmoid']]
resultsB = classifierTestsB(myDataVariants, classifiers, 4)


Attribute `_pairwise` was deprecated in version 0.24 and will be removed in 1.1 (renaming of 0.26).


Attribute `_pairwise` was deprecated in version 0.24 and will be removed in 1.1 (renaming of 0.26).


Attribute `_pairwise` was deprecated in version 0.24 and will be removed in 1.1 (renaming of 0.26).


Attribute `_pairwise` was deprecated in version 0.24 and will be removed in 1.1 (renaming of 0.26).


Attribute `_pairwise` was deprecated in version 0.24 and will be removed in 1.1 (renaming of 0.26).


Attribute `_pairwise` was deprecated in version 0.24 and will be removed in 1.1 (renaming of 0.26).


Attribute `_pairwise` was deprecated in version 0.24 and will be removed in 1.1 (renaming of 0.26).


Attribute `_pairwise` was deprecated in version 0.24 and will be removed in 1.1 (renaming of 0.26).


Attribute `_pairwise` was deprecated in version 0.24 and will be removed in 1.1 (renaming of 0.26).


Attribute `_pairwise` was deprecated in version 0.24 and will be removed in 1.1 (

In [282]:
# Compare the results grouped by parameters_kernel and fold
compareResults(resultsB, ['parameters_kernel', 'test_size'], score_groups)
compareResults(resultsB, ['fold', 'test_size'], score_groups)

In [283]:
# Perform the first classifier test again using default and kFold tuned parameters
_defResultsB = classifierTestsA(myDataVariants, [SVC()])
_kFresultsB = classifierTestsA(myDataVariants, [SVC(kernel='poly')], 'kFold')

# Merge the two results
results = {}
for key in _defResultsB.keys() | _kFresultsB.keys():
    results[key] = _defResultsB.get(key, []) + _kFresultsB.get(key, [])

In [284]:
# Compare the results grouped by the method; kFold tuned paramaters or default parameters
compareResults(results, ['method', 'test_size'], score_groups)