# Parallel Training with TPOT and Dask

In [1]:
import time

def sleep_func():
  print("Sleeping for a 1 sec")
  time.sleep(1)
  print("Done")


time_start = time.time()
sleep_func()
sleep_func()
sleep_func()
sleep_func()
sleep_func()
time_stop = time.time()
print(f"Took {round(time_stop - time_start,2)} seconds to execute!")

Sleeping for a 1 sec
Done
Sleeping for a 1 sec
Done
Sleeping for a 1 sec
Done
Sleeping for a 1 sec
Done
Sleeping for a 1 sec
Done
Took 5.01 seconds to execute!


In [2]:
#Running the same func 3 times in parallel

from multiprocessing import Process
import time

def sleep_func():
  print("Sleeping for a 1 sec")
  time.sleep(1)
  print("Done")


time_start = time.time()
process1 = Process(target=sleep_func)
process2 = Process(target=sleep_func)
process3 = Process(target=sleep_func)
process1.start()
process2.start()
process3.start()
process1.join()
process2.join()
process3.join()

time_stop = time.time()
print(f"Took {round(time_stop - time_start,2)} seconds to execute!")

Sleeping for a 1 sec
Sleeping for a 1 sec
Sleeping for a 1 sec
Done
Done
Done
Took 1.03 seconds to execute!


In [3]:
#Another way to run a process in parallel

import time
import concurrent.futures
def sleep_func():
  print("Sleeping for a 1 sec")
  time.sleep(1)
  return "Done."


time_start = time.time()
with concurrent.futures.ProcessPoolExecutor() as ppe:
  out = []
  for _ in range(5):
    out.append(ppe.submit(sleep_func))
  for curr in concurrent.futures.as_completed(out):
    print(curr.result())
time_stop = time.time()
print(f"Took {round(time_stop - time_start,2)} seconds to execute!")

Sleeping for a 1 sec
Sleeping for a 1 sec
Sleeping for a 1 sec
Sleeping for a 1 sec
Done.
Done.
Sleeping for a 1 sec
Done.
Done.
Done.
Took 3.04 seconds to execute!


In [4]:
#running a process in parrallel with a function that take a parameter

import time
import concurrent.futures
def sleep_func(how_long: int):
  print(f"sleeping for {how_long} seconds")
  time.sleep(how_long)
  return f"finished sleeping for {how_long} secs."

time_start = time.time()
sleep_seconds = [1,2,3,1,2,3]
with concurrent.futures.ProcessPoolExecutor() as ppe:
  out = []
  for sleep_second in sleep_seconds:
    out.append(ppe.submit(sleep_func,sleep_second))
  for curr in concurrent.futures.as_completed(out):
    print(curr.result())
time_stop = time.time()
print(f"Took {round(time_stop - time_start,2)} seconds to execute!")

sleeping for 1 seconds
sleeping for 2 seconds
sleeping for 3 seconds
finished sleeping for 1 secs.
sleeping for 1 seconds
finished sleeping for 2 secs.
sleeping for 2 seconds
finished sleeping for 1 secs.
sleeping for 3 seconds
finished sleeping for 3 secs.
finished sleeping for 2 secs.
finished sleeping for 3 secs.
Took 7.04 seconds to execute!


# Introduction to the Dask library

In [5]:
#comparing dask with numpy
%%time
import numpy as np
np_ones = np.ones((1000,1000,1000))

CPU times: user 208 ms, sys: 776 ms, total: 984 ms
Wall time: 981 ms


In [6]:
#trying the same thing with dask

%%time
import dask.array as da
da_ones = da.ones((1000,1000,1000))

CPU times: user 138 ms, sys: 16.4 ms, total: 154 ms
Wall time: 159 ms


In [7]:
#comparing dask multiprocessing


import time
import dask
import math
from dask import delayed, compute

In [8]:
#first sequential
%%time
def cube(number: int) -> int:
    print(f'cube({number}) called!')
    time.sleep(1)
    return number ** 3

def multiply(items: list) -> int:
    print(f'multiply([{items}]) called!')
    ans = 1
    for x in items:
      ans *= x
    return ans

numbers = [1, 2, 3, 4, 5]
graph = multiply([cube(num) for num in numbers])
print(f'Total = {graph}')

cube(1) called!
cube(2) called!
cube(3) called!
cube(4) called!
cube(5) called!
multiply([[1, 8, 27, 64, 125]]) called!
Total = 1728000
CPU times: user 32.3 ms, sys: 5.15 ms, total: 37.4 ms
Wall time: 5.01 s


In [9]:
# dask parallelize version

%%time
@delayed
def cube(number: int) -> int:
    print(f'cube({number}) called!')
    time.sleep(1)
    return number ** 3
@delayed
def multiply(items: list) -> int:
    print(f'multiply([{items}]) called!')
    ans = 1
    for x in items:
      ans *= x
    return ans

numbers = [1, 2, 3, 4, 5]
graph = multiply([cube(num) for num in numbers])
print(f'Total = {graph.compute()}')

cube(2) called!
cube(5) called!
cube(3) called!
cube(4) called!
cube(1) called!
multiply([[1, 8, 27, 64, 125]]) called!
Total = 1728000
CPU times: user 23.9 ms, sys: 10.6 ms, total: 34.5 ms
Wall time: 3.01 s


# Training machine learning models with TPOT and Dask

In [10]:
!pip install dask-ml
!pip install tpot



In [11]:
#importing libs also using a built in dataset from sklearn designed to fetch many 8x8 pixel digit images for classification

import tpot
from tpot import TPOTClassifier
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from dask.distributed import Client
import warnings
warnings.filterwarnings("ignore")

In [12]:
#Creating an instance of the client which will immediately start the Dask cluster ans use all the CPU cores you have available

client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:44323  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 13.62 GB


In [13]:
#loading the dataset from load_digits() and splitting the data into training and testing

digits = load_digits()

X_train, X_test, y_train, y_test = train_test_split(
    digits.data,
    digits.target,
    test_size = 0.5,
)

X_train.shape, X_test.shape

((898, 64), (899, 64))

In [16]:
#Setting up the Training pipeline with TPOT and Dask

estimator = TPOTClassifier( 
    n_jobs = -1,
    random_state = 42,
    use_dask=True,
    verbosity=2,
    max_time_mins=10
)

In [17]:
#training the model

estimator.fit(X_train,y_train)

Optimization Progress:   0%|          | 0/100 [00:00<?, ?pipeline/s]


Generation 1 - Current best internal CV score: 0.9844072004965859

10.06 minutes have elapsed. TPOT will close down.
TPOT closed during evaluation in one generation.


TPOT closed prematurely. Will use the current best pipeline.

Best pipeline: KNeighborsClassifier(input_matrix, n_neighbors=1, p=2, weights=uniform)


TPOTClassifier(max_time_mins=10, n_jobs=-1, random_state=42, use_dask=True,
               verbosity=2)