In [1]:
from implementations import *
from helpers import *
import numpy as np
import matplotlib.pyplot as plt

## Load the data ###

In [2]:
data_path = './dataset_to_release/'
train_data_path = "./dataset_to_release/x_train.csv"
test_data_path = "./dataset_to_release/x_test.csv"

x_train, x_test, y_train, train_ids, test_ids =load_csv_data_all(data_path, sub_sample=False)

# Change the label -1 by 0
y_train = np.where(y_train == -1, 0, y_train)

## Data Preprocessing

In [3]:
# Handling missing values

def replace_nan_by_mean(data):
    ''' function that handels the missing values by replacing them with the column means'''
    nan_indices = np.isnan(data)
    column_means = np.nanmean(data, axis=0)
    data[nan_indices] = np.take(column_means, np.where(nan_indices)[1])
    return data

data_train = replace_nan_by_mean(x_train)

In [4]:
# Data filtering

features_to_keep = ["_AGE80", "_AGE65YR", "_AGEG5YR", "_AGE_G", "_AIDTST3", "_ASTHMS1", "_BMI5", "_BMI5CAT",
                    "_CASTHM1", "_CHLDCNT", "_CHOLCHK", "_DRDXAR1", "_DRNKWEK", "_FLSHOT6", "_FRT16", 
                    "_FRTLT1", "_FRTRESP", "_FRUITEX", "_FRUTSUM", "_HCVU651", "_LMTACT1", "_LMTSCL1", 
                    "_LMTWRK1", "_LTASTH1", "_MINAC11", "_MINAC21", "_MISFRTN", "_MISVEGN", "_PA30021", 
                    "_PA150R2", "_PACAT1", "_PAINDX1", "_PASTAE1", "_PASTRNG", "_PNEUMO2", "_RFBING5", 
                    "_RFBMI5", "_RFCHOL", "_RFDRHV5", "_RFHLTH", "_RFHYPE5", "_RFSMOK3", "_SMOKER3", 
                    "_TOTINDA", "_VEG23", "_VEGESUM", "_VEGETEX", "_VEGLT1", "_VEGRESP"]

def filtering(data,data_path):
    columns = extract_first_line(data_path).split(',')
    columns.pop(0)
    filtered_columns = [col for col in columns if col in features_to_keep]
    indices_to_keep = [columns.index(c) for c in filtered_columns]
    data_f = data[:, indices_to_keep]
    return(data_f)

data_train_filtered=filtering(data_train, train_data_path)

In [5]:
# Standardization of the data
def standardize(data):
    small_value = 1*10**(-9)
    mean = np.mean(data, axis=0)
    std = np.std(data, axis=0)+small_value
    return((data - mean) / (std))

data_train_standard = standardize(data_train_filtered)

In [6]:
# Feature augmentation
def feature_expansion(data, degree):
    augmented_features = []
    for i in range(data.shape[1]):
        feature = data[:,i]
        augmented_feature = build_poly(feature, degree)
        augmented_features.append(augmented_feature)

    # Stack the augmented features horizontally
    augmented_data = np.hstack(augmented_features)
    return(augmented_data)

In [7]:
def apply_model(test, model):
    pred = (sigmoid(test.dot(model))>=0.14).astype(int)
    return(pred)

In [8]:
# Undersampling to balance the data 

def undersample(X, y, class_):
    """
    Undersample two arrays X and Y according to a class 'class_'.

    Parameters:
    X (array) : array of training data
    Y (array) : array of training data

    Returns:
    X_undersampled (array): Undersampled array of the concerned class
    Y_undersampled (array): Undersampled array of the concerned class
    """
    indices = np.where(y == class_)[0]
    no_indices = np.where(y != class_)[0]
    number_no_indices = len(no_indices)
    undersample_indices = np.random.choice(indices, number_no_indices, replace=False)
    
    keep_indices = np.concatenate([undersample_indices, no_indices])
    X_undersampled = X[keep_indices]
    y_undersampled = y[keep_indices]
    
    return X_undersampled, y_undersampled

x_undersampled, y_undersampled =undersample(data_train_standard, y_train, 0)

## Validation metrics

In [9]:
def compute_f1_score(true_labels, predicted_labels):
    """
    Computes the F1 score for a classification model using NumPy.

    Parameters:
    true_labels (numpy.ndarray): True labels for the data.
    predicted_labels (numpy.ndarray): Predicted labels from the model.

    Returns:
    f1 (float): The F1 score.
    """
    true_positive = np.sum(np.logical_and(true_labels == 1, predicted_labels == 1))
    false_positive = np.sum(np.logical_and(true_labels == 0, predicted_labels == 1))
    false_negative = np.sum(np.logical_and(true_labels == 1, predicted_labels == 0))
    
    precision = true_positive / (true_positive + false_positive)
    recall = true_positive / (true_positive + false_negative)
    
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    return f1

# Compute the accuracy of predictions

def compute_accuracy(pred, label):
    correct_predictions = np.sum((pred == label))
    total_samples = len(label)
    accuracy = correct_predictions / total_samples
    return(accuracy)

## Cross-Validation

In [10]:
def build_k_indices(y, k_fold, seed):
    """build k indices for k-fold.

    Args:
        y:      shape=(N,)
        k_fold: K in K-fold, i.e. the fold num
        seed:   the random seed

    Returns:
        A 2D array of shape=(k_fold, N/k_fold) that indicates the data indices for each fold

    """
    num_row = y.shape[0]
    interval = int(num_row / k_fold)
    np.random.seed(seed)
    indices = np.random.permutation(num_row)
    k_indices = [indices[k * interval : (k + 1) * interval] for k in range(k_fold)]
    
    return np.array(k_indices)

In [14]:
def cross_validation(y, x, k_indices, k, lambda_, degree):
    """return the loss of ridge regression for a fold corresponding to k_indices

    Args:
        y:          shape=(N,)
        x:          shape=(N,)
        k_indices:  2D array returned by build_k_indices()
        k:          scalar, the k-th fold (N.B.: not to confused with k_fold which is the fold nums)
        lambda_:    scalar, cf. ridge_regression()
        degree:     scalar, cf. build_poly()

    Returns:
        train and test root mean square errors rmse = sqrt(2 mse)

    """

    # ***************************************************
    # get k'th subgroup in test, others in train:
    train_idx = np.reshape(k_indices[[i for i in range(len(k_indices)) if i != k]], -1)
    test_idx = k_indices[k]

    x_train = x[train_idx, :]
    y_train = y[train_idx]
    x_test = x[test_idx, :]
    y_test = y[test_idx]

    y_tr = np.expand_dims(y_train, 1)
    y_te = np.expand_dims(y_test, 1)

    y_tr = np.where(y_tr == -1, 0, y_tr)
    y_te = np.where(y_te == -1, 0, y_te)

    max_iters = 1000
    gamma = 0.5

    # ***************************************************
    # form data with polynomial degree :

    train_data = feature_expansion(x_train, degree)
    test_data = feature_expansion(x_test, degree)
    train_data = standardize(train_data)
    test_data = standardize(test_data)
    # ***************************************************
    # build tx :
    tx_tr = np.c_[np.ones((y_train.shape[0], 1)), train_data]
    tx_te = np.c_[np.ones((test_data.shape[0], 1)), test_data]
    initial_w = np.zeros((tx_tr.shape[1], 1))

    # reg logistic regression :
    w = reg_logistic_regression(y_tr, tx_tr, lambda_, initial_w, max_iters, gamma)[0]
    y_pred = apply_model(tx_te, w)
    # calculate f1 score on test :
    f1_te = compute_f1_score(y_te, y_pred)

    return f1_te

In [16]:
def cross_validation_demo(degree, k_fold, lambdas):
    """cross validation over regularisation parameter lambda.

    Args:
        degree: integer, degree of the polynomial expansion
        k_fold: integer, the number of folds
        lambdas: shape = (p, ) where p is the number of values of lambda to test
    Returns:
        best_lambda: scalar, value of the best lambda
        best_rmse: scalar, the associated root mean squared error for the best lambda
    """

    seed = 12
    k_fold = k_fold  # Removed the duplicate k_fold assignment
    k_indices = build_k_indices(y_train, k_fold, seed)
    f1_score = np.zeros((len(degree), len(lambdas)))
    
    for i in range(len(degree)):
        d = degree[i]
        for j in range(len(lambdas)):
            lambda_ = lambdas[j]
            cross_val = [cross_validation(y_train, data_train_filtered_2, k_indices, k, lambda_, d) for k in range(k_fold)]
            f1 = np.mean(cross_val)
            f1_score[i, j] = f1
            
    best_degree = degree[np.unravel_index(np.argmax(f1_score, axis=None), f1_score.shape)[0]]
    best_lambda = lambdas[np.unravel_index(np.argmax(f1_score, axis=None), f1_score.shape)[1]]
    best_f1 = np.max(f1_score)
    
    return best_degree, best_f1, best_lambda, f1_score

# best_degree, best_f1, best_lambda, f1_score = cross_validation_demo(np.array([1]).astype(int), 4, np.array([0]))

## Training

In [17]:
def split_data(x, y, ratio, seed=1):
    """
    Split the dataset based on the split ratio. If the ratio is 0.8,
    you will have 80% of your dataset dedicated to training,
    and the rest is dedicated to testing. If the ratio times the number of samples is not a whole number,
    you can use np.floor. Also, check the documentation for np.random.permutation;
    it could be useful.

    Args:
        x: numpy array of shape (N,), N is the number of samples.
        y: numpy array of shape (N,).
        ratio: scalar in [0,1]
        seed: integer.

    Returns:
        x_tr: numpy array containing the train data.
        x_te: numpy array containing the test data.
        y_tr: numpy array containing the train labels.
        y_te: numpy array containing the test labels.
    """
    N = int(ratio * len(x))
    np.random.seed(seed)
    shuffled_data = np.random.permutation(x)
    np.random.seed(seed)
    shuffled_labels = np.random.permutation(y)
    x_tr = shuffled_data[:N]  # train data
    x_te = shuffled_data[N:]  # test data
    y_tr = shuffled_labels[:N]  # train labels
    y_te = shuffled_labels[N:]  # test labels

    return x_tr, x_te, y_tr, y_te

x_tr, x_te, y_tr, y_te = split_data(data_train_standard, y_train, ratio=0.8)
y_tr = np.expand_dims(y_tr, 1)

In [18]:
max_iters = 5000
gamma = 0.5

# Build tx
tx_tr = np.c_[np.ones((y_tr.shape[0], 1)), x_tr]
initial_w = np.zeros((tx_tr.shape[1], 1))

# Binary classification using logistic regression
w, loss = logistic_regression(y_tr, tx_tr, initial_w, max_iters, gamma = 0.5)

# Regularized binary classification using logistic regression
# lambda_ = 10e-4
# w_reg,loss_reg = reg_logistic_regression(y_tr, tx_tr, lambda_, initial_w, max_iters, gamma)

Current iteration=0, loss=0.6035724632042655
Current iteration=100, loss=0.25252331440624926
Current iteration=200, loss=0.25092710997432416
Current iteration=300, loss=0.2506468243763326
Current iteration=400, loss=0.25056331447293845
Current iteration=500, loss=0.2505249162600111
Current iteration=600, loss=0.2504997167097189
Current iteration=700, loss=0.25047978984147706
Current iteration=800, loss=0.25046284797009216
Current iteration=900, loss=0.2504480543493292
Current iteration=1000, loss=0.2504349871295834
Current iteration=1100, loss=0.2504233678584978
Current iteration=1200, loss=0.2504129842565934
Current iteration=1300, loss=0.25040366442065337
Current iteration=1400, loss=0.2503952656280911
Current iteration=1500, loss=0.2503876679328799
Current iteration=1600, loss=0.25038076976501
Current iteration=1700, loss=0.25037448462413575
Current iteration=1800, loss=0.25036873849400937
Current iteration=1900, loss=0.2503634677794887
Current iteration=2000, loss=0.250358617639573

## Compute accuracy and F1-score

In [23]:
tx_te = np.c_[np.ones((x_te.shape[0], 1)), x_te]
y_te = np.expand_dims(y_te, 1)
y_pred = tx_te.dot(w)

# Compute accuracy of training
compute_accuracy(y_te,y_pred)

# Compute F1-score of training
compute_f1_score(y_te, y_pred)

0.0

## Test

In [21]:
# Preprocessing steps

xt = replace_nan_by_mean(x_test)
xt_filtered = filtering(xt, test_data_path)
xt_standardized = standardize(xt_filtered)
xtest = np.c_[np.ones((xt.shape[0], 1)), xt_standardized]

In [22]:
predictions = apply_model(xtest, w)
predictions = np.where(predictions==0,-1, predictions)

create_csv_submission(test_ids, predictions, 'predictions_name.csv')