
CSYE7105 High Performance Parallel Machine Learning and AI

Instructor: Dr. Handan Liu

Lecture 12: Dask DataFrame in Parallel and Diagnostics


In [1]:
import dask

## Dask DataFrame

In [2]:
df = dask.datasets.timeseries()

In [3]:
df

Unnamed: 0_level_0,name,id,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01,string,int64,float64,float64
2000-01-02,...,...,...,...
...,...,...,...,...
2000-01-30,...,...,...,...
2000-01-31,...,...,...,...


In [4]:
df.dtypes

name    string[pyarrow]
id                int64
x               float64
y               float64
dtype: object

In [6]:
df = dask.datasets.timeseries(start="2021-01-01", end="2022-01-01")
df

Unnamed: 0_level_0,name,id,x,y
npartitions=365,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2021-01-01,string,int64,float64,float64
2021-01-02,...,...,...,...
...,...,...,...,...
2021-12-31,...,...,...,...
2022-01-01,...,...,...,...


In [7]:
df.compute()

Unnamed: 0_level_0,name,id,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2021-01-01 00:00:00,Norbert,962,-0.580968,0.035411
2021-01-01 00:00:01,Laura,976,0.501258,-0.971124
2021-01-01 00:00:02,Yvonne,967,-0.791158,-0.199473
2021-01-01 00:00:03,Jerry,1042,0.306435,0.349104
2021-01-01 00:00:04,Wendy,1044,0.541958,0.936901
...,...,...,...,...
2021-12-31 23:59:55,Ingrid,968,-0.526083,0.900046
2021-12-31 23:59:56,Hannah,972,-0.640473,0.628574
2021-12-31 23:59:57,Charlie,989,0.836480,0.513972
2021-12-31 23:59:58,Alice,998,-0.782194,-0.158081


### Use Standard Pandas Operations.

In [8]:
df2 = df[df.y > 0]
df2

Unnamed: 0_level_0,name,id,x,y
npartitions=365,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2021-01-01,string,int64,float64,float64
2021-01-02,...,...,...,...
...,...,...,...,...
2021-12-31,...,...,...,...
2022-01-01,...,...,...,...


In [21]:
df3 = df2.groupby("name").x.std()
df3

Dask Series Structure:
npartitions=1
    float64
        ...
Dask Name: getitem, 8 expressions
Expr=(((Filter(frame=ArrowStringConversion(frame=Timeseries(bd68993)), predicate=ArrowStringConversion(frame=Timeseries(bd68993))['y'] > 0))[['name', 'x']]).std(ddof=1, numeric_only=False, split_out=None, observed=False))['x']

In [22]:
computed_df = df3.compute()

FutureCancelledError: ('repartitiontofewer-2066cfe2a2aa1554008e5f6eb7a48f51', 0) cancelled for reason: scheduler-connection-lost.
Client lost the connection to the scheduler. Please check your connection and re-run your work.

In [11]:
type(computed_df)

pandas.core.series.Series

In [12]:
computed_df

name
Edith       0.576799
Alice       0.577108
Kevin       0.577916
Michael     0.576494
Patricia    0.577521
Norbert     0.577804
Zelda       0.577754
Frank       0.577056
Ray         0.577298
Dan         0.577204
Laura       0.578006
Oliver      0.577421
Sarah       0.577208
Quinn       0.577273
Ursula      0.577681
Hannah      0.577062
Jerry       0.576861
Tim         0.576946
Xavier      0.577405
Yvonne      0.577425
Victor      0.577647
George      0.577448
Wendy       0.577321
Bob         0.577176
Ingrid      0.577411
Charlie     0.577878
Name: x, dtype: float64

In [18]:
df4 = df.groupby("name").aggregate({"x": "sum", "y": "max"})

In [19]:
df4.compute()

FutureCancelledError: ('repartitiontofewer-560a600b0d91c31200d69700cee146bc', 0) cancelled for reason: scheduler-connection-lost.
Client lost the connection to the scheduler. Please check your connection and re-run your work.

In [None]:
df4 = df4.repartition(npartitions=1)

joined = df.merge(
    df4, left_on="name", right_index=True, suffixes=("_original", "_aggregated") )

joined.head()


## Dask Performance Reports


In [13]:
from dask.distributed import Client

In [14]:
client = Client(n_workers=2, threads_per_worker=2, memory_limit='8GB')
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 4,Total memory: 14.90 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:36153,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 14.90 GiB

0,1
Comm: tcp://127.0.0.1:36085,Total threads: 2
Dashboard: http://127.0.0.1:44577/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:41823,
Local directory: /tmp/dask-scratch-space/worker-etj7ueez,Local directory: /tmp/dask-scratch-space/worker-etj7ueez

0,1
Comm: tcp://127.0.0.1:33497,Total threads: 2
Dashboard: http://127.0.0.1:36305/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:42927,
Local directory: /tmp/dask-scratch-space/worker-h4hus339,Local directory: /tmp/dask-scratch-space/worker-h4hus339


In [15]:
from dask_jobqueue import SLURMCluster

In [16]:
cluster = SLURMCluster(cores=4, memory='25GB')
cluster.scale(jobs=1)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 46461 instead
2025-10-14 14:10:24,398 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/h.liu/.conda/envs/py2025/lib/python3.11/site-packages/msgpack/fallback.py", line 128, in unpackb
    ret = unpacker._unpack()
          ^^^^^^^^^^^^^^^^^^
  File "/home/h.liu/.conda/envs/py2025/lib/python3.11/site-packages/msgpack/fallback.py", line 565, in _unpack
    ret.append(self._unpack(EX_CONSTRUCT))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/h.liu/.conda/envs/py2025/lib/python3.11/site-packages/msgpack/fallback.py", line 585, in _unpack
    key = self._unpack(EX_CONSTRUCT)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/h.liu/.conda/envs/py2025/lib/python3.11/site-packages/msgpack/fallback.py", line 546, in _unpack
    typ, n, obj = self._read_header()
                  ^^^^^^^^^^^^^^^^^^^
  File "/home/h.liu/.conda/envs/py2025/lib/python3.

In [17]:
client=Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://10.99.249.61:46461/status,

0,1
Dashboard: http://10.99.249.61:46461/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.99.249.61:42635,Workers: 0
Dashboard: http://10.99.249.61:46461/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [None]:
# benchmark
import dask
from dask.distributed import performance_report

with performance_report(filename="dask_report_2.html"):
    df = dask.datasets.timeseries(start="2021-01-01", end="2022-01-01")
    df = df.persist()
    df.groupby(df.name).x.mean().compute()