In [96]:
import pandas as pd
import multiprocessing as mp
import dask.dataframe as dd
import dask.array as da
import numpy as np
from multiprocessing import Pool

### Multiprocessig vs. Dask 

In [2]:
#--Simple arithmetic
def simple_sum(x, y):
    add = x+y
    return add
NUM_PROCESSES = 4
pool = mp.Pool(processes=NUM_PROCESSES)

In [118]:
%time add_result = pool.apply(simple_sum, args=(3,4))

CPU times: user 1.65 ms, sys: 4.94 ms, total: 6.59 ms
Wall time: 15.4 ms


In [4]:
%time add_result = simple_sum(3, 4)

CPU times: user 4 µs, sys: 1 µs, total: 5 µs
Wall time: 7.15 µs


In [5]:
a = [1, 2, 3]
b = [1, 2, 3]

In [6]:
%time results = pool.apply(simple_sum, args=(a,b))
print(results)

CPU times: user 579 µs, sys: 3.34 ms, total: 3.92 ms
Wall time: 2.57 ms
[1, 2, 3, 1, 2, 3]


In [7]:
%time results = simple_sum(a, b)
print results

CPU times: user 6 µs, sys: 1e+03 ns, total: 7 µs
Wall time: 11.9 µs
[1, 2, 3, 1, 2, 3]


In [8]:
#---dask
a = np.arange(1000)
b = np.arange(1000)
y = da.from_array(a, chunks=(100))
x = da.from_array(b, chunks=(100))

In [9]:
%time add_sum = simple_sum(y,x)

CPU times: user 1.34 ms, sys: 743 µs, total: 2.08 ms
Wall time: 2.21 ms


In [10]:
%time add_sum = pool.apply(simple_sum, args=(a,b))

CPU times: user 781 µs, sys: 1.2 ms, total: 1.98 ms
Wall time: 1.71 ms


In [120]:
%time add_sum =simple_sum(a,b)

CPU times: user 690 µs, sys: 2.41 ms, total: 3.1 ms
Wall time: 3.15 ms


### Working with Data

#### Dask

In [79]:
#---read data
df = pd.read_csv("un-general-debates.csv", encoding='utf-8')

In [26]:
#---visual check
df.head()

Unnamed: 0,session,year,country,text
0,44,1989,MDV,﻿It is indeed a pleasure for me and the member...
1,44,1989,FIN,"﻿\nMay I begin by congratulating you. Sir, on ..."
2,44,1989,NER,"﻿\nMr. President, it is a particular pleasure ..."
3,44,1989,URY,﻿\nDuring the debate at the fortieth session o...
4,44,1989,ZWE,﻿I should like at the outset to express my del...


In [32]:
def clean_text(row):
    clean = ''.join(i for i in row if not i.isdigit())
    return clean

In [43]:
%time df['cleaned'] = df['text'].apply(clean_text)

CPU times: user 35.3 s, sys: 3.85 s, total: 39.1 s
Wall time: 36.6 s


In [123]:
chunks=pd.read_csv('un-general-debates.csv',chunksize=100, encoding='utf-8') #read df in chunks
df=pd.DataFrame()
%time df=pd.concat(chunks)

CPU times: user 1.36 s, sys: 265 ms, total: 1.62 s
Wall time: 1.64 s


In [124]:
%time df2 = dd.from_pandas(df, chunksize=25)


CPU times: user 485 ms, sys: 251 ms, total: 736 ms
Wall time: 1.03 s


In [64]:
%time df2['cleaned'] = df2['text'].apply(clean_text)

CPU times: user 9.54 ms, sys: 4.11 ms, total: 13.7 ms
Wall time: 13.1 ms


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


#### Multiprocessing

In [103]:
chunks=pd.read_csv('un-general-debates.csv',chunksize=100) #read df in chunks
df3=pd.DataFrame()
%time df3=pd.concat(chunks)

CPU times: user 1.1 s, sys: 144 ms, total: 1.25 s
Wall time: 1.25 s


In [115]:
#---set up workers
NUM_PARTITIONS = 10 
NUM_PROCESSES = 4

In [106]:
def parallelize_dataframe(df, func):
    df_split = np.array_split(df, NUM_PARTITIONS)
    pool = Pool(NUM_PROCESSES)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

def clean_textv2(data):
    data = data['text'].str.replace('\d+', '')
    return data

In [104]:
%time df3['cleaned'] = parallelize_dataframe(df3, clean_textv2)


CPU times: user 251 ms, sys: 334 ms, total: 586 ms
Wall time: 2.48 s


In [126]:
df = dd.from_pandas(pd.read_csv("un-general-debates.csv", encoding='utf-8'), chunksize=25)


In [127]:
df.head()

Unnamed: 0,session,year,country,text
0,44,1989,MDV,﻿It is indeed a pleasure for me and the member...
1,44,1989,FIN,"﻿\nMay I begin by congratulating you. Sir, on ..."
2,44,1989,NER,"﻿\nMr. President, it is a particular pleasure ..."
3,44,1989,URY,﻿\nDuring the debate at the fortieth session o...
4,44,1989,ZWE,﻿I should like at the outset to express my del...
