---
# Jan 23, 20 Dask Basic
* Name: Jikhan Jeong
* Ref: https://docs.dask.org/en/latest/install.html (conda install dask)
* Ref; https://datascienceschool.net/view-notebook/2282b75b2a63448087b77269885c27cb/ (Korean)
* Ref: https://towardsdatascience.com/trying-out-dask-dataframes-in-python-for-fast-data-analysis-in-parallel-aa960c18a915 (English)
---

In [1]:
import pandas as pd
import numpy as np

# Part 1: Basic
* Ref; https://datascienceschool.net/view-notebook/2282b75b2a63448087b77269885c27cb/ (Korean)

In [2]:
%%writefile data1.csv
time,temperature,humidity
0,22,58
1,21,57
2,25,57
3,26,55
4,22,53
5,23,59

Overwriting data1.csv


In [28]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()

In [4]:
ls

 Volume in drive C has no label.
 Volume Serial Number is 86A3-4B65

 Directory of C:\Users\jikhan.jeong\2020 dask

01/23/2020  10:02 PM    <DIR>          .
01/23/2020  10:02 PM    <DIR>          ..
01/23/2020  10:02 PM    <DIR>          .ipynb_checkpoints
01/23/2020  06:45 PM       264,273,991 crime.csv
01/24/2020  12:19 PM                81 data1.csv
01/23/2020  06:05 PM                81 data2.csv
01/23/2020  06:05 PM                81 data3.csv
01/23/2020  06:59 PM            45,677 Jan 23, 20 Dask Basic.ipynb
               5 File(s)    264,319,911 bytes
               3 Dir(s)  23,029,207,040 bytes free


In [5]:
df = dd.read_csv("data1.csv")
df

Unnamed: 0_level_0,time,temperature,humidity
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,int64,int64,int64
,...,...,...


In [6]:
type(df)

dask.dataframe.core.DataFrame

In [7]:
df.head(1)

Unnamed: 0,time,temperature,humidity
0,0,22,58


In [8]:
df.temperature.mean() # task so no results

dd.Scalar<series-..., dtype=float64>

In [9]:
df.temperature.mean().compute()

23.166666666666668

In [10]:
df.temperature.compute()

0    22
1    21
2    25
3    26
4    22
5    23
Name: temperature, dtype: int64

In [11]:
(df.temperature*10+5).compute()

0    225
1    215
2    255
3    265
4    225
5    235
Name: temperature, dtype: int64

---
# assign : change the input of dd.dataframe
---

In [12]:
df = df.assign(temperature = df.temperature*10)
df.head(1)

Unnamed: 0,time,temperature,humidity
0,0,220,58


In [13]:
df = df.assign(title = df.temperature.astype(str) + "jikhan is handsome")
df.head()

Unnamed: 0,time,temperature,humidity,title
0,0,220,58,220jikhan is handsome
1,1,210,57,210jikhan is handsome
2,2,250,57,250jikhan is handsome
3,3,260,55,260jikhan is handsome
4,4,220,53,220jikhan is handsome


---
# Dataframe for multiple dataset`
* read a file with while-card(*)
* E.g. df = dd.read_csv('data*.csv') # read data1, data2, data3
---

In [14]:
%%writefile data2.csv
time,temperature,humidity
0,22,58
1,21,57
2,25,57
3,26,55
4,22,53
5,23,59

Overwriting data2.csv


In [15]:
%%writefile data3.csv
time,temperature,humidity
0,22,58
1,21,57
2,25,57
3,26,55
4,22,53
5,23,59

Overwriting data3.csv


In [16]:
ls *.csv

 Volume in drive C has no label.
 Volume Serial Number is 86A3-4B65

 Directory of C:\Users\jikhan.jeong\2020 dask

01/23/2020  06:45 PM       264,273,991 crime.csv
01/24/2020  12:19 PM                81 data1.csv
01/24/2020  12:19 PM                81 data2.csv
01/24/2020  12:19 PM                81 data3.csv
               4 File(s)    264,274,234 bytes
               0 Dir(s)  23,028,768,768 bytes free


In [17]:
df = dd.read_csv('data*.csv')
df.head(12)



Unnamed: 0,time,temperature,humidity
0,0,22,58
1,1,21,57
2,2,25,57
3,3,26,55
4,4,22,53
5,5,23,59


In [18]:
len(df)

18

In [19]:
df.compute()

Unnamed: 0,time,temperature,humidity
0,0,22,58
1,1,21,57
2,2,25,57
3,3,26,55
4,4,22,53
5,5,23,59
0,0,22,58
1,1,21,57
2,2,25,57
3,3,26,55


In [20]:
df.count()

Dask Series Structure:
npartitions=1
humidity    int64
time          ...
dtype: int64
Dask Name: dataframe-count-agg, 13 tasks

In [21]:
df.count().compute()

time           18
temperature    18
humidity       18
dtype: int64

In [22]:
df.temperature.describe().compute()

count    18.000000
mean     23.166667
std       1.823055
min      21.000000
25%      22.000000
50%      22.500000
75%      24.500000
max      26.000000
Name: temperature, dtype: float64

---
# Handle Bigdata with parallel approach with dask 

* Dataset: https://catalogtdatatgovudatasetucrimess2001stospresents398a4 (1.3GB) 
* (Window) * using **urllib.reques** instead of wget in Linux
* (Linux) !wget -O crime.csv https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD (Linux)
* Ref: https://stackoverflow.com/questions/57748687/downloading-files-in-jupyter-wget-on-windows
---

In [31]:
import urllib.request

In [35]:
url = 'https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD'
filename = 'crime.csv'
urllib.request.urlretrieve(url, filename)

('crime.csv', <http.client.HTTPMessage at 0x1385f81fba8>)

In [23]:
ls *.csv

 Volume in drive C has no label.
 Volume Serial Number is 86A3-4B65

 Directory of C:\Users\jikhan.jeong\2020 dask

01/23/2020  06:45 PM       264,273,991 crime.csv
01/24/2020  12:19 PM                81 data1.csv
01/24/2020  12:19 PM                81 data2.csv
01/24/2020  12:19 PM                81 data3.csv
               4 File(s)    264,274,234 bytes
               0 Dir(s)  23,028,736,000 bytes free


In [24]:
df = dd.read_csv("crime.csv", dtype= str, error_bad_lines=False, warn_bad_lines=False)
df

Unnamed: 0_level_0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1
,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [25]:
df.tail()

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
34695,11373380,JB340646,0{,,,,,,,,...,,,,,,,,,,
34696,"""error"" : true",,,,,,,,,,...,,,,,,,,,,
34697,"""message"" : ""Internal error""",,,,,,,,,,...,,,,,,,,,,
34698,"""status"" : 500",,,,,,,,,,...,,,,,,,,,,
34699,},,,,,,,,,,...,,,,,,,,,,


---
# ProgressBar 
* To know the progress of work in dask task
* Show the progress of dask work as a bar
---

In [29]:
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()

In [30]:
%%time
df.count().compute()

[########################################] | 100% Completed | 11.5s
[########################################] | 100% Completed | 11.6s
[########################################] | 100% Completed | 11.7s
Wall time: 11.7 s


ID                      1127927
Case Number             1127922
Date                    1127923
Block                   1127922
IUCR                    1127922
Primary Type            1127922
Description             1127922
Location Description    1123245
Arrest                  1127922
Domestic                1127922
Beat                    1127922
District                1127921
Ward                    1118017
Community Area          1118079
FBI Code                1127922
X Coordinate            1060394
Y Coordinate            1060394
Year                    1127922
Updated On              1127922
Latitude                1060394
Longitude               1060394
Location                1060394
dtype: int64

---
# Task Scheduler
* dask.get: single thread
* dask.threaded.get: multiple thread pool
* dask.multiprocessing.get : multiprocess pool
* distributed.Client.get: multiple computer
---

* 4 CPUs Demo
* 0.6s faster

In [31]:
%%time
df.count().compute(scheduler='processes', num_workers=4) # 4 Cpus

[########################################] | 100% Completed | 10.3s
[########################################] | 100% Completed | 10.3s
[########################################] | 100% Completed | 10.4s
Wall time: 10.6 s


ID                      1127927
Case Number             1127922
Date                    1127923
Block                   1127922
IUCR                    1127922
Primary Type            1127922
Description             1127922
Location Description    1123245
Arrest                  1127922
Domestic                1127922
Beat                    1127922
District                1127921
Ward                    1118017
Community Area          1118079
FBI Code                1127922
X Coordinate            1060394
Y Coordinate            1060394
Year                    1127922
Updated On              1127922
Latitude                1060394
Longitude               1060394
Location                1060394
dtype: int64

# Part 2: Basic 2
* Ref: https://towardsdatascience.com/trying-out-dask-dataframes-in-python-for-fast-data-analysis-in-parallel-aa960c18a915 (English)
* Ref: https://stackoverflow.com/questions/54028190/how-can-a-dask-worker-access-the-total-number-of-workers-currently-in-the-cluste (n_workers)

In [38]:
from dask import dataframe as dd

In [43]:
type(df)

dask.dataframe.core.DataFrame

In [42]:
df.head(10)

[########################################] | 100% Completed |  2.8s
[########################################] | 100% Completed |  2.9s
[########################################] | 100% Completed |  2.9s


Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
0,11034701,JA366925,01/01/2001 11:00:00 AM,016XX E 86TH PL,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,...,8,45,11,,,2001,08/05/2017 03:50:08 PM,,,
1,11227287,JB147188,10/08/2017 03:00:00 AM,092XX S RACINE AVE,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,RESIDENCE,False,False,...,21,73,2,,,2017,02/11/2018 03:57:41 PM,,,
2,11227583,JB147595,03/28/2017 02:00:00 PM,026XX W 79TH ST,620,BURGLARY,UNLAWFUL ENTRY,OTHER,False,False,...,18,70,5,,,2017,02/11/2018 03:57:41 PM,,,
3,11227293,JB147230,09/09/2017 08:17:00 PM,060XX S EBERHART AVE,810,THEFT,OVER $500,RESIDENCE,False,False,...,20,42,6,,,2017,02/11/2018 03:57:41 PM,,,
4,11227634,JB147599,08/26/2017 10:00:00 AM,001XX W RANDOLPH ST,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,HOTEL/MOTEL,False,False,...,42,32,2,,,2017,02/11/2018 03:57:41 PM,,,
5,11227517,JB138481,02/10/2013 12:00:00 AM,071XX S LAFAYETTE AVE,266,CRIM SEXUAL ASSAULT,PREDATORY,RESIDENCE,False,False,...,6,69,2,,,2013,02/11/2018 03:57:41 PM,,,
6,11227503,JB146383,01/01/2015 12:01:00 AM,061XX S KILBOURN AVE,1751,OFFENSE INVOLVING CHILDREN,CRIM SEX ABUSE BY FAM MEMBER,RESIDENCE,False,True,...,13,65,17,,,2015,04/12/2019 04:00:15 PM,,,
7,11227508,JB146365,01/01/2017 12:01:00 AM,027XX S WHIPPLE ST,1754,OFFENSE INVOLVING CHILDREN,AGG SEX ASSLT OF CHILD FAM MBR,RESIDENCE,False,False,...,12,30,2,,,2017,02/11/2018 03:57:41 PM,,,
8,11022695,JA353568,07/17/2017 10:10:00 AM,021XX W MC LEAN AVE,810,THEFT,OVER $500,RESIDENCE,False,False,...,32,22,6,,,2017,07/24/2017 03:54:23 PM,,,
9,11227633,JB147500,12/28/2017 03:55:00 PM,011XX S MICHIGAN AVE,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,,False,False,...,2,32,11,,,2017,02/11/2018 03:57:41 PM,,,


In [41]:
sd = dd.from_pandas(df,npartitions=1)

[########################################] | 100% Completed |  9.4s
[########################################] | 100% Completed |  9.5s
[########################################] | 100% Completed |  9.6s


AttributeError: 'Index' object has no attribute 'is_monotonic_increasing'