# Delay

In [1]:
from math import sqrt

In [2]:
def f(z):
    return sqrt(z+4)
def g(y):
    return y - 3
def h(x):
    return x ** 2
x = 4
y = h(x)
z = g(y)
w = f(z)
print(w)
print(f(g(h(x))))

4.123105625617661
4.123105625617661


In [8]:
from dask import delayed
import dask
y = delayed(h)(x)
z = delayed(g)(y)
w = delayed(f)(z)
print(w)
type(w)

Delayed('f-145a57ec-fe29-4faf-939d-071f4e640acc')


dask.delayed.Delayed

In [9]:
w.compute()

4.123105625617661

# Chunking arrays

In [11]:
import numpy as np
a = np.random.rand(10000)
print(a.shape, a.dtype)
print(a.sum())
print(a.mean())

(10000,) float64
5008.629969044578
0.5008629969044578


In [13]:
import dask.array as da
a_dask = da.from_array(a, chunks=len(a) // 4)
a_dask.chunks

((2500, 2500, 2500, 2500),)

In [23]:
n_chunks = 4
chunk_size = len(a) // n_chunks
result = 0

for k in range(n_chunks):
    offset = k*chunk_size
    a_chunk = a[offset:offset+chunk_size]
    result += a_chunk.sum()
print(result)

5008.629969044578


In [26]:
result = a_dask.sum()
print(result.compute())

5008.629969044578


# Computing with multidimensional arrays

In [31]:
time_series = np.array([49,51,60,54,47,50,64,58,47,43,50,63,67,68,64,48,55,46,66,51,52])

In [36]:
print(time_series.dtype)
print(time_series.shape)
print(time_series.ndim)
print(time_series)

int32
(21,)
1
[49 51 60 54 47 50 64 58 47 43 50 63 67 68 64 48 55 46 66 51 52]


In [37]:
table = time_series.reshape((3,7))
print(table)

[[49 51 60 54 47 50 64]
 [58 47 43 50 63 67 68]
 [64 48 55 46 66 51 52]]


In [38]:
print(time_series)

[49 51 60 54 47 50 64 58 47 43 50 63 67 68 64 48 55 46 66 51 52]


In [39]:
time_series.reshape((7,3))

array([[49, 51, 60],
       [54, 47, 50],
       [64, 58, 47],
       [43, 50, 63],
       [67, 68, 64],
       [48, 55, 46],
       [66, 51, 52]])

In [41]:
time_series.reshape((7,3), order='F') # F fortran et C consistent

array([[49, 58, 64],
       [51, 47, 48],
       [60, 43, 55],
       [54, 50, 46],
       [47, 63, 66],
       [50, 67, 51],
       [64, 68, 52]])

In [42]:
print(table)

[[49 51 60 54 47 50 64]
 [58 47 43 50 63 67 68]
 [64 48 55 46 66 51 52]]


In [46]:
table[0,4] # Week 0, Day 4

47

In [45]:
table[1,2:5] # Week 1, Day 2,3,4

array([43, 50, 63])

In [48]:
table[0::2, ::3] # Week 0 and 2, Day 2,3, 6 (opérateur division du range)

array([[49, 54, 64],
       [64, 46, 52]])

In [50]:
table[0] # Week 0 all Days

array([49, 51, 60, 54, 47, 50, 64])

In [52]:
table.mean()

54.904761904761905

In [56]:
daily_means = table.mean(axis=0)
daily_means # chaque jour moyenne

array([57.        , 48.66666667, 52.66666667, 50.        , 58.66666667,
       56.        , 61.33333333])

In [57]:
daily_means.shape

(7,)

In [59]:
weekly_means = table.mean(axis=1)
weekly_means

array([53.57142857, 56.57142857, 54.57142857])

In [60]:
# Broadcasting operation
table - daily_means

array([[ -8.        ,   2.33333333,   7.33333333,   4.        ,
        -11.66666667,  -6.        ,   2.66666667],
       [  1.        ,  -1.66666667,  -9.66666667,   0.        ,
          4.33333333,  11.        ,   6.66666667],
       [  7.        ,  -0.66666667,   2.33333333,  -4.        ,
          7.33333333,  -5.        ,  -9.33333333]])

In [62]:
# avec dask on met da.from_array avec chunks et on peut faire

# Using Dask DataFrames

In [64]:
import dask.dataframe as dd

## Reading CSV

In [None]:
# quarter1.csv quarter2.csv quarter3.csv quarter4.csv
# transactions = dd.read_csv('*.csv')

# on lit et on met dans df tous les fichiers quarter#.csv

## Building Delayed Pipelines

In [None]:
# is_wendy = (transactions['names'] == 'Wendy')
# wendy_amounts = transactions.loc[is_wendy, 'amount']

# wendy_diff = wendy_amounts.sum()
# wendy_diff.compute()
# wendy_diff.visualize()

## Computations : pandas

In [None]:
# df = dd.read_csv('yellow_tripdata_2015-*.csv')
# m = df['trip_distance'].mean()
# result = m.compute()

## Buildings Dask Bags & Globbing

In [65]:
nested_containers = [
    [0,1,2,3],
    {},
    [6.5,3.14],
    'Python',
    {'version':3},
    ''
]

In [67]:
import dask.bag as db

In [72]:
the_bag = db.from_sequence(nested_containers)
the_bag.count().compute()

6

In [74]:
the_bag.any().compute(), the_bag.all().compute()

(True, False)

## Functional approaches using Dask Bags

### Using map

In [75]:
def squared(x):
    return x ** 2
squares = map(squared, [1,2,3,4,5,6])

squares

<map at 0x203e57e9400>

In [77]:
squares = list(squares)
squares

[1, 4, 9, 16, 25, 36]

## Using filter

In [78]:
def is_even(x):
    return x % 2 == 0

evens = filter(is_even, [1,2,3,4,5,6])
list(evens)

[2, 4, 6]

In [79]:
even_squares = filter(is_even, squares)
list(even_squares)

[4, 16, 36]

### Using dask.bag.map

In [80]:
numbers = db.from_sequence([1,2,3,4,5,6])
squares = numbers.map(squared)

squares

dask.bag<map-squ..., npartitions=6>

In [81]:
result = squares.compute()
result

[1, 4, 9, 16, 25, 36]

### Using dask.bag.filter

In [82]:
numbers = db.from_sequence([1,2,3,4,5,6])
evens = numbers.filter(is_even)
evens.compute()

[2, 4, 6]

In [83]:
even_squares = numbers.map(squared).filter(is_even)

even_squares.compute()

[4, 16, 36]