# Dask test

In [21]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
from random import randint, uniform, choices
from string import ascii_lowercase
import multiprocessing as mp
from datetime import datetime, timedelta
import glob

In [38]:
def create_test_set(length, int_cols, float_cols, category_cols, text_cols):
    col_count = 0
    
    # ints
    data = {'col%003d' % col: [randint(0, 100000) for i in range(length)] for col in range(int_cols)}
    col_count += int_cols
    
    # floats
    data.update({'col%003d' % col: [uniform(0.0, 100000) for i in range(length)] for col in range(col_count, col_count + float_cols)})
    col_count += float_cols
    
    # categories
    data.update({'col%003d' % col: [['col%003d_cat%d' % (col, cat) for cat in range(10)][cat_] for cat_ in [randint(0,9) for i in range(length)]] for col in range(col_count, col_count + category_cols)})
    col_count += category_cols
    
    # text
    data.update({'col%003d' % col: [''.join(choices(ascii_lowercase, k=15)) for i in range(length)] for col in range(col_count, col_count + text_cols)})
    
    return pd.DataFrame(data)

In [17]:
for i in range(9):
    create_test_set(20000,20,20,60,20).to_csv('./Dataset_%d.csv' % i)

In [18]:
create_test_set(200000,20,20,60,20).to_csv('./Dataset_10.csv')

In [8]:
data = pd.read_csv('./Dataset_10.csv')

In [19]:
for i in range(30):
    date = datetime.strftime(datetime.today() + timedelta(days=i), '%Y%m%d')
    data['extraction_date'] = date
    data.to_parquet('./Dataset/%s.parq' % date) 

In [22]:
data.to_parquet('./Dataset_10.parquet')

In [24]:
data.to_parquet('./Dataset_10_brotli.parquet', compression='brotli')

In [25]:
data.to_parquet('./Dataset_10_gzip.parquet', compression='gzip')

In [5]:
data_cat = data.copy()

In [6]:
for col in range(40,100):
    data_cat['col%002d' % col] = data_cat['col%002d' % col].astype('category')

In [28]:
data.info(memory_usage='deep')

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 200000 entries, 0 to 199999
Columns: 121 entries, Unnamed: 0 to col99
dtypes: float64(20), int64(21), object(80)
memory usage: 1.1 GB


In [29]:
data_cat.info(memory_usage='deep')

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 200000 entries, 0 to 199999
Columns: 121 entries, Unnamed: 0 to col99
dtypes: category(60), float64(20), int64(21), object(20)
memory usage: 348.7 MB


In [30]:
data_cat.to_parquet('./Dataset_10_cat.parquet')

In [7]:
data_cat.to_parquet('./Dataset_10_cat_fastparquet.parquet', engine='fastparquet')

In [25]:
datasets = glob.glob('./Dataset/*.parq')

In [26]:
dd_df = dd.read_parquet(datasets)

In [28]:
dd_df.groupby(['extraction_date']).extraction_date.count().compute()

extraction_date
20180630    200000
20180714    200000
20180704    200000
20180719    200000
20180715    200000
20180721    200000
20180716    200000
20180708    200000
20180702    200000
20180706    200000
20180627    200000
20180701    200000
20180622    200000
20180713    200000
20180709    200000
20180718    200000
20180710    200000
20180720    200000
20180705    200000
20180625    200000
20180717    200000
20180623    200000
20180711    200000
20180712    200000
20180624    200000
20180707    200000
20180628    200000
20180703    200000
20180629    200000
20180626    200000
Name: extraction_date, dtype: int64

In [10]:
dd_df.col01.max().compute()

100000

In [2]:
from dask.distributed import Client, LocalCluster

In [3]:
cluster = LocalCluster()
client = Client(cluster)

In [4]:
print(cluster)

LocalCluster('tcp://127.0.0.1:35362', workers=8, ncores=8)
