# Using Dask for splitting dataset

## Converting chunks to include row groups

In [1]:
import dask.dataframe as dd
import pandas as pd

In [2]:
import os
files = os.listdir('data')

# Filter the files to only include the parquet files on the form "chunk_xxx.parquet"
files = [f for f in files if f.startswith('chunk_') and f.endswith('.parquet')]
files


['chunk_114.parquet',
 'chunk_227.parquet',
 'chunk_327.parquet',
 'chunk_432.parquet',
 'chunk_558.parquet',
 'chunk_684.parquet',
 'chunk_784.parquet',
 'chunk_853.parquet',
 'chunk_last.parquet']

### 9th chunk

In [3]:
# First chunk
df = pd.read_parquet('data/' + files[8])
df

Unnamed: 0,id,content,type,title,authors,domain,url
0,9869341,"The colon cancer story began in 2011, when Dr....",reliable,Why Is This Bacterium Hiding in Human Tumors?,Gina Kolata,nytimes.com,https://www.nytimes.com/2017/11/23/health/bact...
1,9869342,This is highlighted in the grid by 17A’s FLIPP...,reliable,Snapping With Only Slight Provocation,Deb Amlen,nytimes.com,https://www.nytimes.com/2017/11/23/crosswords/...
2,9869343,"Born at the height of the Cold War, Pine Gap w...",reliable,An American Spy Base Hidden in Australia’s Out...,Jackie Dent,nytimes.com,https://www.nytimes.com/2017/11/23/world/austr...
3,9869344,OBITUARIES\n\nThe byline for an obituary on Th...,reliable,"Corrections: November 24, 2017",,nytimes.com,https://www.nytimes.com/2017/11/23/pageoneplus...
4,9869345,"“I want to be great, and I have confidence in ...",reliable,Vikings Top Lions and Tighten Grip on N.F.C. N...,The Associated Press,nytimes.com,https://www.nytimes.com/2017/11/23/sports/foot...
...,...,...,...,...,...,...,...
8951,9878292,"Photo\n\nMan does not live by bread alone, it ...",reliable,Corruption With a Chance of Meatballs,Clyde Haberman,nytimes.com,https://www.nytimes.com/2018/02/06/opinion/cor...
8952,9878293,"VIX, which measures investor expectations that...",reliable,"Volatility Rattles Stocks, and Investors Who B...",Landon Thomas Jr.,nytimes.com,https://www.nytimes.com/2018/02/06/business/st...
8953,9878294,Housing Authority residents in the audience we...,reliable,"As 4 of 5 in Public Housing Lost Heat, a Deman...",William Neuman,nytimes.com,https://www.nytimes.com/2018/02/06/nyregion/he...
8954,9878295,"He has a curious way of expressing it, though....",reliable,Who Said It: Pepé Le Pew or Donald Trump?,Jennifer Finney Boylan,nytimes.com,https://www.nytimes.com/2018/02/06/opinion/pep...


In [4]:
# Saves the dataframe as a parquet file. Remember to specify the row_group_size
df.to_parquet('data/c9.parquet', row_group_size=10000)

In [5]:
# Reads the parquet file. We can now use the dask dataframe to avoid loading the entire file into memory
ddf = dd.read_parquet('data/c9.parquet')
ddf

Unnamed: 0_level_0,id,content,type,title,authors,domain,url
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
,int64,object,object,object,object,object,object
,...,...,...,...,...,...,...


In [6]:
# Loading only 1 partition
ddf.partitions[0].compute()

Unnamed: 0,id,content,type,title,authors,domain,url
0,9869341,"The colon cancer story began in 2011, when Dr....",reliable,Why Is This Bacterium Hiding in Human Tumors?,Gina Kolata,nytimes.com,https://www.nytimes.com/2017/11/23/health/bact...
1,9869342,This is highlighted in the grid by 17A’s FLIPP...,reliable,Snapping With Only Slight Provocation,Deb Amlen,nytimes.com,https://www.nytimes.com/2017/11/23/crosswords/...
2,9869343,"Born at the height of the Cold War, Pine Gap w...",reliable,An American Spy Base Hidden in Australia’s Out...,Jackie Dent,nytimes.com,https://www.nytimes.com/2017/11/23/world/austr...
3,9869344,OBITUARIES\n\nThe byline for an obituary on Th...,reliable,"Corrections: November 24, 2017",,nytimes.com,https://www.nytimes.com/2017/11/23/pageoneplus...
4,9869345,"“I want to be great, and I have confidence in ...",reliable,Vikings Top Lions and Tighten Grip on N.F.C. N...,The Associated Press,nytimes.com,https://www.nytimes.com/2017/11/23/sports/foot...
...,...,...,...,...,...,...,...
8951,9878292,"Photo\n\nMan does not live by bread alone, it ...",reliable,Corruption With a Chance of Meatballs,Clyde Haberman,nytimes.com,https://www.nytimes.com/2018/02/06/opinion/cor...
8952,9878293,"VIX, which measures investor expectations that...",reliable,"Volatility Rattles Stocks, and Investors Who B...",Landon Thomas Jr.,nytimes.com,https://www.nytimes.com/2018/02/06/business/st...
8953,9878294,Housing Authority residents in the audience we...,reliable,"As 4 of 5 in Public Housing Lost Heat, a Deman...",William Neuman,nytimes.com,https://www.nytimes.com/2018/02/06/nyregion/he...
8954,9878295,"He has a curious way of expressing it, though....",reliable,Who Said It: Pepé Le Pew or Donald Trump?,Jennifer Finney Boylan,nytimes.com,https://www.nytimes.com/2018/02/06/opinion/pep...


## Testing on cluster for fun

In [3]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=8, threads_per_worker=1, memory_limit='2GB')
client = Client(cluster)
client

2023-03-08 12:14:12,164 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-ag8njkiy', purging
2023-03-08 12:14:12,164 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-_tbv1twq', purging
2023-03-08 12:14:12,164 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-jaun1nko', purging
2023-03-08 12:14:12,165 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-71j00owi', purging
2023-03-08 12:14:12,165 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-4byx7pxr', purging
2023-03-08 12:14:12,165 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-xg4nlxok', purging
2023-03-08 12:14:12,165 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-kbzte9hk', purging

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 8,Total memory: 14.90 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34349,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 14.90 GiB

0,1
Comm: tcp://127.0.0.1:43665,Total threads: 1
Dashboard: http://127.0.0.1:36615/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:46563,
Local directory: /tmp/dask-worker-space/worker-a9amv8k0,Local directory: /tmp/dask-worker-space/worker-a9amv8k0

0,1
Comm: tcp://127.0.0.1:36747,Total threads: 1
Dashboard: http://127.0.0.1:33037/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:42317,
Local directory: /tmp/dask-worker-space/worker-ozsg3lza,Local directory: /tmp/dask-worker-space/worker-ozsg3lza

0,1
Comm: tcp://127.0.0.1:34763,Total threads: 1
Dashboard: http://127.0.0.1:35493/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:33529,
Local directory: /tmp/dask-worker-space/worker-0p4b97vi,Local directory: /tmp/dask-worker-space/worker-0p4b97vi

0,1
Comm: tcp://127.0.0.1:43451,Total threads: 1
Dashboard: http://127.0.0.1:36035/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:37193,
Local directory: /tmp/dask-worker-space/worker-g7ruddht,Local directory: /tmp/dask-worker-space/worker-g7ruddht

0,1
Comm: tcp://127.0.0.1:39413,Total threads: 1
Dashboard: http://127.0.0.1:35605/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:44073,
Local directory: /tmp/dask-worker-space/worker-i1rc319a,Local directory: /tmp/dask-worker-space/worker-i1rc319a

0,1
Comm: tcp://127.0.0.1:37111,Total threads: 1
Dashboard: http://127.0.0.1:44481/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:45683,
Local directory: /tmp/dask-worker-space/worker-bgeytid5,Local directory: /tmp/dask-worker-space/worker-bgeytid5

0,1
Comm: tcp://127.0.0.1:37147,Total threads: 1
Dashboard: http://127.0.0.1:45539/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:41445,
Local directory: /tmp/dask-worker-space/worker-_5_072oj,Local directory: /tmp/dask-worker-space/worker-_5_072oj

0,1
Comm: tcp://127.0.0.1:42683,Total threads: 1
Dashboard: http://127.0.0.1:40855/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:34173,
Local directory: /tmp/dask-worker-space/worker-matqrhap,Local directory: /tmp/dask-worker-space/worker-matqrhap


In [4]:
ddf_test = ddf.copy()
ddf_test

Unnamed: 0_level_0,id,content,type,title,authors,domain,url
npartitions=31,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
,int64,object,object,object,object,object,object
,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...
,...,...,...,...,...,...,...


In [5]:
# Split the dataframe into a test and a train set
ddf_test, ddf_train = ddf_test.random_split([0.2, 0.8], random_state=42)

In [6]:
print(len(ddf_test))

225944


In [7]:
print(len(ddf_train))



904056


In [8]:
# Write the test and train set to parquet files
ddf_test.to_parquet('data/test.parquet', row_group_size=10000)

In [9]:
# Write the train set to parquet files
ddf_train.to_parquet('data/train.parquet', row_group_size=10000)

2023-03-08 12:16:54,261 - distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:42683
Traceback (most recent call last):
  File "/home/kristiandampedersen/.local/share/virtualenvs/fakenewsproject-Oj_5wi4E/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/kristiandampedersen/.local/share/virtualenvs/fakenewsproject-Oj_5wi4E/lib/python3.10/site-packages/distributed/worker.py", line 2081, in gather_dep
    response = await get_data_from_worker(
  File "/home/kristiandampedersen/.local/share/virtualenvs/fakenewsproject-Oj_5wi4E/lib/python3.10/site-packages/distributed/worker.py", line 2903, in get_data_from_worker
    response = await send_recv(
  File "/home/kristiandampedersen/.local/share/virtualenvs/fakenewsp

In [10]:
train = dd.read_parquet('data/train.parquet')
train

Unnamed: 0_level_0,id,content,type,title,authors,domain,url
npartitions=31,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
,int64,object,object,object,object,object,object
,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...
,...,...,...,...,...,...,...


In [11]:
# Split train into training and validation sets
train, val = train.random_split([0.8, 0.2], random_state=42)


In [15]:
# Barplot over the type column
train['type'].value_counts().compute()



political     187035
unreliable    110191
bias           94252
conspiracy     79451
fake           79257
rumor          44269
unknown        30372
               29644
hate           26430
junksci        14718
clickbait      13933
satire          9233
reliable        4188
Name: type, dtype: int64



In [4]:
df['type'].value_counts()

political     292180
unreliable    172478
bias          147189
conspiracy    124418
fake          123966
rumor          69018
unknown        47600
               45988
hate           41404
junksci        23048
clickbait      21761
satire         14326
reliable        6624
Name: type, dtype: int64