# Dask 사용법 기초
### Dask 패키지 : Pandas 데이터 프레임 형식으로 빅데이터룰 처리하기 위한 파이썬 패키지
#### 기능1 : 가상 데이터프레임
#### 기능2 : 병렬처리용 작업 스케쥴러

# 가상 데이터 프레임
### 메모리 크기에 관계없이 엄청나게 큰 CSV 파일을 가상 데이터프레임으로 로드하거나 같은 형식의 데이터를 가진 여러개의 csv 파일을 하나의 가상 데이터프레임에 로드 할 수 있다.

In [1]:
!mkdir -p data
%cd data

C:\pythondev\data


하위 디렉터리 또는 파일 -p이(가) 이미 있습니다.
다음 내용 진행 중 오류 발생: -p.
하위 디렉터리 또는 파일 data이(가) 이미 있습니다.
다음 내용 진행 중 오류 발생: data.


In [2]:
%%writefile data1.csv
time,temperature,humidity
0,22,58
1,21,57
2,25,57
3,26,55
4,22,53
5,23,59

Writing data1.csv


In [3]:
import dask.dataframe as dd

In [4]:
df = dd.read_csv("data1.csv")
df

dd.DataFrame<from-de..., npartitions=1>

### df는 데이터 프레임과 유사하지만 실제로 데이터를 메모리에 읽지 않았기에 값은 표시되지 않음
### head, tail명령을 내리면 일부 읽어서 표시

In [5]:
df.head()

Unnamed: 0,time,temperature,humidity
0,0,22,58
1,1,21,57
2,2,25,57
3,3,26,55
4,4,22,53


In [6]:
df.temperature.mean()

dd.Scalar<series-..., dtype=float64>

In [7]:
df.temperature.mean().compute()

23.166666666666668

In [8]:
(df.temperature * 9 / 5 + 32).compute()

0    71.6
1    69.8
2    77.0
3    78.8
4    71.6
5    73.4
Name: temperature, dtype: float64

In [9]:
df = df.assign(temperature=df.temperature * 9 / 5 + 32)

In [10]:
df.head()

Unnamed: 0,time,temperature,humidity
0,0,71.6,58
1,1,69.8,57
2,2,77.0,57
3,3,78.8,55
4,4,71.6,53


In [11]:
df = df.assign(title=df.temperature.astype(str) + " degree")

In [12]:
df.head()

Unnamed: 0,time,temperature,humidity,title
0,0,71.6,58,71.6 degree
1,1,69.8,57,69.8 degree
2,2,77.0,57,77.0 degree
3,3,78.8,55,78.8 degree
4,4,71.6,53,71.6 degree


# 복수 데이터에 대한 가상 데이터프레임

In [13]:
%%writefile data2.csv
time,temperature,humidity
0,22,58
1,21,57
2,25,57
3,26,55
4,22,53
5,23,59

Writing data2.csv


In [14]:
%%writefile data3.csv
time,temperature,humidity
0,22,58
1,21,57
2,25,57
3,26,55
4,22,53
5,23,59

Writing data3.csv


In [15]:
df = dd.read_csv('data*.csv')

In [16]:
df.count().compute()

time           18
temperature    18
humidity       18
dtype: int64

In [17]:
df.temperature.describe().compute()

count    18.000000
mean     23.166667
std       1.823055
min      21.000000
25%      22.000000
50%      22.500000
75%      24.500000
max      26.000000
dtype: float64

# 대량 데이터의 병렬 처리

In [18]:
!wget -O crime.csv https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD

'wget'은(는) 내부 또는 외부 명령, 실행할 수 있는 프로그램, 또는
배치 파일이 아닙니다.


In [19]:
df = dd.read_csv("crime.csv", dtype=str, error_bad_lines=False, warn_bad_lines=False)
df

FileNotFoundError: [WinError 2] 지정된 파일을 찾을 수 없습니다: 'crime.csv'

In [20]:
df.tail()

Unnamed: 0,time,temperature,humidity
1,1,21,57
2,2,25,57
3,3,26,55
4,4,22,53
5,5,23,59


In [21]:
from dask.diagnostics import ProgressBar

pbar = ProgressBar()
pbar.register()

In [22]:
%%time
df.count().compute()

[########################################] | 100% Completed |  0.1s
Wall time: 125 ms


time           18
temperature    18
humidity       18
dtype: int64

##### Dask는 이러한 대량 데이터의 분석 작업을 돕기 위한 작업 스케줄러(task scheduler)라는 것을 제공한다. 작업 스케줄러는 하나의 작업을 여러개의 쓰레드, 프로세스, 노드 등이 나누어 분담하도록 한다.

### 스케쥴러의 종류

##### dask.get: 단일 쓰레드
##### dask.threaded.get: 멀티쓰레드 풀(pool)
##### dask.multiprocessing.get: 멀티프로세스 풀
##### distributed.Client.get: 여러대의 컴퓨터에서 분산 처리
##### 병렬처리를 위해서는 어떠한 병렬 처리 방식을 사용할지, 작업 프로세스의 갯수는 어떻게 할지 등은 compute 명령에서 인수로 설정해야 한다. 다음 코드는 멀티프로세싱을 하고 12개의 CPU 코어를 동시에 사용하도록 설정한 예이다. (물론 이 코드가 실행되는 컴퓨터가 실제로 12개 이상의 코어를 가지고 있어야 성능이 개선된다.) 

In [23]:
import dask

In [24]:
%%time
df.count().compute(get=dask.multiprocessing.get, num_workers=12)

[########################################] | 100% Completed |  1.7s
Wall time: 1.74 s


time           18
temperature    18
humidity       18
dtype: int64