# Load data

In [None]:
import pandas as pd

dtypesBvot = {
    'NUMTOUR' :    'int64',
    'CODDPT' :    'object',
    'CODSUBCOM' :  'int64',
    'LIBSUBCOM' : 'object',
    'CODBURVOT' : 'object',
    'CODCAN' :     'int64',
    'LIBCAN' :    'object',
    'NBRINS' :     'int64',
    'NBRVOT' :     'int64',
    'NBREXP' :     'int64',
    'NUMDEPCAND' : 'int64',
    'LIBLISEXT' : 'object',
    'CODNUA' :    'object',
    'NBRVOIX' :    'int64',
}

departements = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', 
                '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', 
                '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', 
                '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '76', '77', '78', 
                '79', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '90', '91', '92', '93', '94', '95']

################################# Load data #################################
def loadDatas(dataPaths, sep=';', dtype=dtypesBvot, col_dep='CODDPT', col_tour='NUMTOUR'):
    dataT1Bvot = pd.DataFrame()
    dataT2Bvot = pd.DataFrame()
    for path in dataPaths:
        dataBvot = pd.read_csv(path, sep=sep, dtype=dtype)
        dataBvot = dataBvot[dataBvot[col_dep].isin(departements)]
        dataT1Bvot = pd.concat([dataT1Bvot ,dataBvot[dataBvot[col_tour]==1]])
        dataT2Bvot = pd.concat([dataT2Bvot ,dataBvot[dataBvot[col_tour]==2]])

    return dataT1Bvot, dataT2Bvot

################################# fonction utile #############################
def saveData(data, loc):
    # save cher_data as excel
    writer = pd.ExcelWriter(loc)
    
    # write dataframe to excel
    data.to_excel(writer)

    # save the excel
    writer.save()



############################# Format des donnees brute ##############################

def getNbBinomes(data):
    return len([header for header in data.columns if "Binôme" in header])

def explodeLines(data):
    initdf = data[['Code du département', 'Libellé du département', 'Code du canton', 
            'Libellé du canton', 'Inscrits', 'Abstentions', '% Abs/Ins', 'Votants',
            '% Vot/Ins', 'Blancs', '% Blancs/Ins', '% Blancs/Vot', 'Nuls', '% Nuls/Ins',
            '% Nuls/Vot', 'Exprimés', '% Exp/Ins', '% Exp/Vot']]

    headers = ['N°Panneau', 'Nuance', 'Binôme', 'Sièges', 'Voix', '% Voix/Ins', '% Voix/Exp']
    
    df = pd.DataFrame()
    for i in range(getNbBinomes(data)):
        partidf = data[[h+'.'+str(i) if i!=0 else h for h in headers]]
        partidf = pd.concat([initdf, partidf], axis=1)
        partidf.columns = pd.Index(initdf.columns.values.tolist() + headers)
        df = pd.concat([df, partidf])
    
    # Remove useless rows   
    df = df.dropna(how='all', subset=headers)
    
    return df

In [None]:
loadDataPath = ['../dataset/raw/DP15_Bvot_T1T2.csv']

#load data
dataT1Bvot, dataT2Bvot = loadDatas(loadDataPath)

assert dataT1Bvot.shape != (0,0) and dataT2Bvot.shape != (0,0)

# Dictionnaire des duels

In [None]:
#################################### Dictionnaire des duels #####################################

def getNuanceOfElected(data, col_siege='Sièges', col_nuance='Nuance'):
    elected = data[data[col_siege]=='Elus']
    return list(elected[col_nuance]) if len(elected)!=0 else None

def filterBestNuances(data, col_nuance='Nuance', criteria=12.50):
    bestCandidat = data[data['% Voix/Ins']>= criteria]
    
    if bestCandidat.empty or len(bestCandidat)==1:
        bestCandidat = data.sort_values(by='Voix', ascending=False).iloc[0:2,:]

    return list(bestCandidat[col_nuance])

def getDuelsFromDep(data_dep, col_dep='CODDPT', col_canton='CODCAN', col_nuance='CODNUA'):
    '''
        return : {'canton_1':[P1,...,P_k], ..., 'canton_n':[P1,...,P_K]}, 
                                        ..., 
                    'canton_N':[P1,...,P_i], ..., 'canton_m':[P1,...,P_I]}
                  }
    '''
    duels = dict()
    for canton in data_dep[col_canton].unique():
        data_canton = data_dep[data_dep[col_canton]==canton]
        duels[str(canton)]= list(data_canton[col_nuance].unique())
    return duels 

def optimizedDuelDict(duels):
    '''
        return : {'duel_1':{'dep':[canton],..., 'duel_n':[canton]},
                            ...,
                  'duel_N':{'dep':[canton],..., 'duel_n':[canton]}
                  }
            avec :
                  - duel = 'P1:P2:...:Pn'
    '''
    optdict = dict()
    for dep, duelDepDict in duels.items():
        for canton, duelList in duelDepDict.items():
            key = ':'.join(sorted(duelList))
            if key in optdict.keys():                
                if dep in optdict[key].keys():
                    optdict[key][dep].append(int(canton))
                else:
                    optdict[key][dep]=[int(canton)]
            else:
                optdict[key]= dict([(dep, [int(canton)])])
    return optdict

In [None]:
#dictionnaire des duels
duels = dict()
for dep in dataT2Bvot['CODDPT'].unique():
    duels[str(dep)]= getDuelsFromDep(dataT2Bvot[dataT2Bvot['CODDPT']==dep])
    if duels[str(dep)]==[]:
        print('empty list for dep : ', dep)

#dictionnaire optimize
optDuels = optimizedDuelDict(duels)

In [None]:
len(optDuels)

# Preparation des donnees et entrainement du reseau de neuronne

In [None]:
#################################### Data Processing ####################################

def prepareInputDataExploded(data):
    tmp = data[['NUMTOUR', 'CODDPT', 'CODSUBCOM', 'LIBSUBCOM', 'CODBURVOT', 'CODCAN',
            'LIBCAN', 'NBRINS', 'NBRVOT', 'NBREXP', 'CODNUA', 'NBRVOIX']].copy()

    # Compute missing data
    tmp['NBRABS'] = tmp['NBRINS'] - tmp['NBRVOT']
    tmp['NBRBLCNUL'] = tmp['NBRVOT'] - tmp['NBREXP']
    tmp['%ABS/INS'] = tmp['NBRABS'] / tmp['NBRINS']
    tmp['%BLCNUL/VOT'] = tmp['NBRBLCNUL'] / tmp['NBRVOT']
    tmp['%EXP/VOT'] = tmp['NBREXP'] / tmp['NBRVOT']
    tmp['%VOIX/EXP'] = tmp['NBRVOIX'] / tmp['NBREXP']

    nuances = getAllNuances()
    statsFeatures = ['NBRINS', 'NBREXP', '%ABS/INS', '%BLCNUL/VOT', '%EXP/VOT']
    idFeatures = ['CODDPT', 'CODCAN', 'CODSUBCOM', 'CODBURVOT']

    exprimes = tmp[idFeatures + ['NBREXP']].drop_duplicates().sort_values(idFeatures)['NBREXP']
    stats = tmp[idFeatures + statsFeatures].drop_duplicates()[statsFeatures]
    ids = tmp[idFeatures].drop_duplicates()

    # Create [%Voix] and fill it
    voix = pd.DataFrame(0, index=data.index, columns=nuances)
    for parti in data.CODNUA.unique():
        voix[parti][data['CODNUA']==parti] = tmp[tmp['CODNUA']==parti]['NBRVOIX']
    voix = pd.concat([tmp[idFeatures], voix], axis=1).groupby(idFeatures).sum()[nuances]
    voix.index = exprimes.index

    # Concat with computed stats and divide almost everything by Exprimés
    voix = voix.divide(exprimes, axis=0)
    X = pd.concat([stats, voix], axis=1)
    X.index = pd.MultiIndex.from_frame(ids)
    return X.sort_values(['CODDPT', 'CODCAN', 'CODSUBCOM', 'CODBURVOT']).fillna(0)

def getAllNuances(data=dataT1Bvot, colNuance='CODNUA', fmt='exploded'):
    if fmt not in ['exploded', 'line']:
        raise ValueError("format parameter must be 'exploded' or 'line'")
    
    if fmt == 'exploded':
        nuances = data[colNuance].unique()
    
    if fmt == 'line':
        nuances = np.array([])
        nuances_tmp = data[colNuance].fillna(0)
        for c in nuances_tmp:
            nuances = np.append(nuances, nuances_tmp[c])
        nuances = np.unique(nuances[nuances!=0])
    
    return sorted(nuances)

# retourne un dataset associe a un duel
def extractDuelRaw(duel, X=dataT1Bvot, y=dataT2Bvot, col_canton='CODCAN', col_dep='CODDPT'):
    correction = [str(i) for i in range(1,10)]
    X_duel = pd.DataFrame()
    y_duel = pd.DataFrame()

    #remove canton where there is a winner in the 1st turn
    for dep, cantonList in optDuels[':'.join(sorted(duel))].items():
        X_duel = pd.concat([X_duel, X.loc[( X[col_dep] == (dep if dep not in correction else '0'+dep) ) & ( X[col_canton].isin(cantonList) )]])
        y_duel = pd.concat([y_duel, y.loc[( y[col_dep] == (dep if dep not in correction else '0'+dep) ) & ( y[col_canton].isin(cantonList) )]])
    
    return (X_duel, y_duel)

def prepareLabelsExploded(data, oneHotEncode=False):
    nuances = getAllNuances(data)
    idFeatures = ['CODDPT', 'CODCAN', 'CODSUBCOM', 'CODBURVOT']

    exprimes = data[idFeatures+['NBREXP']].groupby(idFeatures).first()

    # Create [%Voix] and fill it
    voix = pd.DataFrame(0, index=data.index, columns=nuances)
    for parti in nuances:
        voix[parti][data['CODNUA']==parti] = data[data['CODNUA']==parti]['NBRVOIX']
    voix = pd.concat([data[idFeatures], voix], axis=1).groupby(idFeatures).sum().sort_values(idFeatures)[nuances]

    # Concat with computed stats and divide voix by exprimes
    y = voix.divide(exprimes['NBREXP'], axis=0)
    return y.fillna(0)

In [None]:
import tensorflow as tf
import os
from matplotlib import pyplot
from sklearn.model_selection import train_test_split



def testna(data):
    return len(data[data.isna().any(axis=1)])


def getTestAccuracy(duel):
    return models[duel]['accuracy'][1]

def getTrainAccuracy(duel):
    return models[duel]['accuracy'][0]

def getActivation(duel):
    return models[duel]['activation']

def getLoss(duel):
    return models[duel]['loss']

def getHistory(duel):
    return models[duel]['history']

def getModel(duel):
    return models[duel]['model']

def getDataset(duel):
    return models[duel]['dataset']

def criteriaRespected(test_acc, loss, test_criteria=0.55, loss_criteria=0.14):
    return test_acc >= test_criteria and loss <=loss_criteria

def betterModel(duel, test_acc, loss, epsilon=0.001):
    current_acc = getTestAccuracy(duel)
    current_loss = getLoss(duel)
    return test_acc >= current_acc and loss<=current_loss + epsilon
    
def saveModel(model, save_name, duel):
    model_path = 'models/'+duel+'/'+save_name
    model.save(model_path)
    

def showPlot(history, save=False, save_name=None, duel=None):
    # plot loss during training
    pyplot.subplot(211)
    pyplot.title('Loss')
    pyplot.plot(history.history['loss'], label='train')
    pyplot.plot(history.history['val_loss'], label='test')
    pyplot.legend()
    
    # plot accuracy during training
    pyplot.subplot(212)
    pyplot.title('Accuracy')
    pyplot.plot(history.history['accuracy'], label='train')
    pyplot.plot(history.history['val_accuracy'], label='test')
    pyplot.legend()

    if save:
        if not os.path.isdir('models/'+duel):
            os.mkdir('models/'+duel)
            
        fig_path = 'models/'+duel+'/'+save_name+'.png'
        pyplot.savefig(fig_path)

    pyplot.show()

def getDuelData(duel):
    #Selecting data
    X_duel, y_duel  = extractDuelRaw(duel)

    print('Preparing input data... ', end='')
    X = prepareInputDataExploded(X_duel)
    print('OK')

    print('Preparing labels... ', end='')
    y = prepareLabelsExploded(y_duel)
    print('OK')

    assert X.shape[0]==y.shape[0]
    
    if len(y.columns)<2:
        print("duel with same nuance !")
        print("skipped")
        return (pd.DataFrame(), pd.DataFrame() )
        
    if testna(X_duel) or testna(y_duel) or testna(X) or testna(y):
        print('nan in X_duel', testna(X_duel))
        print('nan in y_duel', testna(y_duel))
        print('nan in X', testna(X))
        print('nan in y', testna(y))


    return X,y


def configureModel(act, X=None, y=None, X_train=None, X_test=None, y_train=None, y_test=None):
    tf.keras.backend.clear_session()

    in_shape  = (X_train.shape[1],) if X_train is not None else (X.shape[1],)
    out_shape = y_train.shape[1] if y_train is not None else y.shape[1]

    model = tf.keras.Sequential()
    model.add(tf.keras.layers.InputLayer(input_shape=in_shape))
    model.add(tf.keras.layers.Dense(32, activation=act))
    model.add(tf.keras.layers.Dense(out_shape, activation='softmax'))

    model.compile(loss='mse', optimizer='adamax', metrics=['accuracy'])

    if X_train is not None and X_test is not None and y_train is not None and y_test is not None:
        history = model.fit(X_train, y_train, batch_size=32, validation_data=(X_test, y_test), epochs=100, verbose=0)
        _, train_acc = model.evaluate(X_train, y_train, verbose=0)
        _, test_acc  = model.evaluate(X_test , y_test , verbose=0)
    elif X is not None and y is not None:
        history = model.fit(X, y, batch_size=32, epochs=100, verbose=0)



    return (model, train_acc, test_acc, history) if (X is None and y is None) else model

def setModel(model, duel):
    models[duel]['model'] = model

    


In [14]:
# fake prediction
print('--------------------------- Fake prediction ---------------------------')
x, y = getDuelData(['BC-FN', 'BC-UD'])
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2,train_size=0.8, random_state=42, shuffle=True)
model, train_acc, test_acc, history = configureModel('relu', X_train=X_train, X_test=X_test, y_train=y_train, y_test=y_test)
print('--------------------------- prediction -------------------------')
print(pd.DataFrame(model.predict(X_test), columns=['FN', 'UD']))
print('------------------------ y_test -----------------------------')
print(y_test)


--------------------------- Fake prediction ---------------------------
Preparing input data... OK
Preparing labels... OK
--------------------------- prediction -------------------------
            FN        UD
0     0.301603  0.698397
1     0.374333  0.625667
2     0.197483  0.802517
3     0.460452  0.539548
4     0.339063  0.660937
...        ...       ...
2497  0.409616  0.590383
2498  0.424450  0.575550
2499  0.447672  0.552328
2500  0.355609  0.644391
2501  0.290984  0.709016

[2502 rows x 2 columns]
------------------------ y_test -----------------------------
                                      BC-FN     BC-UD
CODDPT CODCAN CODSUBCOM CODBURVOT                    
56     13     165       0002       0.257764  0.742236
76     21     740       0001       0.321608  0.678392
72     21     127       0002       0.177570  0.822430
34     1      3         0020       0.491124  0.508876
59     22     152       0005       0.318538  0.681462
...                                     ...     

In [None]:
models = dict()
noModel = []
activations = ['relu', 'elu', 'selu']

In [None]:
################################################ Creation des modeles #######################################3
def getBestModels(duelList, save=False, nbModel=None):
    nbSmallDataset = 0
    nbDataset = 0
    i=0
    for duel in duelList:
        if nbModel is not None and nbModel<=i:
            break
        print('\n---------------------------------------- Duel :',duel , '--------------------------------------')

        i+=1
        duel_ = sorted(duel.split(':'))
        if len(duel_)>=2:
            
            X, y = getDuelData(duel_)

            if X.empty and y.empty:
                continue


            X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,train_size=0.8, random_state=42, shuffle=True)
            assert X_train.shape[0]==y_train.shape[0] and y_test.shape[0]== X_test.shape[0]
    
            for act in activations:
                model, train_acc, test_acc, history = configureModel(act, X_train=X_train, X_test=X_test, y_train=y_train, y_test=y_test)
                loss = history.history['loss'][-1]

                if criteriaRespected(test_acc, loss):
                    # if already have a model for this duel
                    if duel in models.keys():
                        if betterModel(duel, test_acc, loss):
                            # replace current model by a better one
                            print(act, 'function is better', 'Train: %.3f, Test: %.3f loss:%.4f' % (train_acc, test_acc, loss))
                            models[duel]= dict([('model', model)   , ('accuracy', (train_acc, test_acc)), ('loss', loss),
                                                ('activation', act), ('history', history), ('dataset', (X,y))
                                            ])
                        else:
                            print(act, 'function is not better', 'Train: %.3f, Test: %.3f loss:%.4f' % (train_acc, test_acc, loss))
                    else:
                        #append new model
                        print(act, 'function used for the new model', 'Train: %.3f, Test: %.3f loss:%.4f' % (train_acc, test_acc, loss))
                        models[duel]= dict([('model', model)   , ('accuracy', (train_acc, test_acc)), ('loss', loss),
                                            ('activation', act), ('history', history), ('dataset', (X,y))
                                        ])
                        if duel in noModel:
                            #remove the duel from the noModel list
                            noModel.pop(noModel.index(duel))
                else:
                    print(act, 'funtion is worst !', 'Train: %.3f, Test: %.3f loss:%.4f' % (train_acc, test_acc, loss))
                    if duel not in noModel+list(models.keys()):
                        noModel.append(duel)
                
            if duel not in noModel:
                if save:
                    saveName = duel.replace(':','_') + '_train_' + f'{getTrainAccuracy(duel):.2f}'+ '_test_' + \
                                f'{getTestAccuracy(duel):.2f}'+'_loss_'+ f'{getLoss(duel):.4f}'
                    

                showPlot(getHistory(duel), save=save, save_name=saveName.replace('.',','), duel=duel.replace(':', '_') )
                print('Train: %.3f, Test: %.3f loss:%.4f activation: %s' % 
                        (getTrainAccuracy(duel), getTestAccuracy(duel), getLoss(duel), getActivation(duel)))
                

            else:
                print('no model for this duel !')




            if X_train.shape[0]<500:
                nbSmallDataset+=1

            nbDataset+=1
                
        

    
        print(f'shape xtrain {X_train.shape} shape ytrain {y_train.shape} || shape xtest {X_test.shape} shape ytest {y_test.shape}')
        print('Progression..........................................................................................................', 
                f'{(i/len(duelList))*100:.2f}%')
            


    print("pourcentage de petit dataset : ", (nbSmallDataset/nbDataset)*100)
    print('duel with no model : ', noModel)


getBestModels(optDuels.keys(), save=True)


In [None]:
# try to find a model for the duels with no model
getBestModels(noModel, save=True)

In [None]:
#train the model with all the dataset
def trainFinalModels(save=False):
    for duel in models.keys():
        act = getActivation(duel)
        X,y = getDataset(duel)
        model = configureModel(act, X=X, y=y)
        setModel(model, duel)
        if save:
            saveName = duel.replace(':','_') + '_train_' + f'{getTrainAccuracy(duel):.2f}'+ '_test_' + \
                                    f'{getTestAccuracy(duel):.2f}'+'_loss_'+ f'{getLoss(duel):.4f}'
            saveModel(getModel(duel), saveName.replace('.',','), duel.replace(':', '_'))        

In [None]:
trainFinalModels(save=True)