In [1]:
import pandas as pd

# Repartitioning needed to be done only once

In [2]:
ORIGINAL_PARTITIONS_COUNT = 10
SUBPARTITIONS_COUNT = 8

def client_sample(data, clients, subpartition_index, subpartitions_count=SUBPARTITIONS_COUNT, column='client'):
    clients_batch = clients[subpartition_index::subpartitions_count]
    return data[data[column].isin(clients_batch)]

In [3]:
from tqdm.notebook import tqdm
import gc
import joblib

data_full = []
for partition_i in tqdm(range(ORIGINAL_PARTITIONS_COUNT)):
    data = pd.read_parquet(f'data/part-0000{partition_i}.parquet')
    clients = sorted(data['client'].unique())
    
    for subpartition_j in tqdm(range(SUBPARTITIONS_COUNT)):
        data_subpartition = client_sample(data, clients, subpartition_j)
        joblib.dump(data_subpartition, f'data/subpartition_{partition_i}_{subpartition_j}.dump')
        del data_subpartition
        
    del data
    gc.collect()

HBox(children=(FloatProgress(value=0.0, max=10.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=8.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=8.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=8.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=8.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=8.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=8.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=8.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=8.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=8.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=8.0), HTML(value='')))





# Feature generation - every time you need features to be generated from scratch

In [7]:
def parquet_processing(partition_i, subpartition_j):
    data = joblib.load(f'data/subpartition_{partition_i}_{subpartition_j}.dump')
    return data.groupby('client').agg({'client': 'count'})

    
def parallel_parquet_processing(n_jobs=4):
    subpartitions = []
    for partition_i in range(ORIGINAL_PARTITIONS_COUNT):
        for subpartition_j in range(SUBPARTITIONS_COUNT):
            subpartitions.append((partition_i, subpartition_j))
            
    results = joblib.Parallel(n_jobs=n_jobs)(
        joblib.delayed(parquet_processing)(partition_i, subpartition_j)
        for partition_i, subpartition_j in
        tqdm(subpartitions)
    )
    
    return pd.concat(results, axis=0)

In [8]:
%%time

parquet_processing(0, 0)

CPU times: user 1.24 s, sys: 548 ms, total: 1.78 s
Wall time: 1.78 s


Unnamed: 0_level_0,client
client,Unnamed: 1_level_1
0014a49ec89e3a43098375b107f8ff2e,245
006478dcc105b76e2575d292d77d3d36,2101
008d70f711b6874b7221393e8e0e9586,555
00c4502a17b487c52777382b5c14695a,718
00fa0a7fd72e913b507ae82d2179b57f,1588
...,...
febede1840d8ea9661f7aa79423bdd15,427
fed749b35ffed283cba07bd442899f72,93
ff20798f91df6388f2a9b34d8079522d,2948
ff760c65cebf0199d5e5ae44a179f252,364


In [9]:
parallel_parquet_processing(n_jobs=16)

HBox(children=(FloatProgress(value=0.0, max=80.0), HTML(value='')))




Unnamed: 0_level_0,client
client,Unnamed: 1_level_1
0014a49ec89e3a43098375b107f8ff2e,245
006478dcc105b76e2575d292d77d3d36,2101
008d70f711b6874b7221393e8e0e9586,555
00c4502a17b487c52777382b5c14695a,718
00fa0a7fd72e913b507ae82d2179b57f,1588
...,...
fee9b07280d6b7f1ea4262eb8a26d2a0,522
ff34da49e2b9eb59c7b4664ba17094ff,527
ff908f0720bf22060753cbe197ecfdf7,633
ffbe44e0ae513b0a49c8c55ff5fd4076,1374
