In [1]:
import time
from time import sleep
import pdb
from dask.distributed import Client

## Create a class to run our logic

In [2]:
class TestClass:
    def __init__(self, group_id):
        self.group_id = group_id
        self.sleep_time = None

    def sleep(self):
        sleep(self.sleep_time)
        return('group_id {}; slept for {} seconds'.format(self.group_id, self.sleep_time))

## Implement heterogenous class instantiations in serial

In [11]:
def serial_test():

    group_ids = [1, 2]
    sleep_times = [1, 2, 3, 4, 5]

    start_time = time.time()

    for i in group_ids:
        for j in sleep_times:
            test_class = TestClass(i)
            test_class.sleep_time = j
            print('submitting test_class group_id = {}, sleep_time = {}'.format(test_class.group_id, test_class.sleep_time))
            print(test_class.sleep())

    print('Whole process took {} seconds'.format(time.time() - start_time))

In [15]:
serial_test()

submitting test_class group_id = 1, sleep_time = 1
group_id 1; slept for 1 seconds
submitting test_class group_id = 1, sleep_time = 2
group_id 1; slept for 2 seconds
submitting test_class group_id = 1, sleep_time = 3
group_id 1; slept for 3 seconds
submitting test_class group_id = 1, sleep_time = 4
group_id 1; slept for 4 seconds
submitting test_class group_id = 1, sleep_time = 5
group_id 1; slept for 5 seconds
submitting test_class group_id = 2, sleep_time = 1
group_id 2; slept for 1 seconds
submitting test_class group_id = 2, sleep_time = 2
group_id 2; slept for 2 seconds
submitting test_class group_id = 2, sleep_time = 3
group_id 2; slept for 3 seconds
submitting test_class group_id = 2, sleep_time = 4
group_id 2; slept for 4 seconds
submitting test_class group_id = 2, sleep_time = 5
group_id 2; slept for 5 seconds
Whole process took 30.041079998016357 seconds


## Implement same heterogeneous class instantiations in parallel using dask

In [None]:
def dask_test():

    if socket.gethostname() == 'submit3.chtc.wisc.edu':
        # CHTC execution
        cluster = CHTCCluster(job_extra = {"accounting_group": "COVID19_AFIDSI"}, log_directory='./test_logs')
        cluster.adapt(minimum=10, maximum=20)
        client = Client(cluster)
    else:
        # local execution
    client = Client()
    client = Client('google_cloud_platform', )

In [13]:
def dask_test():
    client = Client()

    group_ids = [1, 2]
    sleep_times = [1, 2, 3, 4, 5]

    # create empty array to collect results
    results = []

    start_time = time.time()

    for i in group_ids:
        for j in sleep_times:
            test_class = TestClass(i)
            test_class.sleep_time = j
            print('submitting test_class instance_id = {}, sleep_time = {}'.format(test_class.group_id, test_class.sleep_time))
            future = client.submit(test_class.sleep)
            results.append(future)

    # print(client.gather(results))
    for result in results:
        print('result = ' + result.result())

    print('Whole process took {} seconds'.format(time.time() - start_time))

In [14]:
dask_test()

submitting test_class instance_id = 1, sleep_time = 1
submitting test_class instance_id = 1, sleep_time = 2
submitting test_class instance_id = 1, sleep_time = 3
submitting test_class instance_id = 1, sleep_time = 4
submitting test_class instance_id = 1, sleep_time = 5
submitting test_class instance_id = 2, sleep_time = 1
submitting test_class instance_id = 2, sleep_time = 2
submitting test_class instance_id = 2, sleep_time = 3
submitting test_class instance_id = 2, sleep_time = 4
submitting test_class instance_id = 2, sleep_time = 5
result = group_id 1; slept for 1 seconds
result = group_id 1; slept for 2 seconds
result = group_id 1; slept for 3 seconds
result = group_id 1; slept for 4 seconds
result = group_id 1; slept for 5 seconds
result = group_id 2; slept for 1 seconds
result = group_id 2; slept for 2 seconds
result = group_id 2; slept for 3 seconds
result = group_id 2; slept for 4 seconds
result = group_id 2; slept for 5 seconds
Whole process took 6.041740894317627 seconds


## Pickle example for saving the results for later access

In [18]:
import pickle

# create dictionary. You can pickle any python object (model, array, string, whatever)
favorite_color = {"lion": "yellow", "kitty": "red"}

# write dict to pickle file - the '.pkl' extention is the convention, but can be anything
pickle.dump(favorite_color, open("favorite_color.pkl", "wb" ))

# Load the dictionary back from the pickle file.
favorite_color_reloaded = pickle.load(open("favorite_color.pkl", "rb"))

print(favorite_color_reloaded)

{'lion': 'yellow', 'kitty': 'red'}
