In [1]:
import os
import gc
from glob import glob
from itertools import combinations
import pandas as pd
import dask
import dask.dataframe as dd

In [2]:
input_file = "../data/raw/train.hdf.table.compress"
output_dir = "../data/interim/partitioned2"

dtypes = {
    'ip': 'uint32',
    'app': 'uint16',
    'device': 'uint16',
    'os': 'uint16',
    'channel': 'uint16',
    'is_attributed': 'uint8'
}

def add_group_column2(df, column, t, npartitions):
    df[column] = df[t[0]].astype(str) + "_" + df[t[1]].astype(str)
    df['npartition'] = df[column].apply(hash) % npartitions
    return df

def add_group_column3(df, column, t, npartitions):
    df[column] = df[t[0]].astype(str) + "_" + df[t[1]].astype(str) + "_" + df[t[2]].astype(str)
    df['npartition'] = df[column].apply(hash) % npartitions
    return df

def partition_by(df, column, npartitions):
    print(f"Processing {column}")
    directory = f"{output_dir}/{column}"
    to_write=list(df)
    if not os.path.exists(directory):
        os.makedirs(directory)
    df['npartition'] = df[column] % npartitions
    df.groupby('npartition').apply(lambda x: x[to_write].to_hdf(f"{directory}/train_{x.name}.hdf.compress", 'train', mode='w', complib='blosc', fletcher32=True))
    del df['npartition']
    gc.collect()

def partition_by2(filename, t, npartitions):
    column = f"{t[0]}_{t[1]}"
    print(f"Processing {column}")
    directory = f"{output_dir}/{column}"
    if not os.path.exists(directory): os.makedirs(directory)

    df = dd.read_hdf(filename, 'train')
    to_write=list(df.columns)
    to_write.append(column)
    
    df = df.map_partitions(add_group_column2, column, t, npartitions)
    df = df.groupby('npartition')
    df = df.apply(lambda x: x[to_write].to_hdf(f"{directory}/train_{x.name}.hdf.compress", 'train', mode='w', complib='blosc', fletcher32=True))
    df = df.compute()
    del df
    gc.collect()

def partition_by3(df, t, npartitions):
    column = f"{t[0]}_{t[1]}_{t[2]}"
    print(f"Processing {column}")
    directory = f"{output_dir}/{column}"
    if not os.path.exists(directory): os.makedirs(directory)
    df = dd.read_hdf(filename, 'train')
    to_write=list(df.columns)
    to_write.append(column)
    
    df = df.map_partitions(add_group_column3, column, t, npartitions)
    df = df.groupby('npartition')
    df = df.apply(lambda x: x[to_write].to_hdf(f"{directory}/train_{x.name}.hdf.compress", 'train', mode='w', complib='blosc', fletcher32=True))
    df = df.compute()
    del df
    gc.collect()


In [None]:
npartitions = 64
target_entities = ['ip', 'channel', 'app', 'device', 'os']

# for t in combinations(target_entities, 1):
#     partition_by(df, t[0], npartitions)

for t in combinations(target_entities, 2):
    partition_by2(input_file, t, npartitions)

for t in combinations(target_entities, 3):
    partition_by3(input_file, t, npartitions)

Processing ip_channel


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
