In [None]:
# This code trains a lightgbm classifier and tests it on a test dataset of size 214. The model achieves 0.939 test accuracy and 0.967 validation accuracy

In [None]:
import numpy as np
import pandas as pd
import math
import re
from scipy.stats import skew, kurtosis
import logging
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.metrics import (accuracy_score, log_loss)
from sklearn.preprocessing import StandardScaler
from lightgbm import LGBMClassifier
import joblib
from lightgbm import early_stopping


In [None]:
Data_Dir = Path("/content/drive/MyDrive/Dirichlet_Mod_Cond")
Train_Dir = Data_Dir / "Zeros_per_Modulus_Train"
Test_Dir = Data_Dir / "Zeros_per_Modulus_Test"
Out_Dir = Data_Dir / "results"

Seed = 42
FFT_Comp = 30
Test_Sz = 0.2 # this is for validation set contruction purposes
Early_Stoping_Rounds = 75


# learning rate was tuned after several experiements

Model_Params = {
    "objective": "multiclass",
    "num_leaves": 127,
    "random_state": Seed,
    "num_class": 140,
    "bagging_freq": 5,
    "class_weight": "balanced",
    "learning_rate": 0.005,
    "n_estimators": 1500,
    "feature_fraction": 0.85,
    "bagging_fraction": 0.85,
    "class_weight": "balanced",
    "n_jobs": -1,
    "verbose": 0,
}

In [None]:
# each text document is named q (an integer that is the label i.e the modulus)
Fixed_Zero_Count = 25  # use exactly first 25 zeros

def load_data(folder: Path) -> pd.DataFrame:

    pattern = re.compile(r"[-+]?\d*\.\d+(?:[eE][-+]?\d+)?")
    label_zeros = []

    for file_path in folder.glob("*.txt"):
        label = int(file_path.stem)
        with file_path.open() as f:
            for line in f:
                parts = line.split(':', maxsplit=1)
                zero_values = pattern.findall(parts[1])
                zeros = [float(z) for z in zero_values]
                label_zeros.append([label] + zeros)

    columns = ["label"] + [f"zero_{i+1}" for i in range(Fixed_Zero_Count)]
    return pd.DataFrame(label_zeros, columns=columns)

In [None]:

#  statistical moments
# first and second differences
# FFT magnitudes (first FFT_Comp components)
def extract_features(data: pd.DataFrame) -> pd.DataFrame:
    feature_list = []

    for row in data.itertuples(index=False):
        label = int(row.label)
        zeros = np.array(row[1:], dtype=float)

        #sample moments
        mean_z = nonzero.mean() 
        var_z = nonzero.var() 
        skew_z = skew(nonzero) 
        kurt_z = kurtosis(nonzero) 
        rms_z = np.sqrt((nonzero**2).mean()) 

        # Differences
        diff1 = np.diff(nonzero) 
        diff2 = np.diff(diff1) 
        mean_d1, var_d1 = diff1.mean(), diff1.var()
        mean_d2, var_d2 = diff2.mean(), diff2.var()

        # FFT features
        fft_vals = np.fft.rfft(nonzero)
        mags = np.abs(fft_vals)[1 : FFT_Comp + 1]
        if mags.size < FFT_Comp:
            mags = np.pad(mags, (0, FFT_Comp - mags.size))

        features = [
            label, mean_z, var_z, skew_z, kurt_z, rms_z,
            mean_d1, var_d1, mean_d2, var_d2, mean_pg,
            *mags.tolist(),
        ]
        feature_list.append(features)

    # Column names
    stats_cols = [
        "label", "mean_z", "var_z", "skew_z", "kurt_z", "rms_z",
        "mean_d1", "var_d1", "mean_d2", "var_d2", "mean_pg",
    ]
    fft_cols = [f"FFT_{i+1}" for i in range(FFT_Comp)]

    return pd.DataFrame(feature_list, columns=stats_cols + fft_cols)


In [None]:
def compute_and_save_metrics(y_true, y_pred, y_proba, classes, tag):
    acc = accuracy_score(y_true, y_pred)
    ll = log_loss(y_true, y_proba, labels=classes)
    logging.info(f"{tag}: Accuracy={acc:.4f}, LogLoss={ll:.4f}")
    print(f"[{tag}] Accuracy: {acc:.4f}, LogLoss: {ll:.4f}")

    # probabilities and true vs predicted
    df_probs = pd.DataFrame(y_proba, columns=classes)
    df_probs.insert(0, "predicted", y_pred)
    df_probs.insert(0, "true", y_true.values)
    df_probs.to_csv(Out_Dir / f"{tag}_probs.csv", index=False)

    df_tp = pd.DataFrame({"true": y_true.values, "predicted": y_pred})
    df_tp.to_csv(Out_Dir / f"{tag}_true_vs_pred.csv", index=False)

    pd.DataFrame([{"accuracy": acc, "log_loss": ll}]) \
      .to_csv(Out_Dir / f"{tag}_summary.csv", index=False)

In [None]:
if __name__ == "__main__":
    train_data = load_data(Train_Dir)
    test_data = load_data(Test_Dir)

    train_features = extract_features(train_data)
    test_features = extract_features(test_data)

    X_train = train_features.drop(columns=["label"])
    y_train = train_features["label"]
    X_test = test_features.drop(columns=["label"])
    y_test = test_features["label"]

    # Train-validation set split
    X_tr, X_val, y_tr, y_val = train_test_split(X_train, y_train, test_size=Test_Sz, stratify=y_train, random_state=Seed,)

    scaler = StandardScaler().fit(X_tr)
    X_tr_scaled = scaler.transform(X_tr)
    X_val_scaled = scaler.transform(X_val)
    X_test_scaled = scaler.transform(X_test)

    # train
    model = LGBMClassifier(**Model_Params)
    model.fit(X_tr_scaled, y_tr, eval_set=[(X_val_scaled, y_val)], eval_metric="multi_logloss", callbacks=[early_stopping(stopping_rounds=Early_Stoping_Rounds)],)

In [None]:
for split, X_split, y_split in [("Val", X_val_scaled, y_val), ("Test", X_test_scaled, y_test)]:
        preds = model.predict(X_split)
        probs = model.predict_proba(X_split)
        compute_and_save_metrics(y_split, preds, probs, model.classes_, split)