In [None]:
def cache_users_on_disk(data, cache_dir):
    os.makedirs(cache_dir, exist_ok=True)
    for iid, group in tqdm(data.groupby('installation_id')):
        filename = os.path.join(cache_dir, iid)
        group.reset_index(drop=True).to_feather(filename)
    return cache_dir

In [None]:
class CachedAlgorithm:
    def __init__(self, extractor, meta, files_per_batch=128,
                 pbar=True, num_workers=cpu_count()):
        self.extractor = extractor
        self.meta = meta
        self.files_per_batch = files_per_batch
        self.pbar = pbar
        self.num_workers = num_workers
        
    def run(self, cache_dir, output_dir):
        os.makedirs(output_dir, exist_ok=True)
        
        def _extract(filename):
            df = feather.read_dataframe(filename)
            features = self.extractor(df, self.meta)
            return pd.DataFrame(features)
        
        def _save(pair):
            dataframe, filename = pair
            name = os.path.basename(filename)
            output_file = os.path.join(output_dir, name)
            dataframe = dataframe.reset_index(drop=True)
            dataframe.to_pickle(output_file)
            return output_file
        
        filenames = [os.path.join(cache_dir, fn) for fn in os.listdir(cache_dir)]
        chunks = list(U.chunks(filenames, self.files_per_batch))
        if self.pbar:
            chunks = tqdm(chunks)
        output_files = []
        n = self.num_workers
        for chunk in chunks:
            datasets = U.parallel(_extract, chunk, num_workers=n)
            saved_files = U.parallel(_save, zip(datasets, chunk), num_workers=n)
            output_files.extend(saved_files)
        return output_files