# Cours 05 : Données non structurées & programmation fonctionnelle

## Troisième partie : dask 

Dask se présente comme une surcouche à toolz/cytoolz et numpy.
Il n'offre pas de nouvelles fonctionnalité, mais vous permet de paralléliser les fonctions existantes.

In [1]:
import cytoolz as ct # import groupby, valmap, compose
import cytoolz.curried as ctc ## pipe, map, filter, get
import sqlite3
import pprint
try:
    import ujson as json
except:
    import json

conn_sqlite = sqlite3.connect("twitter_for_network_100000.db")
cursor_sqlite = conn_sqlite.cursor()
conn_sqlite_f = sqlite3.connect("twitter_for_network_full.db")
cursor_sqlite_f = conn_sqlite_f.cursor()

In [40]:
import os, psutil, gc
import resource

def memory_usage_psutil():
    gc.collect()
    process = psutil.Process(os.getpid())
    mem = process.memory_info()[0] / float(2 ** 20)

    print( "Memory used : %i MB" % mem )
    print( "Max memory usage : %i MB" % (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss//1024) )

On extrait les données de la base et on les écrit dans un fichiers plat.

In [2]:
cursor_sqlite_f.execute("select content FROM tw_users")

with open("tw_users_all.json", 'w') as f:
    ct.count( ct.map( f.write, ct.interpose( "\n", ct.pluck(0, cursor_sqlite_f ) ) ) )

On sépare ce fichier plat en 50 morceaux.

In [8]:
import subprocess
subprocess.check_call( ["split", "-d", "-n", "l/50", "--additional-suffix", ".json", "tw_users_all.json", "tw_users_split_" ] )

0

In [None]:
import os
import glob 
for it_file in glob.glob("tw_users_split_*.json"):
    subprocess.check_call( ["mv", it_file, "tw_users_splits" ] )

On crée un objet de type dask.bag pour gérer l'ensemble de ces fichiers.

In [10]:
import dask.bag as dbag
try:
    import ujson as json
except:
    import json
from operator import add

a = dbag.from_filenames('tw_users_splits/tw_users_split*.json')

On va ensuite pouvoir utiliser la syntaxe cytoolz sur l'objet dbag.  
dask va alors automatiquement gérer la parallélisation sur les différents fichiers.

In [17]:
b = a.map(json.loads).pluck("followers_count").fold(add).compute()
%timeit -n1 b = a.map(json.loads).pluck("followers_count").fold(add).compute()
b

1 loops, best of 3: 14 s per loop


1225448091

Attention à l'état de votre mémoire quand vous utilisez dask.

In [16]:
useless = [it**2 for it in range(25000000)]

In [39]:
#del useless

De la même façon, dask vous permet de paralléliser efficacement des opérations effectuées avec numpy.

In [12]:
import numpy
import dask.array as da

big_random = da.random.normal( 1000, 20000, size = (50000,50000), chunks = (1000,1000) )

In [13]:
big_random.mean().compute()

1000.312179118327

Si vous avez des données obtenues de façon classique sur numpy (ici générées de façon aléatoires)

In [38]:
import numpy as np
import h5py

for it in range(100):
    a = np.random.random(size=(5000,5000))
    h5f = h5py.File('data_{0:02d}.h5'.format(it), 'w')
    h5f.create_dataset('dataset_1', data=a)
    h5f.close()
    if it % 5 == 0:
        print(it)
    

0
5
10
15
20
25
30
35
40
45
50
55
60
65
70
75
80
85
90
95


Dask gère de façon transparente cet ensemble de matrice comme une seule matrice de dimension supérieure.

Ci-dessous x est une matrice de taille 100 X 5000 X 5000, ce qui ne tiendrait absolument pas en mémoire.

In [55]:
from glob import glob

dsets = [h5py.File(fn)['dataset_1'] for fn in sorted(glob('data_numpy/data_*.h5'))]
arrays = [da.from_array(dset, chunks=(1000, 1000)) for dset in dsets]
x = da.stack(arrays, axis=0)
print(x.shape)
print( x[:,0,0].sum().compute() )
print( x[:,:,:].sum().compute() )

(100, 5000, 5000)
15.2774704128
1250004675.56


In [56]:
memory_usage_psutil()

Memory used : 362 MB
Max memory usage : 1651 MB
