In [1]:
import numpy as np
import logging
import os
import datetime
import pandas as pd
import time
from sklearn import preprocessing
import config
import sys

from lib.parallel_a import Parallel_a
from lib.parallel_b import Parallel_b

### class to convert various columns of the dataset into structured format

In [2]:
class Numeric():

    def __init__(self, latency):
        self.latency = latency

    def convert_to_numeric(self, df,target,classification = True):

        # getting categorical, continous dtypes and  boolean dtypes

        categorical_columns = df.select_dtypes(include=['object']).columns.to_list()

        bool_columns = df.select_dtypes(include=['bool']).columns.to_list()

        continou_columns = df.select_dtypes(exclude=['object']).columns.to_list()

        continous_columns = [x for x in continou_columns if not x in bool_columns]

        # convert boolean into numerical values
        for col in bool_columns:
            df[col] = df[col].astype(int)

        print("Continous",continous_columns)

        # handling missing values
        for col in categorical_columns:
            df[col] = df[col].fillna(df[col].mode())
        for col in continous_columns:
            df[col] = df[col].fillna(df[col].mean())
        for col in bool_columns:
            df[col] = df[col].fillna(df[col].mode())

        if target in continous_columns: continous_columns.remove(target)


        for col in continous_columns:
            if len(df[col].unique())==2:
                continous_columns.remove(col)

        # Create x, where x the 'scores' column's values as floats
        x = df[continous_columns].values.astype(float)

        # Create a minimum and maximum processor object
        min_max_scaler = preprocessing.MinMaxScaler()

        # Create an object to transform the data to fit minmax processor
        x_scaled = min_max_scaler.fit_transform(x)

        # Run the normalizer on the dataframe
        df[continous_columns] = pd.DataFrame(x_scaled)

        # dict
        df_dict = {}

        print("before for", categorical_columns)


        # label encode the columns with two categorical values
        for col in categorical_columns:            
            if len(df[col].unique())>=2:
                print(col ,' - ', len(df[col].unique()))
                df[col] = df[col].astype("category")
                df_dict[col] = dict(
                    enumerate(df[col].cat.categories))
                df[col] = df[col].cat.codes

        if target in categorical_columns: categorical_columns.remove(target)

        unwated_cols = []

        '''
        Remove columns with too 
        many unqiue values in 
        categorical columns to 
        reduce the space of the data
        
        '''
        
        for col in categorical_columns:

            l = len(df[col].unique())    
            if l>5:
                unwated_cols.append(col)

        df = df.drop(unwated_cols,axis = 1)
        updated_cat_columns = [x for x in categorical_columns if not x in unwated_cols ]


        '''
        Generate dummy variables
        for the categorical columns
        '''
        
        if updated_cat_columns:
            data=pd.get_dummies(df[updated_cat_columns])
            data = data.apply(np.int64)
            df=df.drop(updated_cat_columns ,axis = 1)
            df = df.join(data)

        return df,df_dict

### load the parameters for the run from the config file

In [3]:
path = './data/'

# read the data from the user
keys = config.parameters['keys']
key = keys[0]
print('All available datasets \n', keys)
print('Which Data to train? \n', key)

# get the number of thereads from the user
num = config.parameters['num']
print("enter num of threads\n", num)

classification = config.parameters['classification']
print('Classification: \n1 - True or 2 - False \n', classification)

# ask the user whether he wants to run parallel or serial
type_key = '2'
print("Want to run parallel or serial?\n", type_key)

targets = config.parameters['targets']
target = targets[0]
print('Targets for the Datasets')
print(target)

All available datasets 
 ['Churn', 'clv', 'diabetes', 'insurance', 'sales']
Which Data to train? 
 Churn
enter num of threads
 1
Classification: 
1 - True or 2 - False 
 1
Want to run parallel or serial?
 2
Targets for the Datasets
Churn


### load the data onto the dataframes

In [4]:
current_time = datetime.datetime.now().strftime("%Y-%m-%d__%H.%M")
type_key_str = '_Serial_' if type_key == "1" else '_Parallel_'
type_key_str = type_key_str

# declaring variables and data structures
# latency dictionary to hold execution times of individual functions
latency = dict()
# metric dictionary
metrics = dict()

# read the data from the data floder
data = pd.read_csv("data/" + key + ".csv", encoding= 'unicode_escape', error_bad_lines=False)
data.head()

Unnamed: 0,CustomerID,Gender,Senior Citizen,Partner,Dependents,Tenure,Phone Service,Multiple Lines,Internet Service,Online Security,...,Device Protection,Tech Support,Streaming TV,Streaming Movies,Contract,Paperless Billing,Payment Method,Monthly Charges,Total Charges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


In [5]:
# incase the columns have any null values mark that to 0

print("\n The columns present are\n\n", data.columns)
list_cols = data.columns.to_list()
if target in list_cols:
    if data[target].isnull().sum() == 0:
        pass
    else:
        data[target] = data[target].fillna(0)


 The columns present are

 Index(['CustomerID', 'Gender', 'Senior Citizen', 'Partner', 'Dependents',
       'Tenure', 'Phone Service', 'Multiple Lines', 'Internet Service',
       'Online Security', 'Online Backup', 'Device Protection', 'Tech Support',
       'Streaming TV', 'Streaming Movies', 'Contract', 'Paperless Billing',
       'Payment Method', 'Monthly Charges', 'Total Charges', 'Churn'],
      dtype='object')


In [6]:
def data_split(data):
    '''
    This funtion helps to generate the data
    required for multiprocessing
    '''
    num = data.shape
    num_each = round(num[0]/3)

    l = 0
    nums = num_each

    for i in range(3):
        df = data[l:nums]
        l += num_each
        nums += num_each
        if nums > num[0]:
            nums = num[0]
        filename = './dist_data/' + key + '_'+str(i+1)+'.csv'
        df.to_csv(filename, index=False)    

In [7]:
# creating object for numeric
numthreads = int(num)
num = Numeric(latency)

flag = True if classification == '1' else False

df, dict_df = num.convert_to_numeric(data, target, flag)
print(df.shape)

# creating data for distrubuted processing in Pydaal
msk = np.random.rand(len(df)) < 0.8
train = df[msk]
test = df[~msk]
filename = './dist_data/' + key + '_test'+'.csv'
test.to_csv(filename, index=False)
data_split(train)

feature = df.columns.tolist()
feature.remove(target)

Continous ['Senior Citizen', 'Tenure', 'Monthly Charges', 'Total Charges']
before for ['CustomerID', 'Gender', 'Partner', 'Dependents', 'Phone Service', 'Multiple Lines', 'Internet Service', 'Online Security', 'Online Backup', 'Device Protection', 'Tech Support', 'Streaming TV', 'Streaming Movies', 'Contract', 'Paperless Billing', 'Payment Method', 'Churn']
CustomerID  -  6499
Gender  -  2
Partner  -  2
Dependents  -  2
Phone Service  -  2
Multiple Lines  -  3
Internet Service  -  3
Online Security  -  3
Online Backup  -  3
Device Protection  -  3
Tech Support  -  3
Streaming TV  -  3
Streaming Movies  -  3
Contract  -  3
Paperless Billing  -  2
Payment Method  -  4
Churn  -  2
(6499, 20)


In [8]:
!pip install openpyxl

Please see https://github.com/pypa/pip/issues/5599 for advice on fixing the underlying issue.
To avoid this problem you can invoke Python with '-m pip' instead of running pip directly.
Defaulting to user installation because normal site-packages is not writeable


In [9]:
from openpyxl import Workbook
from openpyxl import load_workbook

CPU = '1'
Memory = '100'

target_file = './ParallelRun-Results.xlsx'
check_folder = os.path.isdir(target_file)

columns = ['CPU',
 'Memory',
 'NumThreads',
 'LinearRegression',
 'RidgeRegression',
 'NaiveBayes',
 'PCA',
 'SVD',
 'MSE_LinearRegression',
 'r2score_LinearRegression',
 'MSE_RidgeRegression',
 'r2score_RidgeRegression']

if not check_folder:
    wb = Workbook()
    ws = wb.active
    ws.append(columns)
    wb.save(target_file)  

def one_loop(numthreads=1):
    
    parallel_a = Parallel_a(latency, metrics)
    parallel_b = Parallel_b(latency, metrics)

    # path for distrubted data and test data

    dist_data_path = './dist_data/' + key + '_'
    test_data_path = './dist_data/' + key + '_test'+'.csv'

    # parallel linear regression
    parallel_a.linearRegression(dist_data_path, test_data_path,  target, numthreads)

    # parallel ridge regression 
    parallel_b.ridgeRegression(dist_data_path, test_data_path,  target, numthreads)

    # parallel niave bayes
    parallel_a.naiveBayes(dist_data_path, test_data_path,  target, numthreads)

    # parallel kmeans
    #parallel_b.kMeans(dist_data_path, numthreads)

    # parallel pca
    parallel_a.pca(dist_data_path, target, numthreads)

    # parallel svd
    parallel_b.svd(dist_data_path, target, numthreads)
    
    wb = load_workbook(target_file)
    ws = wb.active
    data = list(parallel_a.latency.values()) + list(parallel_a.metrics.values())
    data.insert(0, numthreads)
    data.insert(0, Memory)
    data.insert(0, CPU)
    ws.append(data)
    wb.save(target_file) 

In [10]:
for i in range(100):
    one_loop()

In [11]:
hundred_runs = pd.read_excel(target_file, engine='openpyxl')
hundred_runs.head()

Unnamed: 0,CPU,Memory,NumThreads,LinearRegression,RidgeRegression,NaiveBayes,PCA,SVD,MSE_LinearRegression,r2score_LinearRegression,MSE_RidgeRegression,r2score_RidgeRegression
0,1,100,1,0.386427,0.00148,0.001133,0.00137,0.00139,0.143868,0.265822,0.140583,0.282585
1,1,100,1,0.001412,0.000943,0.000998,0.001344,0.001486,0.143868,0.265822,0.140583,0.282585
2,1,100,1,0.001426,0.000934,0.000891,0.001165,0.00116,0.143868,0.265822,0.140583,0.282585
3,1,100,1,0.001463,0.000944,0.000862,0.001216,0.001244,0.143868,0.265822,0.140583,0.282585
4,1,100,1,0.00137,0.00095,0.000834,0.001119,0.001203,0.143868,0.265822,0.140583,0.282585


In [12]:
hundred_runs.shape

(100, 12)