Skip to content

Commit

Permalink
Merge pull request #6 from BenoitLebreton/transformer
Browse files Browse the repository at this point in the history
Transformer
  • Loading branch information
sachasamama committed May 9, 2019
2 parents d2c7dda + 720effb commit bd5d50d
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 30 deletions.
2 changes: 1 addition & 1 deletion melusine/nlp_tools/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self,
self.logger.addHandler(ch)
self.logger.debug('Create an Embedding instance.')
self.input_column = input_column
self.streamer = Streamer(columns=self.input_column)
self.streamer = Streamer(column=self.input_column)
self.workers = workers
self.seed = seed
self.iter = iter
Expand Down
2 changes: 1 addition & 1 deletion melusine/nlp_tools/phraser.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def __init__(self,
self.threshold = threshold
self.min_count = min_count
self.input_column = input_column
self.streamer = Streamer(columns=self.input_column)
self.streamer = Streamer(column=self.input_column)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
Expand Down
38 changes: 26 additions & 12 deletions melusine/utils/multiprocessing.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
import numpy as np
import pandas as pd
from sklearn.externals import joblib
from tqdm import tqdm
from sklearn.externals.joblib import Parallel, delayed

def apply_df(input_args):
df, func, kwargs = input_args
if "progress_bar" in kwargs:
progress_bar = kwargs.pop('progress_bar')
else:
progress_bar = False
if "args" in kwargs:
args_ = kwargs.pop('args')
else:
args_ = None
if progress_bar:
tqdm.pandas(leave=False, desc=func.__name__, unit='emails', dynamic_ncols=True, mininterval=2.0)
df = df.progress_apply(func, axis=1, args=args_)
else:
df = df.apply(func, axis=1, args=args_)
return df

def _apply_df(args):
"""Apply a function along an axis of the DataFrame"""
df, func, kwargs = args
return df.apply(func, **kwargs)


def apply_by_multiprocessing(df, func, n_jobs=1, **kwargs):
def apply_by_multiprocessing(df, func, **kwargs):
"""Apply a function along an axis of the DataFrame using multiprocessing.
A maximum of half of the core available in the system will be used.
Expand All @@ -25,7 +36,10 @@ def apply_by_multiprocessing(df, func, n_jobs=1, **kwargs):
pd.DataFrame
Returns the DataFrame with the function applied.
"""
result = joblib.Parallel(n_jobs=n_jobs, prefer='processes')(
joblib.delayed(d.apply)(func, **kwargs) for d in np.array_split(df, n_jobs)
)
return pd.concat(list(result))
workers = kwargs.pop('workers')
workers = min(workers, int(df.shape[0]/2))
workers = max(workers, 1)
if (df.shape[0]==1) or (workers==1):
return apply_df((df, func, kwargs))
retLst = Parallel(n_jobs=workers)(delayed(apply_df)(input_args=(d, func, kwargs)) for d in np.array_split(df, workers))
return pd.concat(retLst)
16 changes: 10 additions & 6 deletions melusine/utils/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Streamer():
Attributes
----------
columns : str or list of str,
column : str,
Input text column(s) to consider for the streamer.
stream : MailIterator object,
Expand All @@ -25,8 +25,8 @@ class Streamer():
"""

def __init__(self, columns='clean_body', n_jobs=40):
self.columns_ = columns
def __init__(self, column='clean_body', n_jobs=2):
self.column_ = column
self.n_jobs = n_jobs

def to_stream(self, X):
Expand Down Expand Up @@ -60,9 +60,12 @@ def to_flattoks(self, X):
-------
list of lists of strings
"""
tokenized_sentences_list = apply_by_multiprocessing(X[self.columns_],
self.to_list_of_tokenized_sentences,
workers=self.n_jobs
tokenized_sentences_list = apply_by_multiprocessing(df=X[[self.column_]],
func=lambda x: self.to_list_of_tokenized_sentences(x[self.column_]),
#func=self.to_list_of_tokenized_sentences,
args=None,
workers=self.n_jobs,
progress_bar=False
)
flattoks = [item for sublist in tokenized_sentences_list
for item in sublist]
Expand All @@ -80,6 +83,7 @@ def to_list_of_tokenized_sentences(self, text):
-------
list of list of strings
"""
#text = row[self.column_]
sentences_list = split_message_to_sentences(text)
tokenized_sentences_list = [nltk.regexp_tokenize(sentence,
pattern="\w+(?:[\?\-\'\"_]\w+)*")
Expand Down
43 changes: 34 additions & 9 deletions melusine/utils/transformer_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class TransformerScheduler(BaseEstimator, TransformerMixin):
n_jobs : int, optional
Number of cores used for computation. Default value, 1.
progress_bar : boolean, optional
Whether to print a progress bar from tqdm package. Default value, True.
Works only when mode is set to 'apply_by_multiprocessing'.
copy : boolean, optional
Make a copy of DataFrame. Default value, True.
Expand Down Expand Up @@ -107,11 +111,13 @@ def __init__(self,
functions_scheduler,
mode='apply',
n_jobs=1,
progress_bar=True,
copy=True,
verbose=0):
self.functions_scheduler = functions_scheduler
self.mode = mode
self.n_jobs = n_jobs
self.progress_bar=True
self.copy = copy
self.verbose = verbose

Expand Down Expand Up @@ -143,16 +149,35 @@ def transform(self, X):

for tuple_ in self.functions_scheduler:
func_, args_, cols_ = _check_tuple(*tuple_)

cols_ = cols_ or X_.columns
# cols_ = cols_ or X_.columns

if self.mode == 'apply':
X_[cols_] = X_.apply(func_, args=args_, axis=1).apply(pd.Series)
if cols_ is None:
X_ = X_.apply(func_, args=args_, axis=1)
elif len(cols_) == 1:
X_[cols_[0]] = X_.apply(func_, args=args_, axis=1)
else:
X_[cols_] = X_.apply(func_, args=args_, axis=1).apply(pd.Series)
else: # 'apply_by_multiprocessing'
X_[cols_] = apply_by_multiprocessing(df=X_,
func=func_,
args=args_,
axis=1,
workers=self.n_jobs).apply(pd.Series)

if cols_ is None:
X_ = apply_by_multiprocessing(df=X_,
func=func_,
args=args_,
axis=1,
workers=self.n_jobs,
progress_bar=self.progress_bar)
elif len(cols_) == 1:
X_[cols_[0]] = apply_by_multiprocessing(df=X_,
func=func_,
args=args_,
axis=1,
workers=self.n_jobs,
progress_bar=self.progress_bar)
else:
X_[cols_] = apply_by_multiprocessing(df=X_,
func=func_,
args=args_,
axis=1,
workers=self.n_jobs,
progress_bar=self.progress_bar).apply(pd.Series)
return X_
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
'nltk>=3.3',
'keras>=2.2.0',
'tqdm>=4.14',
'tensorflow>=1.10.0',
'tensorflow>=1.10.0,<=1.13.1',
'unidecode'
]

Expand Down

0 comments on commit bd5d50d

Please sign in to comment.