Logistic Regression

In [22]:
import numpy as np
import matplotlib.pyplot as plt

class LogisticRegression:
    def __init__(self, learning_rate=0.01, num_iterations=1000, tolerance=1e-4):
        self.learning_rate = learning_rate
        self.num_iterations = num_iterations
        self.tolerance = tolerance
    
    def sigmoid(self, z):
        return 1 / (1 + np.exp(-z))
    
    def calculate_gradient(self, X, y, theta):
        m = len(y)
        z = np.dot(X, theta)
        h = self.sigmoid(z)
        gradient = np.dot(X.T, (h - y)) / m
        return gradient
    
    def fit(self, X, y):
        m, n = X.shape
        self.theta = np.zeros(n)  # Initialize weights with zeros
        prev_cost = float('inf')  # ensures that the first iteration of will certainly lead to a decrease in the cost function.
        
        for iteration in range(self.num_iterations):
            gradient = self.calculate_gradient(X, y, self.theta)
            self.theta -= self.learning_rate * gradient
            
            # Calculate the cost function (log-likelihood)
            z = np.dot(X, self.theta)
            h = self.sigmoid(z)
            cost = -(1/m) * np.sum(y * np.log(h) + (1 - y) * np.log(1 - h))
            
            # Check for convergence based on tolerance
            if abs(prev_cost - cost) < self.tolerance:
                print(f"Converged after {iteration + 1} iterations.")
                break
            
            prev_cost = cost
    
    def predict(self, X):
        z = np.dot(X, self.theta)
        h = self.sigmoid(z)
        # Use a threshold of 0.5 for binary classification
        predictions = (h >= 0.5).astype(int)
        return predictions
    
    def predict_proba(self, X):
        z = np.dot(X, self.theta)
        h = self.sigmoid(z)
        return h

In [23]:
import pandas as pd
df = pd.read_csv("./datasets/heartdisease.csv")

In [24]:
categorical = df.select_dtypes(include=['object']).columns.to_list()
numerical = df.select_dtypes(include=['number']).columns.to_list()
print(categorical, numerical, sep="\n")
# Get the number of unique values in each column
unique_counts = df.nunique()
# print(unique_counts)
# Filter for columns with less than 10 unique values
columns_with_less_than_10_unique = unique_counts[unique_counts < 10]
encoding_df = df[columns_with_less_than_10_unique.index].drop(columns=["HeartDisease"])
print(encoding_df.columns.to_list())

# Get distinct values for all columns
distinct_values = [[col, encoding_df[col].unique()] for col in encoding_df.columns]
# Create a DataFrame from the list of distinct values
distinct_df = pd.DataFrame(distinct_values, columns=['Column Name', 'Distinct Values'])
print(distinct_df)

# # Print distinct values in each column
# for column in encoding_df.columns:
#     distinct_values = encoding_df[column].unique()
#     print(f"Distinct values in {column}: {distinct_values}")

['Sex', 'ChestPainType', 'RestingECG', 'ExerciseAngina', 'ST_Slope']
['Age', 'RestingBP', 'Cholesterol', 'FastingBS', 'MaxHR', 'Oldpeak', 'HeartDisease']
['Sex', 'ChestPainType', 'FastingBS', 'RestingECG', 'ExerciseAngina', 'ST_Slope']
      Column Name      Distinct Values
0             Sex               [M, F]
1   ChestPainType  [ATA, NAP, ASY, TA]
2       FastingBS               [0, 1]
3      RestingECG    [Normal, ST, LVH]
4  ExerciseAngina               [N, Y]
5        ST_Slope     [Up, Flat, Down]


In [83]:
dataset = df.copy()
print("total #features with target:",len(dataset.columns.to_list()))
print(dataset.shape)
target_column = "HeartDisease"
# Define the split ratio (e.g., 80% train, 20% test)
split_ratio = 0.8
# Calculate the number of rows for the training set
train_size = int(len(df) * split_ratio)
# Create a random permutation of row indices
indices = np.random.permutation(len(df))
# Split the indices into training and testing indices
train_indices, test_indices = indices[:train_size], indices[train_size:]

# Create the training and testing DataFrames
X_train_df = dataset.drop(columns=[target_column]).iloc[train_indices]
X_test_df = dataset.drop(columns=[target_column]).iloc[test_indices]

y_train_df = dataset[target_column].iloc[train_indices]
y_test_df = dataset[target_column].iloc[test_indices]

# Optionally, reset the index for the new DataFrames if needed
X_train_df.reset_index(drop=True, inplace=True)
X_test_df.reset_index(drop=True, inplace=True)
y_train_df.reset_index(drop=True, inplace=True)
y_test_df.reset_index(drop=True, inplace=True)
print(X_train_df[:2].to_numpy(), y_train_df[:2], sep="\n")


total #features with target: 12
(918, 12)
[[74 'M' 'ASY' 150 258 1 'ST' 130 'Y' 4.0 'Down']
 [61 'M' 'ASY' 148 203 0 'Normal' 161 'N' 0.0 'Up']]
0    1
1    1
Name: HeartDisease, dtype: int64


In [87]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.ensemble import RandomForestClassifier

categorical_features = encoding_df.columns.to_list()
# Define your preprocessing steps
categorical_transformer = Pipeline(steps=[
    ('onehot-encoder', OneHotEncoder(sparse_output=False, handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(transformers=[
                    ('onehot-transformer', categorical_transformer, categorical_features)])
# Create the full pipeline with preprocessing and the model
model = Pipeline(steps=[('data-preprocessor', preprocessor),
                        ('model', LogisticRegression(learning_rate=0.01, num_iterations=1000, tolerance=1e-4)
                         )])

In [88]:
model.fit(X_train_df, y_train_df)

Converged after 747 iterations.


In [89]:
y_pred = model.predict(X_test_df)

# Calculate accuracy
accuracy = np.mean(y_pred == y_test_df)
print(f"Accuracy: {accuracy*100:.2f}%")

Accuracy: 85.87%


In [152]:
# X_test_df[:1].to_dict(orient='list') #.tolist()

In [151]:
# Define a function for inference
def predict_with_model(input_data, model):
    if isinstance(input_data, dict):
        # If input is a dictionary, convert it to a DataFrame with a single row
        input_df = pd.DataFrame([input_data])
    elif isinstance(input_data, list):
        # If input is a list, check if it's a list of dictionaries or a list of scalar values
        if isinstance(input_data[0], dict):
            # If it's a list of dictionaries, convert it to a DataFrame
            input_df = pd.DataFrame(input_data)
        else:
            # If it's a list of scalar values, assume it's a single row of data
            # and convert it to a DataFrame with appropriate column names
            input_df = pd.DataFrame([input_data], columns=['Age', 'Sex', 'ChestPainType', 'RestingBP', 'Cholesterol', 'FastingBS', 'RestingECG', 'MaxHR', 'ExerciseAngina', 'Oldpeak', 'ST_Slope'])
    
    X_input = input_df
    # Make predictions on the input data
    try:
        predictions = model.predict(X_input)
        # Or, if you need class probabilities (for classifiers)
        # probabilities = model.predict_proba(X_input_transformed)

        return predictions  # Return the model's predictions
    except TypeError as te:
        predictions = model.predict(pd.DataFrame(input_data))
        
        return predictions

# Example usage
# 1. Single dictionary
input_dict1 = {
    'Age': 58,
    'Sex': 'F',
    'ChestPainType': 'ATA',
    'RestingBP': 180,
    'Cholesterol': 393,
    'FastingBS': 0,
    'RestingECG': 'Normal',
    'MaxHR': 110,
    'ExerciseAngina': 'Y',
    'Oldpeak': 1.0,
    'ST_Slope': 'Flat'
}

# 2. List of dictionaries
input_dict2 = [
    {
        'Age': 58,
        'Sex': 'F',
        'ChestPainType': 'ATA',
        'RestingBP': 180,
        'Cholesterol': 393,
        'FastingBS': 0,
        'RestingECG': 'Normal',
        'MaxHR': 110,
        'ExerciseAngina': 'Y',
        'Oldpeak': 1.0,
        'ST_Slope': 'Flat'
    },
    {
        'Age': 23,
        'Sex': 'F',
        'ChestPainType': 'ata',
        'RestingBP': 180,
        'Cholesterol': 213,
        'FastingBS': 1,
        'RestingECG': 'Normal',
        'MaxHR': 119,
        'ExerciseAngina': 'N',
        'Oldpeak': 0.0,
        'ST_Slope': 'Flat'
    }
]

# 3. List of dictionaries with series
input_dict3 = {
        'Age': [58, 23],
        'Sex': ['F', 'F'],
        'ChestPainType': ['ATA', 'ata'],
        'RestingBP': [180, 180],
        'Cholesterol': [393, 213],
        'FastingBS': [0, 1],
        'RestingECG': ['Normal', 'Normal'],
        'MaxHR': [110, 119],
        'ExerciseAngina': ['Y', 'N'],
        'Oldpeak': [1.0, 0.0],
        'ST_Slope': ['Flat', 'Flat']
    }

# 4. List
input_list = [58, 'F', 'ATA', 180, 393, 0, 'Normal', 110, 'Y', 1.0, 'Flat']

output1 = predict_with_model(input_dict1, model)
output2 = predict_with_model(input_dict2, model)
output3 = predict_with_model(input_dict3, model)
output4 = predict_with_model(input_list, model)

print(output1)
print(output2)
print(output3)
print(output4)

[1]
[1 1]
[1 1]
[1]


In [None]:
# Generate some sample data
np.random.seed(0)
X = np.random.randn(100, 2)
y = (X[:, 0] + X[:, 1] > 0).astype(int)  # A simple linear decision boundary
# Split the data into training and test sets (you should use a more robust method in practice)
X_train, X_test = X[:80], X[80:]
y_train, y_test = y[:80], y[80:]

# Create and train the logistic regression model
model = LogisticRegression(learning_rate=0.01, num_iterations=10000, tolerance=1e-4)
model.fit(X_train, y_train)
# Make predictions on the test data
y_pred = model.predict(X_test)

# Calculate accuracy
accuracy = np.mean(y_pred == y_test)
print(f"Accuracy: {accuracy*100:.2f}%")

Metrics

In [None]:
def calculate_accuracy(y_pred, y_true):
    return np.mean(y_pred == y_true)

def calculate_precision(y_pred, y_true):
    true_positives = np.sum((y_pred == 1) & (y_true == 1))
    false_positives = np.sum((y_pred == 1) & (y_true == 0))
    return true_positives / (true_positives + false_positives)

def calculate_recall(y_pred, y_true):
    true_positives = np.sum((y_pred == 1) & (y_true == 1))
    false_negatives = np.sum((y_pred == 0) & (y_true == 1))
    return true_positives / (true_positives + false_negatives)

def calculate_f1_score(y_pred, y_true):
    precision = calculate_precision(y_pred, y_true)
    recall = calculate_recall(y_pred, y_true)
    return 2 * (precision * recall) / (precision + recall)

def calculate_metric(metric_name, y_pred, y_true):
    if metric_name == "accuracy":
        return calculate_accuracy(y_pred, y_true)
    elif metric_name == "precision":
        return calculate_precision(y_pred, y_true)
    elif metric_name == "recall":
        return calculate_recall(y_pred, y_true)
    elif metric_name == "f1_score":
        return calculate_f1_score(y_pred, y_true)
    elif metric_name == "list_metrics":
        return ["accuracy", "precision", "recall", "f1_score"]
    else:
        raise ValueError(f"Invalid metric name. Available metrics: {', '.join(calculate_metric('list_metrics', None, None))}")

def calculate_metrics(y_pred, y_true):
    metrics = {}
    metrics["accuracy"] = calculate_accuracy(y_pred, y_true)
    metrics["precision"] = calculate_precision(y_pred, y_true)
    metrics["recall"] = calculate_recall(y_pred, y_true)
    metrics["f1_score"] = calculate_f1_score(y_pred, y_true)
    return metrics

def calculate_roc_auc(y_prob, y_true, plot=False):
    thresholds = np.linspace(0, 1, 100)  # Threshold values
    tpr_list = []  # True Positive Rate (Sensitivity)
    fpr_list = []  # False Positive Rate

    for threshold in thresholds:
        y_pred_thresholded = (y_prob >= threshold).astype(int)
        true_positives = np.sum((y_pred_thresholded == 1) & (y_true == 1))
        false_positives = np.sum((y_pred_thresholded == 1) & (y_true == 0))
        true_negatives = np.sum((y_pred_thresholded == 0) & (y_true == 0))
        false_negatives = np.sum((y_pred_thresholded == 0) & (y_true == 1))

        tpr = true_positives / (true_positives + false_negatives)
        fpr = false_positives / (false_positives + true_negatives)

        tpr_list.append(tpr)
        fpr_list.append(fpr)

    auc = calculate_auc(tpr_list, fpr_list)

    if plot:
        plt.figure(figsize=(8, 6))
        plt.plot(fpr_list, tpr_list, linestyle='-', marker='.')
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title(f'ROC Curve (AUC: {auc:.2f})')  # Include AUC score in the title
        plt.grid(True)
        # Add AUC score as text annotation on the plot
        plt.annotate(f'AUC = {auc:.2f}', xy=(0.6, 0.4), xytext=(0.6, 0.7),
                    #  arrowprops=dict(facecolor='black', shrink=0.05),
                     fontsize=12, color='black', backgroundcolor='white')
        plt.show()

    return auc

def calculate_auc(tpr, fpr):
    auc = 0.0
    for i in range(1, len(tpr)):
        width = fpr[i] - fpr[i - 1]
        height_avg = (tpr[i] + tpr[i - 1]) / 2
        auc += width * height_avg
    return auc


# Example usage:
if __name__ == "__main__":
    # ... (training and prediction code)

    # Calculate and print the requested metric or list available metrics
    metric_name = "list_metrics"  # Change this to the metric you want to calculate or "list_metrics"
    
    if metric_name == "list_metrics":
        print("Available metrics:", calculate_metric(metric_name, None, None))
    else:
        metric_value = calculate_metric(metric_name, y_pred, y_test)
        print(f"{metric_name.capitalize()}: {metric_value*100:.2f}%")

    # Calculate and print a specific metric
    metric_name = "accuracy"  # Change this to the metric you want
    metric_value = calculate_metric(metric_name, y_pred, y_test)
    print(f"{metric_name.capitalize()}: {metric_value*100:.2f}%")
    
    # Calculate and print all eligible metrics
    calculated_metrics = calculate_metrics(y_pred, y_test)
    for metric_name, metric_value in calculated_metrics.items():
        print(f"{metric_name.capitalize()}: {metric_value*100:.2f}%")

    # Calculate and print ROC AUC score
    auc_score = calculate_roc_auc(y_prob, y_test, plot=True)
    print(f"ROC AUC Score: {auc_score:.2f}")