&emsp;이번 포스팅에서는 파이썬 **Dask Bag**를 활용한 간단한 예제를 다뤄보도록 하겠습니다.

# 목차

---

# Dask Bag

&emsp;Dask Bag은 `map`, `filter`, `groupby`, 그 외 집계 작업처럼 파이썬 컬렉션 객체에 대해 수행할 수 있는 여러 작업들을 구현해놓았습니다. 이 작업들을 수행하기 위해 Python 반복자(iterators)를 사용하여, 적은 메모리에서도 병렬 작업을 수행합니다. 이러한 점은 병렬 버전의 Itertools 또는 PySpark RDD와 유사합니다.  

&emsp;특히 로그 파일, JSON records, 사용자 정의 Python 객체에 대해 간단한 전처리를 수행하는 데에 이 Dask Bag이 사용됩니다.

---

# Dask 클라이언트

&emsp;Dask 클라이언트(client)는 필요에 따라 실행해도 되고, 실행하지 않아도 됩니다. Dask 클라이언트는 연산 작업 현황에 대한 정보를 얻는데 유용한 대시보드(dashboard)를 제공합니다.

&emsp;아래 코드와 같이 클라이언트를 생성하면 대시보드에 대한 링크가 표시됩니다. 이 링크를 클릭해, 작업을 실행하는 동안 다른 화면 한쪽에 대시보드를 열어 두는 것이 좋습니다. 대시보드를 작업 화면과 동시에 보는 것은 분석 및 학습을 수행할 때 매우 유용합니다.

In [1]:
from dask.distributed import Client

client = Client(n_workers=4, threads_per_worker=1)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 64098 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:64098/status,

0,1
Dashboard: http://127.0.0.1:64098/status,Workers: 4
Total threads: 4,Total memory: 15.87 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:64101,Workers: 4
Dashboard: http://127.0.0.1:64098/status,Total threads: 4
Started: Just now,Total memory: 15.87 GiB

0,1
Comm: tcp://127.0.0.1:64127,Total threads: 1
Dashboard: http://127.0.0.1:64134/status,Memory: 3.97 GiB
Nanny: tcp://127.0.0.1:64104,
Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-vgafxr4o,Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-vgafxr4o

0,1
Comm: tcp://127.0.0.1:64126,Total threads: 1
Dashboard: http://127.0.0.1:64132/status,Memory: 3.97 GiB
Nanny: tcp://127.0.0.1:64105,
Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-qa6tkdnu,Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-qa6tkdnu

0,1
Comm: tcp://127.0.0.1:64125,Total threads: 1
Dashboard: http://127.0.0.1:64130/status,Memory: 3.97 GiB
Nanny: tcp://127.0.0.1:64106,
Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-tmiaomdt,Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-tmiaomdt

0,1
Comm: tcp://127.0.0.1:64123,Total threads: 1
Dashboard: http://127.0.0.1:64128/status,Memory: 3.97 GiB
Nanny: tcp://127.0.0.1:64107,
Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-5pw300no,Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-5pw300no


---

# 데이터셋 생성

&emsp;여기에서 사용할 데이터셋을 만들기 위해 아래 코드와 같이 임의의 레코드 데이터셋을 생성하고, 여러 개의 JSON 파일에 저장합니다.  

In [2]:
import dask
import json
import os

os.makedirs('data', exist_ok=True)              # data 디렉토리 생성

b = dask.datasets.make_people()                 # dask.datasets에서 제공하는 make_people 데이터셋 사용
b.map(json.dumps).to_textfiles('data/*.json')   # JSON 파일로 저장

['C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/0.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/1.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/2.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/3.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/4.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/5.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/6.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/7.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/8.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/9.json']

---

# JSON 데이터 읽기

&emsp;이제 JSON 파일 형식의 데이터가 있으므로 Dask Bag과 Python JSON 모듈을 사용하여 데이터를 읽어오겠습니다.  

In [4]:
import dask.bag as db
import json

b = db.read_text('data/*.json').map(json.loads)
b

dask.bag<loads, npartitions=10>

In [5]:
b.take(2)

({'age': 48,
  'name': ['Lashaunda', 'Valenzuela'],
  'occupation': 'Purchase Clerk',
  'telephone': '+1-706-722-4054',
  'address': {'address': '1049 Mesa Lane', 'city': 'Kissimmee'},
  'credit-card': {'number': '3746 504439 67088', 'expiration-date': '09/20'}},
 {'age': 24,
  'name': ['Antone', 'Guzman'],
  'occupation': 'Horse Trainer',
  'telephone': '+18644106724',
  'address': {'address': '698 Augusta Landing', 'city': 'Fredericksburg'},
  'credit-card': {'number': '2455 0521 7628 0268',
   'expiration-date': '12/22'}})

---

# Map, Filter, Aggregate

&emsp;특정 레코드만 필터링하고, 데이터를 처리하기 위해 함수를 매핑(mapping)하고, 그 결과를 집계하는 순서로 데이터를 처리해보겠습니다.

In [9]:
b.filter(lambda record: record['age'] > 30).take(2)  # 나이가 30보다 큰 사람만 추출

({'age': 48,
  'name': ['Lashaunda', 'Valenzuela'],
  'occupation': 'Purchase Clerk',
  'telephone': '+1-706-722-4054',
  'address': {'address': '1049 Mesa Lane', 'city': 'Kissimmee'},
  'credit-card': {'number': '3746 504439 67088', 'expiration-date': '09/20'}},
 {'age': 35,
  'name': ['Emelia', 'Jacobs'],
  'occupation': 'Tiler',
  'telephone': '+1-678-760-1845',
  'address': {'address': '428 Pine Shore', 'city': 'Atlantic City'},
  'credit-card': {'number': '5334 5278 4048 0824',
   'expiration-date': '09/22'}})

In [10]:
b.map(lambda record: record['occupation']).take(2)  # 직업만 추출

('Purchase Clerk', 'Horse Trainer')

In [11]:
b.count().compute()  # 총 레코드 개수

10000

&emsp;위처럼 하나의 파이프라인에서 단일 작업만 수행해도 되지만, 일반적으로 하나의 파이프라인에서 여러 작업을 수행합니다. 이 경우, 파이프라인을 구성한 후 마지막에 `compute` 또는 `take`를 호출하면 실행된 연산 결과를 얻을 수 있습니다.

In [12]:
result = (b.filter(lambda record: record['age'] > 30)
           .map(lambda record: record['occupation'])
           .frequencies(sort=True)
           .topk(10, key=1))
result

dask.bag<topk-aggregate, npartitions=1>

In [13]:
result.compute()

[('Church Officer', 17),
 ('Typesetter', 15),
 ('Building Surveyor', 15),
 ('Plate Layer', 15),
 ('Accounts Assistant', 14),
 ('Locksmith', 14),
 ('Building Inspector', 14),
 ('Cafe Owner', 14),
 ('Works Manager', 13),
 ('Optical Advisor', 13)]

---

# 변환 및 저장

&emsp;위처럼 분석 결과를 바로 표출할 수도 있지만, 향후 추가적인 분석을 위해 전처리한 데이터를 디스크에 저장해야 할 경우도 있습니다. 이를 위해 `to_textfiles`와 `json.dumps`와 같은 메서드를 사용하거나, Dask Dataframes로 데이터 유형을 변환할 수도 있습니다.

In [14]:
(b.filter(lambda record: record['age'] > 30)  # 나이가 30보다 큰 레코드만 추출
  .map(json.dumps)                            # 파이썬 객체를 텍스트로 전환
  .to_textfiles('data/processed.*.json'))     # JSON 파일로 저장

['C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.0.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.1.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.2.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.3.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.4.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.5.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.6.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.7.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.8.json',
 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.9.json']

## Dask DataFrame으로 변환

&emsp;Dask Bag은 데이터를 읽고, 간단한 전처리를 수행하고, 이후 Dask Dataframes처럼 더 효율적인 데이터 형식으로 변환하는 데 적합합니다. Dask Dataframes는 Pandas 라이브러리의 기능들을 내부적으로 사용하므로, 숫자 데이터를 처리하는 데에 훨씬 빠르고 복잡한 알고리즘 작업을 수행할 수 있습니다.