In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import Ridge
from sklearn import preprocessing
from scipy.spatial.distance import cdist
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.model_selection import GridSearchCV, cross_val_score, RandomizedSearchCV,train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.decomposition import PCA
from torch.utils.data import TensorDataset, DataLoader
from scipy.stats import uniform, randint, loguniform


**Preprocessing**

**Linear Model**

In [None]:
# Retrieve the data
data = pd.read_csv("./train.csv")
subspures = pd.read_csv("./substances.csv")
pure_heroin = subspures[(subspures['substance'] == 'heroin (white)') | (subspures["substance"]=="heroin (brown)")]

# Create new features
distances = cdist(data.iloc[:,6:].to_numpy(), pure_heroin.iloc[:, 1:].to_numpy(), metric = 'euclidean')
dist_her = pd.DataFrame(distances, index = data.iloc[:,6:].index, columns=pure_heroin.iloc[:,1:].index)
data_new_features2 = data.iloc[:, 6:].values.dot(subspures.iloc[:,1:].values.T)


X = data_new_features2
y = data['PURITY']
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.15, random_state=42)


model = Ridge() #ridge regressors model
param_grid = {'alpha': np.logspace(-7, 0, 100)} #hyperparamater alpha


grid_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=5, scoring='neg_mean_squared_error')
grid_search.fit(X, y)


mach1 = grid_search.best_estimator_ #best model
mach1.fit(X_train, y_train) #apply the best model to the data
predictions = mach1.predict(X_valid)


plt.figure()
plt.scatter(np.logspace(-7, 0, 100),        
            np.sqrt(-grid_search.cv_results_['mean_test_score']))
plt.xlabel("lambda")
plt.ylabel("RMSE")
plt.xscale("log")
plt.show()


y_pred1 = mach1.predict(X_train)
y_pred2 = mach1.predict(X_valid)
train_score = np.mean(np.abs(y_pred1-y_train<=5))
test_score = np.mean(np.abs(y_pred2-y_valid<=5))
print("Train score ridge :", train_score)
print("Test score ridge :", test_score)

**Non-linear model**

In [None]:
data_train = pd.read_csv("./train.csv")
data_test = pd.read_csv("./test.csv")
X_train = data_train.iloc[:, 6:]
X_test = data_test.iloc[:, 5:]

y = data_train["PURITY"]/100

pca = PCA(n_components=16)
X_pca = pd.DataFrame(pca.fit_transform(X_train))
print(pca.explained_variance_ratio_) # to find the number of components to keep
X_new = pd.concat([X_train, X_pca], axis=1)
X_new.columns = X_new.columns.astype(str)
standardizer = StandardScaler()
X_st = standardizer.fit_transform(X_new)

X_train, X_valid, y_train, y_valid = train_test_split(X_st, y, test_size=0.2, random_state=42)

# Convert to tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32).reshape(-1, 1)
X_valid_tensor = torch.tensor(X_valid, dtype=torch.float32)
y_valid_tensor = torch.tensor(y_valid.values, dtype=torch.float32).reshape(-1, 1)

X_pca = pd.DataFrame(pca.transform(X_test))
X_new = pd.concat([X_test, X_pca], axis=1)
X_new.columns = X_new.columns.astype(str)

X_st = standardizer.transform(X_new)
X_test_tensor = torch.tensor(X_st, dtype=torch.float32)

# Define the FeedForwardNN class
class FeedForwardNN(nn.Module):

    def __init__(self, input_size, lin_layer_sizes,
                 outpout_size, lin_layer_dropouts, activation):
        
        super().__init__()
        
        if activation == 0:
            self.activation = nn.ReLU()
        elif activation == 1:
            self.activation = nn.SiLU()
        elif activation == 2:
            self.activation = nn.Tanh()
        elif activation == 3:
            self.activation = nn.LeakyReLU()

        # Linear Layers
        first_lin_layer = nn.Linear(input_size, lin_layer_sizes[0])
    
        self.lin_layers = nn.ModuleList([first_lin_layer] + [nn.Linear(lin_layer_sizes[i], lin_layer_sizes[i + 1]) for i in range(len(lin_layer_sizes) - 1)])
      
        # Output Layer
        self.outpout_layer = nn.Linear(lin_layer_sizes[-1], outpout_size)
    
        # Dropout Layers
        self.dropout_layers = nn.ModuleList([nn.Dropout(rate) for rate,size in zip(lin_layer_dropouts,lin_layer_sizes)])

    def forward(self, x):
  
        for lin_layer, dropout_layer in zip(self.lin_layers, self.dropout_layers):

            x = lin_layer(x)
        
            x = self.activation(x)

            x = dropout_layer(x)
      
        x = self.outpout_layer(x)
        x = nn.Sigmoid()(x)
    
        return x
    
# Define the NeuralNetRegressor
class NeuralNetRegressor(BaseEstimator, RegressorMixin):
    def __init__(self, input_size, random_state, eta=0.001, max_epochs=100, batch=10, lin_layer_sizes = [50, 50],
                 outpout_size = 1, lin_layer_dropouts = [0.4, 0.4], activation = 0):
        self.input_size = input_size
        self.random_state = random_state
        self.eta = eta
        self.max_epochs = max_epochs
        self.batch = batch
        self.lin_layer_sizes = lin_layer_sizes
        self.outpout_size = outpout_size
        self.lin_layer_dropouts = lin_layer_dropouts
        self.activation = activation
        self.model = FeedForwardNN(input_size, lin_layer_sizes,
                 outpout_size, lin_layer_dropouts, activation)
        self.criterion = nn.L1Loss()
    
    def fit(self, X, y, do_print=False):
        optimizer = optim.Adam(self.model.parameters(), lr=self.eta)
        X_tensor = torch.tensor(X).clone().detach().float()
        y_tensor = torch.tensor(y).clone().detach().float()
        dataset = TensorDataset(X_tensor, y_tensor)
        dataloader = DataLoader(dataset, batch_size=self.batch, shuffle=True)
        self.model.train()
        
        # Training loop
        for epoch in range(self.max_epochs):
            epoch_loss = 0.0
            for batch_X, batch_y in dataloader:
                optimizer.zero_grad()  # Reset gradients
                outputs = self.model(batch_X)  # Forward pass
                loss = self.criterion(outputs, batch_y)  # Compute loss
                loss.backward()  # Backward pass
                optimizer.step()  # Update parameters
                epoch_loss += loss.item()
            if do_print:
                print(f"Epoch {epoch+1}/{self.max_epochs}, Loss: {epoch_loss / len(dataloader)}")
        return self
    
    def predict(self, X):
        self.model.eval()
        X_tensor = torch.tensor(X, dtype=torch.float32)
        with torch.no_grad():
            outputs = self.model(X_tensor).flatten()
        return outputs.numpy()
    
    def parameters(self):
        return self.model.parameters()
    
# Initialize the model
seed = 43
np.random.seed(seed)
torch.manual_seed(seed)

input_size = X_train_tensor.shape[1]
net = NeuralNetRegressor(input_size=input_size, random_state=43)

# Define the parameters for GridSearch
params_dist = {
    'eta': loguniform(1e-4, 1e-1),
    'max_epochs': randint(50, 150),
    'batch': randint(32, 70),
    'lin_layer_sizes': [[randint.rvs(32, 128) for _ in range(randint.rvs(1, 4))]],  # Taille de 1 à 4 couches, entre 32 et 128 neurones par couche
    'lin_layer_dropouts': [[uniform.rvs(0, 0.5) for _ in range(randint.rvs(1, 4))]],  # Dropout entre 0 et 0.5 pour chaque couche
    'activation': randint(0, 4),
}

# Initialize RandomizedSearchCV
random_search = RandomizedSearchCV(net, params_dist, refit=True, cv=5, random_state=43, scoring='neg_mean_squared_error', verbose=0)

# Train the model with the best hyperparameters
random_grid_result = random_search.fit(X_train_tensor, y_train_tensor)
nouveau_model = random_grid_result.best_estimator_

print("Best MSE: %f using %s" % (random_grid_result.best_score_, random_grid_result.best_params_))

y_pred = nouveau_model.predict(X_valid_tensor)
y_pred_train = nouveau_model.predict(X_train_tensor)
predictions = nouveau_model.predict(X_test_tensor)*100

# Verify that there are no NaN values in the predictions
print("y_pred contains NaN:", np.isnan(y_pred).any())

# Calculater the loss
mse = np.mean(((y_pred - y_valid)*100)**2)
print("MSE :", mse)

# Calculate the t_score
train_score = np.mean(np.abs((y_pred_train-y_train)*100<=5))
test_score = np.mean(np.abs((y_pred-y_valid)*100<=5))
print("t_score test :", train_score)
print("t_score train :", test_score)

ids = np.arange(1, len(predictions) + 1)

# Create a DataFrame for the output
output_df = pd.DataFrame({
    'ID': ids,
    'PURITY': predictions
})

# Save the DataFrame to a CSV file
output_df.to_csv('predictions.csv', index=False) 