### Dask Distributed와 대시보드

In [None]:
import dask
import dask.dataframe as dd
import pandas as pd
import os
import glob

# Dask의 분산 스케줄러를 로컬 머신에 띄움.
from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)

# 아래 출력되는 "Dashboard" 링크를 복사해서 브라우저에 열어두세요.
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 7.72 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43571,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:45883,Total threads: 2
Dashboard: http://127.0.0.1:36083/status,Memory: 1.93 GiB
Nanny: tcp://127.0.0.1:39507,
Local directory: /tmp/dask-scratch-space/worker-u349dmpd,Local directory: /tmp/dask-scratch-space/worker-u349dmpd

0,1
Comm: tcp://127.0.0.1:33601,Total threads: 2
Dashboard: http://127.0.0.1:35689/status,Memory: 1.93 GiB
Nanny: tcp://127.0.0.1:33887,
Local directory: /tmp/dask-scratch-space/worker-cmsedkuz,Local directory: /tmp/dask-scratch-space/worker-cmsedkuz

0,1
Comm: tcp://127.0.0.1:39437,Total threads: 2
Dashboard: http://127.0.0.1:45889/status,Memory: 1.93 GiB
Nanny: tcp://127.0.0.1:41845,
Local directory: /tmp/dask-scratch-space/worker-dxktmfpt,Local directory: /tmp/dask-scratch-space/worker-dxktmfpt

0,1
Comm: tcp://127.0.0.1:46669,Total threads: 2
Dashboard: http://127.0.0.1:35763/status,Memory: 1.93 GiB
Nanny: tcp://127.0.0.1:39951,
Local directory: /tmp/dask-scratch-space/worker-m_mtdaxg,Local directory: /tmp/dask-scratch-space/worker-m_mtdaxg


2025-10-31 16:22:28,465 - tornado.application - ERROR - Uncaught exception GET /status/ws (127.0.0.1)
HTTPServerRequest(protocol='http', host='127.0.0.1:8787', method='GET', uri='/status/ws', version='HTTP/1.1', remote_ip='127.0.0.1')
Traceback (most recent call last):
  File "/home/anseh/.pyenv/versions/dask/lib/python3.11/site-packages/tornado/websocket.py", line 965, in _accept_connection
    open_result = handler.open(*handler.open_args, **handler.open_kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/anseh/.pyenv/versions/dask/lib/python3.11/site-packages/tornado/web.py", line 3375, in wrapper
    return method(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/anseh/.pyenv/versions/dask/lib/python3.11/site-packages/bokeh/server/views/ws.py", line 149, in open
    raise ProtocolError("Token is expired. Configure the app with a larger value for --session-token-expiration if necessary")
bokeh.protocol.exception

### 1. 테스트용 대용량 데이터 준비

In [3]:
%%time

data_dir = "dask_data"
os.makedirs(data_dir, exist_ok=True)

# 샘플 데이터 프레임 생성
N_ROWS = 5_000_000 # 5백만 건
N_FILES = 3
rows_per_file = N_ROWS // N_FILES

print(f"총 {N_ROWS} 건의 데이터를 {N_FILES}개의 파일에 나누어 생성합니다...")

for i in range(N_FILES):
    filepath = os.path.join(data_dir, f"data_part_{i}.csv")
    if os.path.exists(filepath):
        print(f"{filepath} 파일이 이미 존재합니다. (생성 건너뛰기)")
        continue
        
    print(f"{filepath} 생성 중...")
    
    # 5백만 건 / 3 = 약 166만 건씩
    df = pd.DataFrame({
        'id': range(i * rows_per_file, (i + 1) * rows_per_file),
        'value': pd.Series(range(rows_per_file)) * (i + 1) * 0.1,
        'category': pd.Series(['A', 'B', 'C', 'D', 'E'] * (rows_per_file // 5))
    })
    df.to_csv(filepath, index=False)

print("데이터 생성 완료.")

총 5000000 건의 데이터를 3개의 파일에 나누어 생성합니다...
dask_data/data_part_0.csv 파일이 이미 존재합니다. (생성 건너뛰기)
dask_data/data_part_1.csv 파일이 이미 존재합니다. (생성 건너뛰기)
dask_data/data_part_2.csv 파일이 이미 존재합니다. (생성 건너뛰기)
데이터 생성 완료.
CPU times: user 0 ns, sys: 7.61 ms, total: 7.61 ms
Wall time: 2.19 s


### 2. Pandas의 한계

In [4]:
%%time

print("Pandas로 파일 3개 읽기 및 병합 시작...")

# 파일 3개를 순회하며 읽어오기
file_paths = glob.glob(os.path.join(data_dir, "*.csv"))
pandas_dfs = []
for f in file_paths:
    pandas_dfs.append(pd.read_csv(f))

# 읽어온 DF 리스트를 하나로 합치기 (메모리 사용량 급증!)
total_df_pandas = pd.concat(pandas_dfs)

print("Pandas 병합 완료.")
print(f"총 행 개수: {len(total_df_pandas)}")

# 평균 계산
mean_val = total_df_pandas['value'].mean()
print(f"Pandas value 평균: {mean_val}")

Pandas로 파일 3개 읽기 및 병합 시작...
Pandas 병합 완료.
총 행 개수: 4999998
Pandas value 평균: 166666.5
CPU times: user 1.18 s, sys: 746 ms, total: 1.93 s
Wall time: 2.98 s


### 3. Dask DataFrame의 접근

In [5]:
%%time

print("Dask로 파일 3개 읽기 시작...")

# "dask_data/" 폴더 밑의 모든 .csv 파일을 읽으라고 '계획'
df_dask = dd.read_csv(os.path.join(data_dir, "*.csv"))

print("Dask '읽기 계획' 완료.")

Dask로 파일 3개 읽기 시작...
Dask '읽기 계획' 완료.
CPU times: user 21.9 ms, sys: 10.3 ms, total: 32.2 ms
Wall time: 54.3 ms


### 4. Dask 연산: compute()

In [6]:
%%time

# 1. '평균' 계산 '계획' 수립
# (아직 계산 안 함. 0.01초 미만 소요)
mean_val_dask_plan = df_dask['value'].mean()

print(f"Dask 계획 객체: {mean_val_dask_plan}")

# 2. '.compute()'로 실제 계산 실행
# 'Task Stream'에 read_csv, mean 등 작업이 여러 코어에서 병렬로 실행됩니다.
print("\n--- Dask .compute() 시작 ---")
mean_val_dask = mean_val_dask_plan.compute()
print("--- Dask .compute() 완료 ---")

print(f"Dask value 평균: {mean_val_dask}")

Dask 계획 객체: <dask_expr.expr.Scalar: expr=ArrowStringConversion(frame=FromMapProjectable(a5109f5))['value'].mean(), dtype=float64>

--- Dask .compute() 시작 ---
--- Dask .compute() 완료 ---
Dask value 평균: 166666.5
CPU times: user 324 ms, sys: 60.3 ms, total: 384 ms
Wall time: 2.18 s


### 5. Dask의 핵심: compute() vs persist()

In [7]:
# 1. 원본 Dask DF로 두 개의 연산을 .compute() (비효율적)
#    (대시보드를 보면 read_csv를 2번 수행합니다)
print("--- 1. compute() 2번 실행 (비효율) ---")

%time mean_val = df_dask['value'].mean().compute()
%time std_val = df_dask['value'].std().compute()

# 2. 데이터를 메모리에 '고정(persist)' 시킴
#    (대시보드를 보세요! read_csv가 실행되고, 
#     워커 메모리(Bytes Stored)가 1GB+로 올라갑니다)
print("\n--- 2. persist() 실행 (데이터 로딩) ---")

%time df_persisted = df_dask.persist()

# 3. 메모리에 올라간 데이터로 연산 (효율적)
#    (대시보드를 보면 read_csv 없이, 바로 연산 시작!)
print("\n--- 3. persist()된 데이터로 2번 실행 (효율) ---")

%time mean_val_p = df_persisted['value'].mean().compute()
%time std_val_p = df_persisted['value'].std().compute()

--- 1. compute() 2번 실행 (비효율) ---
CPU times: user 201 ms, sys: 49.7 ms, total: 251 ms
Wall time: 1.21 s
CPU times: user 183 ms, sys: 35.7 ms, total: 218 ms
Wall time: 979 ms

--- 2. persist() 실행 (데이터 로딩) ---
CPU times: user 6.12 ms, sys: 445 μs, total: 6.57 ms
Wall time: 5.58 ms

--- 3. persist()된 데이터로 2번 실행 (효율) ---
CPU times: user 187 ms, sys: 30.3 ms, total: 217 ms
Wall time: 1.09 s
CPU times: user 35.4 ms, sys: 3.95 ms, total: 39.4 ms
Wall time: 92.2 ms
