# HPC Machine Learning Ensemble 
> Multi core Ensemble using MPI and concepts of bagging accross multiple machines.

- toc: true 
- badges: true
- comments: true
- categories: [jupyter]
- image: images/chart-preview.png

In [1]:
!pip install mpi4py



In [4]:
%%writefile comm.py
from mpi4py import MPI
from typing import Tuple
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import StandardScaler
import numpy as np  
import pandas as pd
import copy
from sklearn.metrics import classification_report

def pre_processing(df_: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray]:
    """Pre-processing step for classification task
    
    This function takes the data frame loaded from above and returns 
    a tuple of the NumPy array. The tuple's first element is the independent variables,
    or feature vectors, having a shape of (N, d) where N is the number of observations 
    and d is the number of variables (or columns). The tuple's second element is that
    the vector represents the dependent variable or label with the shape in (N,).
    
    Finalize this function to pre-process the data frame to be fit in the output spec.
    Beyond the mechanical conversion between input data-type and output data-type,
    apply any content-wise pre-processing that is necessary.

    """
    df=df_.copy(deep=True)
    categorical=['job','marital','education','default','housing','loan','poutcome']
    corr_matrix = df.corr().abs()
    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(np.bool))
    dropcols = [column for column in upper.columns if any(upper[column] > 0.7)]
    dropcols=dropcols+['contact']
    day={'thu':3,'tue':1,'wed':2,'mon':0,'fri':4}
    df['day_of_week']=df['day_of_week'].map(day)
    month={'may':1,'jun':2,'jul':3,'aug':4,'sep':5,'nov':6}
    df['month']=df['month'].map(month)
    df['week_sin'] = np.sin((df.day_of_week)*(2.*np.pi/5))
    df['wee_cos'] = np.cos((df.day_of_week)*(2.*np.pi/5))
    df['age'] = df['age'].apply(lambda x: x.replace('unknown','39'))

    #str(df[df['age']!='unknown']['age'].astype(int).median()))
    df['age']=df['age'].astype(int)
    df.drop(columns=dropcols,axis=1,inplace=True)
    df=pd.get_dummies(df,drop_first=True,columns=categorical)
    return df.drop('y',axis=1).values,df['y'].map({'no':0,'yes':1}).values,df.drop('y',axis=1).columns
def main():
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()
    #print(size)
    X, y ,columns= pre_processing(pd.read_csv('moro14_synth.csv', index_col=0))    
    n_samples, n_features = X.shape
    sc=StandardScaler()

# split the dataset into train / test
    rnd_idx = np.random.permutation(n_samples)
    bound = int(n_samples / 5)
    newbound=n_samples-bound
    x_train = X[rnd_idx[:newbound]]
    y_train = y[rnd_idx[:newbound]]
    x_test = X[rnd_idx[newbound:]]
    y_test = y[rnd_idx[newbound:]]
    xtrn=copy.deepcopy(x_train)
    xtst=copy.deepcopy(x_test)
    sc.fit(xtrn)
    xtrn=sc.transform(xtrn)
    xtst=sc.transform(xtst)
    x_train1 = xtrn[:800,:12]
    y_train1 = y_train[:800]
    x_train2 = xtrn[800:1600,12:24]
    y_train2 = y_train[800:1600]
    x_train3 = xtrn[1600:2200,24:]
    y_train3 = y_train[1600:2200]
    clfs = {
    'LR': LogisticRegression(),
    'QDA': QuadraticDiscriminantAnalysis(),
    'GNB': GaussianNB(),
    'DT': DecisionTreeClassifier()
    }
    # root process
    if rank == 0:
        
        # root process sends data to all other processes
        data1={
            'x':x_train1,
            'y':y_train1,
            'xt':xtst[:,:12],
            'yt':y_test
        }
        data2={
            'x':x_train2,
            'y':y_train2,
            'xt':xtst[:,12:24],
            'yt':y_test
        }
        data3={
            'x':x_train3,
            'y':y_train3,
            'xt':xtst[:,24:],
            'yt':y_test,
        }


        comm.send(data1, dest=1, tag=1)
        print('Process {} sent data:'.format(rank), data1)
        comm.send(data2, dest=2, tag=2)
        print('Process {} sent data:'.format(rank), data2)
        comm.send(data3, dest=3, tag=3)
        print('Process {} sent data:'.format(rank), data3)

        result1=comm.recv(source=1, tag=5)
        print('Process rec data 1')
        
        result2=comm.recv(source=2, tag=6)
        print('Process rec data 2')
        
        result3=comm.recv(source=3, tag=7)
        print('Process rec data 3')
        print('Mean Result',classification_report(y_test,np.round((result1+result2+result3)/3)))

    # non-root processes
    elif rank==1:
        # each non-root process receives data from root process
        data1 = comm.recv(source=0, tag=rank)
        print('Process {} received data:'.format(rank), data1)
        clfs['LR'].fit(data1['x'], data1['y'])
        print('LR',classification_report(y_test,clfs['LR'].predict(data1['xt'])))
        comm.send(clfs['LR'].predict_proba(data1['xt'])[:, 1], dest=0, tag=5)
        print('done 1')
    elif rank==2:
        # each non-root process receives data from root process
        data1 = comm.recv(source=0, tag=rank)
        print('Process {} received data:'.format(rank), data1)
        clfs['QDA'].fit(data1['x'], data1['y'])
        print('QDA',classification_report(y_test,clfs['QDA'].predict(data1['xt'])))
        comm.send(clfs['QDA'].predict_proba(data1['xt'])[:, 1], dest=0, tag=6)
        print('done 2')
    elif rank==3:
        # each non-root process receives data from root process
        data1 = comm.recv(source=0, tag=rank)
        print('Process {} received data:'.format(rank), data1)
        clfs['GNB'].fit(data1['x'], data1['y'])
        print('GNB',classification_report(y_test,clfs['GNB'].predict(data1['xt'])))
        comm.send(clfs['GNB'].predict_proba(data1['xt'])[:, 1], dest=0, tag=7)
        print('done 3')

main()

Overwriting comm.py


In [6]:
!mpirun --allow-run-as-root -np 4 python comm.py

4
4
4
4
Process 0 sent data: {'x': array([[ 0.75350917,  0.02882345, -1.4974563 , ..., -0.16406442,
         1.56752607, -0.25264558],
       [-0.22391057, -1.6862909 ,  1.66618377, ..., -1.52816275,
         0.5815288 , -0.25264558],
       [ 0.81867049, -1.6862909 ,  0.87527375, ..., -1.00712355,
        -1.01384828, -0.25264558],
       ...,
       [-0.68003979, -1.6862909 , -0.70654628, ...,  1.20003392,
         0.5815288 , -0.25264558],
       [ 0.55802523, -1.11458612, -1.4974563 , ..., -0.16406442,
         1.56752607, -0.25264558],
       [-0.02842662,  0.60052823,  0.08436374, ...,  0.67899472,
        -1.01384828, -0.25264558]]), 'y': array([1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0,
       0, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 1, 1, 0,
       0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 0, 0,
       0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0,
       0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 1, 0,