# Takes .qual File and Generates the Corresponding CSV

In [None]:
import csv

input_file = r"D:\Malak\iped\validation\filtered_mockfb_clean.qual"
output_file = r"D:\Malak\iped\validation\mockb_data_qual.csv"

with open(input_file, "r") as infile, open(output_file, "w", newline="") as outfile:
    writer = csv.writer(outfile)
    writer.writerow(["Sequence_ID", "Quality_Scores"])

    seq_id = None
    scores = []

    for line in infile:
        line = line.strip()
        if line.startswith(">"):
            if seq_id is not None:
                writer.writerow([seq_id, " ".join(scores)])
            seq_id = line[1:]  # remove ">"
            scores = []
        else:
            scores.extend(line.split())

    # Write the last sequence
    if seq_id is not None:
        writer.writerow([seq_id, " ".join(scores)])


# Takes .error File and Generates the Corresponding CSV

In [None]:
import csv

input_file = r"D:\Malak\iped\validation\filtered_mockfb_clean.error"
output_file = r"D:\Malak\iped\validation\mockb_sequences.csv"

with open(input_file, "r") as infile, open(output_file, "w", newline="") as outfile:
    writer = csv.writer(outfile)
    writer.writerow(["Sequence_ID2", "Sequence"])

    seq_id = ""
    sequence = ""

    for line in infile:
        line = line.strip()
        if line.startswith(">"):
            if seq_id:
                writer.writerow([seq_id, sequence])
            seq_id = line[1:]  # remove '>'
            sequence = ""
        else:
            sequence += line

    # Write last sequence
    if seq_id:
        writer.writerow([seq_id, sequence])


# Merge the 2 Generated CSVs to a Single File with 3 Columns (ID, Sequence, Errors)

In [None]:
import pandas as pd

# Load your file
df = pd.read_csv(r"D:\Malak\iped\validation\data.csv")  # replace with actual filename

# Clean Sequence_ID2: remove " ref:..." to get the comparable ID
df["cleaned_ID2"] = df["Sequence_ID2"].str.extract(r"^([^\s]+)")

# Create a mapping: cleaned_ID2 → Sequence
seq_dict = df.set_index("cleaned_ID2")["Sequence"].to_dict()

# Now build result: use Sequence_ID and Quality_Scores directly
# Use Sequence_ID to look up matching Sequence from Sequence_ID2 side
df["matched_sequence"] = df["Sequence_ID"].map(seq_dict)

# Prepare final result
final = df[["Sequence_ID", "Quality_Scores", "matched_sequence"]].dropna()
final.columns = ["Sequence_ID", "Quality_Scores", "Sequence"]

# Save to CSV
final.to_csv(r"D:\Malak\iped\validation\final_matched_output.csv", index=False)

print("✅ Matching completed and saved to 'final_matched_output.csv'")


# Process the File, Separate Bases, and Generate Features

In [None]:
import pandas as pd
import re
import csv
import math
from collections import Counter, defaultdict

def compute_window_abundance(base_seq, window_size=21):
    counts = Counter()
    half = window_size // 2
    padded_seq = ['X'] * half + base_seq + ['X'] * half
    for i in range(len(base_seq)):
        window = ''.join(padded_seq[i:i+window_size])
        counts[window] += 1
    return counts

def assign_window_abundance(base_seq, abundance_dict, window_size=21):
    half = window_size // 2
    padded_seq = ['X'] * half + base_seq + ['X'] * half
    abundances = []
    for i in range(len(base_seq)):
        window = ''.join(padded_seq[i:i+window_size])
        abundances.append(abundance_dict.get(window, 0))
    return abundances

def parse_contig_data(contig_str, matching_string, contig_index):
    contig_str = contig_str.replace('-', '.')
    entries = contig_str.strip().split()
    parsed = []
    for entry in entries:
        if re.match(r'^\d+[ACGT]{1}\.0$', entry):  # Forward-only
            phred = int(entry[:-3])
            base = entry[-3]
            parsed.append({'region': 'forward', 'base': base, 'forward_phred': phred, 'reverse_phred': None})
        elif re.match(r'^\d+[ACGT]{1,2}\d+$', entry):  # Overlap
            m = re.match(r'^(\d+)([ACGT]{1,2})(\d+)$', entry)
            f_phred, base, r_phred = int(m[1]), m[2], int(m[3])
            parsed.append({'region': 'overlap', 'base': base, 'forward_phred': f_phred, 'reverse_phred': r_phred})
        elif re.match(r'^0\.[ACGT]\d+$', entry):  # Reverse-only
            base = entry[2]
            r_phred = int(entry[3:])
            parsed.append({'region': 'reverse', 'base': base, 'forward_phred': None, 'reverse_phred': r_phred})
        else:
            print(f"⚠️ Unknown format in contig {contig_index}: {entry}")
    if len(parsed) != len(matching_string):
        print(f"❌ Contig {contig_index}: Length mismatch ({len(parsed)} vs {len(matching_string)})")
        return []
    for i, letter in enumerate(matching_string):
        parsed[i]['match_letter'] = letter
    return parsed

def normalize_base(base):
    return base.replace('GG', 'G').replace('AA', 'A').replace('TT', 'T').replace('CC', 'C')

def assign_homopolymer_positions(base_list, reverse=False):
    result = []
    if reverse:
        base_list = base_list[::-1]
    for i in range(len(base_list)):
        curr = base_list[i]
        prev = base_list[i - 1] if i - 1 >= 0 else None
        prev2 = base_list[i - 2] if i - 2 >= 0 else None
        prev3 = base_list[i - 3] if i - 3 >= 0 else None
        next_ = base_list[i + 1] if i + 1 < len(base_list) else None
        if curr != prev and curr != next_:
            pos = "solo"
        elif curr != prev and curr == next_:
            pos = "first"
        elif curr == prev and curr != prev2:
            pos = "second"
        elif curr == prev == prev2 and curr != prev3:
            pos = "third"
        else:
            count = 0
            for j in range(i - 1, -1, -1):
                if base_list[j] == curr:
                    count += 1
                else:
                    break
            pos = f"{count+1}th"
        result.append(pos)
    if reverse:
        result = result[::-1]
    return result

def count_homopolymer_length(base_list):
    lengths = []
    count = 1
    for i in range(len(base_list)):
        if i == 0:
            lengths.append(1)
        else:
            if base_list[i] == base_list[i - 1]:
                count += 1
            else:
                count = 1
            lengths.append(count)
    return lengths

def detect_gcc_motif(base_list, reverse=False):
    motif_flags = []
    if reverse:
        base_list = base_list[::-1]
    for i in range(len(base_list)):
        if i >= 3 and ''.join(base_list[i - 3:i]) == 'GCC':
            motif_flags.append(1)
        else:
            motif_flags.append(0)
    if reverse:
        motif_flags = motif_flags[::-1]
    return motif_flags

def compute_gc_and_entropy(seq, window=5):
    half = window // 2
    gc_list = []
    entropy_list = []
    for i in range(len(seq)):
        start = max(0, i - half)
        end = min(len(seq), i + half + 1)
        window_seq = seq[start:end]
        gc = sum(1 for b in window_seq if b in ['G', 'C']) / len(window_seq)
        freqs = Counter(window_seq)
        entropy = -sum((v / len(window_seq)) * math.log2(v / len(window_seq)) for v in freqs.values())
        gc_list.append(gc)
        entropy_list.append(entropy)
    return gc_list, entropy_list

# === Main function ===
def process_all_sequences(input_csv, output_csv, window=5):
    df = pd.read_csv(input_csv)
    header_written = False

    # === Precompute global abundance of full sequences ===
    global_sequence_counts = df["Sequence"].value_counts().to_dict()

    with open(output_csv, mode='w', newline='') as f_out:
        writer = None

        forward_risk_map = {
            'solo': 0.013018963, 'first': 0.0102533, 'second': 0.008364577, 'third': 0.010356342,
            '4th': 0.025978121, '5th': 0.00977431, '6th': 0.012274475, '7th': 0.119402985, '8th': 0.0
        }
        reverse_risk_map = {
            'solo': 0.013018963, 'first': 0.008704605, 'second': 0.00988321, 'third': 0.013463661,
            '4th': 0.013611362, '5th': 0.007787314, '6th': 0.003812876, '7th': 0.229508197, '8th': 0.0
        }

        for idx, row in df.iterrows():
            contig_str = str(row['Quality_Scores'])
            matching_str = str(row['Sequence']).strip()

            parsed = parse_contig_data(contig_str, matching_str, idx)
            if not parsed:
                continue

            for p in parsed:
                p['base'] = normalize_base(p['base'])

            base_seq = [p['base'] for p in parsed]

            # Feature: window abundance
            window_abundance_map = compute_window_abundance(base_seq, window_size=21)
            window_abundances = assign_window_abundance(base_seq, window_abundance_map, window_size=21)

            # Feature: global + local sequence abundance
            global_abundance = global_sequence_counts.get(matching_str, 1)
            local_abundance = contig_str.strip().split().count(contig_str.strip().split()[0])  # crude: use first token type count

            hp_forward = assign_homopolymer_positions(base_seq, reverse=False)
            hp_reverse = assign_homopolymer_positions(base_seq, reverse=True)
            motif_forward = detect_gcc_motif(base_seq, reverse=False)
            motif_reverse = detect_gcc_motif(base_seq, reverse=True)

            total_len = len(parsed)
            forward_phred = [p['forward_phred'] if p['forward_phred'] is not None else 0 for p in parsed]
            reverse_phred = [p['reverse_phred'] if p['reverse_phred'] is not None else 0 for p in parsed]
            phred_delta_forward = [0] + [forward_phred[i] - forward_phred[i - 1] for i in range(1, total_len)]
            phred_delta_reverse = [0] + [reverse_phred[i] - reverse_phred[i - 1] for i in range(1, total_len)]

            hp_len_fwd = count_homopolymer_length(base_seq)
            hp_len_rev = count_homopolymer_length(base_seq[::-1])[::-1]
            gc_forward, entropy_forward = compute_gc_and_entropy(base_seq, window)
            gc_reverse, entropy_reverse = compute_gc_and_entropy(base_seq[::-1], window)
            gc_reverse = gc_reverse[::-1]
            entropy_reverse = entropy_reverse[::-1]

            for i in range(total_len):
                parsed[i]['normalized_position_forward'] = i / (total_len - 1) if total_len > 1 else 0.0
                parsed[i]['normalized_position_reverse'] = (total_len - 1 - i) / (total_len - 1) if total_len > 1 else 0.0
                parsed[i]['homopolymer_position_forward'] = hp_forward[i]
                parsed[i]['homopolymer_position_reverse'] = hp_reverse[i]
                parsed[i]['homopolymer_error_risk_forward'] = forward_risk_map.get(hp_forward[i], 0)
                parsed[i]['homopolymer_error_risk_reverse'] = reverse_risk_map.get(hp_reverse[i], 0)
                parsed[i]['homopolymer_length_forward'] = hp_len_fwd[i]
                parsed[i]['homopolymer_length_reverse'] = hp_len_rev[i]
                parsed[i]['GC_content_forward'] = gc_forward[i]
                parsed[i]['GC_content_reverse'] = gc_reverse[i]
                parsed[i]['entropy_forward'] = entropy_forward[i]
                parsed[i]['entropy_reverse'] = entropy_reverse[i]
                parsed[i]['phred_delta_forward'] = phred_delta_forward[i]
                parsed[i]['phred_delta_reverse'] = phred_delta_reverse[i]
                parsed[i]['is_overlap'] = int(parsed[i]['region'] == 'overlap')
                parsed[i]['has_GCC_prev3_forward'] = motif_forward[i]
                parsed[i]['has_GCC_prev3_reverse'] = motif_reverse[i]
                parsed[i]['is_last_in_contig'] = int(i == len(parsed) - 1)
                parsed[i]['window_abundance'] = window_abundances[i]
                parsed[i]['global_abundance'] = global_abundance
                parsed[i]['local_abundance'] = len(parsed)

            if not header_written:
                writer = csv.DictWriter(f_out, fieldnames=parsed[0].keys())
                writer.writeheader()
                header_written = True

            writer.writerows(parsed)

    print("✅ File saved:", output_csv)
    # === Run it ===
process_all_sequences(
    input_csv=r"D:\Malak\iped\validation\final_matched_output.csv",
    output_csv=r"D:\Malak\iped\validation\final_parsed_with_motifs.csv"
) 

# Extract Features of Interest

In [None]:
import pandas as pd

input_path = r"D:\Malak\iped\validation\final_parsed_with_motifs.csv"
output_path = r"D:\Malak\iped\validation\ml_ready_dataset.csv"

columns_to_keep = [
    'homopolymer_length_forward',
    'homopolymer_length_reverse',
    'normalized_position_forward',
    'normalized_position_reverse',
    'forward_phred',
    'reverse_phred',
    'homopolymer_error_risk_forward',
    'homopolymer_error_risk_reverse',
    'is_overlap',
    'has_GCC_prev3_forward',
    'has_GCC_prev3_reverse',
    'match_letter',
    'GC_content_forward',
    'GC_content_reverse',
    'entropy_forward',
    'entropy_reverse',
    'window_abundance',
    'global_abundance',
    'local_abundance'
]

# Process in chunks
chunk_size = 100_000
first_chunk = True

for chunk in pd.read_csv(input_path, chunksize=chunk_size, usecols=columns_to_keep):
    chunk['match_letter'] = chunk['match_letter'].replace("i", "s")
    
    if first_chunk:
        chunk.to_csv(output_path, index=False, mode='w')
        first_chunk = False
    else:
        chunk.to_csv(output_path, index=False, mode='a', header=False)

print(f"✅ Processed file saved to: {output_path}")

# RF Classifier

In [None]:
import pandas as pd
from sklearn.utils import resample
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, accuracy_score

input_path = r"D:\Malak\iped\ml_ready_dataset.csv"
balanced_output_path = r"D:\Malak\iped\balanced_subset.csv"

# === 1. Process in chunks and build balanced subset ===
chunk_size = 100_000
minority_rows = []
majority_rows = []

for chunk in pd.read_csv(input_path, chunksize=chunk_size):
    chunk["label"] = chunk["match_letter"].map({"m": 0, "s": 1})
    chunk.drop(columns=["match_letter"], inplace=True)

    minority = chunk[chunk["label"] == 1]
    majority = chunk[chunk["label"] == 0]

    minority_rows.append(minority)
    if len(minority) > 0:
        downsampled_majority = resample(
            majority,
            replace=False,
            n_samples=len(minority) * 5,
            random_state=42
        )
        majority_rows.append(downsampled_majority)

# Concatenate and save balanced set
df_minority = pd.concat(minority_rows, ignore_index=True)
df_majority = pd.concat(majority_rows, ignore_index=True)
df_balanced = pd.concat([df_minority, df_majority], ignore_index=True)
df_balanced.to_csv(balanced_output_path, index=False)
print(f"✅ Balanced subset saved: {balanced_output_path}")

# === 2. Load small balanced file ===
df = pd.read_csv(balanced_output_path)
X = df.drop(columns=["label"])
y = df["label"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, stratify=y, test_size=0.2, random_state=42
)

# === 3. Train model ===
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)
y_proba = clf.predict_proba(X_test)[:, 1]

# === 4. Threshold evaluation ===
thresholds = [0.8,0.7,0.6,0.59,0.58,0.57,0.56,0.55,0.54,0.53,0.52,0.51,0.50,0.49,0.48,0.47,0.46,0.45,0.44,0.43,0.42,0.41, 0.40, 0.35, 0.30, 0.25, 0.20, 0.15, 0.10]

print("=== Threshold Evaluation Report ===")
print("Threshold | Accuracy | Sensitivity | Specificity")
print("-----------------------------------------------")

for threshold in thresholds:
    y_pred = (y_proba >= threshold).astype(int)
    cm = confusion_matrix(y_test, y_pred)
    TN, FP, FN, TP = cm.ravel()

    acc = accuracy_score(y_test, y_pred)
    sensitivity = TP / (TP + FN) if (TP + FN) > 0 else 0
    specificity = TN / (TN + FP) if (TN + FP) > 0 else 0

    print(f"{threshold:9.2f} | {acc:.4f}   | {sensitivity:.4f}     | {specificity:.4f}")

# Voting Combos

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.impute import SimpleImputer
from xgboost import XGBClassifier
import itertools
import numpy as np

# === Load Data ===
balanced_output_path = r"D:\Malak\iped\balanced_subset.csv"
df = pd.read_csv(balanced_output_path)

# Impute missing values (mean for numeric)
X = df.drop(columns=["label"])
imputer = SimpleImputer(strategy='mean')
X = pd.DataFrame(imputer.fit_transform(X), columns=X.columns)

y = df["label"]

# Train/Test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, stratify=y, test_size=0.2, random_state=42
)


# === Train Models ===
rf = RandomForestClassifier(n_estimators=100, random_state=42)
mlp = MLPClassifier(hidden_layer_sizes=(100,), max_iter=300, random_state=42)
xgb = XGBClassifier(use_label_encoder=False, eval_metric="logloss", random_state=42)

print("⏳ Training models...")
rf.fit(X_train, y_train)
mlp.fit(X_train, y_train)
xgb.fit(X_train, y_train)

# === Get Probabilities ===
rf_proba = rf.predict_proba(X_test)[:, 1]
mlp_proba = mlp.predict_proba(X_test)[:, 1]
xgb_proba = xgb.predict_proba(X_test)[:, 1]

# === Voting Combinations ===
model_names = ['RF', 'MLP', 'XGB']
probas = {'RF': rf_proba, 'MLP': mlp_proba, 'XGB': xgb_proba}
combinations = [combo for i in range(1, 4) for combo in itertools.combinations(model_names, i)]

thresholds = [0.8, 0.7, 0.6, 0.59, 0.58, 0.57, 0.56, 0.55, 0.54, 0.53, 0.52,
              0.51, 0.50, 0.49, 0.48, 0.47, 0.46, 0.45, 0.44, 0.43, 0.42,
              0.41, 0.40, 0.35, 0.30, 0.25, 0.20, 0.15, 0.10]

# === Evaluate Each Combination ===
print("\n=== Voting Ensemble Threshold Evaluation ===")
for combo in combinations:
    avg_proba = np.mean([probas[model] for model in combo], axis=0)
    print(f"\n🧠 Combo: {' + '.join(combo)}")
    print("Threshold | Accuracy | Sensitivity | Specificity")
    print("-----------------------------------------------")

    for threshold in thresholds:
        y_pred = (avg_proba >= threshold).astype(int)
        cm = confusion_matrix(y_test, y_pred)
        TN, FP, FN, TP = cm.ravel()

        acc = accuracy_score(y_test, y_pred)
        sensitivity = TP / (TP + FN) if (TP + FN) > 0 else 0
        specificity = TN / (TN + FP) if (TN + FP) > 0 else 0

        print(f"{threshold:9.2f} | {acc:.4f}   | {sensitivity:.4f}     | {specificity:.4f}")


# Generate Models (RF + XGB) and Imputer

In [None]:
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.metrics import confusion_matrix, accuracy_score

# === Config ===
input_path = r"D:\Malak\iped\ml_ready_dataset.csv"
chunk_size = 1_000_000
threshold = 0.48
balance_ratio = 4  # 1 error : 3 correct

# === Prepare growing training set ===
balanced_chunks = []

print("🔄 Building balanced training set...")
for chunk in pd.read_csv(input_path, chunksize=chunk_size):
    chunk["label"] = chunk["match_letter"].map({"m": 0, "s": 1})
    chunk.drop(columns=["match_letter"], inplace=True)

    # Get all errors and sampled corrects
    errors = chunk[chunk["label"] == 1]
    corrects = chunk[chunk["label"] == 0].sample(
        n=min(len(errors) * balance_ratio, len(chunk[chunk["label"] == 0])),
        random_state=42
    )

    balanced_chunk = pd.concat([errors, corrects])
    balanced_chunks.append(balanced_chunk)

# Combine all balanced chunks
balanced_df = pd.concat(balanced_chunks).sample(frac=1, random_state=42).reset_index(drop=True)
print(f"✅ Final balanced training set: {balanced_df.shape}")

# === Prepare training data ===
X_train = balanced_df.drop(columns=["label"])
y_train = balanced_df["label"]

# Impute missing values
imputer = SimpleImputer(strategy='mean')
X_train = pd.DataFrame(imputer.fit_transform(X_train), columns=X_train.columns)

# Train models
rf = RandomForestClassifier(n_estimators=100, random_state=42)
xgb = XGBClassifier(use_label_encoder=False, eval_metric="logloss", random_state=42)

print("⏳ Training models on full balanced data...")
rf.fit(X_train, y_train)
xgb.fit(X_train, y_train)

# === Load full dataset for evaluation ===
print("\n🔍 Evaluating on full dataset...")

full_df = pd.read_csv(input_path)
full_df["label"] = full_df["match_letter"].map({"m": 0, "s": 1})
full_df.drop(columns=["match_letter"], inplace=True)
full_df = full_df.dropna(subset=["label"])

X_full = full_df.drop(columns=["label"])
y_full = full_df["label"].values

# Impute missing values
X_full_imputed = pd.DataFrame(imputer.transform(X_full), columns=X_full.columns)

# Get predictions
rf_proba = rf.predict_proba(X_full_imputed)[:, 1]
xgb_proba = xgb.predict_proba(X_full_imputed)[:, 1]
avg_proba = (rf_proba + xgb_proba) / 2
y_pred = (avg_proba >= threshold).astype(int)

# Evaluate
cm = confusion_matrix(y_full, y_pred, labels=[0, 1])
TN, FP, FN, TP = cm.ravel()

acc = accuracy_score(y_full, y_pred)
sensitivity = TP / (TP + FN) if (TP + FN) > 0 else 0
specificity = TN / (TN + FP) if (TN + FP) > 0 else 0

# Print results
print("\n=== Final Evaluation at Threshold 0.46 ===")
print(f"Samples      : {len(y_full)}")
print(f"Accuracy     : {acc:.4f}")
print(f"Sensitivity  : {sensitivity:.4f}")
print(f"Specificity  : {specificity:.4f}")
print(f"TP = {TP}, FP = {FP}, TN = {TN}, FN = {FN}")

import joblib
import os

# === Save models and imputer ===
model_dir = r"D:\Malak\iped\models"
os.makedirs(model_dir, exist_ok=True)

joblib.dump(rf, os.path.join(model_dir, "rf_model.pkl"))
joblib.dump(xgb, os.path.join(model_dir, "xgb_model.pkl"))
joblib.dump(imputer, os.path.join(model_dir, "imputer.pkl"))

print(f"\n💾 Models saved to: {model_dir}")

# Test on New Mock Using Models

In [None]:
import pandas as pd
import joblib
import numpy as np
import os

# === Config ===
model_dir = r"D:\Malak\iped\models"
feature_file = r"D:\Malak\iped\validation\ml_ready_dataset.csv"
output_file = r"D:\Malak\iped\validation\validation_predictions_for_comparison.csv"
threshold = 0.46

# === Step 1: Load models and imputer ===
rf = joblib.load(os.path.join(model_dir, "rf_model.pkl"))
xgb = joblib.load(os.path.join(model_dir, "xgb_model.pkl"))
imputer = joblib.load(os.path.join(model_dir, "imputer.pkl"))

# === Step 2: Define the same features as in training ===
features = [
    'forward_phred',
    'reverse_phred',
    'normalized_position_forward',
    'normalized_position_reverse', 
    'homopolymer_error_risk_forward',
    'homopolymer_error_risk_reverse',
    'homopolymer_length_forward',
    'homopolymer_length_reverse',
    'GC_content_forward',
    'GC_content_reverse',
    'entropy_forward',
    'entropy_reverse',
    'is_overlap',
    'has_GCC_prev3_forward',
    'has_GCC_prev3_reverse',
    'window_abundance',
    'global_abundance',
    'local_abundance'
]

# === Step 3: Load validation data ===
df = pd.read_csv(feature_file)

# Get ground truth match letters
true_labels = df["match_letter"].replace("i", "s").values

# Prepare features for prediction
X_val = df[features]
X_val_imputed = pd.DataFrame(imputer.transform(X_val), columns=features)

# === Step 4: Model predictions ===
rf_proba = rf.predict_proba(X_val_imputed)[:, 1]
xgb_proba = xgb.predict_proba(X_val_imputed)[:, 1]
avg_proba = (rf_proba + xgb_proba) / 2

# Apply voting threshold
y_pred = (avg_proba >= threshold).astype(int)
predicted_labels = np.where(y_pred == 0, 'm', 's')

# === Step 5: Save for comparison ===
comparison_df = pd.DataFrame({
    "match_letter": true_labels,
    "predicted_label": predicted_labels
})

comparison_df.to_csv(output_file, index=False)
print(f"✅ Saved prediction vs match_letter comparison to: {output_file}")

# Compare Results of New Mock Classification to its Correct Classification

In [None]:
import pandas as pd
from sklearn.metrics import confusion_matrix, accuracy_score

# Load the CSV file
file_path = r"D:\Malak\iped\validation\validation_predictions_for_comparison.csv"
df = pd.read_csv(file_path)

# Ensure expected columns exist
assert 'match_letter' in df.columns and 'predicted_label' in df.columns, "Missing required columns."

# Step 1: Unique values
true_unique = df['match_letter'].unique()
pred_unique = df['predicted_label'].unique()

# Step 2: Counts
true_counts = df['match_letter'].value_counts()
pred_counts = df['predicted_label'].value_counts()

# Step 3: Confusion matrix
conf_matrix = pd.crosstab(df['match_letter'], df['predicted_label'], rownames=['Actual'], colnames=['Predicted'], dropna=False)

# Step 4: Metrics
y_true = df['match_letter'].map({'m': 0, 's': 1}).values
y_pred = df['predicted_label'].map({'m': 0, 's': 1}).values
cm = confusion_matrix(y_true, y_pred, labels=[0, 1])
TN, FP, FN, TP = cm.ravel()

accuracy = accuracy_score(y_true, y_pred)
sensitivity = TP / (TP + FN) if (TP + FN) > 0 else 0  # Recall for class 's'
specificity = TN / (TN + FP) if (TN + FP) > 0 else 0  # Recall for class 'm'

# Display
print("✅ Unique values in 'match_letter':", true_unique)
print("✅ Unique values in 'predicted_label':", pred_unique)
print("\n📊 Count of each class in 'match_letter':\n", true_counts)
print("\n📊 Count of each class in 'predicted_label':\n", pred_counts)
print("\n🧮 Confusion Matrix:\n", conf_matrix)
print(f"\n✅ Accuracy     : {accuracy:.4f}")
print(f"✅ Sensitivity  : {sensitivity:.4f} (True 's' recall)")
print(f"✅ Specificity  : {specificity:.4f} (True 'm' recall)")
