In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
from src.preprocessing import display_missing_values
import matplotlib.pyplot as plt
pd.set_option('display.max_columns', None)
pd.set_option('display.float_format', '{:.2f}'.format)
sns.set_theme(style="ticks", palette="pastel")

from sklearn.preprocessing import PolynomialFeatures
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import mean_absolute_error
from sklearn.ensemble import RandomForestRegressor, HistGradientBoostingRegressor
from sklearn.metrics import mean_absolute_error

In [2]:
from fonctions import Dataset,group_fuel_types
from src.preprocessing import display_missing_values

In [3]:
train=Dataset("data/train.csv")
data_train=train.load_data()
test=Dataset("data/test.csv")
data_test=test.load_data()

In [4]:
from fonctions import Preprocessor,TrainPreprocessor,TestPreprocessor
train_preprocessor=TrainPreprocessor(data_train)
test_preprocessor=TestPreprocessor(data_test)

In [None]:
train_preprocessor.fill_fuel_consumption()
train_preprocessor.fill_electric_range()
train_preprocessor.fill_engine_capacity()
train_preprocessor.fill_electric_consumption()
train_preprocessor.fill_category_type()
train_preprocessor.fill_wheel_base()
train_preprocessor.fill_At_1()
train_preprocessor.fill_At_2()
train_preprocessor.fill_mass()
train_preprocessor.fill_engine_power()
train_preprocessor.last_step()

test_preprocessor.fill_fuel_consumption()
test_preprocessor.fill_electric_range()
test_preprocessor.fill_engine_capacity()
test_preprocessor.fill_electric_consumption()
test_preprocessor.fill_category_type()
test_preprocessor.fill_wheel_base()
test_preprocessor.fill_At_1()
test_preprocessor.fill_At_2()
test_preprocessor.fill_mass()
test_preprocessor.fill_engine_power()
test_preprocessor.last_step()

### Test de non-linéarité des variables quantitatives 

In [None]:
variables_continues=["Fuel consumption ","Electric range (km)","ec (cm3)","z (Wh/km)","W (mm)"]
others=[col for col in data_train.columns if col not in variables_continues]

In [None]:
poly = PolynomialFeatures(degree=2, include_bias=False)

# Application de la transformation aux variables continues
variables_continues_transformees = poly.fit_transform(data_train[variables_continues])

data_train_transforme = pd.DataFrame(variables_continues_transformees, columns=poly.get_feature_names_out(variables_continues))
data_train= pd.concat([data_train[others],data_train_transforme], axis=1)



In [None]:
variables_continues_transformees = poly.fit_transform(data_test[variables_continues])

data_test_transforme = pd.DataFrame(variables_continues_transformees, columns=poly.get_feature_names_out(variables_continues))
data_test= pd.concat([data_test[others.remove('Ewltp (g/km)')],data_train_transforme], axis=1)

In [None]:
data_train = train_preprocessor.encode_that_var("Ct")
data_train = train_preprocessor.encode_that_var("Cr")
data_test = test_preprocessor.encode_that_var("Ct")
data_test = test_preprocessor.encode_that_var("Cr")

In [None]:
def create_conforme(df):
    df['conforme'] = df['Tan'].isna()
    df['conforme'] = df['conforme'].apply(lambda x: 1 if x==False else 0)
    df.drop(columns='Tan', inplace=True)
    pass
def compute_surface(obs):
    max_largeur= max(obs['At1 (mm)'], obs['At2 (mm)'])
    return obs['W (mm)']*obs['At1 (mm)'] if max_largeur == obs['At1 (mm)'] else obs['W (mm)'] * obs['At2 (mm)']

def create_surface(df):
    df['surface']= df.apply(compute_surface, axis=1)
    pass

def group_fuel_types(category: str):
    if category in ['PETROL/ELECTRIC', 'DIESEL/ELECTRIC']:
        return "HYBRID"
    elif category in ['NG-BIOMETHANE', 'HYDROGEN', 'NG','E85']:
        return "BIO-FUEL"
    elif category in ['PETROL','LPG'] :
        return 'PETROL'
    else:
        return category
def create_carburant(df):
    df['carburant']= df['Ft'].apply(group_fuel_types)
    df.drop(columns='Ft',inplace=True)

In [None]:
create_conforme(data_train)
create_surface(data_train)
create_carburant(data_train)

create_conforme(data_test)
create_surface(data_test)
create_carburant(data_test)

In [None]:
data_train = train_preprocessor.encode_that_var("carburant")
data_test = test_preprocessor.encode_that_var("carburant")

In [None]:
drop_this=['VFN', 'Mp', 'Mh', 'Man', 'T', 'Mk', 'Cn','Mt','W (mm)', 'At1 (mm)', 'At2 (mm)','Fm','Erwltp (g/km)']

data_train.drop(columns=drop_this, inplace=True)
data_test.drop(columns=drop_this, inplace=True)

In [None]:
data_train.drop(columns='ID',inplace=True)

In [None]:
list_vars=["Ewltp (g/km)", "m (kg)","ec (cm3)", "ep (KW)", "z (Wh/km)", "Fuel consumption ","Electric range (km)", "surface"]

In [None]:
sns.heatmap(data_train[list_vars].corr(method='spearman'),annot=True)
plt.figure(figsize=(20,20))
plt.show

In [None]:
data_train.drop(columns='z (Wh/km)',inplace=True)
data_test.drop(columns='z (Wh/km)',inplace=True)

### Exploratory Data Analysis

In [None]:
from sklearn.linear_model import LassoCV, ElasticNetCV

In [None]:
train, test = train_test_split(data_train, test_size=0.33, random_state=42)

train.reset_index(drop=True, inplace=True)
test.reset_index(drop=True, inplace=True)

X_train, y_train = train.drop(columns=["Ewltp (g/km)"]), train["Ewltp (g/km)"]
X_test, y_test = test.drop(columns=["Ewltp (g/km)"]), test["Ewltp (g/km)"]

In [None]:
lasso = LassoCV(cv=5,)
lasso.fit(X_train, y_train)

# Affichez les coefficients sélectionnés
selected_features = X_train.columns[lasso.coef_ != 0]
print("Variables sélectionnées par Adaptive LASSO :")
print(selected_features)

In [None]:
elastic_net = ElasticNetCV(cv=5, random_state=42, l1_ratio=0.8)
elastic_net.fit(X_train, y_train)

# Affichez les coefficients sélectionnés
selected_features = X_train.columns[elastic_net.coef_ != 0]
print("Variables sélectionnées par Adaptive LASSO :")
print(selected_features)

In [None]:
import xgboost as xgb

In [None]:
polynome=PolynomialFeatures(degree=3, interaction_only=False)

In [None]:
X_train_polynome=polynome.fit_transform(X_train)
X_test_polynome=polynome.fit_transform(X_test)

In [None]:
import numba
from numba import cuda

In [None]:
@numba.jit
def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)
    return mae

In [None]:
from hyperopt import STATUS_OK, Trials, fmin, hp, tpe

In [None]:
space={'max_depth': hp.quniform("max_depth", 3, 18, 1),
        'gamma': hp.uniform ('gamma', 1,9),
        'reg_alpha' : hp.quniform('reg_alpha', 40,180,1),
        'reg_lambda' : hp.uniform('reg_lambda', 0,1),
        'colsample_bytree' : hp.uniform('colsample_bytree', 0.5,1),
        'min_child_weight' : hp.quniform('min_child_weight', 0, 10, 1),
        'n_estimators': 1000,
        'seed': 0
    }

In [None]:
space_bis={'max_depth': hp.quniform("max_depth", 3, 18, 1),
           'n_estimators': 1500,
           'learning_rate':hp.uniform("learning_rate",0.1,0.30)
           

}

In [None]:
@numba.jit
def objective(space):
    clf=xgb.XGBRegressor(objective="reg:squarederror",
                    n_estimators =space['n_estimators'], max_depth = int(space['max_depth']), gamma = space['gamma'],
                    reg_alpha = int(space['reg_alpha']),min_child_weight=int(space['min_child_weight']),
                    colsample_bytree=int(space['colsample_bytree']), eval_metric="mae")
    
    evaluation = [( X_train, y_train), ( X_test, y_test)]
    
    clf.fit(X_train, y_train,
            eval_set=evaluation,
            verbose=False)
    

    y_pred = clf.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)
    print ("SCORE:", mae)
    return {'loss': mae, 'status': STATUS_OK }

In [None]:
@numba.jit
def objective_bis(space):
    clf=xgb.XGBRegressor(objective="reg:squarederror",
                    n_estimators =space['n_estimators'], max_depth = int(space['max_depth']),max_iter=3000, eval_metric="mae")
    
    evaluation = [( X_train, y_train), ( X_test, y_test)]
    
    clf.fit(X_train, y_train,
            eval_set=evaluation,
            verbose=False)
    

    y_pred = clf.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)
    print ("SCORE:", mae)
    return {'loss': mae, 'status': STATUS_OK }

In [None]:
trials = Trials()

best_hyperparams = fmin(fn = objective_bis,
                        space = space_bis,
                        algo = tpe.suggest,
                        max_evals = 100,
                        trials = trials)

In [None]:
@numba.jit
def train_xgboost(n_estimators=1000 ,objective='reg:squarederror',max_depth=10):
    xgboost_model=xgb.XGBRegressor(n_estimators=n_estimators ,objective=objective,max_depth=max_depth,subsample=0.75,eval_metric='mae')
    xgboost_model.fit(X_train, y_train, )
    return xgboost_model

In [None]:
xgboost_model=train_xgboost()

In [None]:
mae=evaluate_model(xgboost_model,X_test,y_test)

In [None]:
mae

In [None]:
xgboost_model = xgb.XGBRegressor(n_estimators=10000, objective='reg:squarederror', random_state=0,max_depth=10)

# Entraînez le modèle sur les données d'entraînement
xgboost_model.fit(X_train, y_train)

# Prédisez sur les données de test
y_pred = xgboost_model.predict(X_test)

# Évaluez le modèle en utilisant la MAE (Mean Absolute Error)
mae = mean_absolute_error(y_test, y_pred)
print("Mean Absolute Error:", mae)

In [None]:
@numba.jit(nopython=True)
def train_random_forest(n_estimators=1000,max_samples=10000 ,criterion='absolute_error',max_depth=10):
    rf_model=RandomForestRegressor(n_estimators=n_estimators, max_samples=max_samples ,criterion=criterion,max_depth=max_depth)
    rf_model.fit(X_train, y_train)
    return rf_model

In [None]:
rf_model=train_random_forest(n_estimators=1000, max_samples=10000,criterion='absolute_error',max_depth=10)

mae=evaluate_model(rf_model,X_test,y_test)
print("Mean Absolute Error:", mae)

In [None]:
@numba.jit
def train_hist_gradient_boosting(loss="absolute_error",max_iter=3000, learning_rate=0.22,max_depth=10):
    hgbgb_model=HistGradientBoostingRegressor(loss=loss,max_iter=max_iter, learning_rate=learning_rate, max_depth=max_depth,interaction_cst="pairwise")
    hgbgb_model.fit(X_train,y_train)
    return hgbgb_model

In [None]:
hgbgb_model=train_hist_gradient_boosting()

mae=evaluate_model(hgbgb_model,X_test,y_test)
print(f"Mean Absolute Error : {mae}")

In [None]:
hgbgb_model.n_iter_

In [None]:
from sklearn.ensemble import AdaBoostRegressor
from sklearn.tree import DecisionTreeRegressor


In [None]:
# Créez un modèle de régression de base (par exemple, un arbre de régression)
@numba.jit
def train_adaboost_regressor(max_depth=7,n_estimators=1000, learning_rate=.2, random_state=42) -> AdaBoostRegressor:
    base_regressor = DecisionTreeRegressor(max_depth=max_depth)

    # Créez un modèle AdaBoostRegressor en utilisant le modèle de base
    adaboost_regressor = AdaBoostRegressor(base_regressor, n_estimators=n_estimators, learning_rate=learning_rate, random_state=42)

    # Entraînez le modèle AdaBoost sur l'ensemble d'entraînement
    adaboost_regressor.fit(X_train, y_train)
    return adaboost_regressor


In [None]:
adaboost_model=train_adaboost_regressor()

mae=evaluate_model(adaboost_model,X_test,y_test)
print(f"Mean Absolute Error : {mae}")

In [None]:
X_true_test=data_test.drop(columns=['ID'])

In [None]:
data_test["ID"]=data_test["ID"].apply(int)

In [None]:
data_test["Ewltp (g/km)"] = xgboost_model.predict(X_true_test)
data_test[["ID","Ewltp (g/km)"]].to_csv("data/xgboost_results3.csv", index=False)

In [None]:
from sklearn.ensemble import HistGradientBoostingRegressor
hgboost_model=HistGradientBoostingRegressor(max_iter=1000,learning_rate=0.23,max_depth=7,loss="absolute_error",random_state=42,l2_regularization=0.5, validation_fraction=0.2)

In [None]:
# Entraînez le modèle sur les données d'entraînement
hgboost_model.fit(X_train, y_train)

# Prédisez sur les données de test
y_pred = hgboost_model.predict(X_test)

# Évaluez le modèle en utilisant la MAE (Mean Absolute Error)
mae = mean_absolute_error(y_test, y_pred)
print("Mean Absolute Error:", mae)

In [None]:
from sklearn.tree import DecisionTreeRegressor

In [None]:
dtregressor_model=DecisionTreeRegressor(criterion='absolute_error')

In [None]:
dtregressor_model.fit(X_train, y_train)

# Prédisez sur les données de test
y_pred = dtregressor_model.predict(X_test)

# Évaluez le modèle en utilisant la MAE (Mean Absolute Error)
mae = mean_absolute_error(y_test, y_pred)
print("Mean Absolute Error:", mae)

In [None]:
import lightgbm as lgb

In [None]:

# Créez un ensemble de données LightGBM
train_data = lgb.Dataset(X_train, label=y_train)

# Définissez les paramètres du modèle
params = {
    "objective": "regression",  # Régression
    "metric": "mae",  # Métrique d'erreur : Mean Absolute Error
    "boosting_type": "gbdt",  # Type de boosting (Gradient Boosting Decision Tree)
    "num_leaves": 31,  # Nombre maximum de feuilles dans un arbre
    "learning_rate": 0.1,
    "feature_fraction": 0.9,
    "bagging_fraction": 0.8,
    "bagging_freq": 10,
    "verbose": -1
}

# Entraînez le modèle
num_round = 1000  # Nombre d'itérations (vous pouvez ajuster selon vos besoins)
bst = lgb.train(params, train_data, num_round)

# Faites des prédictions sur l'ensemble de test
y_pred = bst.predict(X_test, num_iteration=bst.best_iteration)

# Évaluez le modèle
mae = mean_absolute_error(y_test, y_pred)
print(f"Mean Absolute Error (mae): {mae}")

