In [None]:
import pandas as pd

train_data = pd.read_csv("data/detecting-reversal-points-in-us-equities/train.csv", low_memory=False)

- Feature Selection

- Model Building

- Evaluation Metric (in order of importance):
    1. Macro F1-score — treats all swing classes equally, regardless of class imbalance.
    2. Macro Balanced Accuracy
    3. Matthews Correlation Coefficient (multi-class)
    4. Inference runtime

    This ensures that winning models capture swing structure across all four categories, not just the majority class



#### Clean data + Data inspection

In [None]:
train_data["train_id"].unique() # [0,    1,    2, ..., 1929, 1930, 1931]
train_data["ticker_id"].unique() # [2, 3, 6, 1, 4, 5]
train_data["class_label"].unique() # [nan, 'HL', 'HH', 'LH', 'LL']

# Non boolean features
numerical_features = ['momentum', 'ratio', 'sm_momentum', 'sm_ratio']

In [None]:
train_data["class_label"] = train_data["class_label"].fillna("None")


In [None]:
for col in train_data.select_dtypes(include=['object']).columns:
    print(col)

#### Baseline model

In [None]:
from sklearn.calibration import LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.metrics import f1_score
from sklearn.utils import compute_sample_weight
import xgboost as xgb
from sklearn.model_selection import learning_curve
import matplotlib.pyplot as plt


from sklearn.model_selection import StratifiedKFold, cross_val_score
import numpy as np
from copy import copy

models = [
    # RandomForestClassifier(class_weight='balanced'), # class_weight='balanced'

    # LogisticRegression(
    #     penalty='l1', 
    #     solver='liblinear',     
    #     class_weight='balanced',
    #     max_iter=1000
    # ),
    
    xgb.XGBClassifier(),
]

for model in models:
    print(f"Model: {model.__class__.__name__}")
    all_scores = []
    for id in sorted(list(train_data["ticker_id"].unique())):
        # so that each model starts fresh
        tmp_model = copy(model)

        focused_df = train_data[train_data["ticker_id"] == id].sort_values(by="t")
        focused_df.set_index("t", inplace=True)
        focused_df.drop(columns=["train_id", "ticker_id"], inplace=True)
        X, y = focused_df.drop(columns=["class_label"]), focused_df["class_label"]
        # remove duplicated columns
        X = X.loc[:, ~X.T.duplicated()]

        print(X.shape)

        # Encode labels for XGBClassifier
        if model.__class__.__name__ == "XGBClassifier":
            le = LabelEncoder()
            y = le.fit_transform(y.astype(str))
            

        # train-test split
        from sklearn.model_selection import train_test_split
        train_X, val_X, train_y, val_y = train_test_split(X, y, test_size=0.2, shuffle=False)
        
        if model.__class__.__name__ == "XGBClassifier":
            sample_weights = compute_sample_weight(class_weight='balanced', y=train_y)
            tmp_model.fit(train_X, train_y, sample_weight=sample_weights)

            # Calculate learning curves
            train_sizes, train_scores, test_scores = learning_curve(
                estimator=tmp_model, X=X, y=y, scoring='f1_macro', cv=2,
                train_sizes=np.linspace(0.1, 1.0, 10)
            )

            # Calculate mean and standard deviation of scores
            train_mean = np.mean(train_scores, axis=1)
            train_std = np.std(train_scores, axis=1)
            test_mean = np.mean(test_scores, axis=1)
            test_std = np.std(test_scores, axis=1)

            # Plot learning curves
            plt.figure(figsize=(8, 6))
            plt.plot(train_sizes, train_mean, color='blue', marker='o', label='Training accuracy')
            plt.fill_between(train_sizes, train_mean + train_std, train_mean - train_std, alpha=0.15, color='blue')
            plt.plot(train_sizes, test_mean, color='green', marker='+', label='Validation accuracy')
            plt.fill_between(train_sizes, test_mean + test_std, test_mean - test_std, alpha=0.15, color='green')
            plt.title('Learning Curves')
            plt.xlabel('Training set size')
            plt.ylabel('Accuracy')
            plt.grid()
            plt.legend(loc='lower right')
            plt.show()

        else:
            tmp_model.fit(train_X, train_y)
        
        y_pred = tmp_model.predict(val_X)
        score = f1_score(val_y, y_pred, average='macro')
        all_scores.append(score)
        print(f"id: {id}, macro F1: {float(score):.4f}")
    print(f"Overall macro F1: {float(np.mean(all_scores)):.4f}\n")

In [None]:
focused_df = train_data[train_data["ticker_id"] == 2].sort_values(by="t")
focused_df.set_index("t", inplace=True)
focused_df.drop(columns=["train_id", "ticker_id"], inplace=True)
X, y = focused_df.drop(columns=["class_label"]), focused_df["class_label"]

In [None]:

X_cleaned =  X.T.drop_duplicates().T
print(X_cleaned.shape)
cols_to_drop = [col for col in X_cleaned.columns if X_cleaned[col].nunique() == 1]
X_cleaned = X_cleaned.drop(columns=cols_to_drop)
print(X_cleaned.shape)
X.columns.difference(X_cleaned.columns)

In [None]:
X.shape

#### Features selection

In [None]:
from sklearn.feature_selection import VarianceThreshold
from sklearn.model_selection import TimeSeriesSplit
from sklearn.base import clone

# Global safe filtering (no leakage)
def reduce_features_global(X):
    # Remove near-zero variance
    sel_var = VarianceThreshold(threshold=1e-5)
    X_reduced = sel_var.fit_transform(X)
    
    # Optional: remove duplicates
    # _, unique_idx = np.unique(X_reduced, axis=1, return_index=True)
    # X_reduced = X_reduced[:, sorted(unique_idx)]
    
    return X_reduced, sel_var

# full_X = train_data.drop(columns=["train_id", "ticker_id", "t", "class_label"])
# X_reduced, _ = reduce_features_global(full_X.values)
# train_data_red = pd.concat([
#     train_data[["ticker_id", "t", "class_label"]].reset_index(drop=True),
#     pd.DataFrame(X_reduced)
# ], axis=1)

# === 2. Per-ticker modeling ===
for base_model in models:
    all_scores = []
    for id in sorted(train_data["ticker_id"].unique()):
        df = train_data[train_data["ticker_id"] == id].sort_values("t")
        X, y = df.drop(columns=["ticker_id", "t", "class_label"]), df["class_label"]

        X_reduced, _ = reduce_features_global(X.values)
        print(len(X_reduced[0]))
        
        # TimeSeries CV
        tscv = TimeSeriesSplit(n_splits=3)
        fold_scores = []
        for train_idx, val_idx in tscv.split(X):
            model = clone(base_model)
            X_tr, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_tr, y_val = y.iloc[train_idx], y.iloc[val_idx]
        
            
            # Handle XGBoost label encoding
            if isinstance(model, xgb.XGBClassifier):
                le = LabelEncoder()
                y_tr_enc = le.fit_transform(y_tr)
                y_val_enc = le.transform(y_val)
                model.fit(X_tr, y_tr_enc)
                y_pred = le.inverse_transform(model.predict(X_val))
            else:
                model.fit(X_tr, y_tr)
                y_pred = model.predict(X_val)
                
            fold_scores.append(f1_score(y_val, y_pred, average='macro'))
        
        all_scores.append(np.mean(fold_scores))
    print(f"{model.__class__.__name__}: {np.mean(all_scores):.4f}")

In [None]:
from sklearn.feature_selection import SelectKBest, chi2, VarianceThreshold
import numpy as np

selector_var = VarianceThreshold(threshold=0.01)  # Keep features with var > 0.01
X_var = selector_var.fit_transform(train_X)
feature_names_after_var = np.array(train_X.columns)[selector_var.get_support()]


In [None]:
feature_names_after_var

In [None]:
len(train_X)

In [None]:
X_selected

In [None]:
train_x.head(5)

In [None]:
for i in range(1, 7):
    df = train_data[train_data["ticker_id"] == i]
    print(f"Ticker ID: {i}")
    print(df["class_label"].value_counts(dropna=False), "\n")

In [None]:
train_data.head(5)

In [None]:
"123".replace("3","")

In [None]:
from copy import copy

all_feature_names = train_data.columns.to_list()
all_feature_names_dict = {}

for feature_name in all_feature_names:
    if len(feature_name) >= 2 and feature_name[-2] == '_' and feature_name[-1].isdigit():
        feature_name = feature_name[:-2]
    elif feature_name [-3:] == "_10":
        feature_name = feature_name[:-3]
    name = ("_").join(feature_name.split("_")[:-1])

    if name not in all_feature_names_dict:
        print(feature_name)
        all_feature_names_dict[name] = []

# for i in list(all_feature_names_dict.keys()):
#     print(i)