# ARESD - CO2 emissions from the electricity mix

#### This classâ€™s objective is to predict the carbon quality of the electricity generation mix (Carbonised, Normal, Decarbonised) given weather and calendar variables by using different models and comparing their accuracy rates to decide which one is the most reliable one.
#### Throughout this report, we will explore the principles of the Bayes Classifier, kNN, and Decision Tree, aiming to gain insights into their roles in modern data analysis and classification tasks.


## Library import and data upload

In [None]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn 
from tqdm.notebook import tqdm 

In [None]:
#benchmark = pd.read_csv("benchmark.csv")
path_to_data = "/kaggle/input/are-sd-2024-emissions-co2-du-mix-electrique/"
train = pd.read_csv(path_to_data + "train.csv")
test = pd.read_csv(path_to_data + "prev.csv")

## Functions needed to study the models

In [None]:
#Average Class Accuracy
def ARE(prev,true,v = 'MixProdElec'):   #The variable v is the one to predict, here our study concerns the variable "MixProdElec"
    """
    Calculates the precision of our model

    Parameters :
        prev : the predicted labels
        true : the true labels

    Returns :
        float : the accuracy rate
    """
    return (100/len(true))*(np.sum(true==prev))

In [None]:
#Confusion matrix
def confusion_matrix(actual, predicted):
    """
    Generates a confusion matrix

    Parameters:
        actual: the true labels
        predicted: the predicted labels
    
    Returns:
        np.array: the confusion matrix
    """
    labels = np.unique(actual)       #We assume that actual and predicted have the same labels
    num_labels = len(labels)
    conf = np.zeros((num_labels, num_labels))
    
    for a, p in zip(actual, predicted):
        conf[a, p] += 1
    
    return conf

In [None]:
#Split dataset
def split_train_test(X,y,ratio_train=0.8):  #It can also be done randomly, here we decided to choose ourselves
    X_train= X[:int(ratio_train*len(X))]
    X_test= X[int(ratio_train*len(X)):]
    y_train= y[:int(ratio_train*len(y))]
    y_test = y[int(ratio_train*len(y)):]
    return X_train,X_test,y_train,y_test

In [None]:
#Label encoder
def label_encoder(feature_vec):
    """
    Encoding categorical labels to integers (for example for the feature "MixProdElec" : ["Normal", "Carbonne", "Decarbonne"] -> [0,1,2])
    """
    unique_labels = set(feature_vec)
    index = [i for i in range(len(unique_labels))]
    label_to_index = {label: index for (label,index) in zip(unique_labels,index)}
    encoded_labels = [label_to_index[label] for label in feature_vec]
    return np.array(encoded_labels)

In [None]:
#Label decoder
def label_decoder(encoded_labels, original_labels):
    """
    Decodes encoded integer labels back to their original categorical labels.
    """
    # Create a reverse mapping from encoded labels to original labels
    index_to_label = {i: label for i, label in enumerate(original_labels)}
    # Decode the encoded labels back to their original categorical labels
    decoded_labels = np.array([index_to_label[label] for label in encoded_labels])
    return decoded_labels

## 1. Bayes Classifier

In [None]:
#Crossed probability law
def cond_prob_table(data, v1, v2 = "MixProdElec"):
    """
    Calculates the crossed probability law of 2 variables

    Parameters:
        data : the dataset
        v1 : the first variable
        v2 : the second variable
        
    Returns :
        np.array : the conditional probability table
    """
    x = data[v1]
    y = data[v2]
    n_x = len(np.unique(x))
    n_y = len(np.unique(y))
    pxy =np.zeros((n_x,n_y))
    
    for i in range(n_x):
        for j in range(n_y):
            pxy[i,j]= np.mean((data[v1] == np.unique(data[v1])[i])*(data[v2] == np.unique(data[v2])[j]))
    return pxy

In [None]:
#Independence test
def independence_test(pxy):
    """
    Checks whether two variables are independent
    """
    px = np.array([np.sum(pxy,axis =1)])
    py = np.array([np.sum(pxy,axis = 0)])
    pxpy= px.T*py

    for i in range(len(pxy)):
        for j in range(len(pxy[i])):
            if pxy[i,j] != pxpy[i,j]:
                return False
    return True 

In [None]:
#Bayes Classifier
def Bayes_Classif(train,test,v):
    """
    Naive Bayes Implementation
    
    Parameters:
        train : the training dataset (the function will be trained on this dataset)
        test : the test dataset (the function will make predictions on this dataset)
        v : the variable for which the classification is performed
    
    Returns :
        np.array : the predicted classes from the test dataset
    """

    pxy = cond_prob_table(train,v)
    #classes = np.unique(train['MixProdElec'])
    values = np.unique(train[v])
    X = test[v]
    pred = np.empty(len(X))  #creates a new array of shape len(X)

    for i in range(len(X)):
        pred[i] = np.argmax(pxy[np.argmax(values == X.iloc[i]),:])
        
    return pred

## 2. k-Nearest Neighbors

In [None]:
#Modulo distance
def modulo_distance(xTrain, xTest, modulo = 7):
    diff = np.abs(xTrain[:, np.newaxis] - xTest)
    squared_distance = (modulo % (diff +1e-7))**2
    distances = squared_distance
    distances[distances < 0] = 0
    distances = np.sqrt(distances)
    return distances

In [None]:
#Euclidean distance
def euclidean_distance(xTrain, xTest):
    distances = np.sqrt(np.sum((xTrain[:, np.newaxis] - xTest) ** 2, axis=-1))
    distances[distances < 0] = 0
    return distances

In [None]:
#Hamming distance
def hamming_distance(xTrain, xTest):
    """
    Hamming distance is used for comparing two strings of equal length. 
    It calculates the number of positions at which the corresponding symbols are different.
    """
    distances = np.sum(xTrain != xTest, axis=-1)
    return distances

In [None]:
def knn(X_train, X_test, k):
    """
    Find the k-nearest neighbors for each test sample in the batch.
    """
    distances = euclidean_distance(X_test, X_train)
    return np.argsort(distances, axis=1)[:, :k]

In [None]:
#kNN Classification
def knn_classification(X_train, y_train, X_test, k, batch_size=100):
    """
    Make predictions for the test samples using k-nearest neighbors.
    """
    n_test_samples = X_test.shape[0]
    y_pred = np.empty(n_test_samples, dtype=int)

    for i in tqdm(range(0, n_test_samples, batch_size)):
        batch = X_test[i:i+batch_size]
        neighbors = knn(X_train, batch, k)
        y_pred[i:i+batch_size] = np.array([np.argmax(np.bincount(y_train[indices])) for indices in neighbors])

    return y_pred

In [None]:
#Preparing the data
train=data[:int((len(data)*1)*.36505)+1]
test=data[int((len(data)*1)*.63495):]

n=train.shape[0]
X_train = train.sample(n, random_state = 2023).iloc[:,2:]
y_train = train.sample(n, random_state = 2023).iloc[:,0]
nt=test.shape[0]
X_test = test.iloc[:nt,2:]
y_test = test.iloc[:nt,0]

labels = ['Jour', 'Mois', 'Jour','JourFerie','JourFerieType',
 'VacancesZoneA','VacancesZoneB','VacancesZoneC' ]

for i in labels:
    X_train[i] = label_encoder(X_train[i].values)
    X_test[i]  = label_encoder(X_test[i].values)
    
y_train = label_encoder(y_train.values)
y_test = label_encoder(y_test.values)

In [None]:
#Using knn
y_pred = knn_classification(X_train.values.astype(np.float32), y_train.astype(np.int8), X_test.values.astype(np.float32),5)
decoded_y_pred = label_decoder(y_pred, ['Carbonne', 'Decarbonne', 'Normal'])

## 3. Decision Tree

### Gini impurity : measures the probability of incorrectly classifying a randomly chosen element if it were randomly labeled according to the distribution of labels in the subset.

In [None]:
def gini_impurity(labels):
    """
    Calculate the Gini impurity for a set of labels.

    Parameters:
        labels (numpy.ndarray): An array of labels.

    Returns:
        float: The Gini impurity value.
    """
    _, counts = np.unique(labels, return_counts=True)
    probabilities = counts / len(labels)
    return 1 - np.sum(probabilities ** 2)

### Split data

In [None]:
def split_data(data, labels, feature_index, threshold):
    """
    Split the data and labels based on a feature and threshold.

    Args:
        data (numpy.ndarray): The input data.
        labels (numpy.ndarray): The corresponding labels.
        feature_index (int): The index of the feature to split on.
        threshold (float): The threshold value for splitting.

    Returns:
        tuple: A tuple containing the left and right split of data and labels.
    """
    left_mask = data[:, feature_index] < threshold
    right_mask = ~left_mask
    return data[left_mask], labels[left_mask], data[right_mask], labels[right_mask]

### Best split : iterates over all features and thresholds to find the split that minimizes the Gini impurity.

In [None]:
def find_best_split(data, labels):
    """
    Find the best split for a given dataset.

    Args:
        data (numpy.ndarray): The input data.
        labels (numpy.ndarray): The corresponding labels.

    Returns:
        tuple: A tuple containing the best feature index and threshold value.
    """
    best_feature, best_threshold, best_gini = None, None, float('inf')
    for feature_index in range(data.shape[1]):
        thresholds = np.unique(data[:, feature_index])
        for threshold in thresholds:
            _, left_labels, _, right_labels = split_data(data, labels, feature_index, threshold)
            if len(left_labels) == 0 or len(right_labels) == 0:
                continue
            gini = (len(left_labels) * gini_impurity(left_labels) +
                    len(right_labels) * gini_impurity(right_labels)) / len(labels)
            if gini < best_gini:
                best_feature, best_threshold, best_gini = feature_index, threshold, gini
    return best_feature, best_threshold

### Tree : it recursively builds the decision tree by finding the best split at each node until the maximum depth is reached or the minimum number of samples is not met.

In [None]:
def build_tree(data, labels, max_depth, min_samples_split, depth=0):
    """
    Build a decision tree recursively.

    Args:
        data (numpy.ndarray): The input data.
        labels (numpy.ndarray): The corresponding labels.
        max_depth (int): The maximum depth of the tree.
        min_samples_split (int): The minimum number of samples required to split a node.
        depth (int, optional): The current depth of the tree. Defaults to 0.

    Returns:
        dict: The decision tree represented as a dictionary.
    """
    if depth == max_depth or len(labels) < min_samples_split or gini_impurity(labels) == 0:
        return {'prediction': np.argmax(np.bincount(labels))}
    feature, threshold = find_best_split(data, labels)
    if feature is None:
        return {'prediction': np.argmax(np.bincount(labels))}
    left_data, left_labels, right_data, right_labels = split_data(data, labels, feature, threshold)
    return {
        'feature': feature,
        'threshold': threshold,
        'left': build_tree(left_data, left_labels, max_depth, min_samples_split, depth + 1),
        'right': build_tree(right_data, right_labels, max_depth, min_samples_split, depth + 1)
    }

### Prediction : it traverses the decision tree based on the feature values of the data point and returns the predicted class label.

In [None]:
def predict(tree, data_point):
    """
    Make a prediction for a single data point using the decision tree.

    Args:
        tree (dict): The decision tree represented as a dictionary.
        data_point (numpy.ndarray): The input data point.

    Returns:
        int: The predicted class label.
    """
    if 'prediction' in tree:
        return tree['prediction']
    if data_point[tree['feature']] < tree['threshold']:
        return predict(tree['left'], data_point)
    else:
        return predict(tree['right'], data_point)