In [1]:
import pandas as pd
import numpy as np
import io
# Compute the correlation matrix
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.notebook import tqdm

from sklearn.impute import SimpleImputer
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LinearRegression, Lasso, Ridge, ElasticNet
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import cross_val_score

import warnings 

warnings.filterwarnings('ignore') 

In [2]:
# Lire les données à partir du fichier CSV
data_train = pd.read_csv(r'train.csv')
data_test = pd.read_csv(r'test.csv')

## Traitements des données :

In [None]:
def df_analyse(df, columns, name_df):
    """
    Initial analysis on the DataFrame.

    Parameters
    ----------
    Args:
        df (pandas.DataFrame): DataFrame to analyze.
        columns (list): Dataframe keys in list format.
        name_df (str): DataFrame name.

    Returns:
        None.
        Print the initial analysis on the DataFrame.
    """

    # Calculating the memory usage based on dataframe.info()
    buf = io.StringIO()
    df.info(buf=buf)
    memory_usage = buf.getvalue().split('\n')[-2]

    if df.empty:
        print("The", name_df, "dataset is empty. Please verify the file.")
    else:
        # identifying empty columns
        empty_cols = [col for col in df.columns if df[col].isna().all()]
        #identifying full duplicates rows
        df_rows_duplicates = df[df.duplicated()]

        # Creating a dataset based on Type object and records by columns
        type_cols = df.dtypes.apply(lambda x: x.name).to_dict()
        df_resume = pd.DataFrame(list(type_cols.items()), columns = ["Name", "Type"])
        df_resume["Records"] = list(df.count())
        df_resume["% of NaN"] = list(round((df.isnull().sum(axis = 0))/len(df),5)*100)
        df_resume["Unique"] = list(df.nunique())


        print("\nInitial Analysis of", name_df, "dataset")
        print("--------------------------------------------------------------------------")
        print("- Dataset shape:                 ", df.shape[0], "rows and", df.shape[1], "columns")
        print("- Total of NaN values:           ", df.isna().sum().sum())
        #print("- Percentage of NaN:             ", round((df.isna().sum().sum() / prod(df.shape)) * 100, 2), "%")
        print("- Total of full duplicates rows: ", df_rows_duplicates.shape[0])
        print("- Total of empty rows:           ", df.shape[0] - df.dropna(axis="rows", how="all").shape[0]) if df.dropna(axis="rows", how="all").shape[0] < df.shape[0] else \
                    print("- Total of empty rows:            0")
        print("- Total of empty columns:        ", len(empty_cols))
        print("  + The empty column is:         ", empty_cols) if len(empty_cols) == 1 else \
                    print("  + The empty column are:         ", empty_cols) if len(empty_cols) >= 1 else None

        print("\n- The key(s):", columns, "is not present multiple times in the dataframe.\n  It CAN be used as a primary key.") if df.size == df.drop_duplicates(columns).size else \
                    print("\n- The key(s):", columns, "is present multiple times in the dataframe.\n  It CANNOT be used as a primary key.")

        print("\n- Type object and records by columns         (",memory_usage,")")
        print("--------------------------------------------------------------------------")
        print(df_resume.sort_values("Records", ascending=False))

# Analyse df
df_analyse(data_train, ["Ewltp (g/km)"], "df")

In [None]:
data_train_des = (data_train.describe()).T
data_train_des

In [3]:
def Drop_Columns(column_to_drop, data):
    """
    Drop columns from the DataFrame.

    Parameters
    ----------
    Args:
        column_to_drop (list): List of columns to drop.
        X_train (pandas.DataFrame): DataFrame to train.
        X_val (pandas.DataFrame): DataFrame to validate.

    Returns:
        None.
        Drop columns from the DataFrame.
    """

    data.drop(column_to_drop, axis=1, inplace=True)

    return data

In [4]:
data_train["IT"].fillna("Missing", inplace=True)
data_train["Mp"].fillna("Missing", inplace=True)
data_train["Ct"].fillna("Missing", inplace=True)

column_to_drop = ["z (Wh/km)" ,"Electric range (km)", "Enedc (g/km)" ,"Vf" ,"De" ,"Ernedc (g/km)", "MMS", "Cr", "r", "Enedc (g/km)", "Fm", "Status", "Date of registration", "T", "Mk", "Va", "Tan", "Ve", "Cn"]

data_train = Drop_Columns(column_to_drop, data_train)
col_to_keep = ['ec (cm3)' ,'ep (KW)', 'm (kg)','Mt', 'W (mm)', 'At1 (mm)', 'At2 (mm)',  'Erwltp (g/km)','Fuel consumption ',  'ID', 'Ewltp (g/km)']

### Modèlisation :

In [5]:
# Splitting data into features (X) and target variable (y)
#col_to_keep_cat = ['Country', 'Mh', 'Man', 'VFN', 'Mp', 'IT', 'Ct', 'Ft']
#col_to_keep.append( col_to_keep_cat)
#flattened_list = []

#for item in original_list:
 #   if isinstance(item, list):
#        flattened_list.extend(item)
#    else:
#        flattened_list.append(item)
#col_to_keep = flattened_list
X = data_train[col_to_keep]
X = X.drop(columns=['Ewltp (g/km)'])
y = data_train['Ewltp (g/km)']
# Split data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.35, random_state=42)

## Imputation des valeurs manquantes et normalisation :

### Encodage et Normalisation :

In [6]:
def Impute_Missing_Value(column_to_impute, X, strategy='median'):
    """
    Impute missing values in the DataFrame.

    Parameters
    ----------
    Args:
        column_to_impute (list): List of columns to impute.
        X_train (pandas.DataFrame): DataFrame to train.
        X_val (pandas.DataFrame): DataFrame to validate.

    Returns:
        None.
        Impute missing values in the DataFrame.
    """

    # Create a SimpleImputer with the strategy 'median' and apply it only to the specific column

    imputer = SimpleImputer(strategy=strategy)
    # Fit the imputer to your training data for that specific column
    imputer.fit(X[column_to_impute])
    # Transform the column for both the training and validation datasets
    X[column_to_impute] = imputer.transform(X[column_to_impute])

    return X

def Label_Encoder(column_to_encode, X):
    """
    Encode labels with value between 0 and n_classes-1.

    Parameters
    ----------
    Args:
        column_to_encode (list): List of columns to encode.
        X_train (pandas.DataFrame): DataFrame to train.
        X_val (pandas.DataFrame): DataFrame to validate.

    Returns:
        None.
        Encode labels with value between 0 and n_classes-1.
    """

    # Create a LabelEncoder object and apply it to each column of the DataFrame
    label_encoder = LabelEncoder()
    for col in column_to_encode:
        X[col] = label_encoder.fit_transform(X[col])

    return X

def Standard_Scaler(column_to_scale, X):

    scaler = StandardScaler()
    X[column_to_scale] = scaler.fit_transform(X[column_to_scale])

    return X

In [7]:
X_train_ID = X_train['ID']
X_val_ID = X_val['ID']

del X_train['ID']
del X_val['ID']

In [8]:
column_to_impute_scale = ['ec (cm3)',
 'ep (KW)',
 'm (kg)',
 'Mt',
 'W (mm)',
 'At1 (mm)',
 'At2 (mm)',
 'Erwltp (g/km)',
 'Fuel consumption ']
#column_to_encode = ['Country', 'Mh', 'Man', 'VFN', 'Mp', 'IT', 'Ct', 'Ft']
#column_to_impute_scale =['m (kg)', 'W (mm)', 'ep (KW)', 'At1 (mm)', 'At2 (mm)', 'Mt', 'ec (cm3)', 'Fuel consumption ', 'Erwltp (g/km)']
#column_to_encode =['Country', 'Mh', 'Man', 'VFN', 'Mp', 'IT', 'Ct', 'Ft']


X_train = Impute_Missing_Value(column_to_impute_scale, X_train, strategy='median')
X_val = Impute_Missing_Value(column_to_impute_scale, X_val, strategy='median')
#X_train = Label_Encoder(column_to_encode, X_train)
#X_val = Label_Encoder(column_to_encode, X_val)
X_train = Standard_Scaler(column_to_impute_scale, X_train)
X_val = Standard_Scaler(column_to_impute_scale, X_val)

In [None]:
X_train

In [None]:
corr = X_train.select_dtypes(exclude=['object']).corr()
# Generate a mask for the upper triangle
mask = np.zeros_like(corr, dtype=bool)
mask[np.triu_indices_from(mask)] = True
f, ax = plt.subplots(figsize=(5.5, 4.5))
cmap = sns.color_palette("RdBu_r", 11)
# Draw the heatmap with the mask and correct aspect ratio
_ = sns.heatmap(corr, mask=None, cmap=cmap, vmax=1, center=0,
square=True, linewidths=.5, cbar_kws={"shrink": .5})

# convert correlation to distances
d = 2 * (1 - np.abs(corr))
from sklearn.cluster import AgglomerativeClustering
clustering = AgglomerativeClustering(n_clusters=6, linkage='single', affinity="precomputed").fit(d)
lab=0
clusters = [list(corr.columns[clustering.labels_==lab]) for lab in set(clustering.labels_)]
print(clusters)
reordered = np.concatenate(clusters)

R = corr.loc[reordered, reordered]
f, ax = plt.subplots(figsize=(5.5, 4.5))
# Draw the heatmap with the mask and correct aspect ratio
_ = sns.heatmap(R, mask=None, cmap=cmap, vmax=1, center=0,
square=True, linewidths=.5, cbar_kws={"shrink": .5})


### Régression linéaire :

In [None]:
# Create and train a Linear Regression model
model = LinearRegression()
model.fit(X_train, y_train)

# Make predictions on the validation set
y_pred = model.predict(X_val)

# Evaluate the model
mae = mean_absolute_error(y_val, y_pred)
print(f'Mean Absolute Error on Validation Set: {mae}')

In [None]:
# Create and train a Linear Regression model
model = Ridge()
model.fit(X_train, y_train)

# Make predictions on the validation set
y_pred = model.predict(X_val)

# Evaluate the model
mae = mean_absolute_error(y_val, y_pred)
print(f'Mean Absolute Error on Validation Set: {mae}')

### Decision Tree :

In [9]:
# Create and train a Linear Regression model
model = DecisionTreeRegressor(random_state=42)
model.fit(X_train, y_train)

# Make predictions on the validation set
y_pred = model.predict(X_val)

# Evaluate the model
mae = mean_absolute_error(y_val, y_pred)
print(f'Mean Absolute Error on Validation Set: {mae}')

In [None]:
# Create and train a Linear Regression model
from sklearn.ensemble import RandomForestRegressor

model = RandomForestRegressor()
model.fit(X_train, y_train)

# Make predictions on the validation set
y_pred = model.predict(X_val)

# Evaluate the model
mae = mean_absolute_error(y_val, y_pred)
print(f'Mean Absolute Error on Validation Set: {mae}')

In [12]:
import optuna
from optuna import Trial
import optuna.visualization as ov


def objective(trial: optuna.Trial):
    n_estimators = trial.suggest_int('n_estimators', 50, 250)
    max_depth = trial.suggest_int('max_depth', 1, 20)
    min_samples_split = trial.suggest_float('min_samples_split', 0.0, 1.0)
    min_samples_leaf = trial.suggest_float('min_samples_leaf', 0.1, 0.5)
    max_features = trial.suggest_categorical('max_features', ['auto', 'sqrt'])

    model = RandomForestRegressor(
        n_estimators=n_estimators,
        max_depth=max_depth,
        min_samples_split=min_samples_split,
        min_samples_leaf=min_samples_leaf,
        max_features=max_features
    )

    # Use cross_val_score to calculate MAE (mean absolute error)
    mae = -cross_val_score(model, X_train, y_train, cv=3, scoring='neg_mean_absolute_error').mean()
    return mae

study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=50)
best_params = study.best_params
print("Best Hyperparameters:", best_params)

# Create a RandomForestRegressor with the best hyperparameters
best_model = RandomForestRegressor(
    n_estimators=best_params['n_estimators'],
    max_depth=best_params['max_depth'],
    min_samples_split=best_params['min_samples_split'],
    min_samples_leaf=best_params['min_samples_leaf'],
    max_features=best_params['max_features'],
    criterion=best_params['criterion']
)

# Train the best model on the training data
best_model.fit(X_train, y_train)

# Make predictions on the validation dataset
y_pred = best_model.predict(X_val)

# Calculate the MAE on the validation dataset
mae = mean_absolute_error(y_val, y_pred)
print("Mean Absolute Error on Validation Data:", mae)


# Plot the best hyperparameters
optuna.visualization.plot_param_importances(study)
optuna.visualization.plot_optimization_history(study)


[I 2023-10-27 09:45:31,789] A new study created in memory with name: no-name-1aa17c4b-5dac-4b81-b362-8ca288e217e4
[I 2023-10-27 09:46:39,268] Trial 0 finished with value: 138.4416988053937 and parameters: {'n_estimators': 235, 'max_depth': 5, 'min_samples_split': 0.7765753519976455, 'min_samples_leaf': 0.37145362227371337, 'max_features': 'sqrt'}. Best is trial 0 with value: 138.4416988053937.
[I 2023-10-27 09:47:15,629] Trial 1 finished with value: 138.4492597887468 and parameters: {'n_estimators': 128, 'max_depth': 19, 'min_samples_split': 0.9025875230888355, 'min_samples_leaf': 0.10233559511815185, 'max_features': 'auto'}. Best is trial 0 with value: 138.4416988053937.
[I 2023-10-27 09:48:13,789] Trial 2 finished with value: 138.4468655040149 and parameters: {'n_estimators': 207, 'max_depth': 17, 'min_samples_split': 0.28692170320074073, 'min_samples_leaf': 0.4950949499395654, 'max_features': 'auto'}. Best is trial 0 with value: 138.4416988053937.
[I 2023-10-27 09:49:01,275] Trial 3

KeyboardInterrupt: 

In [None]:
from bayes_opt import BayesianOptimization
from sklearn.tree import DecisionTreeRegressor
from sklearn.model_selection import cross_val_score

# Define the objective function to optimize (in this case, cross-validated MAE)
def objective_function(max_depth, min_samples_split, min_samples_leaf):
    # Convert hyperparameters to integers when necessary
    max_depth = int(max_depth)
    min_samples_split = int(min_samples_split)
    min_samples_leaf = int(min_samples_leaf)

    # Create a DecisionTreeRegressor with the specified hyperparameters
    model = DecisionTreeRegressor(
        max_depth=max_depth,
        min_samples_split=min_samples_split,
        min_samples_leaf=min_samples_leaf
    )

    # Use cross_val_score to calculate the mean absolute error (MAE)
    mae = -cross_val_score(model, X_train, y_train, cv=5, scoring='neg_mean_absolute_error').mean()

    return mae

# Define the hyperparameter search space
pbounds = {
    'max_depth': (1, 32),                  # Depth of the tree
    'min_samples_split': (2, 20),         # Minimum number of samples required to split an internal node
    'min_samples_leaf': (1, 10)           # Minimum number of samples required to be at a leaf node
}

# Create a BayesianOptimization instance
optimizer = BayesianOptimization(
    f=objective_function,
    pbounds=pbounds,
    random_state=42  # Set a random seed for reproducibility
)

# Perform the optimization
optimizer.maximize(init_points=5, n_iter=15, acq='ucb', kappa=2)

# Get the best hyperparameters
best_hyperparameters = optimizer.max
print("Best Hyperparameters:", best_hyperparameters)

In [None]:
import pickle
# Save the model to a file using pickle
with open('RandomForest.pkl', 'wb') as model_file:
    pickle.dump(model, model_file)

In [None]:
import pickle

# Load the model from the file
with open('model.pkl', 'rb') as model_file:
    loaded_model = pickle.load(model_file)

### Production du résultat (prédiction : )

In [None]:
data_test = pd.read_csv('test.csv')

In [None]:
data_test["IT"].fillna("Missing", inplace=True)
data_test["Mp"].fillna("Missing", inplace=True)
data_test["Ct"].fillna("Missing", inplace=True)
column_to_drop = ["z (Wh/km)" ,"Electric range (km)", "Enedc (g/km)" ,"Vf" ,"De" ,"Ernedc (g/km)", "MMS", "Cr", "r", "Enedc (g/km)", "Fm", "Status", "Date of registration", "T", "Mk", "Va", "Tan", "Ve", "Cn"]

data_test = Drop_Columns(column_to_drop, data_test)
data_test = Impute_Missing_Value(column_to_impute_scale, data_test, strategy='median')
#data_test = Label_Encoder(column_to_encode, data_test)
data_test = Standard_Scaler(column_to_impute_scale, data_test)

#column_to_encode = ['Country', 'Mh', 'Man', 'VFN', 'Mp', 'IT', 'Ct', 'Ft']
#column_to_impute_scale =['m (kg)', 'W (mm)', 'ep (KW)', 'At1 (mm)', 'At2 (mm)', 'Mt', 'ec (cm3)', 'Fuel consumption ', 'Erwltp (g/km)']
#column_to_encode =['Country', 'Mh', 'Man', 'VFN', 'Mp', 'IT', 'Ct', 'Ft']

data_test_ID=data_test["ID"]
data_test = Drop_Columns(["Country","VFN", "Mp", "Mh", "Man", "Ct","IT" ,"Ft", "ID"], data_test)
data_test = data_test[X_train.columns]


In [None]:
test_predictions = model.predict(data_test)

In [None]:
# Prepare a submission file with test predictions
submission = pd.DataFrame({'ID': data_test_ID, 'Ewltp (g/km)': test_predictions})
submission.to_csv('submission_352.csv', index=False)
submission