In [1]:
import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
	split_ind, df_split, df_f_name = tup_arg
	return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
	if njobs == -1:
		njobs = multiprocessing.cpu_count()
	pool = multiprocessing.Pool(processes=njobs)

	try:
		splits = np.array_split(df[subset], njobs)
	except ValueError:
		splits = np.array_split(df, njobs)

	pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
	results = pool.map(partial(_df_split, **kwargs), pool_data)
	pool.close()
	pool.join()
	results = sorted(results, key=lambda x:x[0])
	results = pd.concat([split[1] for split in results])
	return results


# testing on "apply" and "isin" pandas functions
# you can also use tqdm's progress_apply (a pandas plugin function), to get nice progrees bars per each core (but slower..)
from time import time
from tqdm import tqdm

In [2]:
def sample_test()
    sep = '-' * 50

    # isin test
    N = 10000000
    df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
    lookfor = np.random.randint(low=1, high=N, size=1000000)

    print('{}\ntesting pandas isin on {}\n{}'.format(sep, df.shape, sep))
    t1 = time()
    print('result\n{}'.format(df.isin(lookfor).sum()))
    t2 = time()
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
    print('result\n{}'.format(res.sum()))
    t4 = time()
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

    def apply_f(row):
        return row['c1'] + 0.1
    # apply / tqdm apply test
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    # res = df.progress_apply(apply_f, axis=1)
    # tqdm.pandas()
    res = df.apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

--------------------------------------------------
testing pandas isin on (10000000, 2)
--------------------------------------------------
result
c1    951375
c2    951533
dtype: int64
time for native implementation 4.7
--------------------------------------------------
result
c1    951375
dtype: int64
time for multi core implementation 1.15
--------------------------------------------------
testing pandas apply on (1000000, 2)
--------------------------------------------------
result random sample
157105    157105.1
374554    374554.1
688694    688694.1
dtype: float64
time for native implementation 15.79
--------------------------------------------------
result random sample
157105    157105.1
374554    374554.1
688694    688694.1
dtype: float64
time for multi core implementation 3.22
--------------------------------------------------


In [2]:
import pandas as pd
df = pd.read_csv('/Users/ankur.kumar/Desktop/Work/projects/client/pantaloon/Pantaloon-MH-processed.csv')[['UID', 'Cleaned_Verbatim']]
df = df.drop_duplicates(['UID'])
df = df[0:10000]
df.shape

(10000, 2)

In [3]:
import spacy
nlp = spacy.load('en_core_web_sm')

In [6]:
def tokenize(row):
    doc = nlp(row['Cleaned_Verbatim'])
    return [d.text for d in doc]

In [8]:
%%time
res = df_multi_core(df=df, df_f_name='apply', subset=['Cleaned_Verbatim'], njobs=6, func=tokenize, axis=1)

CPU times: user 122 ms, sys: 56.3 ms, total: 178 ms
Wall time: 20.4 s


In [10]:
res.values

array([list(['1', '.', 'the', 'fit', '&', 'price', 'was', 'good', '.', '2', '.', 'i', 'was', 'not', 'looking', 'for', 'any', 'particular', 'brand', 'or', 'product', '.', '3', '.', 'this', 'is', 'was', 'just', 'a', 'casual', 'shopping', '.', '4', '.', 'the', 'staff', 'did', 'assist', 'me', 'well', '.', '5', '.', 'apart', 'form', 'pantaloons', 'i', 'also', 'shop', 'west', '-', 'side6', '.', 'recommend', 'pantaloons', 'to', 'friends', '&', 'family', '-', 'yes']),
       list(['1', '.', 'i', 'am', 'satisfied', 'with', 'the', 'staff', 'service', '.', '2', '.', 'the', 'staff', 'was', 'very', 'helpful', 'to', 'me', '.', '3', '.', 'they', 'explained', 'me', 'about', 'the', 'products', '.', '4', '.', 'recommend', 'pantaloons', 'to', 'friends', '&', 'family', '-', 'yes']),
       list(['1', '.', 'i', 'am', 'fine', 'with', 'the', 'range', 'and', 'variety', '.', '2', '.', 'the', 'fit', 'and', 'price', 'is', 'also', 'good', '.', '3', '.', 'i', 'got', 'all', 'the', 'products', 'whatever', 'i', 'was'

In [12]:
from fastprogress import *
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import functools, operator

#function
def parallel(func, items, chunksize=None, max_workers=1):
    
    if chunksize:
        arr = [np.array(items[i: i+chunksize]) for i in (range(0, len(items), chunksize))]
    else:
        arr = np.array_split(items, max_workers)

    if max_workers<2: results = list(progress_bar(map(func, enumerate(arr)), total=len(arr)))
    else:
        with ProcessPoolExecutor(max_workers=max_workers) as ex:
            return list(progress_bar(ex.map(func, enumerate(arr)), total=len(arr)))
    if any([o is not None for o in results]): return results


In [21]:
def tokenize(chunk):
    docs = [[d.text for d in doc] for doc in nlp.pipe(chunk[1])]
    return docs

items = df['Cleaned_Verbatim'].values
results = parallel(tokenize, items, max_workers=6)

In [25]:
type(results)