# Computación Distribuida con Dask

Si ... pero ...

![](dask_def.svg)

En esta charla vamos a hablar sobre computación distribuida.

-------------------------------------------

## La Arquitectura de Dask

![](https://docs.dask.org/en/latest/_images/collections-schedulers.png)

------------------------------------------

## `dask.distributed`

### El Modelo de `dask.distributed`

![](https://blog.dask.org/images/distributed-layout.png)

---------------------------------------------------------

### Cómo Armar un Cluster Local

In [1]:
from dask.distributed import Client

In [2]:
client = Client()
client

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


0,1
Client  Scheduler: tcp://127.0.0.1:42525  Dashboard: http://127.0.0.1:34752/status,Cluster  Workers: 8  Cores: 40  Memory: 270.18 GB


Y ahora podemos realizar cómputo:

In [3]:
def squared(n):
    return n**2

client.submit(squared, 3).result()

9

--------------------------------------------

### Bags

In [4]:
from dask import bag

In [5]:
bible_bag = bag.read_text('bible.txt', blocksize='100KB')
bible_bag.take(3)

('In the beginning God created the heaven and the earth. And the earth was without form, and void; and darkness was upon the face of the deep. And the Spirit of God moved upon the face of the waters. \n',
 'And God said, Let there be light: and there was light. \n',
 'And God saw the light, that it was good: and God divided the light from the darkness. \n')

In [6]:
total_lines = bible_bag.count()
client.compute(total_lines).result()

30383

In [9]:
god_lines = bible_bag.filter(lambda l: 'God' in l).count()
client.compute(god_lines).result()

3513

---------------------------------

### Clusters Distribuidos

In [11]:
client = Client('tcp://10.65.8.34:8786')
client.restart()
client

0,1
Client  Scheduler: tcp://10.65.8.34:8786  Dashboard: http://10.65.8.34:8787/status,Cluster  Workers: 4  Cores: 160  Memory: 886.64 GB


In [13]:
client.submit(squared, 4).result()

16

--------------------------------------------

### Ejemplo: Generando Datos en un Cluster Distribuido

In [14]:
from dask.delayed import delayed
from random import gauss

In [17]:
numbers = 10**8
partitions = 160
numbers_per_partition = numbers // partitions

In [18]:
@delayed
def get_partition_numbers():
    return (
        gauss(4, 2) for j in range(numbers_per_partition)
    )

In [19]:
gauss_bag = bag.from_delayed(
    get_partition_numbers() for i in range(partitions)
)

gauss_bag.take(3)

(3.8324762002123967, 4.593674992130431, 5.864875979169428)

In [20]:
gauss_dataframe = gauss_bag.to_dataframe([('a', float)])
gauss_dataframe.head()

Unnamed: 0,a
0,3.832476
1,4.593675
2,5.864876
3,6.540659
4,3.502452


In [21]:
gauss_dataframe['b'] = 4*gauss_dataframe['a'] + 3
gauss_dataframe

Unnamed: 0_level_0,a,b
npartitions=160,Unnamed: 1_level_1,Unnamed: 2_level_1
,float64,float64
,...,...
...,...,...
,...,...
,...,...


In [29]:
gauss_dataframe.to_csv('gen-*.csv')

['/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-000.csv',
 '/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-001.csv',
 '/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-002.csv',
 '/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-003.csv',
 '/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-004.csv',
 '/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-005.csv',
 '/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-006.csv',
 '/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-007.csv',
 '/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-008.csv',
 '/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-009.csv',
 '/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-010.csv',
 '/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-011.csv',
 '/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-012.csv',
 '/remote/tcadprj/cganterh/services/jupyterlab/dask_talk/gen-013.csv',
 '/rem

-------------------------------------------

### Leyendo los Datos

In [23]:
from dask import dataframe
from dask_ml.linear_model import LinearRegression

In [24]:
df = dataframe.read_csv('gen-*.csv')
client.persist(df)

Unnamed: 0_level_0,Unnamed: 0,a,b
npartitions=160,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,int64,float64,float64
,...,...,...
...,...,...,...
,...,...,...
,...,...,...


-------------------------------------------

### Regresión Lineal Distribuida

In [25]:
array = df.to_dask_array(lengths=True)
array

Unnamed: 0,Array,Chunk
Bytes,2.40 GB,15.00 MB
Shape,"(100000000, 3)","(625000, 3)"
Count,640 Tasks,160 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.40 GB 15.00 MB Shape (100000000, 3) (625000, 3) Count 640 Tasks 160 Chunks Type float64 numpy.ndarray",3  100000000,

Unnamed: 0,Array,Chunk
Bytes,2.40 GB,15.00 MB
Shape,"(100000000, 3)","(625000, 3)"
Count,640 Tasks,160 Chunks
Type,float64,numpy.ndarray


In [26]:
lr = LinearRegression()
lr.fit(array[:, 1:2], array[:, 2:3])

LinearRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                 intercept_scaling=1.0, max_iter=100, multi_class='ovr',
                 n_jobs=1, penalty='l2', random_state=None, solver='admm',
                 solver_kwargs=None, tol=0.0001, verbose=0, warm_start=False)

In [27]:
lr.coef_

array([3.99999902])

In [28]:
lr.intercept_

2.9999992658668226