# Dask 사용법 기초

* 가상 데이터프레임
* 병렬처리용 작업 스케줄러

## 가상 데이터프레임
메모리상에 모든 데이터를 로드 하는 것이 아니라 파일, 혹은 데이터베이스에 존재하는 채로 처리 할 수 있다. 
여러개의 CSV 파일을 하나의 가상 데이터프레임으로 로드 할수 있다.

In [9]:
!mkdir -p data
%cd data

C:\Users\student\Documents\jhfolder\GitRepositories\StudyML\DataScienceSchool\data\data\data


In [19]:
%%writefile data1.csv
time,temperature,humidity
0,22,58
1,21,57
2,25,57
3,26,55
4,22,53
5,22,58
6,21,57
7,25,57
8,26,55
9,22,53
10,22,58
11,21,57
12,25,57
13,26,55
14,22,53
15,22,58
16,21,57
17,25,57
18,26,55
19,22,53
20,22,58
21,21,57
22,25,57
23,26,55
24,22,53
25,22,58
26,21,57
27,25,57
28,26,55
29,22,53

Overwriting data1.csv


In [20]:
import dask.dataframe as dd
df = dd.read_csv("data1.csv")
df

Unnamed: 0_level_0,time,temperature,humidity
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,int64,int64,int64
,...,...,...


메모리에 읽지 않았기 때문에 값은 표시되지 않음

In [21]:
df.head()

Unnamed: 0,time,temperature,humidity
0,0,22,58
1,1,21,57
2,2,25,57
3,3,26,55
4,4,22,53


In [25]:
df.temperature.mean()

dd.Scalar<series-..., dtype=float64>

구체적으로 어떤 작업인지를 보려면 visualize 메서드를 사용 (작업 그래프(graph))<br>
연산 결과가 바로 나오지 않음


In [23]:
# 실제로 연산 시킴 compute
df.temperature.mean().compute()

23.2

In [27]:
# 화씨로 단위 변환
(df.temperature * 9 / 5 + 32).compute().head()

0    71.6
1    69.8
2    77.0
3    78.8
4    71.6
Name: temperature, dtype: float64

* 컬럼에 해당하는 값을 바꿈 <br>
assign 메서드를 사용 시에 compute 할 필요 없음 

In [30]:
df = df.assign(temperature=df.temperature * 9 / 5 + 32)
df.head()

Unnamed: 0,time,temperature,humidity
0,0,160.88,58
1,1,157.64,57
2,2,170.6,57
3,3,173.84,55
4,4,160.88,53


In [31]:
# 새로운 열을 추가
df = df.assign(title=df.temperature.astype(str) + " degree")
df.head()

Unnamed: 0,time,temperature,humidity,title
0,0,160.88,58,160.88 degree
1,1,157.64,57,157.64 degree
2,2,170.6,57,170.6 degree
3,3,173.84,55,173.84 degree
4,4,160.88,53,160.88 degree


# 복수 데이터에 대한 가상 데이터프레임

In [32]:
%%writefile data2.csv
time,temperature,humidity
0,22,58
1,21,57
2,25,57
3,26,55
4,22,53
5,23,59

Writing data2.csv


In [33]:
%%writefile data3.csv
time,temperature,humidity
0,22,58
1,21,57
2,25,57
3,26,55
4,22,53
5,23,59

Writing data3.csv


In [34]:
# 하나의 테이블처럼 사용 가능 
df = dd.read_csv('data*.csv')

In [35]:
df.count().compute()

time           42
temperature    42
humidity       42
dtype: int64

In [36]:
df.temperature.describe().compute()

count    42.000000
mean     23.190476
std       1.915764
min      21.000000
25%      22.000000
50%      22.500000
75%      25.000000
max      26.000000
dtype: float64

# 대량 데이터의 병렬 처리

시카고의 범죄 관련 데이터
https://catalog.data.gov/dataset/crimes-2001-to-present-398a4

In [4]:
import dask.dataframe as dd
df = dd.read_csv("crimes.csv", dtype=str, error_bad_lines=False, warn_bad_lines=False)
df

Unnamed: 0_level_0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
npartitions=24,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1
,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [5]:
df.tail()

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
159620,1887319,G737480,12/08/2001 09:52:00 PM,031XX S ASHLAND AV,1310,CRIMINAL DAMAGE,TO PROPERTY,COIN OPERATED MACHINE,False,False,...,,,14,1166221,1883706,2001,08/17/2015 03:03:40 PM,41.836466744,-87.665570118,"(41.836466744, -87.665570118)"
159621,1887320,G737430,12/09/2001 03:30:00 PM,048XX W NORTH AV,820,THEFT,$500 AND UNDER,GROCERY FOOD STORE,False,False,...,,,06,1143859,1910159,2001,08/17/2015 03:03:40 PM,41.90950483,-87.746962481,"(41.90950483, -87.746962481)"
159622,1887321,G737164,12/09/2001 12:35:00 PM,045XX N MELVINA AV,460,BATTERY,SIMPLE,RESIDENCE,False,True,...,,,08B,1134153,1929713,2001,08/17/2015 03:03:40 PM,41.96333973,-87.782156862,"(41.96333973, -87.782156862)"
159623,1887322,G736767,12/09/2001 05:00:00 AM,032XX W NORTH AV,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,False,False,...,,,14,1154385,1910409,2001,08/17/2015 03:03:40 PM,41.909986882,-87.708287568,"(41.909986882, -87.708287568)"
159624,1887323,G737279,12/09/2001 02:05:57 PM,001XX N STATE ST,820,THEFT,$500 AND UNDER,DEPARTMENT STORE,True,False,...,,,06,1176391,1900935,2001,08/17/2015 03:03:40 PM,41.88352126,-87.627733247,"(41.88352126, -87.627733247)"


Progress Bar 보여주기

In [7]:
from dask.diagnostics import ProgressBar

pbar = ProgressBar()
pbar.register()

In [12]:
%%time
df.count().compute()

[########################################] | 100% Completed | 44.1s
[########################################] | 100% Completed | 44.2s
Wall time: 44.2 s


ID                      6409902
Case Number             6409898
Date                    6409902
Block                   6409902
IUCR                    6409902
Primary Type            6409902
Description             6409902
Location Description    6407271
Arrest                  6409902
Domestic                6409902
Beat                    6409902
District                6409853
Ward                    5795048
Community Area          5793872
FBI Code                6409902
X Coordinate            6327573
Y Coordinate            6327573
Year                    6409902
Updated On              6409902
Latitude                6327573
Longitude               6327573
Location                6327573
dtype: int64

* 작업 스케줄러(task scheduler) 
    - dask.get: 단일 쓰레드
    - dask.threaded.get: 멀티쓰레드 풀(pool)
    - dask.multiprocessing.get: 멀티프로세스 풀
    - distributed.Client.get: 여러대의 컴퓨터에서 분산 처리
    
    
멀티프로세싱을 하고 12개의 CPU 코어를 동시에 사용

In [11]:
%%time
import dask

df.count().compute(get=dask.multiprocessing.get, num_workers=12)

[########################################] | 100% Completed | 59.4s
[########################################] | 100% Completed | 59.5s
Wall time: 59.6 s
