# Dask tutorial

In [36]:
# PyArrow is a dependency of Dask
# !sudo apt-get update
# !sudo apt-get install python3-dask
# !pip3 install pyarrow

import dask
import dask.dataframe as dd

# When dask uses pandas 2.0.0+ it casts object columns to string automatically ([("i", 0.48)] -> '[("i", 0.48)]')
_ = dask.config.set({"dataframe.convert-string": False})

Dask Dataframes are just collections of Pandas-like Dataframes.

In [3]:
dataset_path = "smadex-challenge-predict-the-revenue/train/train"
filters = [("datetime", ">=", "2025-10-01-00-00"), ("datetime", "<", "2025-10-13-00-00")]

ddf = dd.read_parquet(
    dataset_path,
    filters = filters
)

We can operate with Dask Dataframes with the same API as the Pandas DataFrames...

In [13]:
ddf.head()

Unnamed: 0,buyer_d1,buyer_d7,buyer_d14,buyer_d28,buy_d7,buy_d14,buy_d28,iap_revenue_d7,iap_revenue_d14,iap_revenue_d28,...,user_bundles_l28d,weekend_ratio,weeks_since_first_seen,wifi_ratio,whale_users_bundle_num_buys_prank,whale_users_bundle_revenue_prank,whale_users_bundle_total_num_buys,whale_users_bundle_total_revenue,row_id,datetime
0,0,1,1,1,1,1,1,2.147718,2.147718,2.147718,...,"[88981729bd5c1e5aea9ada4bce00a2531e9e98f7, 25c...",0.019802,6.0,0.913366,,,,,819ecc0e-1a97-43ed-83f6-b9ede4f7fc48,2025-10-01-00-00
1,0,0,0,0,0,0,0,0.0,0.0,0.0,...,,,,,,,,,0a7fbf18-5041-42af-bd0a-0cb6586b8598,2025-10-01-00-00
2,0,0,0,0,0,0,0,0.0,0.0,0.0,...,"[6506b7e0a24666debd08f74266800f2eb154df5a, 150...",0.399021,6.0,0.999388,,,,,fc1a2689-b136-4ffa-b23b-9d8215bd720f,2025-10-01-00-00
3,0,0,0,0,0,0,0,0.0,0.0,0.0,...,"[2b472e3dc96f1847490d7411b25e12ed417b9714, 3ba...",0.121547,6.0,1.0,,,,,0340fcc6-50bd-42ab-b9f4-4c1184b640cb,2025-10-01-00-00
4,0,0,0,0,0,0,0,0.0,0.0,0.0,...,"[1031535cf2a1315422fd05d321349bcd3c3ffc04, 478...",0.293285,6.0,0.160243,,,,,219d253f-bef4-4039-84b2-ed55f009cc43,2025-10-01-00-00


...but we cannot actually use them like this. Note that getting the header of the dataframe had a lot of computational cost because we had to load the data into memory.

In [14]:
ddf

Unnamed: 0_level_0,buyer_d1,buyer_d7,buyer_d14,buyer_d28,buy_d7,buy_d14,buy_d28,iap_revenue_d7,iap_revenue_d14,iap_revenue_d28,registration,retention_d1_to_d7,retention_d3_to_d7,retention_d7_to_d14,retention_d1,retention_d3,retentiond7,advertiser_bundle,advertiser_category,advertiser_subcategory,advertiser_bottom_taxonomy_level,carrier,country,region,dev_make,dev_model,dev_os,dev_osv,hour,release_date,release_msrp,weekday,avg_act_days,avg_daily_sessions,avg_days_ins,avg_duration,bcat,bcat_bottom_taxonomy,bundles_cat,bundles_cat_bottom_taxonomy,bundles_ins,city_hist,country_hist,cpm,cpm_pct_rk,ctr,ctr_pct_rk,dev_language_hist,dev_osv_hist,first_request_ts,first_request_ts_bundle,first_request_ts_category_bottom_taxonomy,hour_ratio,iap_revenue_usd_bundle,iap_revenue_usd_category,iap_revenue_usd_category_bottom_taxonomy,last_buy,last_buy_ts_bundle,last_buy_ts_category,last_ins,last_install_ts_bundle,last_install_ts_category,advertiser_actions_action_count,advertiser_actions_action_last_timestamp,user_actions_bundles_action_count,user_actions_bundles_action_last_timestamp,last_advertiser_action,new_bundles,num_buys_bundle,num_buys_category,num_buys_category_bottom_taxonomy,region_hist,rev_by_adv,rwd_prank,user_bundles,user_bundles_l28d,weekend_ratio,weeks_since_first_seen,wifi_ratio,whale_users_bundle_num_buys_prank,whale_users_bundle_revenue_prank,whale_users_bundle_total_num_buys,whale_users_bundle_total_revenue,row_id,datetime
npartitions=144,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1,Unnamed: 44_level_1,Unnamed: 45_level_1,Unnamed: 46_level_1,Unnamed: 47_level_1,Unnamed: 48_level_1,Unnamed: 49_level_1,Unnamed: 50_level_1,Unnamed: 51_level_1,Unnamed: 52_level_1,Unnamed: 53_level_1,Unnamed: 54_level_1,Unnamed: 55_level_1,Unnamed: 56_level_1,Unnamed: 57_level_1,Unnamed: 58_level_1,Unnamed: 59_level_1,Unnamed: 60_level_1,Unnamed: 61_level_1,Unnamed: 62_level_1,Unnamed: 63_level_1,Unnamed: 64_level_1,Unnamed: 65_level_1,Unnamed: 66_level_1,Unnamed: 67_level_1,Unnamed: 68_level_1,Unnamed: 69_level_1,Unnamed: 70_level_1,Unnamed: 71_level_1,Unnamed: 72_level_1,Unnamed: 73_level_1,Unnamed: 74_level_1,Unnamed: 75_level_1,Unnamed: 76_level_1,Unnamed: 77_level_1,Unnamed: 78_level_1,Unnamed: 79_level_1,Unnamed: 80_level_1,Unnamed: 81_level_1,Unnamed: 82_level_1,Unnamed: 83_level_1,Unnamed: 84_level_1,Unnamed: 85_level_1
,int32,int32,int32,int32,int64,int64,int64,float64,float64,float64,int32,int32,int32,int32,int32,int32,int32,object,object,object,object,object,object,object,object,object,object,object,object,object,int64,int32,float64,object,float64,object,object,object,object,object,object,object,object,object,object,object,object,object,object,int64,object,object,object,object,object,object,int64,object,object,int64,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,float64,int32,float64,object,object,object,object,object,category[known]
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


We need to manually get hold of the partitions individually. We can do this easily by converting the DataFrame into a list of Dask Delayed objects.

In [7]:
parts = [part for part in ddf.to_delayed()]

In [32]:
type(parts[0])

dask.delayed.Delayed

Then load each part separately. Just loading one partition is not _too_ slow.

In [21]:
part_0 = parts[0].compute()
part_0.head()

Unnamed: 0,buyer_d1,buyer_d7,buyer_d14,buyer_d28,buy_d7,buy_d14,buy_d28,iap_revenue_d7,iap_revenue_d14,iap_revenue_d28,...,user_bundles_l28d,weekend_ratio,weeks_since_first_seen,wifi_ratio,whale_users_bundle_num_buys_prank,whale_users_bundle_revenue_prank,whale_users_bundle_total_num_buys,whale_users_bundle_total_revenue,row_id,datetime
0,0,1,1,1,1,1,1,2.147718,2.147718,2.147718,...,"[88981729bd5c1e5aea9ada4bce00a2531e9e98f7, 25c...",0.019802,6.0,0.913366,,,,,819ecc0e-1a97-43ed-83f6-b9ede4f7fc48,2025-10-01-00-00
1,0,0,0,0,0,0,0,0.0,0.0,0.0,...,,,,,,,,,0a7fbf18-5041-42af-bd0a-0cb6586b8598,2025-10-01-00-00
2,0,0,0,0,0,0,0,0.0,0.0,0.0,...,"[6506b7e0a24666debd08f74266800f2eb154df5a, 150...",0.399021,6.0,0.999388,,,,,fc1a2689-b136-4ffa-b23b-9d8215bd720f,2025-10-01-00-00
3,0,0,0,0,0,0,0,0.0,0.0,0.0,...,"[2b472e3dc96f1847490d7411b25e12ed417b9714, 3ba...",0.121547,6.0,1.0,,,,,0340fcc6-50bd-42ab-b9f4-4c1184b640cb,2025-10-01-00-00
4,0,0,0,0,0,0,0,0.0,0.0,0.0,...,"[1031535cf2a1315422fd05d321349bcd3c3ffc04, 478...",0.293285,6.0,0.160243,,,,,219d253f-bef4-4039-84b2-ed55f009cc43,2025-10-01-00-00


The loaded partition is a Pandas DataFrame.

In [20]:
type(part_0)

pandas.core.frame.DataFrame

In [19]:
part_0.describe()

Unnamed: 0,buyer_d1,buyer_d7,buyer_d14,buyer_d28,buy_d7,buy_d14,buy_d28,iap_revenue_d7,iap_revenue_d14,iap_revenue_d28,...,release_msrp,weekday,avg_act_days,avg_days_ins,first_request_ts,last_buy,last_ins,weekend_ratio,weeks_since_first_seen,wifi_ratio
count,121887.0,121887.0,121887.0,121887.0,121887.0,121887.0,121887.0,121887.0,121887.0,121887.0,...,109905.0,121887.0,61107.0,7183.0,53907.0,2374.0,20522.0,63305.0,67489.0,63304.0
mean,0.031217,0.042301,0.044615,0.046601,0.095096,0.124853,0.174161,8.001834,8.308581,9.071367,...,564.108494,3.0,3.998519,5.228739,1758773000.0,1758279000.0,1758424000.0,0.329158,4.877195,0.641166
std,0.173906,0.201277,0.206458,0.210783,0.996749,1.397024,2.388029,2048.960358,2049.186514,2050.423635,...,514.951762,0.0,2.115821,5.584633,375764.1,712223.3,740638.6,0.270035,1.947677,0.39861
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,30.0,3.0,1.0,0.0,1758067000.0,1756600000.0,1756771000.0,0.0,0.0,0.0
25%,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,180.0,3.0,2.0,1.0,1758499000.0,1757735000.0,1757857000.0,0.12987,4.0,0.2212
50%,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,299.0,3.0,4.0,3.333333,1758499000.0,1758478000.0,1758611000.0,0.27981,6.0,0.842471
75%,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,899.0,3.0,6.0,7.0,1759190000.0,1758888000.0,1759091000.0,0.458333,6.0,1.0
max,1.0,1.0,1.0,1.0,136.0,150.0,368.0,696622.14,696622.14,696622.14,...,5160.0,3.0,7.0,28.0,1759190000.0,1759104000.0,1759277000.0,1.0,6.0,1.0


In [24]:
part_0.isna()

Unnamed: 0,buyer_d1,buyer_d7,buyer_d14,buyer_d28,buy_d7,buy_d14,buy_d28,iap_revenue_d7,iap_revenue_d14,iap_revenue_d28,...,user_bundles_l28d,weekend_ratio,weeks_since_first_seen,wifi_ratio,whale_users_bundle_num_buys_prank,whale_users_bundle_revenue_prank,whale_users_bundle_total_num_buys,whale_users_bundle_total_revenue,row_id,datetime
0,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,True,True,True,True,False,False
1,False,False,False,False,False,False,False,False,False,False,...,True,True,True,True,True,True,True,True,False,False
2,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,True,True,True,True,False,False
3,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,True,True,True,True,False,False
4,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,True,True,True,True,False,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
121882,False,False,False,False,False,False,False,False,False,False,...,True,True,True,True,True,True,True,True,False,False
121883,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,True,True,True,True,False,False
121884,False,False,False,False,False,False,False,False,False,False,...,True,True,True,True,True,True,True,True,False,False
121885,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,True,True,True,True,False,False


## Using Dask with PyTorch

The main objective is to be able to use PyTorch's DataLoaders with our Dask DataFrame. This is difficult because PyTorch does not accept Dask DataFrames.

In [25]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils as utils

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

The _state-of-the-art_ solution (ChatGPT's suggestion) is to create a subclass of the DataLoader.

In [26]:
class DaskIterableDataset(utils.data.IterableDataset):
    """A PyTorch IterableDataset fitted to accept Dask DataFrames as input."""
    
    def __init__ (self, ddf: dd.DataFrame):
        self.ddf = ddf

    def __iter__ (self):
        for part in self.ddf.to_delayed():
            pdf = part.compute()
            for _, row in pdf.iterrows():
                yield row
            del pdf   # optional but safe

Then iterate over it. This takes a LOT of time, so we need to partition over the data somehow.

## Second version of the DataLoader

I did not like the previous version of the DataLoader so I have created another one which instantiates a new Pytorch DataLoader object for each partition. Partitions are shuffled at each call of the iteration method and the DataLoader will internally shuffle the data of each partition. I can also set a batch size, etc... Much cleaner!

In [38]:
class DaskIterableDataset(utils.data.IterableDataset):
    """A class to iterate over a Dask DataFrame in orderly manner."""
    
    partition: list[dd.DataFrame]
    batch_size: int
    shuffle: bool
    num_workers: int
    
    def __init__ (self, ddf: dd.DataFrame,
                  batch_size: int = 1024,
                  shuffle: bool = True,
                  num_workers: int = 2):
        self.partition = [part for part in ddf.to_delayed()]
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.num_workers = num_workersrkers

    def __iter__ (self):
        for part in self.ddf.to_delayed():
            dataloader = utils.data.DataLoader(part.compute(),
                                               batch_size = self.batch_size,
                                               shuffle = self.shuffle,
                                               num_workers = self.num_workers)
            for _, row in pdf.iterrows():
                yield row
            del pdf   # optional but safe