## **Imports required**

In [1]:
import recordlinkage
import pandas as pd
import numpy as np 
import itertools
import zipfile
import os

from zipfile import ZipFile
from codecarbon import EmissionsTracker

from recordlinkage.preprocessing import clean
from sklearn.preprocessing import LabelEncoder
from shapash.explainer.smart_explainer import SmartExplainer
from sklearn.model_selection import train_test_split
from lightgbm import LGBMClassifier
from sklearn.cluster import KMeans
from sklearn.cluster import DBSCAN

RuntimeError: module compiled against API version 0xe but this version of numpy is 0xd

In [2]:
def unzipe_file_in_directory(dir_name: str, path_to_unzipe_files: str):
    """
    Unzipe file from dir_name to path_to_unzipe_files directory.
    
    Parameters:
    -----------
        dir_name (str) : Folder in which files to unzip are 
        path_to_unzipe_files (str) : Where file have to be unzip
        
    """
    
    zip_files = os.listdir(dir_name)
    working_dir = os.path.abspath('.')
    os.chdir(dir_name) # change directory from working dir to dir with files

    for item in zip_files: # loop through items in dir
        if item.endswith(".zip"): # check for ".zip" extension
            file_name = os.path.abspath(item) # get full path of files
            zip_ref = zipfile.ZipFile(file_name) # create zipfile object
            zip_ref.extractall(path_to_unzipe_files) # extract file to dir
            zip_ref.close() # close file
            #os.remove(file_name) # delete zipped file  
    os.chdir(working_dir)

In [3]:
columns = ['variable_id', 'variable_label', 'unit', 'min_value', 'max_value', 'alias']

def concate_file_from_different_directory_to_dataframe(file_to_find:str, dir_name: str, path_to_unzipe_files: str) -> pd.DataFrame:
    """
    Create a dataframe concatenating all files named file_to_find present in all folders 
    contained in folder called dir_name.
    
    Parameters:
    -----------
        file_to_find (str) : File to find in dir_name
        dir_name (str) : Folder in which search is made
        path_to_unzipe_files (str) : where files have to be unziped
    
    Return:
    --------
        concat_files (pandas.DataFrame) : A DataFrame with all unziped files concatenated
    
    """
    unzipe_file_in_directory(dir_name, path_to_unzipe_files)
    os.chdir(path_to_unzipe_files) # change directory from working dir to dir with files
    concat_files = pd.DataFrame()

    for dir_ in os.listdir('.'):
        if os.path.isdir(dir_): 
            dir_name = os.path.abspath(dir_)
            for folder in os.listdir(dir_name):
                if file_to_find in folder:
                    tmp_data = pd.read_csv(dir_name + '/' + folder, header=None, encoding="ISO-8859-1")
                    concat_files = pd.concat([concat_files, tmp_data], ignore_index=True)
    concat_files.columns = columns
    return concat_files

In [4]:
def replace_by_nan(dataset: pd.DataFrame, char: str) -> pd.DataFrame:
    """
    Replace by NaN all char values find in dataset.
    
    Parameters:
    -----------
        dataset (pandas.DataFrame) : DataFrame in which modifications are made
        char (str) : Character to replace by nan 
        
    Return:
    -----------
        preprocessed_data (pandas.DataFrame) : DataFrame with char values replaced by NaN
    """
    
    preprocessed_data = dataset.copy()
    for col in preprocessed_data.columns:
        preprocessed_data[col] = preprocessed_data[col].apply(lambda x: np.nan if x == char else x)
    return preprocessed_data

In [5]:
def preprocessing_recordlinkage(dataset: pd.DataFrame) -> pd.DataFrame:
    """
    Make the preprocessinf of dataset using recordlinkage preprocessing function.
    
    Parameters:
    -----------
        dataset (pandas.DataFrame) : DataFrame in which modifications are made
        
    Return:
    -----------
        preprocessed_data (pandas.DataFrame) : DataFrame with char values replaced by NaN
    """
    
    preprocessed_data = dataset.copy()
    for column in preprocessed_data.columns:
        if isinstance(preprocessed_data[column][0], str) :
            preprocessed_data[column] = recordlinkage.preprocessing.clean(preprocessed_data[column],
                                                                          replace_by_none='()', lowercase=True, 
                                                                          strip_accents=None, remove_brackets=True,
                                                                          encoding='utf-8', decode_error='strict')
    return preprocessed_data

## **Reading files in zip folders**

In [6]:
dir_name = '../datas/usine_datas/2.MariaDB_PCB_Schema_DUMP_ZIP_Files_Fab=CROLFA_Eqt=' \
'END10_Day=2020-11-02_Hrs=19h17-20h22'
extension = ".zip"
path_to_unzipe_files = '../datas/usine_datas_unziped/2.MariaDB_PCB_Schema_DUMP_ZIP_Files_Fab=CROLFA_Eqt=' \
'END10_Day=2020-11-02_Hrs=19h17-20h22'
file_to_find = 'PCB.Variables.dump'

In [7]:
variables = concate_file_from_different_directory_to_dataframe(file_to_find, dir_name, path_to_unzipe_files)

#### 1.**PREPROCESSING**

In [8]:
variables = replace_by_nan(variables, '\\N')
variables = preprocessing_recordlinkage(variables)

In [9]:
variables_subset = variables[['variable_id', 'variable_label', 'alias']]

####  2. **INDEXING**

In [10]:
def block_indexing(blocking_key: list, dataset: pd.DataFrame) -> pd.MultiIndex :
    """
    Simple function that use block indexation from recordlinkage package.
    
    Make candidate record pairs, from dataset, that agree on one or more variables of blocking_key parameter. 
    Returns all record pairs founded.
    
    Parameters:
    -----------
        blocking_key (list) : A list of variables in which block method is made
        dataset (pandas.DataFrame) : A dataframe containing at least blocking_key variables 
    
    Return:
    --------
        pairs : pandas.MultiIndex with record pairs founded
    """
    
    indexer = recordlinkage.Index()
    for key in blocking_key:
        indexer.block(on=key)
    pairs = indexer.index(dataset)
        
    return pairs

In [11]:
candidate_pairs = block_indexing(['alias'], variables_subset)

#### 3. **COMPARISON**

In [12]:
def comparison_scores(attr: list, candidate_pairs: pd.MultiIndex, dataset: pd.DataFrame, method:list=None) -> pd.DataFrame:
    """
    Compare the attributes of candidate record pairs candidate_pairs and return scores comparison 
    returned by .compute() method from recordlinkage.
    
    Parameters:
    -----------
        attr (list) : List of attributes to compare
        candidate_pairs (pandas.MultiIndex) :  MultiIndex with index of candidates pairs to compare 
        original_dataset (pandas.DataFrame) : A dataframe containing at least feature_to_comp  
        duplicate_dataset (pandas.DataFrame) :  A dataframe containing at least feature_to_comp 
        method (list) : list of method (jarowinkler, leveinshtein, etc) to use for comparison
        
    Return:
    --------
        scores (pandas.DataFrame) : A DataFrame with the comparison scores vectors
    """
    
    if method is None:
        method = ['jarowinkler']*len(attr)
    
    compare = recordlinkage.Compare()
    # initialise similarity measurement algorithms
    for i in range(len(attr)): 
        compare.string(attr[i], attr[i], label=attr[i]+'_score', method=method[i])

    # the method .compute() returns the DataFrame with the feature vectors.
    scores = compare.compute(candidate_pairs, dataset)
    
    return scores

In [13]:
scores = comparison_scores(['variable_label'], candidate_pairs, variables_subset, ['levenshtein'] )

### **DROP DOUBLON**

In [14]:
def drop_doublon(scores: pd.DataFrame, dataset: pd.DataFrame):
    """
    Drop duplicated row in dataset.
    
    Parameters:
    -----------
        scores (pd.DataFrame) : A DataFrame with scores vector
        dataset (pd.DataFrame) : DataFrame to deduplicate

    Return:
    --------
        without_doublon (pandas.DataFrame) : A DataFrame without duplicated row
    """
    
    duplicated = scores[scores[scores.columns[0]] >= 1]
    not_duplicated = scores[scores[scores.columns[0]] < 1]
    
    idx_to_keep = list(set(list(map(list, zip(*duplicated.index)))[0]) -   
                   set(list(map(list, zip(*duplicated.index)))[1])) + \
                list(set(itertools.chain(*not_duplicated.index.to_list())))
    
    without_doublon = dataset.iloc[dataset.index.isin(idx_to_keep)]
    
    return without_doublon

In [15]:
variables_without_doublon = drop_doublon(scores, variables_subset)

In [16]:
variables_without_doublon.head()

Unnamed: 0,variable_id,variable_label,alias
5,110,pvd current target life 06,counter target life
6,111,pvd current target life 07,counter target life
7,112,pvd current target life 08,counter target life
8,114,ch recipe time 00,time recipe
9,120,ch recipe time 06,time recipe


#### 4. **CLASSIFICATION**

In [19]:
scores['label'] = scores['variable_label_score'].apply(lambda x: 1 if x > 0.96 else 0)

In [20]:
train, test = train_test_split(scores, test_size=0.25)
X_train = train.drop('label', axis=1, inplace=False)
X_test = test.drop('label', axis=1, inplace=False)
y_train = train['label']
y_test = test['label']

In [21]:
def fit_and_track(project_name: str, clf, X_train: pd.DataFrame, X_test: pd.DataFrame, y_train: pd.DataFrame):
    """
    Train and fit the classifier clf with X_train, X_test and y_train and calculate carbon emissions 
    using EmissionsTracker from codecarbon
    
    Parameters:
    -----------
        project_name (str) : Name of the project for EmissionTracker function
        X_train (pandas.DataFrame) : A DataFrame for training
        X_test (pandas.DataFrame) : A DataFrame for test
        y_train (pandas.DataFrame) : A DataFrame for training
    Return:
    -----------
        predictions (numpy.array) : Array of predicted values
        
    """
    tracker = EmissionsTracker(project_name=project_name)
    tracker.start()

    clf.fit(X_train, y_train)
    predictions = clf.predict(X_test)

    tracker.stop()
    
    return predictions

In [22]:
classifier = LGBMClassifier()
predictions = fit_and_track("donnees_usine", classifier, X_train, X_test, y_train)

INFO:apscheduler.scheduler:Adding job tentatively -- it will be properly scheduled when the scheduler starts
INFO:apscheduler.scheduler:Added job "BaseEmissionsTracker._measure_power" to job store "default"
INFO:apscheduler.scheduler:Scheduler started
INFO:apscheduler.scheduler:Scheduler has been shut down


#### 4.**HUMAN REVIEW**

In [24]:
features = {
     'variable_label': 'variable_label'
}

In [25]:
def explain(X_test: pd.DataFrame,  y_test: pd.DataFrame, classifier: object, features: dict):
    """
    Parameters:
    -----------
        X_test (pd.DataFrame) : DataFrame used to train classifier
        y_test (pd.DataFrme) : DataFrame used to train classifier
        features (dict) : Features present in DataFrame
        classifier (object) : classifier used to make prediction 
        
    Return: 
    -----------
        xpl (object) : SmartExplainer 
    """
    
    xpl = SmartExplainer(features_dict=features) # optional parameter
    xpl.compile(
        x=X_test,
        model=classifier,
        y_pred= y_test
    )
    return xpl 

In [28]:
explainer = explain(X_test, y_test, classifier, features)

C extension was not built during install!
Backend: Shap TreeExplainer
