In [None]:
import dask
import dask.array as da
import dask.dataframe as dd
import dask.delayed as delayed
import dask.bag as bag

import time, sys
import timeit
if sys.version_info[0] >= 3 and sys.version_info[1] >= 3:
    timer = time.perf_counter
else:
    timer = time.clock if sys.platform[:3] == 'win' else time.time
    
scheduler_types = {
    'PROCESSES': 'processes',
    'SINGLE-THREADED': 'single-threaded',
    'THREADS': 'threads'
}

In [None]:
x = da.random.random((10000, 10000))
z = x[::2, 5000:].mean(axis=1)
z

In [None]:
z.visualize()

In [None]:
for key, value in scheduler_types.items():
    print(f'{key} run:')
    %time z.compute(scheduler=value)
    print('----------------------------')

In [None]:
import numpy as np
xp = np.random.rand(10000, 10000)
%time zp = xp[::2, 5000:].mean(axis=1)

In [None]:
x = da.random.random((10000, 10000), chunks=(1000, 1000))
z = x[::2, 5000:].mean(axis=1)
x

In [None]:
for key, value in scheduler_types.items():
    print(f'{key} run:')
    %time z.compute(scheduler=value)
    print('----------------------------')

In [None]:
from dask.distributed import Client
client = Client()       #distribuire
client

In [None]:
import dask.array as da
import numpy as np

# in cazul tabloului numpy genereaza eroare de memorie
#xp = np.random.rand(100000, 100000)

#x = da.random.random((100000, 100000), chunks=(1000, 1000))
x = da.random.random((100000, 100000))
z = x[::2, 5000:].mean(axis=1)
%time z.compute()

In [None]:
from dask import delayed
from dask.diagnostics import ProgressBar

@delayed
def inc(x):
    return x + 1

@delayed
def add(x, y):
    return x + y
x = inc(15)
y = inc(30)
total = add(x, y)
total.visualize()


In [None]:
from dask import delayed
data = [1,2,3]
d = 0
@delayed
def f1(x):
    return x + 1
@delayed
def f2(x):
    return 2*x
for x in data:
    a = f1(x)
    b = f2(x)
    d = d+a+b
d.visualize(rankdir='LR')



In [None]:
import dask.bag as bag
from operator import add
data = bag.from_sequence([1, 2, 3])
def f1(x):
    return x + 1
def f2(x):
    return 2*x
a = data.map(f1)
b = data.map(f2)
d = bag.concat([a,b])
d = d.sum()
d.visualize(rankdir='LR')

In [None]:
import dask.bag as bag
from operator import add
data = bag.from_sequence([1, 2, 3])
def f1(x):
    return x + 1
def f2(x):
    return 2*x
a = data.map(f1)
b = data.map(f2)
d = a.accumulate(add)
d.visualize(rankdir='LR')

In [None]:
import dask.array as da
from dask.distributed import as_completed

x = da.random.random((1000, 1000))
futures = [client.submit(sum,x) for i in range(5)]
for future in as_completed(futures):  
    %time future.result() 
    print(future.result())


In [None]:
import dask.array as da
from dask.distributed import as_completed

x = da.random.random((100,100))
futures = [client.submit(min,x[i,:]) for i in range(5)]
seq = as_completed(futures)
i = 6
for future in seq:  
    %time future.result() 
    print(future.result().mean().compute())
    if (future.result().mean() > 0.01 and i < 10):
        futures_new = [client.submit(min,x[i,:])]
        i = i+1
        seq.update(futures_new)

In [None]:
import dask.dataframe as dd
births = dd.read_csv('births.csv', assume_missing=True)
births['decade'] = 10 * (births['year'] // 10)
#births
births.drop(['year', 'month','day'], axis=1)
bornd = births.groupby('decade').births.sum()
bornd
bornd.visualize(rankdir='LR',filename='birthsd.png')
%time bornd.compute()


In [None]:
client.shutdown()