# Embarrasingly parallel date operations

each row is processed independently

I want to calculate the number of days to and after the next holiday. As I am new to python I am unsure how to perform such a calculation efficiently

In [1]:
%load_ext Cython

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.cm as cm
import datetime as DT

import matplotlib.pyplot as plt
plt.style.use('ggplot')
import time

load dates and holidays

In [4]:
datesFrame = pd.read_csv('myDates.csv')
datesFrame.myDates = pd.to_datetime(datesFrame.myDates)

holidays = pd.read_csv('holidays.csv')
holidays.day = pd.to_datetime(holidays.day)
holidays.type = holidays.type.astype("category")
holidays.name = holidays.name.astype("category")

holidays = holidays[holidays.apply(lambda x: (x.type == 'National holiday'), axis=1)]

In [5]:
def get_nearest_date(dates, pivot):
    nearest = min(dates, key=lambda x: abs(x - pivot))
    difference = abs(nearest - pivot)
    return difference / np.timedelta64(1, 'D')

## approach 1 - too slow

In [6]:
datesFrameSmall= datesFrame[: 10]

In [7]:
time1 = time.time()
datesFrameSmall['daysBeforeHoliday'] = datesFrameSmall.myDates.apply(lambda x: get_nearest_date(holidays.day[holidays.day < x], x))
datesFrameSmall['daysAfterHoliday']  =  datesFrameSmall.myDates.apply(lambda x: get_nearest_date(holidays.day[holidays.day > x], x))
time2 = time.time()
print ('function took %0.3f ms' % ((time2-time1)*1000.0))
datesFrameSmall

function took 114.090 ms


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  from ipykernel import kernelapp as app
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  app.launch_new_instance()


Unnamed: 0,myDates,daysBeforeHoliday,daysAfterHoliday
0,2014-09-01,17.0,55.0
1,2014-03-01,54.0,51.0
2,2014-03-01,54.0,51.0
3,2014-01-18,12.0,93.0
4,2014-01-14,8.0,97.0
5,2014-01-23,17.0,88.0
6,2014-12-01,30.0,7.0
7,2014-03-01,54.0,51.0
8,2014-03-01,54.0,51.0
9,2014-04-06,90.0,15.0


In [16]:
# disabled - takes too long
#time1 = time.time()
#datesFrame['daysBeforeHoliday'] = datesFrame.myDates.apply(lambda x: get_nearest_date(holidays.day[holidays.day < x], x))
#datesFrame['daysAfterHoliday']  =  datesFrame.myDates.apply(lambda x: get_nearest_date(holidays.day[holidays.day > x], x))
#time2 = time.time()
#print ('function took %0.3f ms' % ((time2-time1)*1000.0))
#datesFrame

## approach 2 - noReturnValues  -> parallel processing + cython

In [19]:
%%cython
################
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.cm as cm
import datetime as DT
import skutils

import matplotlib.pyplot as plt
plt.style.use('ggplot')
import time
## strange - > have to re import all the things for cython. 
datesFrame = pd.read_csv('myDates.csv')
datesFrame.myDates = pd.to_datetime(datesFrame.myDates)

holidays = pd.read_csv('holidays.csv')
holidays.day = pd.to_datetime(holidays.day)
holidays.type = holidays.type.astype("category")
holidays.name = holidays.name.astype("category")

holidays = holidays[holidays.apply(lambda x: (x.type == 'National holiday'), axis=1)]

def get_nearest_date(dates, pivot):
    nearest = min(dates, key=lambda x: abs(x - pivot))
    difference = abs(nearest - pivot)
    return difference / np.timedelta64(1, 'D')
################
import multiprocessing
num_cpus = multiprocessing.cpu_count()

time1 = time.time()
n_entries = datesFrame.shape[0]
n_entrie_perfold = round(n_entries / num_cpus)
folds = [datesFrame[start:start+n_entrie_perfold] for start in range(0, n_entries + 1, n_entrie_perfold)]

def process_fold(df_per_fold):
    df_per_fold['daysBeforeHoliday'] = df_per_fold.myDates.apply(lambda x: get_nearest_date(holidays.day[holidays.day < x], x))
    df_per_fold['daysAfterHoliday'] = df_per_fold.myDates.apply(lambda x: get_nearest_date(holidays.day[holidays.day > x], x))
    return df_per_fold
    
pool = multiprocessing.Pool(processes=num_cpus)  
pool.map(process_fold, folds)

time2 = time.time()
print ('function took %0.3f ms' % ((time2-time1)*1000.0))
datesFrame

Seems to parallelize well. Still it is not really "quick". Strange that cython does require everything in a single jupyter cell. 

However, there is no output in the datesframe. What am I doing wrong here?

## approach 3 - parallelApply

In [None]:
from multiprocessing import Pool, cpu_count

def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for name, group in dfGrouped])
    return pandas.concat(ret_list)

def apply_row_foo(input_df):
    return input_df.apply((row_foo), axis=1)

n_chunks = 10

grouped = df.groupby(df.index // n_chunks)
applyParallel(grouped, apply_row_foo)

only for grouped data --> how to port to rows?

In [27]:
# from http://stackoverflow.com/questions/26187759/parallelize-apply-after-pandas-groupby/29281494#29281494
from joblib import Parallel, delayed
import multiprocessing

def tmpFunc(df):
    df['c'] = df.a + df.b
    return df

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)

df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
print ('parallel version: ')
print( applyParallel(df.groupby(df.index), tmpFunc))

print ('regular version: ')
print (df.groupby(df.index).apply(tmpFunc))

parallel version: 
    a  b   c
g1  6  4  10
g1  2  5   7
g2  2  6   8
regular version: 
    a  b   c
g1  6  4  10
g1  2  5   7
g2  2  6   8


here my try to port it. But so far I do not know how to pass both 2 parameters

In [28]:
from joblib import Parallel, delayed
import multiprocessing

def get_nearest_dateParallel(dates, pivot):
    nearest = min(dates, key=lambda x: abs(x - pivot))
    difference = abs(nearest - pivot)
    return difference / np.timedelta64(1, 'D')

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)

print ('parallel version: ')
print( applyParallel(datesFrame.groupby(datesFrame.index), get_nearest_dateParallel))

parallel version: 


JoblibTypeError: JoblibTypeError
___________________________________________________________________________
Multiprocessing exception:
...........................................................................
/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/runpy.py in _run_module_as_main(mod_name='ipykernel.__main__', alter_argv=1)
    179         sys.exit(msg)
    180     main_globals = sys.modules["__main__"].__dict__
    181     if alter_argv:
    182         sys.argv[0] = mod_spec.origin
    183     return _run_code(code, main_globals, None,
--> 184                      "__main__", mod_spec)
        mod_spec = ModuleSpec(name='ipykernel.__main__', loader=<_f...b/python3.5/site-packages/ipykernel/__main__.py')
    185 
    186 def run_module(mod_name, init_globals=None,
    187                run_name=None, alter_sys=False):
    188     """Execute a module's code without importing it

...........................................................................
/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/runpy.py in _run_code(code=<code object <module> at 0x105b74300, file "/usr...3.5/site-packages/ipykernel/__main__.py", line 1>, run_globals={'__builtins__': <module 'builtins' (built-in)>, '__cached__': '/usr/local/lib/python3.5/site-packages/ipykernel/__pycache__/__main__.cpython-35.pyc', '__doc__': None, '__file__': '/usr/local/lib/python3.5/site-packages/ipykernel/__main__.py', '__loader__': <_frozen_importlib_external.SourceFileLoader object>, '__name__': '__main__', '__package__': 'ipykernel', '__spec__': ModuleSpec(name='ipykernel.__main__', loader=<_f...b/python3.5/site-packages/ipykernel/__main__.py'), 'app': <module 'ipykernel.kernelapp' from '/usr/local/lib/python3.5/site-packages/ipykernel/kernelapp.py'>}, init_globals=None, mod_name='__main__', mod_spec=ModuleSpec(name='ipykernel.__main__', loader=<_f...b/python3.5/site-packages/ipykernel/__main__.py'), pkg_name='ipykernel', script_name=None)
     80                        __cached__ = cached,
     81                        __doc__ = None,
     82                        __loader__ = loader,
     83                        __package__ = pkg_name,
     84                        __spec__ = mod_spec)
---> 85     exec(code, run_globals)
        code = <code object <module> at 0x105b74300, file "/usr...3.5/site-packages/ipykernel/__main__.py", line 1>
        run_globals = {'__builtins__': <module 'builtins' (built-in)>, '__cached__': '/usr/local/lib/python3.5/site-packages/ipykernel/__pycache__/__main__.cpython-35.pyc', '__doc__': None, '__file__': '/usr/local/lib/python3.5/site-packages/ipykernel/__main__.py', '__loader__': <_frozen_importlib_external.SourceFileLoader object>, '__name__': '__main__', '__package__': 'ipykernel', '__spec__': ModuleSpec(name='ipykernel.__main__', loader=<_f...b/python3.5/site-packages/ipykernel/__main__.py'), 'app': <module 'ipykernel.kernelapp' from '/usr/local/lib/python3.5/site-packages/ipykernel/kernelapp.py'>}
     86     return run_globals
     87 
     88 def _run_module_code(code, init_globals=None,
     89                     mod_name=None, mod_spec=None,

...........................................................................
/usr/local/lib/python3.5/site-packages/ipykernel/__main__.py in <module>()
      1 
      2 
----> 3 
      4 if __name__ == '__main__':
      5     from ipykernel import kernelapp as app
      6     app.launch_new_instance()
      7 
      8 
      9 
     10 

...........................................................................
/usr/local/lib/python3.5/site-packages/traitlets-4.2.2-py3.5.egg/traitlets/config/application.py in launch_instance(cls=<class 'ipykernel.kernelapp.IPKernelApp'>, argv=None, **kwargs={})
    591         
    592         If a global instance already exists, this reinitializes and starts it
    593         """
    594         app = cls.instance(**kwargs)
    595         app.initialize(argv)
--> 596         app.start()
        app.start = <bound method IPKernelApp.start of <ipykernel.kernelapp.IPKernelApp object>>
    597 
    598 #-----------------------------------------------------------------------------
    599 # utility functions, for convenience
    600 #-----------------------------------------------------------------------------

...........................................................................
/usr/local/lib/python3.5/site-packages/ipykernel/kernelapp.py in start(self=<ipykernel.kernelapp.IPKernelApp object>)
    469             return self.subapp.start()
    470         if self.poller is not None:
    471             self.poller.start()
    472         self.kernel.start()
    473         try:
--> 474             ioloop.IOLoop.instance().start()
    475         except KeyboardInterrupt:
    476             pass
    477 
    478 launch_new_instance = IPKernelApp.launch_instance

...........................................................................
/usr/local/lib/python3.5/site-packages/zmq/eventloop/ioloop.py in start(self=<zmq.eventloop.ioloop.ZMQIOLoop object>)
    157             PollIOLoop.configure(ZMQIOLoop)
    158         return PollIOLoop.current(*args, **kwargs)
    159     
    160     def start(self):
    161         try:
--> 162             super(ZMQIOLoop, self).start()
        self.start = <bound method ZMQIOLoop.start of <zmq.eventloop.ioloop.ZMQIOLoop object>>
    163         except ZMQError as e:
    164             if e.errno == ETERM:
    165                 # quietly return on ETERM
    166                 pass

...........................................................................
/usr/local/lib/python3.5/site-packages/tornado/ioloop.py in start(self=<zmq.eventloop.ioloop.ZMQIOLoop object>)
    882                 self._events.update(event_pairs)
    883                 while self._events:
    884                     fd, events = self._events.popitem()
    885                     try:
    886                         fd_obj, handler_func = self._handlers[fd]
--> 887                         handler_func(fd_obj, events)
        handler_func = <function wrap.<locals>.null_wrapper>
        fd_obj = <zmq.sugar.socket.Socket object>
        events = 1
    888                     except (OSError, IOError) as e:
    889                         if errno_from_exception(e) == errno.EPIPE:
    890                             # Happens when the client closes the connection
    891                             pass

...........................................................................
/usr/local/lib/python3.5/site-packages/tornado/stack_context.py in null_wrapper(*args=(<zmq.sugar.socket.Socket object>, 1), **kwargs={})
    270         # Fast path when there are no active contexts.
    271         def null_wrapper(*args, **kwargs):
    272             try:
    273                 current_state = _state.contexts
    274                 _state.contexts = cap_contexts[0]
--> 275                 return fn(*args, **kwargs)
        args = (<zmq.sugar.socket.Socket object>, 1)
        kwargs = {}
    276             finally:
    277                 _state.contexts = current_state
    278         null_wrapper._wrapped = True
    279         return null_wrapper

...........................................................................
/usr/local/lib/python3.5/site-packages/zmq/eventloop/zmqstream.py in _handle_events(self=<zmq.eventloop.zmqstream.ZMQStream object>, fd=<zmq.sugar.socket.Socket object>, events=1)
    435             # dispatch events:
    436             if events & IOLoop.ERROR:
    437                 gen_log.error("got POLLERR event on ZMQStream, which doesn't make sense")
    438                 return
    439             if events & IOLoop.READ:
--> 440                 self._handle_recv()
        self._handle_recv = <bound method ZMQStream._handle_recv of <zmq.eventloop.zmqstream.ZMQStream object>>
    441                 if not self.socket:
    442                     return
    443             if events & IOLoop.WRITE:
    444                 self._handle_send()

...........................................................................
/usr/local/lib/python3.5/site-packages/zmq/eventloop/zmqstream.py in _handle_recv(self=<zmq.eventloop.zmqstream.ZMQStream object>)
    467                 gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
    468         else:
    469             if self._recv_callback:
    470                 callback = self._recv_callback
    471                 # self._recv_callback = None
--> 472                 self._run_callback(callback, msg)
        self._run_callback = <bound method ZMQStream._run_callback of <zmq.eventloop.zmqstream.ZMQStream object>>
        callback = <function wrap.<locals>.null_wrapper>
        msg = [<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>]
    473                 
    474         # self.update_state()
    475         
    476 

...........................................................................
/usr/local/lib/python3.5/site-packages/zmq/eventloop/zmqstream.py in _run_callback(self=<zmq.eventloop.zmqstream.ZMQStream object>, callback=<function wrap.<locals>.null_wrapper>, *args=([<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>],), **kwargs={})
    409         close our socket."""
    410         try:
    411             # Use a NullContext to ensure that all StackContexts are run
    412             # inside our blanket exception handler rather than outside.
    413             with stack_context.NullContext():
--> 414                 callback(*args, **kwargs)
        callback = <function wrap.<locals>.null_wrapper>
        args = ([<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>],)
        kwargs = {}
    415         except:
    416             gen_log.error("Uncaught exception, closing connection.",
    417                           exc_info=True)
    418             # Close the socket on an uncaught exception from a user callback

...........................................................................
/usr/local/lib/python3.5/site-packages/tornado/stack_context.py in null_wrapper(*args=([<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>],), **kwargs={})
    270         # Fast path when there are no active contexts.
    271         def null_wrapper(*args, **kwargs):
    272             try:
    273                 current_state = _state.contexts
    274                 _state.contexts = cap_contexts[0]
--> 275                 return fn(*args, **kwargs)
        args = ([<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>],)
        kwargs = {}
    276             finally:
    277                 _state.contexts = current_state
    278         null_wrapper._wrapped = True
    279         return null_wrapper

...........................................................................
/usr/local/lib/python3.5/site-packages/ipykernel/kernelbase.py in dispatcher(msg=[<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>])
    271         if self.control_stream:
    272             self.control_stream.on_recv(self.dispatch_control, copy=False)
    273 
    274         def make_dispatcher(stream):
    275             def dispatcher(msg):
--> 276                 return self.dispatch_shell(stream, msg)
        msg = [<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>]
    277             return dispatcher
    278 
    279         for s in self.shell_streams:
    280             s.on_recv(make_dispatcher(s), copy=False)

...........................................................................
/usr/local/lib/python3.5/site-packages/ipykernel/kernelbase.py in dispatch_shell(self=<ipykernel.ipkernel.IPythonKernel object>, stream=<zmq.eventloop.zmqstream.ZMQStream object>, msg={'buffers': [], 'content': {'allow_stdin': True, 'code': 'from joblib import Parallel, delayed\nimport mult...pby(datesFrame.index), get_nearest_dateParallel))', 'silent': False, 'stop_on_error': True, 'store_history': True, 'user_expressions': {}}, 'header': {'date': '2016-09-03T17:17:40.373104', 'msg_id': '72DDB024E68D492BAB4CF4741C36F235', 'msg_type': 'execute_request', 'session': '220818B99B4449D589C6EEE455A037F3', 'username': 'username', 'version': '5.0'}, 'metadata': {}, 'msg_id': '72DDB024E68D492BAB4CF4741C36F235', 'msg_type': 'execute_request', 'parent_header': {}})
    223             self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
    224         else:
    225             self.log.debug("%s: %s", msg_type, msg)
    226             self.pre_handler_hook()
    227             try:
--> 228                 handler(stream, idents, msg)
        handler = <bound method Kernel.execute_request of <ipykernel.ipkernel.IPythonKernel object>>
        stream = <zmq.eventloop.zmqstream.ZMQStream object>
        idents = [b'220818B99B4449D589C6EEE455A037F3']
        msg = {'buffers': [], 'content': {'allow_stdin': True, 'code': 'from joblib import Parallel, delayed\nimport mult...pby(datesFrame.index), get_nearest_dateParallel))', 'silent': False, 'stop_on_error': True, 'store_history': True, 'user_expressions': {}}, 'header': {'date': '2016-09-03T17:17:40.373104', 'msg_id': '72DDB024E68D492BAB4CF4741C36F235', 'msg_type': 'execute_request', 'session': '220818B99B4449D589C6EEE455A037F3', 'username': 'username', 'version': '5.0'}, 'metadata': {}, 'msg_id': '72DDB024E68D492BAB4CF4741C36F235', 'msg_type': 'execute_request', 'parent_header': {}}
    229             except Exception:
    230                 self.log.error("Exception in message handler:", exc_info=True)
    231             finally:
    232                 self.post_handler_hook()

...........................................................................
/usr/local/lib/python3.5/site-packages/ipykernel/kernelbase.py in execute_request(self=<ipykernel.ipkernel.IPythonKernel object>, stream=<zmq.eventloop.zmqstream.ZMQStream object>, ident=[b'220818B99B4449D589C6EEE455A037F3'], parent={'buffers': [], 'content': {'allow_stdin': True, 'code': 'from joblib import Parallel, delayed\nimport mult...pby(datesFrame.index), get_nearest_dateParallel))', 'silent': False, 'stop_on_error': True, 'store_history': True, 'user_expressions': {}}, 'header': {'date': '2016-09-03T17:17:40.373104', 'msg_id': '72DDB024E68D492BAB4CF4741C36F235', 'msg_type': 'execute_request', 'session': '220818B99B4449D589C6EEE455A037F3', 'username': 'username', 'version': '5.0'}, 'metadata': {}, 'msg_id': '72DDB024E68D492BAB4CF4741C36F235', 'msg_type': 'execute_request', 'parent_header': {}})
    385         if not silent:
    386             self.execution_count += 1
    387             self._publish_execute_input(code, parent, self.execution_count)
    388 
    389         reply_content = self.do_execute(code, silent, store_history,
--> 390                                         user_expressions, allow_stdin)
        user_expressions = {}
        allow_stdin = True
    391 
    392         # Flush output before sending the reply.
    393         sys.stdout.flush()
    394         sys.stderr.flush()

...........................................................................
/usr/local/lib/python3.5/site-packages/ipykernel/ipkernel.py in do_execute(self=<ipykernel.ipkernel.IPythonKernel object>, code='from joblib import Parallel, delayed\nimport mult...pby(datesFrame.index), get_nearest_dateParallel))', silent=False, store_history=True, user_expressions={}, allow_stdin=True)
    191 
    192         self._forward_input(allow_stdin)
    193 
    194         reply_content = {}
    195         try:
--> 196             res = shell.run_cell(code, store_history=store_history, silent=silent)
        res = undefined
        shell.run_cell = <bound method ZMQInteractiveShell.run_cell of <ipykernel.zmqshell.ZMQInteractiveShell object>>
        code = 'from joblib import Parallel, delayed\nimport mult...pby(datesFrame.index), get_nearest_dateParallel))'
        store_history = True
        silent = False
    197         finally:
    198             self._restore_input()
    199 
    200         if res.error_before_exec is not None:

...........................................................................
/usr/local/lib/python3.5/site-packages/ipykernel/zmqshell.py in run_cell(self=<ipykernel.zmqshell.ZMQInteractiveShell object>, *args=('from joblib import Parallel, delayed\nimport mult...pby(datesFrame.index), get_nearest_dateParallel))',), **kwargs={'silent': False, 'store_history': True})
    493             )
    494         self.payload_manager.write_payload(payload)
    495 
    496     def run_cell(self, *args, **kwargs):
    497         self._last_traceback = None
--> 498         return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
        self.run_cell = <bound method ZMQInteractiveShell.run_cell of <ipykernel.zmqshell.ZMQInteractiveShell object>>
        args = ('from joblib import Parallel, delayed\nimport mult...pby(datesFrame.index), get_nearest_dateParallel))',)
        kwargs = {'silent': False, 'store_history': True}
    499 
    500     def _showtraceback(self, etype, evalue, stb):
    501         # try to preserve ordering of tracebacks and print statements
    502         sys.stdout.flush()

...........................................................................
/usr/local/lib/python3.5/site-packages/IPython/core/interactiveshell.py in run_cell(self=<ipykernel.zmqshell.ZMQInteractiveShell object>, raw_cell='from joblib import Parallel, delayed\nimport mult...pby(datesFrame.index), get_nearest_dateParallel))', store_history=True, silent=False, shell_futures=True)
   2712                 self.displayhook.exec_result = result
   2713 
   2714                 # Execute the user code
   2715                 interactivity = "none" if silent else self.ast_node_interactivity
   2716                 has_raised = self.run_ast_nodes(code_ast.body, cell_name,
-> 2717                    interactivity=interactivity, compiler=compiler, result=result)
        interactivity = 'last_expr'
        compiler = <IPython.core.compilerop.CachingCompiler object>
   2718                 
   2719                 self.last_execution_succeeded = not has_raised
   2720 
   2721                 # Reset this so later displayed values do not modify the

...........................................................................
/usr/local/lib/python3.5/site-packages/IPython/core/interactiveshell.py in run_ast_nodes(self=<ipykernel.zmqshell.ZMQInteractiveShell object>, nodelist=[<_ast.ImportFrom object>, <_ast.Import object>, <_ast.FunctionDef object>, <_ast.FunctionDef object>, <_ast.Expr object>, <_ast.Expr object>], cell_name='<ipython-input-28-24c5f514da83>', interactivity='last', compiler=<IPython.core.compilerop.CachingCompiler object>, result=<ExecutionResult object at 10c5bb400, execution_..._before_exec=None error_in_exec=None result=None>)
   2822                     return True
   2823 
   2824             for i, node in enumerate(to_run_interactive):
   2825                 mod = ast.Interactive([node])
   2826                 code = compiler(mod, cell_name, "single")
-> 2827                 if self.run_code(code, result):
        self.run_code = <bound method InteractiveShell.run_code of <ipykernel.zmqshell.ZMQInteractiveShell object>>
        code = <code object <module> at 0x107210780, file "<ipython-input-28-24c5f514da83>", line 14>
        result = <ExecutionResult object at 10c5bb400, execution_..._before_exec=None error_in_exec=None result=None>
   2828                     return True
   2829 
   2830             # Flush softspace
   2831             if softspace(sys.stdout, 0):

...........................................................................
/usr/local/lib/python3.5/site-packages/IPython/core/interactiveshell.py in run_code(self=<ipykernel.zmqshell.ZMQInteractiveShell object>, code_obj=<code object <module> at 0x107210780, file "<ipython-input-28-24c5f514da83>", line 14>, result=<ExecutionResult object at 10c5bb400, execution_..._before_exec=None error_in_exec=None result=None>)
   2876         outflag = 1  # happens in more places, so it's easier as default
   2877         try:
   2878             try:
   2879                 self.hooks.pre_run_code_hook()
   2880                 #rprint('Running code', repr(code_obj)) # dbg
-> 2881                 exec(code_obj, self.user_global_ns, self.user_ns)
        code_obj = <code object <module> at 0x107210780, file "<ipython-input-28-24c5f514da83>", line 14>
        self.user_global_ns = {'DT': <module 'datetime' from '/usr/local/Cellar/pytho...ramework/Versions/3.5/lib/python3.5/datetime.py'>, 'In': ['', 'get_ipython().run_cell_magic(\'cython\', \'\', "%loa...ot as plt\\nplt.style.use(\'ggplot\')\\nimport time")', 'get_ipython().run_cell_magic(\'Cython\', \'\', "%loa...ot as plt\\nplt.style.use(\'ggplot\')\\nimport time")', "get_ipython().magic('load_ext Cython')", 'get_ipython().run_cell_magic(\'cython\', \'\', "%loa...ot as plt\\nplt.style.use(\'ggplot\')\\nimport time")', 'get_ipython().run_cell_magic(\'cython\', \'\', "%mat...ot as plt\\nplt.style.use(\'ggplot\')\\nimport time")', 'get_ipython().run_cell_magic(\'cython\', \'\', "impo...ot as plt\\nplt.style.use(\'ggplot\')\\nimport time")', r"get_ipython().run_cell_magic('cython', '', 'date...a x: (x.type == \'National holiday\'), axis=1)]')", "datesFrame = pd.read_csv('myDates.csv')\ndatesFra...ambda x: (x.type == 'National holiday'), axis=1)]", 'get_ipython().run_cell_magic(\'cython\', \'\', "def ...n    return difference / np.timedelta64(1, \'D\')")', "get_ipython().magic('load_ext Cython')", "import pandas as pd\nimport numpy as np\nimport ma...pyplot as plt\nplt.style.use('ggplot')\nimport time", "datesFrame = pd.read_csv('myDates.csv')\ndatesFra...ambda x: (x.type == 'National holiday'), axis=1)]", "def get_nearest_date(dates, pivot):\n    nearest ...t)\n    return difference / np.timedelta64(1, 'D')", 'datesFrameSmall= datesFrame[: 10]', "time1 = time.time()\ndatesFrameSmall['daysBeforeH....3f ms' % ((time2-time1)*1000.0))\ndatesFrameSmall", "# disabled - takes too long\n#time1 = time.time()...k %0.3f ms' % ((time2-time1)*1000.0))\n#datesFrame", r"get_ipython().run_cell_magic('cython', '', '####...sses=num_cpus)  \npool.map(process_fold, folds)')", r"get_ipython().run_cell_magic('cython', '', '####...sses=num_cpus)  \npool.map(process_fold, folds)')", r"get_ipython().run_cell_magic('cython', '', '####...sses=num_cpus)  \npool.map(process_fold, folds)')", ...], 'Out': {15:      myDates  daysBeforeHoliday  daysAfterHolida...9 2014-04-06               90.0              15.0, 20:           myDates
0      2014-09-01
1      2014-...2-12
178663 2015-02-12

[178664 rows x 1 columns], 25: <pandas.core.groupby.DataFrameGroupBy object>}, 'Parallel': <class 'joblib.parallel.Parallel'>, '_': <pandas.core.groupby.DataFrameGroupBy object>, '_15':      myDates  daysBeforeHoliday  daysAfterHolida...9 2014-04-06               90.0              15.0, '_20':           myDates
0      2014-09-01
1      2014-...2-12
178663 2015-02-12

[178664 rows x 1 columns], '_25': <pandas.core.groupby.DataFrameGroupBy object>, '__':           myDates
0      2014-09-01
1      2014-...2-12
178663 2015-02-12

[178664 rows x 1 columns], '___':      myDates  daysBeforeHoliday  daysAfterHolida...9 2014-04-06               90.0              15.0, ...}
        self.user_ns = {'DT': <module 'datetime' from '/usr/local/Cellar/pytho...ramework/Versions/3.5/lib/python3.5/datetime.py'>, 'In': ['', 'get_ipython().run_cell_magic(\'cython\', \'\', "%loa...ot as plt\\nplt.style.use(\'ggplot\')\\nimport time")', 'get_ipython().run_cell_magic(\'Cython\', \'\', "%loa...ot as plt\\nplt.style.use(\'ggplot\')\\nimport time")', "get_ipython().magic('load_ext Cython')", 'get_ipython().run_cell_magic(\'cython\', \'\', "%loa...ot as plt\\nplt.style.use(\'ggplot\')\\nimport time")', 'get_ipython().run_cell_magic(\'cython\', \'\', "%mat...ot as plt\\nplt.style.use(\'ggplot\')\\nimport time")', 'get_ipython().run_cell_magic(\'cython\', \'\', "impo...ot as plt\\nplt.style.use(\'ggplot\')\\nimport time")', r"get_ipython().run_cell_magic('cython', '', 'date...a x: (x.type == \'National holiday\'), axis=1)]')", "datesFrame = pd.read_csv('myDates.csv')\ndatesFra...ambda x: (x.type == 'National holiday'), axis=1)]", 'get_ipython().run_cell_magic(\'cython\', \'\', "def ...n    return difference / np.timedelta64(1, \'D\')")', "get_ipython().magic('load_ext Cython')", "import pandas as pd\nimport numpy as np\nimport ma...pyplot as plt\nplt.style.use('ggplot')\nimport time", "datesFrame = pd.read_csv('myDates.csv')\ndatesFra...ambda x: (x.type == 'National holiday'), axis=1)]", "def get_nearest_date(dates, pivot):\n    nearest ...t)\n    return difference / np.timedelta64(1, 'D')", 'datesFrameSmall= datesFrame[: 10]', "time1 = time.time()\ndatesFrameSmall['daysBeforeH....3f ms' % ((time2-time1)*1000.0))\ndatesFrameSmall", "# disabled - takes too long\n#time1 = time.time()...k %0.3f ms' % ((time2-time1)*1000.0))\n#datesFrame", r"get_ipython().run_cell_magic('cython', '', '####...sses=num_cpus)  \npool.map(process_fold, folds)')", r"get_ipython().run_cell_magic('cython', '', '####...sses=num_cpus)  \npool.map(process_fold, folds)')", r"get_ipython().run_cell_magic('cython', '', '####...sses=num_cpus)  \npool.map(process_fold, folds)')", ...], 'Out': {15:      myDates  daysBeforeHoliday  daysAfterHolida...9 2014-04-06               90.0              15.0, 20:           myDates
0      2014-09-01
1      2014-...2-12
178663 2015-02-12

[178664 rows x 1 columns], 25: <pandas.core.groupby.DataFrameGroupBy object>}, 'Parallel': <class 'joblib.parallel.Parallel'>, '_': <pandas.core.groupby.DataFrameGroupBy object>, '_15':      myDates  daysBeforeHoliday  daysAfterHolida...9 2014-04-06               90.0              15.0, '_20':           myDates
0      2014-09-01
1      2014-...2-12
178663 2015-02-12

[178664 rows x 1 columns], '_25': <pandas.core.groupby.DataFrameGroupBy object>, '__':           myDates
0      2014-09-01
1      2014-...2-12
178663 2015-02-12

[178664 rows x 1 columns], '___':      myDates  daysBeforeHoliday  daysAfterHolida...9 2014-04-06               90.0              15.0, ...}
   2882             finally:
   2883                 # Reset our crash handler in place
   2884                 sys.excepthook = old_excepthook
   2885         except SystemExit as e:

...........................................................................
/Users/geoHeil/Dropbox/masterThesis/thesis/researchCode/python/TMA/<ipython-input-28-24c5f514da83> in <module>()
      9 def applyParallel(dfGrouped, func):
     10     retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
     11     return pd.concat(retLst)
     12 
     13 print ('parallel version: ')
---> 14 print( applyParallel(datesFrame.groupby(datesFrame.index), get_nearest_dateParallel))
     15 
     16 
     17 
     18 

...........................................................................
/Users/geoHeil/Dropbox/masterThesis/thesis/researchCode/python/TMA/<ipython-input-28-24c5f514da83> in applyParallel(dfGrouped=<pandas.core.groupby.DataFrameGroupBy object>, func=<function get_nearest_dateParallel>)
      5     nearest = min(dates, key=lambda x: abs(x - pivot))
      6     difference = abs(nearest - pivot)
      7     return difference / np.timedelta64(1, 'D')
      8 
      9 def applyParallel(dfGrouped, func):
---> 10     retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
     11     return pd.concat(retLst)
     12 
     13 print ('parallel version: ')
     14 print( applyParallel(datesFrame.groupby(datesFrame.index), get_nearest_dateParallel))

...........................................................................
/usr/local/lib/python3.5/site-packages/joblib/parallel.py in __call__(self=Parallel(n_jobs=8), iterable=<generator object applyParallel.<locals>.<genexpr>>)
    763             if pre_dispatch == "all" or n_jobs == 1:
    764                 # The iterable was consumed all at once by the above for loop.
    765                 # No need to wait for async callbacks to trigger to
    766                 # consumption.
    767                 self._iterating = False
--> 768             self.retrieve()
        self.retrieve = <bound method Parallel.retrieve of Parallel(n_jobs=8)>
    769             # Make sure that we get a last message telling us we are done
    770             elapsed_time = time.time() - self._start_time
    771             self._print('Done %3i out of %3i | elapsed: %s finished',
    772                         (len(self._output), len(self._output),

---------------------------------------------------------------------------
Sub-process traceback:
---------------------------------------------------------------------------
TypeError                                          Sat Sep  3 17:17:40 2016
PID: 4494                Python 3.5.2: /usr/local/opt/python3/bin/python3.5
...........................................................................
/usr/local/lib/python3.5/site-packages/joblib/parallel.py in __call__(self=<joblib.parallel.BatchedCalls object>)
    126     def __init__(self, iterator_slice):
    127         self.items = list(iterator_slice)
    128         self._size = len(self.items)
    129 
    130     def __call__(self):
--> 131         return [func(*args, **kwargs) for func, args, kwargs in self.items]
        self.items = [(<function get_nearest_dateParallel>, (     myDates
0 2014-09-01,), {})]
    132 
    133     def __len__(self):
    134         return self._size
    135 

...........................................................................
/usr/local/lib/python3.5/site-packages/joblib/parallel.py in <listcomp>(.0=<list_iterator object>)
    126     def __init__(self, iterator_slice):
    127         self.items = list(iterator_slice)
    128         self._size = len(self.items)
    129 
    130     def __call__(self):
--> 131         return [func(*args, **kwargs) for func, args, kwargs in self.items]
        func = <function get_nearest_dateParallel>
        args = (     myDates
0 2014-09-01,)
        kwargs = {}
    132 
    133     def __len__(self):
    134         return self._size
    135 

TypeError: get_nearest_dateParallel() missing 1 required positional argument: 'pivot'
___________________________________________________________________________

## approach 4 - pool

did not have too much time to look into that option. But if approach 3 works that looks much cleaner

In [None]:
def parallelize_dataframe(df, func):
    df_split = np.array_split(df, num_partitions)
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

In [None]:
def multiply_columns(data):
    data['length_of_word'] = data['species'].apply(lambda x: len(x))
    return data
    
iris = parallelize_dataframe(iris, multiply_columns)