In [None]:
import re
import pickle
import nltk

import numpy as np
import pandas as pd

from pathlib import Path
from sklearn.svm import LinearSVC
from abc import ABC, abstractmethod
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.feature_selection import SelectFromModel, SequentialFeatureSelector
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import OneHotEncoder, StandardScaler


assert nltk.download('stopwords')

In [None]:
path_src_dataset = Path("./data/src/X_train_Hi5.csv")

df = pd.read_csv(path_src_dataset, nrows=10000) # Dataframe used to test functions, we can only take few rows

In [None]:
df.head()

In [None]:
class Transformer(ABC, BaseEstimator, TransformerMixin):

    @abstractmethod
    def __init__(self):
        super().__init__()

    @abstractmethod
    def fit(self, X: pd.DataFrame, y=None):
        pass

    @abstractmethod
    def transform(self, X: pd.DataFrame):
        pass

# EXAMPLE OF TRANSFORMER FOR CLEANING / PROCESSING
class NewTransformer(Transformer):
    def __init__(self):
        #TODO
        pass

    def fit(self, X, y=None):
        
        #TODO

        return self

    
    def transform(self, X):
        
        #TODO

        return X

### Dropping Columns Transformer

In [None]:
class DropColumns(Transformer):
    def __init__(self, cols_to_drop=[]):
        self.cols_to_drop = cols_to_drop

    def fit(self, X, y=None):

        print(f">> (Info) Dropped columns : {self.cols_to_drop}")
        return self

    def transform(self, X):
        X = X.drop(columns=self.cols_to_drop)

        return X

# cols_to_drop = ['piezo_station_department_name',
#                 'piezo_station_update_date', 'piezo_station_commune_code_insee', 'piezo_station_pe_label', 'piezo_station_bdlisa_codes', 'piezo_station_bss_code', 'piezo_station_bss_id', 'piezo_bss_code', 'piezo_measurement_date', 'piezo_producer_name', 'piezo_measure_nature_code', 'meteo_name', ]
drop_col = DropColumns(cols_to_drop=['piezo_station_update_date'])
df = drop_col.fit_transform(df)

### Formatting dates transformer

In [None]:
class DateTransformer(Transformer):
    def __init__(self):
        self.date_cols = []

    def fit(self, X, y=None):
        self.date_cols = [col for col in X.columns if 'date' in col]
        return self

    def transform(self, X):
        X = X.copy()
        for col in self.date_cols :
            if col =='meteo_date':
                X[col] = pd.to_datetime(X[col], errors='coerce').apply(lambda x: np.cos(x * 2 * np.pi / 365.25))
            else:
                X.drop(col, axis=1, inplace=True)
            X.rename(columns={'meteo_date': 'date'}, inplace=True)          
        return X
    
date_transformer = DateTransformer()
date_transformer.fit(df)
df_dates = date_transformer.transform(df)

# Data Processing

Create a *Pipeline* which is a series of *Tranformers*.

## Transformers

In [None]:
class SumCols(Transformer):
    def __init__(
            self,
            columns: list[str],
            weights: list[float]=[],
            new_col_name: str=None,
            remove_cols_in: bool=False,
        ):
        
        assert len(columns) > 1, ">> (ERROR - SumCols) 2 columns are required"
        self.columns = columns
        
        assert len(weights) == 0 or len(weights) == len(self.columns), ">> (ERROR - SumCols) columns and weights must have same dimensions."
        self.weights = weights if len(weights) == 0 else [1]*len(columns)

        self.new_col_name = new_col_name if new_col_name is not None else "_+_".join(self.columns)
        self.remove_cols_in = remove_cols_in

    def fit(self, X, y=None):
        return self

    def transform(self, X):

        X[self.new_col_name] = np.dot(X[self.columns], self.weights)

        print(f">> (INFO - SumCols) columns {self.columns} has been sumed in a new column : {self.new_col_name}")


        if self.remove_cols_in:
            X = X.drop(columns=self.columns)

        return X
    
### TEST ###

transfo = SumCols(["duration_ms", "popularity"], [0, 2], remove_cols_in=True)
df_test = transfo.fit_transform(df)
df_test.head()

In [None]:
class PartialOneHotEncoder(Transformer):
    """partial because only some columns can be selected for encoding."""    
    
    def __init__(
            self,
            columns: list[str],
            *,
            categories="auto",
            drop='if_binary',
            handle_unknown="ignore",
            min_frequency=None,
            max_categories=None,
        ):
        self.columns = columns

        self.encoder = OneHotEncoder(
            categories=categories,
            drop=drop,
            sparse_output=False,
            handle_unknown=handle_unknown,
            min_frequency=min_frequency,
            max_categories=max_categories,
        )


    def fit(self, X, y=None):

        self.encoder = self.encoder.fit(X[self.columns])

        return self

    def transform(self, X):

        X_one_hot_encoded= self.encoder.transform(X[self.columns])

        X_one_hot_df = pd.DataFrame(X_one_hot_encoded, columns=self.encoder.get_feature_names_out())

        X = pd.concat([df.drop(self.columns, axis=1), X_one_hot_df], axis=1)

        print(f">> (INFO - PartialOneHotEncoder) {self.encoder.feature_names_in_} features one hot encoded as : {self.encoder.get_feature_names_out()}")

        return X
    
### TEST ###

transfo = PartialOneHotEncoder(columns=["explicit"])
df_test = transfo.fit_transform(df)
df_test.head()

In [None]:
class PartialStandardScaler(Transformer):
    """partial because only some columns can be selected for standardiation."""    

    def __init__(
            self,
            columns: list[str],
            *,
            copy: bool = True,
            with_mean: bool = True,
            with_std: bool = True
        ):
        self.columns = columns
        self.standardizer = StandardScaler(
            copy=copy,
            with_mean=with_mean,
            with_std=with_std,
        )

    def fit(self, X, y=None):

        self.standardizer.fit(X[self.columns])

        return self

    
    def transform(self, X):
        
        X_standardized_np = self.standardizer.transform(X[self.columns])

        X_standardized = pd.DataFrame(X_standardized_np, columns=self.standardizer.get_feature_names_out())

        X = pd.concat([df.drop(self.columns, axis=1), X_standardized], axis=1)

        print(f">> (INFO - PartialStandardScaler) columns {self.columns} have bean standardized")


        return X
    
### TEST ###

transfo = PartialStandardScaler(columns=["energy"])
df_test = transfo.fit_transform(df)
df_test.head()

In [None]:
class DropCols(Transformer):
    def __init__(self, columns: list[str]):
        self.columns = columns
        pass

    def fit(self, X, y=None):
        return self

    
    def transform(self, X):
        
        X = X.drop(columns=self.columns)

        print(f">> (INFO - DropCols) columns {self.columns} is/are droped.")

        return X
    
transfo = DropCols(columns=["explicit"])
df_test = transfo.fit_transform(df)
df_test.head()

## Pipeline for processing

In [None]:
path_src_dataset = Path("./data/cleaned/TODELETE.csv")
out_folder_dataset = Path("./data/processed")
out_folder_config = Path("./data/processed/pipelines")

df = pd.read_csv(path_src_dataset)

### Create a new pipeline

Split Continuous / Categorical features

In [None]:
categorical_features = df.select_dtypes(include=["object"]).columns.to_list()
num_categorical_features = ["key","mode","time_signature"] # Add numerical data but with a categorical meaning (ex: color of car  => red=0, blue=1, green=2)
categorical_features.extend(num_categorical_features)

numerical_features = df.drop(columns=categorical_features).columns.to_list()

In [None]:
pipeline = Pipeline([
    ('SumCols', SumCols(columns=["duration_ms", "popularity"], weights=[0, 2], remove_cols_in=True)),
    ('DropCols', DropCols(["key"])),
    ('OneHotEncoder', PartialOneHotEncoder(columns=["explicit"])),
    ('PartialStandardScaler', PartialStandardScaler(columns=["energy"])),
    # ... Add other Transformers
    ('FeatureSelection', SelectFromModel(LinearSVC(penalty="l1"), max_features=30)) # Feature selection
])

df_processed = pipeline.fit_transform(df)
df_processed.head()

### Load an existing pipeline

In [None]:
pipeline_name = "TODELETE"

with open(out_folder_config / Path(pipeline_name + ".pkl"), 'rb') as file:
    pipeline: Pipeline = pickle.load(file)


df_processed = pipeline.fit_transform(df)
# df_processed.head()

### Save Processed Dataset + Pipeline

In [None]:
df_processed_name = "TODELETE"

df_processed.to_csv(out_folder_dataset / Path(df_processed_name + ".csv"))

# Writing to sample.json
with open(out_folder_config / Path(df_processed_name + ".pkl"), "wb") as file:
    pickle.dump(pipeline, file)