In [98]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
import numpy as np

# Step 1: Load Data
training_data = pd.read_csv('../data/training_data.csv', delimiter=';')
test_data = pd.read_csv('../data/test_data_no_target.csv', delimiter=';')

# Step 2: Convert Numerical Columns from strings to floats
def convert_to_float(df):
    for col in df.columns:
        if df[col].dtype == 'object' and col not in ['Group', 'Class', 'Perform']:
            df[col] = df[col].str.replace(',', '.').astype(float)
    return df

# Apply conversion to the training data
training_data = convert_to_float(training_data)
test_data = convert_to_float(test_data)

# Step 4: One-Hot Encoding for the 'Group' column
training_data = pd.get_dummies(training_data, columns=['Group'])
test_data = pd.get_dummies(test_data, columns=['Group'])

# Ensure the test set has the same columns as the training set
missing_cols = set(training_data.columns) - set(test_data.columns) - {'Class', 'Perform'}
for col in missing_cols:
    test_data[col] = 0
test_data = test_data[training_data.columns.drop(['Class', 'Perform'])]

In [99]:
import numpy as np
from sklearn.metrics import confusion_matrix

cost_matrix = np.array([[0, 1, 2],
                        [1, 0, 1],
                        [2, 1, 0]])
def calculate_custom_error(preds, gt, cost_matrix=cost_matrix):
    """
    Calculate a custom error metric based on a confusion matrix and a cost matrix.

    Args:
    preds (array-like): Predicted labels.
    gt (array-like): Ground truth (actual) labels.
    cost_matrix (numpy.ndarray): A matrix of costs associated with misclassifications.

    Returns:
    float: The calculated error metric.
    """
    # Calculate the confusion matrix
    cm = confusion_matrix(gt, preds)
    
    # Validate dimensions of cost_matrix
    if cm.shape != cost_matrix.shape:
        raise ValueError("Cost matrix dimensions must match the confusion matrix dimensions.")
    
    # Calculate weighted confusion matrix
    weighted_cm = cm * cost_matrix
    
    # Calculate the custom error
    total_samples = len(gt)
    if total_samples == 0:
        raise ValueError("The length of ground truth cannot be zero.")
    
    error = np.sum(weighted_cm) / total_samples
    return error


In [100]:
from sklearn.metrics import make_scorer
custom_scorer = make_scorer(calculate_custom_error, greater_is_better=False)

In [101]:
# Step 3: Handle Missing Values (fill with the median)
training_data.fillna(training_data.median(numeric_only=True), inplace=True)
test_data.fillna(test_data.median(numeric_only=True), inplace=True)

In [102]:

# Step 5: Separate features and target
X_train = training_data.drop(columns=['Class', 'Perform'])
y_train = training_data['Class']

# Step 6: Scale the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(test_data)

# Convert scaled data back to DataFrame for easier manipulation
X_train_scaled = pd.DataFrame(X_train_scaled, columns=X_train.columns)
X_test_scaled = pd.DataFrame(X_test_scaled, columns=X_train.columns)

# Step 7: Perform Exhaustive Feature Selection
# Split the data into training and validation sets
X_train_split, X_val_split, y_train_split, y_val_split = train_test_split(X_train_scaled, y_train, test_size=0.2, random_state=42)


In [103]:
from sklearn.feature_selection import SelectFromModel, SelectKBest
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
import xgboost as xgb

In [104]:
from imblearn.over_sampling import SMOTE

# Step 6: Oversample class 0 using SMOTE
smote = SMOTE(sampling_strategy={0: 2*sum(y_train_split==0)}, random_state=42)
X_train_split, y_train_split = smote.fit_resample(X_train_split, y_train_split)

In [105]:
y_train_split += 1
y_val_split += 1

In [106]:
# Define the pipeline with feature selection and classification
pipeline = Pipeline([
    ("feature_selection", SelectKBest()),
    ("classification", xgb.XGBClassifier(use_label_encoder=False, eval_metric='mlogloss'))
])

# Define the parameter grid to search over
grid_parameters_tune = {
    "feature_selection__k": list(range(10,100)),  # number of features to select,
}

In [107]:
# Perform GridSearchCV
grid_search = GridSearchCV(pipeline, grid_parameters_tune, cv=5, scoring=custom_scorer, n_jobs=-1, verbose=10)
grid_search.fit(X_train_split, y_train_split)

Fitting 5 folds for each of 90 candidates, totalling 450 fits


[CV 1/5; 1/90] START feature_selection__k=10....................................
[CV 1/5; 2/90] START feature_selection__k=11....................................
[CV 2/5; 1/90] START feature_selection__k=10....................................
[CV 3/5; 1/90] START feature_selection__k=10....................................
[CV 3/5; 2/90] START feature_selection__k=11....................................
[CV 5/5; 1/90] START feature_selection__k=10....................................
[CV 4/5; 1/90] START feature_selection__k=10....................................
[CV 2/5; 2/90] START feature_selection__k=11....................................
[CV 5/5; 1/90] END ....feature_selection__k=10;, score=-0.816 total time=   0.6s
[CV 3/5; 1/90] END ....feature_selection__k=10;, score=-0.834 total time=   0.6s
[CV 2/5; 1/90] END ....feature_selection__k=10;, score=-0.865 total time=   0.7s
[CV 4/5; 1/90] END ....feature_selection__k=10;, score=-0.822 total time=   0.6s
[CV 4/5; 2/90] START feature

In [108]:
np.unique(y_train_split, return_counts=True)

(array([0, 1, 2]), array([2474, 1840, 3006]))

In [109]:
# Best model and parameters
best_model = grid_search.best_estimator_
print(f"Best parameters: {grid_search.best_params_}")
print(f"Best custom error: {grid_search.best_score_}")

Best parameters: {'feature_selection__k': 88}
Best custom error: -0.769672131147541


In [140]:
y_val_pred = best_model.predict_proba((X_val_split))

In [141]:
def get_class_index(probs, threshold=0.7):
    class_indices = np.zeros(probs.shape[0], dtype=int)
    for i, prob in enumerate(probs):
        if np.max(prob) > threshold:
            class_indices[i] = np.argmax(prob)  # Adding 1 to index for class numbering
        else:
            class_indices[i] = 1
    return class_indices

In [142]:
np.unique(y_val_pred, return_counts=True)

(array([0, 1, 2]), array([420, 463, 717]))

In [145]:
# y_val_pred = best_model.predict(X_val_split)
# Evaluate the model
for threshold in [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:
    print(f"Threshold: {threshold}")
    y_val_pred = best_model.predict_proba((X_val_split))
    y_val_pred = get_class_index(y_val_pred, threshold=threshold)
    accuracy = accuracy_score(y_val_split, y_val_pred)
    precision = precision_score(y_val_split, y_val_pred, average='weighted')
    recall = recall_score(y_val_split, y_val_pred, average='weighted')
    f1 = f1_score(y_val_split, y_val_pred, average='weighted')
    report = classification_report(y_val_split, y_val_pred)
    error = calculate_custom_error(y_val_split, y_val_pred)

    print(f"Error: {error}")
    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1}")
    print("Classification Report:")
    print(report)


Threshold: 0.1
Error: 0.879375
Accuracy: 0.4625
Precision: 0.4421656394029846
Recall: 0.4625
F1 Score: 0.4494011258296336
Classification Report:
              precision    recall  f1-score   support

           0       0.45      0.41      0.43       622
           1       0.15      0.10      0.12       216
           2       0.52      0.61      0.56       762

    accuracy                           0.46      1600
   macro avg       0.37      0.37      0.37      1600
weighted avg       0.44      0.46      0.45      1600

Threshold: 0.2
Error: 0.879375
Accuracy: 0.4625
Precision: 0.4421656394029846
Recall: 0.4625
F1 Score: 0.4494011258296336
Classification Report:
              precision    recall  f1-score   support

           0       0.45      0.41      0.43       622
           1       0.15      0.10      0.12       216
           2       0.52      0.61      0.56       762

    accuracy                           0.46      1600
   macro avg       0.37      0.37      0.37      1600
wei

In [73]:
cm = confusion_matrix(y_val_split, y_val_pred)
print(cm)

[[275  14 333]
 [ 81   6 129]
 [250  11 501]]


In [181]:
y_test_pred = best_model.predict_proba(X_test_scaled)
y_test_pred = get_class_index(y_test_pred, threshold=0.6) - 1

In [182]:
# y_test_pred = grid_search.predict(X_test_scaled) - 1
np.savetxt('predictions.txt', y_test_pred, newline='\n', fmt='%d')

In [183]:
print(np.unique(y_test_pred, return_counts=True))

(array([-1,  0,  1]), array([ 363, 1030,  607]))


In [158]:
np.unique(y_train, return_counts=True)

(array([-1,  0,  1]), array([3096, 1136, 3768]))

In [191]:
import warnings
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
import numpy as np
from pytorch_tabnet.tab_model import TabNetClassifier
import joblib

# Suppress specific warnings
warnings.filterwarnings("ignore", category=FutureWarning)

# Load Data
training_data = pd.read_csv('../data/training_data.csv', delimiter=';')
test_data = pd.read_csv('../data/test_data_no_target.csv', delimiter=';')

# Convert Numerical Columns from strings to floats
def convert_to_float(df):
    for col in df.columns:
        if df[col].dtype == 'object' and col not in ['Group', 'Class']:
            df[col] = df[col].str.replace(',', '.').astype(float)
    return df

training_data = convert_to_float(training_data)
test_data = convert_to_float(test_data)

# One-Hot Encoding for the 'Group' column
training_data = pd.get_dummies(training_data, columns=['Group'])
test_data = pd.get_dummies(test_data, columns=['Group'])

# Handle Missing Values using Median Imputation
training_data.fillna(training_data.median(numeric_only=True), inplace=True)
test_data.fillna(test_data.median(numeric_only=True), inplace=True)


# Ensure the test set has the same columns as the training set
missing_cols = set(training_data.columns) - set(test_data.columns) - {'Class', 'Perform'}
for col in missing_cols:
    test_data[col] = 0
test_data = test_data[training_data.columns.drop(['Class', 'Perform'])]

# Separate features and target
X_train = training_data.drop(columns=['Class', 'Perform'])
y_train = training_data['Class']

# Scale the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(test_data)

# Convert scaled data back to DataFrame for easier manipulation
X_train_scaled = pd.DataFrame(X_train_scaled, columns=X_train.columns)
X_test_scaled = pd.DataFrame(X_test_scaled, columns=X_train.columns)

# Perform Exhaustive Feature Selection
# Split the data into training and validation sets
X_train_split, X_val_split, y_train_split, y_val_split = train_test_split(X_train_scaled, y_train, test_size=0.2, random_state=42)

# Initialize and Train the TabNet Model
tabnet_model = TabNetClassifier()
tabnet_model.fit(
    X_train_split.values, y_train_split.values,
    eval_set=[(X_val_split.values, y_val_split.values)],
    max_epochs=100,
    patience=10,
    batch_size=1024,
    virtual_batch_size=128,
    num_workers=1,
    drop_last=False,
    verbose=10
)

# Evaluate the model
y_val_pred = tabnet_model.predict(X_val_split.values)
accuracy = accuracy_score(y_val_split, y_val_pred)
precision = precision_score(y_val_split, y_val_pred, average='weighted')
recall = recall_score(y_val_split, y_val_pred, average='weighted')
f1 = f1_score(y_val_split, y_val_pred, average='weighted')
report = classification_report(y_val_split, y_val_pred)

print(f'Accuracy: {accuracy}')
print(f'Precision: {precision}')
print(f'Recall: {recall}')
print(f'F1 Score: {f1}')
print("Classification Report:")
print(report)

# Make predictions on the test set
y_test_pred = tabnet_model.predict(X_test_scaled.values)
np.savetxt('predictions_tabnet.txt', y_test_pred, fmt='%d', newline='\n')

# Export model
joblib.dump(tabnet_model, 'tabnet_model.pkl')


