In [3]:
import pandas as pd
import numpy as np
from datetime import timedelta
from concurrent.futures import ProcessPoolExecutor

chunksize = int(1e5)
num_workers = 50

def clean(df):
    for col in ['submit_time', 'start_date', 'end_date']:
        df[col] = pd.to_datetime(df[col])

    for lab, c in df.items():
        if c.dtype == 'int64':
            df[lab]= c.astype(np.int32)

        if c.dtype == 'float64':
            df[lab]= c.astype(np.float32)
    
    return df
        
def get_dates_and_durations(start_date, end_date):
    if start_date.date() == end_date.date():
        dates = [start_date, end_date]
    else: 
        dates = pd.date_range(start_date.date() + timedelta(days=1), 
                             end_date.date()).tolist()
        dates = [start_date] + dates + [end_date]
        
    start_dates =[dates[i].date() for i in range(len(dates)-1)]
    dts = [dates[i+1] - dates[i] for i in range(len(dates)-1)]
        
    return pd.Series({'date': start_dates, 'seconds': dts})

def expand_dates(df):
    date_data = df.apply(lambda x: get_dates_and_durations(x.start_date, x.end_date), axis=1)
    df = df.join(date_data)
    df = df.explode(column=['date', 'seconds'])
    df = df.drop(['start_date', 'end_date'], axis=1)
    df.date = df.date.astype(str)
    df.seconds = df.seconds.dt.total_seconds().astype(np.int32)
    
    return df

def task(df, ix): 
    path = f"/project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_{ix}.parquet"
    print(ix, path)
    
    df = clean(df)
    df = expand_dates(df)
    df.to_parquet(path, index=None)
    
    return path

with ProcessPoolExecutor(num_workers) as executor:
    for ix, c in enumerate(pd.read_csv('/project/rcde/xdmod_analysis/jobs_with_hosts.csv', chunksize=chunksize)):
        executor.submit(task, c, ix)

0 /project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_0.parquet
1
 /project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_1.parquet2 /project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_2.parquet
3
 /project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_3.parquet4 /project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_4.parquet
5 /project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_5.parquet
6 /project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_6.parquet
7
 /project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_7.parquet8 /project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_8.parquet
9 /project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_9.parquet
10 /project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_10.parquet
11 /project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_11.parquet
12 /project/rcde/xdmod_analysis/jobs_with_hosts_dialy_time/chunk_12.parquet
13
 /project/rcde/xdmod_analysis