In [None]:
import os
import dask.dataframe as dd
import numpy as np
import more_itertools as mit
from dask.distributed import Client, LocalCluster
from dask_jobqueue import SLURMCluster
import pandas as pd


# operation to do over each group
def get_best_answer_and_timestamp(group):

    """ 
    Operation to perform over each group (group defined by ParentId, i.e., question). 
    Returns a dictionary that can be later used as a dataframe row
    """
    q_owner = group['OwnerUserId_question'].iloc[0]
    a_count = group['AnswerCount'].iloc[0]
    b_name  = group['Name'].iloc[0]
    b_class = group['Class'].iloc[0]
    horizon = group['timestamp_vote'].iloc[-1]
    q_time  = group['timestamp'].iloc[0].to_pydatetime()
    
    max_score = group['score_history'].dropna().max()
    best_answer_id = group[group['score_history'] == max_score]['PostId'].iloc[0]

    best_init  = group[group['Id_answer'] == best_answer_id]['CreationDate_answer'].iloc[0]
    first_init = group['CreationDate_answer'].min()

    group = group.sort_values(by='timestamp_vote')
    
    init_bin = pd.Series(np.datetime64(group['timestamp'].iloc[0])) # init with question timestamp
    
    vals_bin = group[group['PostId']==best_answer_id]['timestamp_vote'].drop_duplicates()
    
    if init_bin.iloc[0] < vals_bin.iloc[0]: # NOTE: votes do not have hh:mm:ss, the question yes!
        vals_bin = pd.concat([init_bin, vals_bin], ignore_index=True) # instants of best answers

    # group['bin'] = pd.cut(group['timestamp_vote'], bins=vals_bin)
    try:        
        group['bin'] = pd.cut(group['timestamp_vote'], bins=vals_bin)

        # identify, for each bin, which answer has the highest score
        # best_bin_idx = group.groupby('bin')['score_history'].idxmax().tolist() # indices
        # group = group.loc[best_bin_idx] # keep only best
        group_sorted = group.sort_values(by=['bin', 'score_history'], ascending=[True, False])
        best_per_bin = group_sorted.drop_duplicates(subset=['bin'], keep='first')

        # here should use the 'Id_answer' column, remove the (maximum per bin) that are equal to the best
        filtered_group = best_per_bin[best_per_bin['Id_answer'] != best_answer_id]
        
        if not filtered_group.empty: # timestamp of the last instant (bin) in which 'best_answer_id' was not the best
            last_timestamp_not_best = filtered_group['timestamp_vote'].max()
        else:
            last_timestamp_not_best = None # the best basically is never overcome

        tte = last_timestamp_not_best

    except TypeError:
        tte = pd.NaT
        if vals_bin.shape[0] == 1:
            tte = vals_bin.iloc[0] # only 1 timestamp the best emerged the same day
            # print('\t all dynamics within a single day', flush=True)
        else:
            print('\t', vals_bin, flush=True)
            print(group, flush=True)

    return pd.Series({
            # 'question_id':       question_id, # removed as it is not anymore passed to the function
            'question_owner':    q_owner,
            'answer_count':      a_count,
            'badge_name':        b_name,
            'badge_class':       b_class,
            'best_answer_id':    best_answer_id, 
            'time_to_emerge':    tte,
            'tot_horizon':       horizon, 
            'question_time':     q_time, 
            'best_init_time':    best_init,
            'first_answer_time': first_init,
            'always_best':       group.iloc[0]['PostId'] == best_answer_id
    })

In [2]:
!pwd

/home/fgalante/stackoverflow-analysis


In [2]:
###   here common variables to all processes   ###
dataset = 'math'  # 'math', 'cs'
file_format = 'csv'
filter_by_viewcount = str(0)
mode = 'local_debug'  # 'local_debug', 'hpc'

n_workers = 128
cores_per_job = 4

# empirically determined, you can check worker memory efficiency either in the dashboard or with  
#   seff <job_id> 
# command in SLURM after the job is done
memory_per_job_GB = 16


In [3]:
# close previous cluster if existing
try:
    if client:
        client.shutdown()
    if cluster:
        cluster.close()
except:
    pass
    
if mode == 'local_debug':
    n_workers = 24  # limit the number of workers for local debug
    threads_per_worker = 1
    memory_limit = "1GB"
    cluster = LocalCluster(
        n_workers=n_workers,
        threads_per_worker=threads_per_worker,
        memory_limit=memory_limit
    )
    client = Client(cluster)
else:
    # per-worker settings
    cluster = SLURMCluster(
        account="fgalante",                
        cores=cores_per_job,               # number of cores that will be requested on a node (i.e., SLURM Job cores)
        memory=f"{memory_per_job_GB} GB",
        walltime='06:00:00',
        log_directory='./dask-jobqueue-logs'  # slurm logs
    )
    
    client = Client(cluster)

2025-04-24 18:11:27,527 - distributed.scheduler - ERROR - Couldn't gather keys: {('repartitiontofewer-00fa06910bd6ebff900b08916458d315', 0): 'waiting'}


In [4]:
cluster.scale(n_workers)  # start N workers in N jobs that match the description
                          # you can dynamically adjust the workers (adds more if needed)

In [6]:
# Wait for workers to start

In [5]:
# show client/cluster object
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 24
Total threads: 24,Total memory: 22.35 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:54661,Workers: 24
Dashboard: http://127.0.0.1:8787/status,Total threads: 24
Started: Just now,Total memory: 22.35 GiB

0,1
Comm: tcp://127.0.0.1:54825,Total threads: 1
Dashboard: http://127.0.0.1:54832/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54665,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-bitc7tkk,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-bitc7tkk

0,1
Comm: tcp://127.0.0.1:54782,Total threads: 1
Dashboard: http://127.0.0.1:54798/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54666,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-xfpseaju,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-xfpseaju

0,1
Comm: tcp://127.0.0.1:54777,Total threads: 1
Dashboard: http://127.0.0.1:54795/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54667,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-x0stioyq,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-x0stioyq

0,1
Comm: tcp://127.0.0.1:54764,Total threads: 1
Dashboard: http://127.0.0.1:54774/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54668,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-smxlsjp_,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-smxlsjp_

0,1
Comm: tcp://127.0.0.1:54773,Total threads: 1
Dashboard: http://127.0.0.1:54786/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54669,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-19qro8s6,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-19qro8s6

0,1
Comm: tcp://127.0.0.1:54820,Total threads: 1
Dashboard: http://127.0.0.1:54830/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54670,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-h03p62s_,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-h03p62s_

0,1
Comm: tcp://127.0.0.1:54768,Total threads: 1
Dashboard: http://127.0.0.1:54783/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54671,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-_r4ro2e8,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-_r4ro2e8

0,1
Comm: tcp://127.0.0.1:54762,Total threads: 1
Dashboard: http://127.0.0.1:54767/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54672,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-ram7vmtn,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-ram7vmtn

0,1
Comm: tcp://127.0.0.1:54814,Total threads: 1
Dashboard: http://127.0.0.1:54826/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54673,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-cvssxcky,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-cvssxcky

0,1
Comm: tcp://127.0.0.1:54776,Total threads: 1
Dashboard: http://127.0.0.1:54792/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54674,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-lj82hy56,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-lj82hy56

0,1
Comm: tcp://127.0.0.1:54801,Total threads: 1
Dashboard: http://127.0.0.1:54817/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54675,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-0_v7cujc,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-0_v7cujc

0,1
Comm: tcp://127.0.0.1:54763,Total threads: 1
Dashboard: http://127.0.0.1:54770/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54676,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-zeqzh__w,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-zeqzh__w

0,1
Comm: tcp://127.0.0.1:54789,Total threads: 1
Dashboard: http://127.0.0.1:54802/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54677,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-c3cs_t96,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-c3cs_t96

0,1
Comm: tcp://127.0.0.1:54793,Total threads: 1
Dashboard: http://127.0.0.1:54808/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54678,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-z7uao44i,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-z7uao44i

0,1
Comm: tcp://127.0.0.1:54819,Total threads: 1
Dashboard: http://127.0.0.1:54828/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54679,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-bwqj1ki7,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-bwqj1ki7

0,1
Comm: tcp://127.0.0.1:54797,Total threads: 1
Dashboard: http://127.0.0.1:54812/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54680,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-tddsjpqd,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-tddsjpqd

0,1
Comm: tcp://127.0.0.1:54809,Total threads: 1
Dashboard: http://127.0.0.1:54821/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54681,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-sd1vk40n,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-sd1vk40n

0,1
Comm: tcp://127.0.0.1:54811,Total threads: 1
Dashboard: http://127.0.0.1:54823/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54682,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-p_6_3rjz,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-p_6_3rjz

0,1
Comm: tcp://127.0.0.1:54766,Total threads: 1
Dashboard: http://127.0.0.1:54779/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54683,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-he5cw0_w,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-he5cw0_w

0,1
Comm: tcp://127.0.0.1:54800,Total threads: 1
Dashboard: http://127.0.0.1:54815/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54684,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-awnc286p,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-awnc286p

0,1
Comm: tcp://127.0.0.1:54790,Total threads: 1
Dashboard: http://127.0.0.1:54805/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54685,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-yy3_nawu,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-yy3_nawu

0,1
Comm: tcp://127.0.0.1:54772,Total threads: 1
Dashboard: http://127.0.0.1:54785/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54686,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-cfw6w_jc,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-cfw6w_jc

0,1
Comm: tcp://127.0.0.1:54765,Total threads: 1
Dashboard: http://127.0.0.1:54778/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54687,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-2ohgsgdp,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-2ohgsgdp

0,1
Comm: tcp://127.0.0.1:54791,Total threads: 1
Dashboard: http://127.0.0.1:54804/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:54688,
Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-5_nfzf2a,Local directory: C:\Users\LOCALA~1\AppData\Local\Temp\dask-scratch-space\worker-5_nfzf2a


In [7]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e ./dask-jobqueue-logs/dask-worker-%J.err
#SBATCH -o ./dask-jobqueue-logs/dask-worker-%J.out
#SBATCH -p global
#SBATCH -A fgalante
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=15G
#SBATCH -t 06:00:00

/home/fgalante/miniconda3/envs/try-dask/bin/python -m distributed.cli.dask_worker tcp://192.168.2.254:35913 --name dummy-name --nthreads 1 --memory-limit 3.73GiB --nworkers 4 --nanny --death-timeout 60 --interface ib0



In [6]:
if not os.path.isdir(os.path.join('results', dataset)):
    os.makedirs(os.path.join('results', dataset))

# read file with binning after merge    
try:
    fn = f'./data/{file_format}/{dataset}/df_all_viewcount_{filter_by_viewcount}'
    if file_format == 'csv':
        data_types = {
                'Name': 'object',
                'Class': 'object',
                'Id_question': 'int32',
                'AnswerCount': 'float',
                'PostId': 'int32',
                'VoteTypeId': 'int32',
                'Id_answer': 'int32',
                'PostTypeId': 'int32',
                'AcceptedAnswerId': 'float',
                'ViewCount': 'float',
                'Title': 'object',
                'ParentId': 'int32',
                'increment': 'int32',
                'score_history': 'int32'}
        
        df = dd.read_csv(
            f'{fn}.{file_format}', 
            dtype=data_types,
            parse_dates=['timestamp', 
                         'CreationDate_vote', 
                         'timestamp_vote', 
                         'CreationDate_answer',
                         'timestamp_answer'
                        ]
        )
        
        #df = df.repartition(npartitions=n_workers)
        df = df.set_index('ParentId') # seems this is important to add for performance
        
    elif file_format == 'pkl':
        raise Exception('ERROR: Dask does not support reading pickle files directly. Convert to CSV or Parquet.')
    else:
        raise Exception('ERROR: invalid file format')

except FileNotFoundError:
    raise Exception('ERROR: no pickle files exist, run first preprocessing.py')

In [7]:
df.columns

Index(['Id_question', 'OwnerUserId_question', 'timestamp', 'AnswerCount',
       'PostId', 'VoteTypeId', 'CreationDate_vote', 'timestamp_vote',
       'Id_answer', 'PostTypeId', 'AcceptedAnswerId', 'CreationDate_answer',
       'Score', 'ViewCount', 'OwnerUserId_answer', 'Title', 'timestamp_answer',
       'Name', 'Class', 'increment', 'score_history', 'time_to_first_answer'],
      dtype='object')

In [8]:
df.head(10)

Unnamed: 0_level_0,Id_question,OwnerUserId_question,timestamp,AnswerCount,PostId,VoteTypeId,CreationDate_vote,timestamp_vote,Id_answer,PostTypeId,...,Score,ViewCount,OwnerUserId_answer,Title,timestamp_answer,Name,Class,increment,score_history,time_to_first_answer
ParentId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,1,10.0,2010-07-20 19:09:27.200,9.0,2453,2,2011-06-20,2011-06-20,2453,2,...,23,,,,2010-08-14 10:40:09.497,Great Question,1.0,1,6,0 days 00:12:52.993000
1,1,10.0,2010-07-20 19:09:27.200,9.0,9,2,2010-08-22,2010-08-22,9,2,...,209,,8.0,,2010-07-20 19:22:20.193,Great Question,1.0,1,25,0 days 00:12:52.993000
1,1,10.0,2010-07-20 19:09:27.200,9.0,9,2,2010-07-22,2010-07-22,9,2,...,209,,8.0,,2010-07-20 19:22:20.193,Great Question,1.0,1,14,0 days 00:12:52.993000
1,1,10.0,2010-07-20 19:09:27.200,9.0,9,2,2011-09-16,2011-09-16,9,2,...,209,,8.0,,2010-07-20 19:22:20.193,Great Question,1.0,1,42,0 days 00:12:52.993000
1,1,10.0,2010-07-20 19:09:27.200,9.0,9,2,2010-07-22,2010-07-22,9,2,...,209,,8.0,,2010-07-20 19:22:20.193,Great Question,1.0,1,15,0 days 00:12:52.993000
1,1,10.0,2010-07-20 19:09:27.200,9.0,9,2,2010-07-20,2010-07-20,9,2,...,209,,8.0,,2010-07-20 19:22:20.193,Great Question,1.0,1,6,0 days 00:12:52.993000
1,1,10.0,2010-07-20 19:09:27.200,9.0,9,2,2010-07-20,2010-07-20,9,2,...,209,,8.0,,2010-07-20 19:22:20.193,Great Question,1.0,1,8,0 days 00:12:52.993000
1,1,10.0,2010-07-20 19:09:27.200,9.0,9,2,2010-07-20,2010-07-20,9,2,...,209,,8.0,,2010-07-20 19:22:20.193,Great Question,1.0,1,7,0 days 00:12:52.993000
1,1,10.0,2010-07-20 19:09:27.200,9.0,35,2,2010-07-20,2010-07-20,35,2,...,26,,49.0,,2010-07-20 19:29:54.777,Great Question,1.0,1,1,0 days 00:12:52.993000
1,1,10.0,2010-07-20 19:09:27.200,9.0,513,2,2010-07-22,2010-07-22,513,2,...,13,,99.0,,2010-07-22 20:45:36.930,Great Question,1.0,1,1,0 days 00:12:52.993000


In [9]:
print('Npartitions:', df.npartitions)
print('divisions:', df.divisions)
df.dtypes

Npartitions: 17
divisions: (np.int32(1), np.int32(52255), np.int32(105276), np.int32(239566), np.int32(431294), np.int32(523027), np.int32(711027), np.int32(1061357), np.int32(1338087), np.int32(1597863), np.int32(1784476), np.int32(2229344), np.int32(2560282), np.int32(2987691), np.int32(3301270), np.int32(3775723), np.int32(4393073), 4890904)


Id_question                       int32
OwnerUserId_question            float64
timestamp                datetime64[ns]
AnswerCount                     float64
PostId                            int32
VoteTypeId                        int32
CreationDate_vote        datetime64[ns]
timestamp_vote           datetime64[ns]
Id_answer                         int32
PostTypeId                        int32
AcceptedAnswerId                float64
CreationDate_answer      datetime64[ns]
Score                             int64
ViewCount                       float64
OwnerUserId_answer              float64
Title                   string[pyarrow]
timestamp_answer         datetime64[ns]
Name                    string[pyarrow]
Class                   string[pyarrow]
increment                         int32
score_history                     int32
time_to_first_answer    string[pyarrow]
dtype: object

In [None]:
print(f'STARTED PROCESSING! DATASET={dataset} init cluster'.center(80, '*'), flush=True)

meta = {
    'question_owner': 'int64',
    'answer_count': 'int64',
    'badge_name': 'object',
    'badge_class': 'object',
    'best_answer_id': 'int64',
    'time_to_emerge': 'datetime64[ns]',
    'tot_horizon': 'datetime64[ns]',
    'question_time': 'datetime64[ns]',
    'best_init_time': 'datetime64[ns]',
    'first_answer_time': 'datetime64[ns]',
    'always_best': 'bool'
}
meta_df = pd.DataFrame({k: pd.Series(dtype=t) for k, t in meta.items()})

print(f'Dask partitions: {df.npartitions}', flush=True)

df_out = df.groupby('ParentId').apply(get_best_answer_and_timestamp, meta=meta_df)

print(f'GROUPED DATASET={dataset}'.center(80, '*'), flush=True)

# trigger computation
df_out = df_out.compute()


*****************STARTED PROCESSING! DATASET=math init cluster******************
Dask partitions: 17
******************************GROUPED DATASET=math******************************




In [None]:
# save file
df_out.to_csv(f'./results/{dataset}/time_to_emerge_{filter_by_viewcount}.csv')

In [None]:
df_out.head()

## Shutdown the cluster

In [7]:
client.shutdown()

In [None]:
# It took around 3 hours and 40 minutes at last run with 128 workers and 16GB of RAM allocated for each