# Parallel Solutions

## Dask delayed

In [None]:
# Exercise one

from time import sleep
from dask import delayed

def inc(x):
    sleep(1)
    return x + 1

data = [1, 2, 3, 4, 5, 6, 7, 8]

In [None]:
# Parallelized code

results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)
print("Before computing:", total)  # Let's see what type of thing total is
result = total.compute()
print("After computing :", result)  # After it's computed

In [None]:
# Exercise two

def double(x):
    sleep(1)
    return 2 * x

def is_even(x):
    return not x % 2

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [None]:
results = []
for x in data:
    if is_even(x):  # even
        y = delayed(double)(x)
    else:          # odd
        y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)

In [None]:
%time total.compute()

In [None]:
total.visualize()

In [None]:
## Exercise Three

from dask import compute
import pandas as pd

from glob import glob
filenames = sorted(glob(os.path.join('data', 'nycflights', '*.csv')))

In [None]:
%%time

# This is just one possible solution, there are
# several ways to do this using `delayed`

sums = []
counts = []
for fn in filenames:
    # Read in file
    df = delayed(pd.read_csv)(fn)

    # Groupby origin airport
    by_origin = df.groupby('Origin')

    # Sum of all departure delays by origin
    total = by_origin.DepDelay.sum()

    # Number of flights by origin
    count = by_origin.DepDelay.count()

    # Save the intermediates
    sums.append(total)
    counts.append(count)

# Compute the intermediates
sums, counts = compute(sums, counts)

# Combine intermediates to get total mean-delay-per-origin
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights

## Dask Bag


## Dask Array

In [None]:
import h5py
import os
f = h5py.File(os.path.join('data', 'random.hdf5'), mode='r')
dset = f['/x']

sums = []
lengths = []
for i in range(0, 1000000000, 1000000):
    chunk = dset[i: i + 1000000]  # pull out numpy array
    sums.append(chunk.sum())
    lengths.append(len(chunk))

total = sum(sums)
length = sum(lengths)
print(total / length)

In [None]:
import dask.array as da
%matplotlib inline
import matplotlib.pyplot as plt
import h5py
from glob import glob
import os

filenames = sorted(glob(os.path.join('data', 'weather-big', '*.hdf5')))
dsets = [h5py.File(filename, mode='r')['/t2m'] for filename in filenames]
arrays = [da.from_array(dset, chunks=(500, 500)) for dset in dsets]
x = da.stack(arrays, axis=0)


result = x.mean(axis=0)
fig = plt.figure(figsize=(16, 8))
plt.imshow(result, cmap='RdBu_r');

In [None]:
result = x[0] - x.mean(axis=0)
fig = plt.figure(figsize=(16, 8))
plt.imshow(result, cmap='RdBu_r');

In [None]:
import h5py
from glob import glob
import os
import dask.array as da
import pandas as pd

filenames = sorted(glob(os.path.join('data', 'weather-big', '*.hdf5')))
dsets = [h5py.File(filename, mode='r')['/t2m'] for filename in filenames]

arrays = [da.from_array(dset, chunks=(500, 500)) for dset in dsets]

x = da.stack(arrays, axis=0)

result = x[:, ::2, ::2]

da.to_zarr(result, os.path.join('data', 'myfile.zarr'), overwrite=True)

## Dask Dataframes

In [None]:
import dask.dataframe as dd
df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})

#### 1.) How many rows are in our dataset?

In [None]:
len(df)

#### 2.) In total, how many non-canceled flights were taken?

In [None]:
len(df[~df.Cancelled])

#### 3.) In total, how many non-cancelled flights were taken from each airport?

In [None]:
df[~df.Cancelled].groupby('Origin').Origin.count().compute()

#### 4.) What was the average departure delay from each airport?

In [None]:
df.groupby("Origin").DepDelay.mean().compute()

#### 5.) What day of the week has the worst average departure delay?

In [None]:
df.groupby("DayOfWeek").DepDelay.mean().compute()

### Exercise: Rewrite code to use a single call to `map_partitions`

In [None]:
def compute_departure_timestamp(df):
    hours = df.CRSDepTime // 100
    hours_timedelta = pd.to_timedelta(hours, unit='h')

    minutes = df.CRSDepTime % 100
    minutes_timedelta = pd.to_timedelta(minutes, unit='m')

    return df.Date + hours_timedelta + minutes_timedelta

departure_timestamp = df.map_partitions(compute_departure_timestamp)
departure_timestamp.head()

## Dask Distributed, Advanced

In [None]:
x = c.submit(inc, 1)
y = c.submit(dec, 2)
total = c.submit(add, x, y)

print(total)     # This is still a future
c.gather(total)   # This blocks until the computation has finished