In [648]:
# THIS CODE IS MY OWN WORK, IT WAS WRITTEN WITHOUT
#CONSULTING CODE WRITTEN BY OTHER STUDENTS. Jonatan Peguero 
import pandas as pd
import numpy as np
import random
import math
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

In [650]:
def load_data_custom(data_path):
    data_dict = {}
    df = pd.read_csv(data_path)
    df.columns = df.columns.str.strip()  # Clean column names
    target_col = "Has heart disease? (Prediction Target)"
    df[target_col] = df[target_col].str.strip().str.lower()
    # Use all columns except "person ID" and target as features.
    feature_names = [col for col in df.columns if col not in ["person ID", target_col]]
    for _, row in df.iterrows():
        pid = str(row["person ID"])
        label = row[target_col]
        features = {col: row[col] for col in feature_names}
        data_dict[pid] = {"features": features, "label": label}
    return data_dict, feature_names


In [652]:
def read_ids_from_file(filepath):
    ids = []
    with open(filepath, 'r') as f:
        for line in f:
            line = line.strip()
            if line:
                ids.append(line)
    return ids

In [654]:
def entropy(examples):
    label_counts = Counter(ex['label'] for ex in examples)
    total = len(examples)
    ent = 0.0
    for lbl, count in label_counts.items():
        p = count / total
        if p > 0:
            ent -= p * math.log2(p)
    return ent

In [656]:
def information_gain(examples, attribute):
    base_ent = entropy(examples)
    subsets = defaultdict(list)
    for ex in examples:
        subsets[ex['features'].get(attribute, None)].append(ex)
    total = len(examples)
    split_ent = 0.0
    for subset in subsets.values():
        weight = len(subset) / total
        split_ent += weight * entropy(subset)
    return base_ent - split_ent

In [658]:
def choose_best_attribute(examples, attributes):
    best_attr = None
    best_gain = -1
    for attr in attributes:
        gain = information_gain(examples, attr)
        if gain > best_gain:
            best_gain = gain
            best_attr = attr
    return best_attr


In [660]:
class TreeNode:
    def __init__(self, attribute=None, label=None):
        self.attribute = attribute
        self.label = label
        self.children = {}

In [662]:
def build_tree(examples, attributes, depth=0, max_depth=5, min_samples_split=5):
    if not examples or len(examples) < min_samples_split:
        if examples:
            common_label = Counter(ex['label'] for ex in examples).most_common(1)[0][0]
            return TreeNode(label=common_label)
        else:
            return TreeNode(label="no")
    
    all_labels = set(ex['label'] for ex in examples)
    if len(all_labels) == 1:
        return TreeNode(label=all_labels.pop())
    
    if not attributes or depth >= max_depth:
        common_label = Counter(ex['label'] for ex in examples).most_common(1)[0][0]
        return TreeNode(label=common_label)
    
    best_attr = choose_best_attribute(examples, attributes)
    root = TreeNode(attribute=best_attr)
    
    subsets = defaultdict(list)
    for ex in examples:
        attr_val = ex['features'].get(best_attr, None)
        subsets[attr_val].append(ex)
    
    remaining_attrs = [attr for attr in attributes if attr != best_attr]
    for val, subset in subsets.items():
        child_node = build_tree(subset, remaining_attrs, depth+1, max_depth, min_samples_split)
        root.children[val] = child_node
    
    return root

In [664]:
def predict(tree, example):
    if tree.label is not None:
        return tree.label
    attr_value = example['features'].get(tree.attribute, None)
    if attr_value not in tree.children:
        label_counts = Counter()
        def collect_labels(node):
            if node.label is not None:
                label_counts[node.label] += 1
            for child in node.children.values():
                collect_labels(child)
        collect_labels(tree)
        return label_counts.most_common(1)[0][0] if label_counts else "no"
    return predict(tree.children[attr_value], example)

In [666]:
def evaluate_model(ground_truth, pred_labels):
    acc = accuracy_score(ground_truth, pred_labels)
    prec = precision_score(ground_truth, pred_labels, pos_label="yes")
    rec = recall_score(ground_truth, pred_labels, pos_label="yes")
    f1 = f1_score(ground_truth, pred_labels, pos_label="yes")
    cm = confusion_matrix(ground_truth, pred_labels, labels=["yes", "no"])
    TP = cm[0,0]
    FN = cm[0,1]
    FP = cm[1,0]
    TN = cm[1,1]
    spec = TN / (TN + FP) if (TN + FP) > 0 else 0
    sensitivity = rec
    return acc, prec, rec, f1, spec, sensitivity

In [668]:
df = pd.read_csv("hw3data.csv")
df.columns = df.columns.str.strip()
print("Columns in CSV:", df.columns.tolist())

# Process the target column (adjust name as needed)
target_col = "Has heart disease? (Prediction Target)"
df[target_col] = df[target_col].str.strip().str.lower()
df[target_col] = df[target_col].map({"yes": 1, "no": 0})

# Process binary columns
df["fasting blood sugar > 120 mg/dl"] = df["fasting blood sugar > 120 mg/dl"].str.strip().str.lower()
df["fasting blood sugar > 120 mg/dl"] = df["fasting blood sugar > 120 mg/dl"].map({"yes": 1, "no": 0})
df["exercise induced angina"] = df["exercise induced angina"].str.strip().str.lower()
df["exercise induced angina"] = df["exercise induced angina"].map({"yes": 1, "no": 0})

# Convert gender column to numeric: "M" -> 1, "F" -> 0.
df["gender"] = df["gender"].str.strip().str.upper().map({"M": 1, "F": 0})

# One-hot encode categorical columns
categorical_cols = ["chest pain type", "resting electrocardiographic results"]
df = pd.get_dummies(df, columns=categorical_cols, drop_first=True)

# Optional: Bin the "age" column into categories
def bin_age(age):
    if age < 40:
        return "Under_40"
    elif age < 60:
        return "40_59"
    else:
        return "60_plus"

df["age_bin"] = df["age"].apply(bin_age)
df = pd.get_dummies(df, columns=["age_bin"], drop_first=True)
df = df.drop(columns=["age"])

ids = df["person ID"].astype(str).tolist()

feature_cols = [col for col in df.columns if col not in ["person ID", target_col]]
X = df[feature_cols]
y = df[target_col]
X = X.apply(pd.to_numeric, errors='coerce')
X = X.fillna(X.median())


Columns in CSV: ['person ID', 'age', 'gender', 'chest pain type', 'resting blood pressure', 'serum cholesterol in mg/dl', 'fasting blood sugar > 120 mg/dl', 'resting electrocardiographic results', 'maximum heart rate achieved', 'exercise induced angina', 'oldpeak = ST depression induced by exercise relative to rest', 'the slope of the peak exercise ST segment', 'number of major vessels (0-3) colored by flourosopy', 'Has heart disease? (Prediction Target)']


In [670]:
X_train, X_test, y_train, y_test, ids_train, ids_test = train_test_split(
    X, y, ids,
    test_size=0.3,
    random_state=42,
    stratify=y
)

print(f"Total records: {len(ids)}")
print(f"Training set size: {len(ids_train)}")
print(f"Test set size: {len(ids_test)}")

with open("para2_file.txt", "w") as f:
    for pid in ids_train:
        f.write(pid + "\n")

with open("para3_file.txt", "w") as f:
    for pid in ids_test:
        f.write(pid + "\n")

print("Random 70-30 train/test split saved to para2_file.txt and para3_file.txt")

Total records: 303
Training set size: 212
Test set size: 91
Random 70-30 train/test split saved to para2_file.txt and para3_file.txt


In [677]:
rf = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
rf.fit(X_train, y_train)
rf_pred = rf.predict(X_test)

with open("para4_file.txt", "w") as f:
    for pid, pred in zip(ids_test, rf_pred):
        label_str = "yes" if pred == 1 else "no"
        f.write(f"{pid} {label_str}\n")
print("Random Forest predictions have been written to para4_file.txt")

rf_acc = accuracy_score(y_test, rf_pred)
rf_prec = precision_score(y_test, rf_pred, pos_label=1)
rf_rec = recall_score(y_test, rf_pred, pos_label=1)
rf_f1 = f1_score(y_test, rf_pred, pos_label=1)
cm_rf = confusion_matrix(y_test, rf_pred, labels=[1, 0])
TP = cm_rf[0, 0]
FN = cm_rf[0, 1]
FP = cm_rf[1, 0]
TN = cm_rf[1, 1]
rf_spec = TN / (TN + FP) if (TN + FP) > 0 else 0

print("\nRandom Forest Evaluation (70/30 Hold-Out):")
print(f"Accuracy:        {rf_acc:.4f}")
print(f"Precision:       {rf_prec:.4f}")
print(f"Recall (Sensitivity): {rf_rec:.4f}")
print(f"F-measure (F1 Score):   {rf_f1:.4f}")
print(f"Specificity:     {rf_spec:.4f}")
print(f"Sensitivity:     {rf_rec:.4f}")

Random Forest predictions have been written to para4_file.txt

Random Forest Evaluation (70/30 Hold-Out):
Accuracy:        0.8132
Precision:       0.8000
Recall (Sensitivity): 0.8800
F-measure (F1 Score):   0.8381
Specificity:     0.7317
Sensitivity:     0.8800
