In [1]:
import dask
import time
from dask import delayed
import pandas as pd
import matplotlib.pyplot as plt

In [58]:
## import dependencies
from time import sleep
## calculate square of a number

def calculate_square(x):
    sleep(1)
    x= x**2
    return x

## calculate sum of two numbers
def get_sum(a,b):
    sleep(1)
    return a+b

In [None]:
%%time
## Wrapping the function calls using dask.delayed
x = delayed(calculate_square)(10)
y = delayed(calculate_square)(20)
z = delayed(get_sum)(x, y)
print(z)

In [None]:
%%time
## visualize the task graph
z.visualize()

In [None]:
## Call above functions in a for loop
output = []
## iterate over values and calculate the sum
for i in range(5):
    a = delayed(calculate_square)(i)
    b = delayed(calculate_square)(i+10)
    c = delayed(get_sum)(a, b)
    output.append(c)
total = dask.delayed(sum)(output)
## Visualizing the graph
total.visualize()

In [None]:
%%time

Bootstraps = 12
def generate_sample(i, j):
    
    res_ = pd.DataFrame({i: [j]})
    return res_

def _decode_data(x):
    x = x.apply(lambda x: x**3)
    return x

def concatenation(list_):
    return pd.concat(list_, axis=1)

output = []

for i in range(Bootstraps):

    res1 = delayed(generate_sample)(i, i**2)
    res1 = delayed(_decode_data)(res1)
    output.append(res1)

total = dask.delayed(concatenation)(output)

total.visualize()

In [None]:
total.compute()

# Use client class
## Without parallelization

In [None]:
%%time

Bootstraps = 12
def generate_sample(i, j):
    time.sleep(1)
    res_ = pd.DataFrame({i: [j]})
    return res_

def _decode_data(x):
    x = x.apply(lambda x: x**3)
    time.sleep(1)
    return x

def concatenation(list_):
    time.sleep(1)
    return pd.concat(list_, axis=1)

output = []

for i in range(Bootstraps):

    res1 = generate_sample(i, i**2)
    res1 = _decode_data(res1)
    output.append(res1)

total = concatenation(output)


## With parallelization

In [None]:
%%time

Bootstraps=9

def generate_sample(i, j):
    time.sleep(1)
    res_ = pd.DataFrame({i: [j]})
    return res_

def _decode_data(x):
    time.sleep(1)
    x = x.apply(lambda x: x**3)
    return x

def concatenation(list_):
    time.sleep(1)
    return pd.concat(list_, axis=1)

output = []

for i in range(Bootstraps):

    res1 = delayed(generate_sample)(i, i**2)
    res1 = delayed(_decode_data)(res1)
    output.append(res1)

total = dask.delayed(concatenation)(output)

result = total.compute()

## Plot time increase with bootstrap increase

In [81]:
times = []
trials = range(1, 30)

for Bootstraps in trials:
    t0 = time.time()
    output = []

    for i in range(Bootstraps):

        res1 = delayed(generate_sample)(i, i**2)
        res1 = delayed(_decode_data)(res1)
        output.append(res1)

    total = dask.delayed(concatenation)(output)

    result = total.compute()

    t1 = time.time()

    times.append(t1 - t0)

In [None]:
plt.step(trials, times)

plt.xlabel("Bootstraps")
plt.ylabel("Time [s]")
plt.grid(alpha=0.2)
plt.show()

## Client implementation

In [2]:
from dask.distributed import Client

In [None]:
client = Client(threads_per_worker=8, n_workers=3)

In [None]:
%%time

Bootstraps=24


def generate_sample(i, j):
    time.sleep(1)
    res_ = pd.DataFrame({i: [j]})
    return res_

def _decode_data(x):
    time.sleep(1)
    x = x.apply(lambda x: x**3)
    return x

def concatenation(list_):
    time.sleep(1)
    return pd.concat(list_, axis=1)


@dask.delayed
def task(i):
    
    res1 = generate_sample(i, i**2)
    res1 = _decode_data(res1)
    
    return res1


In [47]:
output = []

for i in range(Bootstraps):

    output.append(task(i))

result = dask.compute(output)


In [58]:
client.close()

In [None]:
concatenation(result[0])