In [None]:
!pip -q install pathos 
#Multi processing
!pip -q install tqdm 
# Progress bar
!pip -q install TextBlob 
#Sentiment analysis
!pip -q install "dask[complete]" 
# preprocessing and cross-validation
!pip -q install dask_ml 
# scikit-learn drop-in replacements

In [None]:
from pathos.multiprocessing import ProcessPool
from textblob import TextBlob
from tqdm import tqdm

# Create a process pool

Pools are a group of poccesses where you will send tasks. Inside you will define the number of processes to create. By defualt it will be number of CPU cores, however you **can** define more than that. 

Scheduling more processes than you have CPU cores can increase performance where the processes run into wait times or I/O

In [None]:
pool = ProcessPool(nodes=3)

# Functions

Map methods provided:


    map         - blocking and ordered worker pool        [returns: list]
    imap        - non-blocking and ordered worker pool    [returns: iterator]
    uimap       - non-blocking and unordered worker pool  [returns: iterator]
    amap        - asynchronous worker pool                [returns: object]

Blocking: handles jobs in batches rather than 1 by 1

Ordered: Batches must be completed in order

In [None]:
#pool.map(function to run, data to run it on, other arguments )

pool.map(pow, [1,2,3,4], [5,6,7,8])

In [None]:
#Iterate through the returned data using imap
for x in pool.imap(pow, [1,2,3,4], [5,6,7,8]):
    print(x)

In [None]:
# do an asynchronous map, then get the results
import time

results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
while not results.ready():
    time.sleep(5); print(".", end=' ')

# Build your function
First lets build a function that can take a line of text and produce the sentiment

In [None]:
def get_sentiment(text):
    from textblob import TextBlob
    blob = TextBlob(text)
    score = blob.sentiment.polarity
    return score

Then we will need a function that will download the poems for us

In [None]:
import urllib.request

def download_poem(url):
    poems = []
    with urllib.request.urlopen(url) as f: 
        for line in f:
            line = line.decode("utf-8") 
            line = line.strip()
            if line:
                poems.append(line)
    return poems

Let's check out what one of these poems look like

In [None]:
test_url = 'https://raw.githubusercontent.com/okfn/openmilton/master/miltondata/texts/poems.txt'
poem = download_poem(test_url)

print(len(poem))
print(poem[:10])

Finally we will build out main function that puts the whole process together

In [None]:
def process_poems(url):
    scores = []
    poems = []

    import urllib
    from textblob import TextBlob

    with urllib.request.urlopen(url) as f: 
        for line in f:
            line = line.decode("utf-8") 
            line = line.strip()
            if line:
                poems.append(line)

    for line in poems:
        blob = TextBlob(line)
        score = blob.sentiment.polarity
        scores.append(score)
    
    return scores

In [None]:
print(len(process_poems(test_url)))

Let's build a hard task, like having to download and process multiple poems

In [None]:
urls = ['https://raw.githubusercontent.com/okfn/openmilton/master/miltondata/texts/poems.txt']

#Duplicating the list to make it larger
for _ in range(0,3):
    urls += urls

print(len(urls))

Now let's test how long it takes to process the sentiment for each line of our poems dataset

We can use TQDM to show us the progress of any for-loop operation

In [None]:
#Serial Processing
scores = []

for url in tqdm(urls, position=0 ): #position=0 forces the bars into the same line when printing
    scores += process_poems(url)


In [None]:
scores = []

for score in tqdm(pool.uimap(process_poems, urls), total=len(urls), position=0):
    scores += score

### Other multiprocessing cells

In [None]:
import multiprocessing as mp
import numpy as np

mp.cpu_count()

In [None]:
def square(lst):
  arr = np.zeros_like(lst)
  for i in range(lst.shape[0]):
    for j in range(lst.shape[1]):
      arr[i][j] = lst[i][j] ** 2
  return arr

array = np.random.randint(1, 9, (2**10, 10000))
data = np.array_split(array, 2)

In [None]:
print(data)

In [None]:
%%time
with mp.Pool(2) as p:
  res = p.map(square, data)
  p.close()
  p.join()

In [None]:
%%time
processes = []
for i in range(2):
  p = mp.Process(target=square, args=(data[i],))
  processes.append(p)
  p.start()
  
for p in processes: p.join()

In [None]:
import multiprocessing
from multiprocessing import Pool


class test_cpu:
    def f(self, x):  # changed
        while True:
            x * x

    def load(self, cores):  # changed
        print("utilizing %d cores" % (cores / 2))
        pool = Pool(10)
        pool.map(self.f, range(6))  # changed


if __name__ == "__main__":
    print("There are %d CPUs in your PC" % multiprocessing.cpu_count())
    cores_count = multiprocessing.cpu_count()
    input_user = input("What do you want to test? type CPU, Memory or Both: ")
    input_user.lower()
    test_cpu_instance = test_cpu()  # changed
    if input_user == "cpu":
        test_cpu_instance.load(cores_count)  # changed

# Using dask

In [1]:
import os
import warnings
import math
warnings.filterwarnings('ignore')
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
plt.style.use('seaborn-colorblind')
from pylab import rcParams
rcParams['figure.figsize'] = 10, 6
from sklearn.metrics import mean_squared_error, mean_absolute_error
from imblearn.combine import SMOTEENN
from sklearn.model_selection import train_test_split 
from sklearn import linear_model
from sklearn.metrics import recall_score, classification_report, precision_score, confusion_matrix, auc, accuracy_score, precision_recall_curve, roc_curve
from sklearn.preprocessing import StandardScaler, normalize
from scipy import ndimage
import seaborn as sns

In [2]:
test_data = pd.read_csv('exoTest.csv').fillna(0)
train_data = pd.read_csv('exoTrain.csv').fillna(0)

In [3]:
categ = {2: 1,1: 0}
train_data.LABEL = [categ[item] for item in train_data.LABEL]
test_data.LABEL = [categ[item] for item in test_data.LABEL]

#Reduce memory
def reduce_memory(df):
    """ iterate through all the columns of a dataframe and modify the data type
        to reduce memory usage.        
    """
    start_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))
    
    for col in df.columns:
        col_type = df[col].dtype
        
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            df[col] = df[col].astype('category')

    end_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
    print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
    return df

test_data = reduce_memory(test_data)
train_data = reduce_memory(train_data)

Memory usage of dataframe is 13.91 MB
Memory usage after optimization is: 6.25 MB
Decreased by 55.1%
Memory usage of dataframe is 124.12 MB
Memory usage after optimization is: 62.04 MB
Decreased by 50.0%


In [4]:
#splitting the data
x_train = train_data.drop(["LABEL"],axis=1)
y_train = train_data["LABEL"]   
x_test = test_data.drop(["LABEL"],axis=1)
y_test = test_data["LABEL"]

#Normalizing the data
x_train = normalized = normalize(x_train)
x_test = normalize(x_test)

In [5]:
# Applying of gaussian filter
x_train = filtered = ndimage.filters.gaussian_filter(x_train, sigma=10)
x_test = ndimage.filters.gaussian_filter(x_test, sigma=10)

# Feature scaling -- after applying Gaussian filters, 
# make the Gaussian data have mean of 0 and variance of 1
std_scaler = StandardScaler()
x_train = scaled = std_scaler.fit_transform(x_train)
x_test = std_scaler.fit_transform(x_test)

In [6]:
# Apply PCA with n_componenets
from sklearn.decomposition import PCA
pca = PCA(n_components=53)
x_train = pca.fit_transform(x_train)
x_test = pca.transform(x_test)

#Resampling as the data is highly unbalanced.
print("Before OverSampling, counts of label '1': {}".format(sum(y_train==1)))
print("Before OverSampling, counts of label '0': {} \n".format(sum(y_train==0)))

sm = SMOTEENN(random_state=27, sampling_strategy = 1.0, n_jobs=-1)
x_train_res, y_train_res = sm.fit_resample(x_train, y_train) 

print("After OverSampling, counts of label '1': {}".format(sum(y_train_res==1)))
print("After OverSampling, counts of label '0': {}".format(sum(y_train_res==0)))

Before OverSampling, counts of label '1': 37
Before OverSampling, counts of label '0': 5050 

After OverSampling, counts of label '1': 5050
After OverSampling, counts of label '0': 5049


In [8]:
from tensorflow.random import set_seed
set_seed(42)

from sklearn.model_selection import cross_val_score
from scikeras.wrappers import KerasClassifier
from keras.models import Sequential # initialize neural network library
from keras.layers import Dense # build our layers library
from dask_ml.model_selection import GridSearchCV
from dask.dataframe import read_csv

In [10]:
from dask.distributed import Client
client = Client(processes=True)

In [11]:
import time

def create_model(optimizer='adam', init='uniform'):
    # create model
    model = Sequential()
    model.add(Dense(4, input_dim=x_train_res.shape[1], kernel_initializer=init, activation='relu'))
    model.add(Dense(4, kernel_initializer=init, activation='relu'))
    model.add(Dense(1, kernel_initializer=init, activation='sigmoid'))
    # Compile model
    model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    return model

# create model
gridtrial = KerasClassifier(model=create_model, verbose=0)
print(gridtrial.get_params().keys())

# grid search epochs, batch size, kernel initializer, and optimizer
optimizers = ['rmsprop', 'adam']
init = ['glorot_uniform', 'normal', 'uniform']
epochs = [10, 20, 40]
batches = [5, 10, 20]

param_grid = dict(optimizer=optimizers, epochs=epochs, batch_size=batches, model__init=init)

tic = time.time()
grid = GridSearchCV(estimator=gridtrial, param_grid=param_grid, scheduler=client)
grid_result = grid.fit(x_train_res, y_train_res)

# summarize results
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param))
    
toc = time.time()
print("Elapsed time:", toc-tic)

dict_keys(['model', 'build_fn', 'warm_start', 'random_state', 'optimizer', 'loss', 'metrics', 'batch_size', 'validation_batch_size', 'verbose', 'callbacks', 'validation_split', 'shuffle', 'run_eagerly', 'epochs', 'class_weight'])
Best: 0.980988 using {'batch_size': 10, 'epochs': 40, 'model__init': 'glorot_uniform', 'optimizer': 'adam'}
0.971581 (0.013084) with: {'batch_size': 5, 'epochs': 10, 'model__init': 'glorot_uniform', 'optimizer': 'rmsprop'}
0.881572 (0.066372) with: {'batch_size': 5, 'epochs': 10, 'model__init': 'glorot_uniform', 'optimizer': 'adam'}
0.838202 (0.022921) with: {'batch_size': 5, 'epochs': 10, 'model__init': 'normal', 'optimizer': 'rmsprop'}
0.823646 (0.109212) with: {'batch_size': 5, 'epochs': 10, 'model__init': 'normal', 'optimizer': 'adam'}
0.812853 (0.114895) with: {'batch_size': 5, 'epochs': 10, 'model__init': 'uniform', 'optimizer': 'rmsprop'}
0.879988 (0.118685) with: {'batch_size': 5, 'epochs': 10, 'model__init': 'uniform', 'optimizer': 'adam'}
0.863254 (0

In [None]:
# Best: 0.970195 using {'batch_size': 20, 'epochs': 10, 'model__init': 'glorot_uniform', 'optimizer': 'adam'}
# Elapsed time: 3707.247061729431