In [1]:
import os

import pandas as pd

In [None]:
from tqdm import tqdm
tqdm.pandas(desc="my bar!")

In [None]:
from joblib import Parallel, delayed
import multiprocessing

def applyParallel(dfGrouped, func):
#     series = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(dfGrouped))
    series = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return series

In [2]:
def parse_order(x):
    series = pd.Series()

    series['products'] = '_'.join(x['product_id'].values.astype(str).tolist())
    series['reorders'] = '_'.join(x['reordered'].values.astype(str).tolist())
    series['aisles'] = '_'.join(x['aisle_id'].values.astype(str).tolist())
    series['departments'] = '_'.join(x['department_id'].values.astype(str).tolist())

    series['order_number'] = x['order_number'].iloc[0]
    series['order_dow'] = x['order_dow'].iloc[0]
    series['order_hour'] = x['order_hour_of_day'].iloc[0]
    series['days_since_prior_order'] = x['days_since_prior_order'].iloc[0]

    return series

In [79]:
def parse_user(x):
    parsed_orders = x.groupby('order_id', sort=False).apply(parse_order)

    series = pd.Series()

    series['order_ids'] = ' '.join(parsed_orders.index.map(str).tolist())
    series['order_numbers'] = ' '.join(parsed_orders['order_number'].map(str).tolist())
    series['order_dows'] = ' '.join(parsed_orders['order_dow'].map(str).tolist())
    series['order_hours'] = ' '.join(parsed_orders['order_hour'].map(str).tolist())
    series['days_since_prior_orders'] = ' '.join(parsed_orders['days_since_prior_order'].map(str).tolist())

    series['product_ids'] = ' '.join(parsed_orders['products'].values.astype(str).tolist())
    series['aisle_ids'] = ' '.join(parsed_orders['aisles'].values.astype(str).tolist())
    series['department_ids'] = ' '.join(parsed_orders['departments'].values.astype(str).tolist())
    series['reorders'] = ' '.join(parsed_orders['reorders'].values.astype(str).tolist())

    series['eval_set'] = x['eval_set'].values[-1]

    return series

In [4]:
orders = pd.read_csv('../data/raw/orders_small.csv')
prior_products = pd.read_csv('../data/raw/order_products__prior_small.csv')
train_products = pd.read_csv('../data/raw/order_products__train_small.csv')
order_products = pd.concat([prior_products, train_products], axis=0)
products = pd.read_csv('../data/raw/products.csv')

In [5]:
orders.head()

Unnamed: 0,order_id,user_id,eval_set,order_number,order_dow,order_hour_of_day,days_since_prior_order
0,2497897,52,prior,1,1,9,
1,1318871,52,prior,2,1,10,7.0
2,1261384,52,prior,3,1,10,7.0
3,2100631,52,prior,4,1,11,7.0
4,580568,52,prior,5,2,10,8.0


In [6]:
order_products.head()

Unnamed: 0,order_id,product_id,add_to_cart_order,reordered
0,45,16349,1,1
1,145,31651,1,0
2,145,35898,2,0
3,145,26348,3,0
4,222,24852,1,1


In [7]:
df = orders.merge(order_products, how='left', on='order_id')

In [8]:
df = df.merge(products, how='left', on='product_id')

In [9]:
df['days_since_prior_order'] = df['days_since_prior_order'].fillna(0).astype(int)

In [10]:
df.isnull().sum()

order_id                    0
user_id                     0
eval_set                    0
order_number                0
order_dow                   0
order_hour_of_day           0
days_since_prior_order      0
product_id                750
add_to_cart_order         750
reordered                 750
product_name              750
aisle_id                  750
department_id             750
dtype: int64

In [11]:
null_cols = ['product_id', 'aisle_id', 'department_id', 'add_to_cart_order', 'reordered']

In [12]:
df[null_cols] = df[null_cols].fillna(0).astype(int)

In [13]:
if not os.path.isdir('../data/processed'):
        os.makedirs('../data/processed')

In [14]:
from tqdm import tqdm

In [15]:
tqdm.pandas(desc="my bar!")

In [67]:
df.head()

Unnamed: 0,order_id,user_id,eval_set,order_number,order_dow,order_hour_of_day,days_since_prior_order,product_id,add_to_cart_order,reordered,product_name,aisle_id,department_id
0,2497897,52,prior,1,1,9,0,12341,1,0,Hass Avocados,32,4
1,2497897,52,prior,1,1,9,0,14444,2,0,Smokehouse Almonds,117,19
2,2497897,52,prior,1,1,9,0,10441,3,0,Dry Roasted Almonds,117,19
3,2497897,52,prior,1,1,9,0,196,4,0,Soda,77,7
4,2497897,52,prior,1,1,9,0,46149,5,0,Zero Calorie Cola,77,7


In [78]:
for name, group in tqdm(df.groupby('user_id', sort=False)):
        print(name)
        print(group.head())
        print(type(group))


  0%|          | 0/2062 [00:00<?, ?it/s][A
  0%|          | 4/2062 [00:00<01:05, 31.30it/s][A
  0%|          | 8/2062 [00:00<01:10, 29.28it/s]

52
   order_id  user_id eval_set  order_number  order_dow  order_hour_of_day  \
0   2497897       52    prior             1          1                  9   
1   2497897       52    prior             1          1                  9   
2   2497897       52    prior             1          1                  9   
3   2497897       52    prior             1          1                  9   
4   2497897       52    prior             1          1                  9   

   days_since_prior_order  product_id  add_to_cart_order  reordered  \
0                       0       12341                  1          0   
1                       0       14444                  2          0   
2                       0       10441                  3          0   
3                       0         196                  4          0   
4                       0       46149                  5          0   

          product_name  aisle_id  department_id  
0        Hass Avocados        32              4  
1   Smo

[A
  0%|          | 10/2062 [00:00<01:21, 25.14it/s][A
  1%|          | 14/2062 [00:00<01:16, 26.80it/s]

     order_id  user_id eval_set  order_number  order_dow  order_hour_of_day  \
525   2857517      542    prior             1          1                 16   
526   2857517      542    prior             1          1                 16   
527   2857517      542    prior             1          1                 16   
528   2857517      542    prior             1          1                 16   
529   1611651      542    prior             2          3                 12   

     days_since_prior_order  product_id  add_to_cart_order  reordered  \
525                       0       35886                  1          0   
526                       0       43352                  2          0   
527                       0       21288                  3          0   
528                       0       15361                  4          0   
529                       9       43352                  1          1   

                      product_name  aisle_id  department_id  
525  Organic Blueberries

[A
  1%|          | 17/2062 [00:00<01:14, 27.56it/s][A
  1%|          | 24/2062 [00:00<01:01, 33.22it/s][A


      order_id  user_id eval_set  order_number  order_dow  order_hour_of_day  \
1369   1131526      878    prior             1          0                 17   
1370   1131526      878    prior             1          0                 17   
1371    203835      878    prior             2          5                 19   
1372    203835      878    prior             2          5                 19   
1373    203835      878    prior             2          5                 19   

      days_since_prior_order  product_id  add_to_cart_order  reordered  \
1369                       0       13966                  1          0   
1370                       0        1160                  2          0   
1371                       5       27624                  1          0   
1372                       5       33493                  2          0   
1373                       5       40313                  3          0   

             product_name  aisle_id  department_id  
1369      Chicken Po


  1%|▏         | 30/2062 [00:00<00:54, 37.35it/s][A
  2%|▏         | 35/2062 [00:00<00:53, 38.00it/s]


<class 'pandas.core.frame.DataFrame'>
2847
      order_id  user_id eval_set  order_number  order_dow  order_hour_of_day  \
3044   3357024     2847    prior             1          0                  6   
3045   3357024     2847    prior             1          0                  6   
3046   3357024     2847    prior             1          0                  6   
3047   3357024     2847    prior             1          0                  6   
3048   3357024     2847    prior             1          0                  6   

      days_since_prior_order  product_id  add_to_cart_order  reordered  \
3044                       0       24852                  1          0   
3045                       0       21137                  2          0   
3046                       0       49235                  3          0   
3047                       0       17316                  4          0   
3048                       0       17600                  5          0   

                              

[A
  2%|▏         | 41/2062 [00:01<00:50, 40.24it/s][A
  2%|▏         | 49/2062 [00:01<00:43, 46.03it/s]


<class 'pandas.core.frame.DataFrame'>
3842
      order_id  user_id eval_set  order_number  order_dow  order_hour_of_day  \
6107   3116635     3842    prior             1          0                 17   
6108   3116635     3842    prior             1          0                 17   
6109   3116635     3842    prior             1          0                 17   
6110   3116635     3842    prior             1          0                 17   
6111   3116635     3842    prior             1          0                 17   

      days_since_prior_order  product_id  add_to_cart_order  reordered  \
6107                       0       36011                  1          0   
6108                       0       13870                  2          0   
6109                       0       14778                  3          0   
6110                       0        6343                  4          0   
6111                       0       30949                  5          0   

                              

[A
  3%|▎         | 59/2062 [00:01<00:37, 53.92it/s][A
  3%|▎         | 68/2062 [00:01<00:33, 60.24it/s]


<class 'pandas.core.frame.DataFrame'>
5163
      order_id  user_id eval_set  order_number  order_dow  order_hour_of_day  \
8635   3303523     5163    prior             1          3                 16   
8636   3303523     5163    prior             1          3                 16   
8637   3303523     5163    prior             1          3                 16   
8638   3303523     5163    prior             1          3                 16   
8639   3303523     5163    prior             1          3                 16   

      days_since_prior_order  product_id  add_to_cart_order  reordered  \
8635                       0       24852                  1          0   
8636                       0        1463                  2          0   
8637                       0       28476                  3          0   
8638                       0       11777                  4          0   
8639                       0       22035                  5          0   

                           pro

[A
  4%|▎         | 76/2062 [00:01<00:30, 64.91it/s][A
  4%|▍         | 84/2062 [00:01<00:28, 68.67it/s][A

<class 'pandas.core.frame.DataFrame'>
6879
       order_id  user_id eval_set  order_number  order_dow  order_hour_of_day  \
11532   1747250     6879    prior             1          2                  2   
11533   1747250     6879    prior             1          2                  2   
11534   1747250     6879    prior             1          2                  2   
11535   1747250     6879    prior             1          2                  2   
11536    140880     6879    prior             2          1                 11   

       days_since_prior_order  product_id  add_to_cart_order  reordered  \
11532                       0       35935                  1          0   
11533                       0       22935                  2          0   
11534                       0       21288                  3          0   
11535                       0       24830                  4          0   
11536                      30       47740                  1          0   

                   


  5%|▍         | 93/2062 [00:01<00:27, 71.32it/s][A
  5%|▍         | 101/2062 [00:01<00:30, 64.79it/s]


       order_id  user_id eval_set  order_number  order_dow  order_hour_of_day  \
13720    679031     7815    prior             1          1                 18   
13721    679031     7815    prior             1          1                 18   
13722    679031     7815    prior             1          1                 18   
13723    679031     7815    prior             1          1                 18   
13724    679031     7815    prior             1          1                 18   

       days_since_prior_order  product_id  add_to_cart_order  reordered  \
13720                       0        2120                  1          0   
13721                       0       10613                  2          0   
13722                       0       15848                  3          0   
13723                       0       42620                  4          0   
13724                       0        3619                  5          0   

                                            product_name  ais

[A
[A

KeyboardInterrupt: 

In [80]:
user_data = df.groupby('user_id', sort=False).apply(parse_user).reset_index()

   order_id  user_id eval_set  order_number  order_dow  order_hour_of_day  \
0   2497897       52    prior             1          1                  9   
1   2497897       52    prior             1          1                  9   
2   2497897       52    prior             1          1                  9   
3   2497897       52    prior             1          1                  9   
4   2497897       52    prior             1          1                  9   

   days_since_prior_order  product_id  add_to_cart_order  reordered  \
0                       0       12341                  1          0   
1                       0       14444                  2          0   
2                       0       10441                  3          0   
3                       0         196                  4          0   
4                       0       46149                  5          0   

          product_name  aisle_id  department_id  
0        Hass Avocados        32              4  
1   Smokeh

KeyboardInterrupt: 

In [34]:
from joblib import Parallel, delayed
import multiprocessing

In [49]:
# def applyParallel(dfGrouped, func):
#     result = Parallel(n_jobs=-1)(delayed(func)(group) for name, group in tqdm(dfGrouped))
#     return pd.concat(result)

In [55]:
# user_data2 = applyParallel(df.groupby('user_id', sort=False), parse_user).reset_index()

JoblibTypeError: JoblibTypeError
___________________________________________________________________________
Multiprocessing exception:
...........................................................................
/opt/conda/lib/python3.5/runpy.py in _run_module_as_main(mod_name='ipykernel.__main__', alter_argv=1)
    179         sys.exit(msg)
    180     main_globals = sys.modules["__main__"].__dict__
    181     if alter_argv:
    182         sys.argv[0] = mod_spec.origin
    183     return _run_code(code, main_globals, None,
--> 184                      "__main__", mod_spec)
        mod_spec = ModuleSpec(name='ipykernel.__main__', loader=<_f...b/python3.5/site-packages/ipykernel/__main__.py')
    185 
    186 def run_module(mod_name, init_globals=None,
    187                run_name=None, alter_sys=False):
    188     """Execute a module's code without importing it

...........................................................................
/opt/conda/lib/python3.5/runpy.py in _run_code(code=<code object <module> at 0x7f8636b9a0c0, file "/...3.5/site-packages/ipykernel/__main__.py", line 1>, run_globals={'__builtins__': <module 'builtins' (built-in)>, '__cached__': '/opt/conda/lib/python3.5/site-packages/ipykernel/__pycache__/__main__.cpython-35.pyc', '__doc__': None, '__file__': '/opt/conda/lib/python3.5/site-packages/ipykernel/__main__.py', '__loader__': <_frozen_importlib_external.SourceFileLoader object>, '__name__': '__main__', '__package__': 'ipykernel', '__spec__': ModuleSpec(name='ipykernel.__main__', loader=<_f...b/python3.5/site-packages/ipykernel/__main__.py'), 'app': <module 'ipykernel.kernelapp' from '/opt/conda/lib/python3.5/site-packages/ipykernel/kernelapp.py'>}, init_globals=None, mod_name='__main__', mod_spec=ModuleSpec(name='ipykernel.__main__', loader=<_f...b/python3.5/site-packages/ipykernel/__main__.py'), pkg_name='ipykernel', script_name=None)
     80                        __cached__ = cached,
     81                        __doc__ = None,
     82                        __loader__ = loader,
     83                        __package__ = pkg_name,
     84                        __spec__ = mod_spec)
---> 85     exec(code, run_globals)
        code = <code object <module> at 0x7f8636b9a0c0, file "/...3.5/site-packages/ipykernel/__main__.py", line 1>
        run_globals = {'__builtins__': <module 'builtins' (built-in)>, '__cached__': '/opt/conda/lib/python3.5/site-packages/ipykernel/__pycache__/__main__.cpython-35.pyc', '__doc__': None, '__file__': '/opt/conda/lib/python3.5/site-packages/ipykernel/__main__.py', '__loader__': <_frozen_importlib_external.SourceFileLoader object>, '__name__': '__main__', '__package__': 'ipykernel', '__spec__': ModuleSpec(name='ipykernel.__main__', loader=<_f...b/python3.5/site-packages/ipykernel/__main__.py'), 'app': <module 'ipykernel.kernelapp' from '/opt/conda/lib/python3.5/site-packages/ipykernel/kernelapp.py'>}
     86     return run_globals
     87 
     88 def _run_module_code(code, init_globals=None,
     89                     mod_name=None, mod_spec=None,

...........................................................................
/opt/conda/lib/python3.5/site-packages/ipykernel/__main__.py in <module>()
      1 if __name__ == '__main__':
      2     from ipykernel import kernelapp as app
----> 3     app.launch_new_instance()

...........................................................................
/opt/conda/lib/python3.5/site-packages/traitlets/config/application.py in launch_instance(cls=<class 'ipykernel.kernelapp.IPKernelApp'>, argv=None, **kwargs={})
    648 
    649         If a global instance already exists, this reinitializes and starts it
    650         """
    651         app = cls.instance(**kwargs)
    652         app.initialize(argv)
--> 653         app.start()
        app.start = <bound method IPKernelApp.start of <ipykernel.kernelapp.IPKernelApp object>>
    654 
    655 #-----------------------------------------------------------------------------
    656 # utility functions, for convenience
    657 #-----------------------------------------------------------------------------

...........................................................................
/opt/conda/lib/python3.5/site-packages/ipykernel/kernelapp.py in start(self=<ipykernel.kernelapp.IPKernelApp object>)
    469             return self.subapp.start()
    470         if self.poller is not None:
    471             self.poller.start()
    472         self.kernel.start()
    473         try:
--> 474             ioloop.IOLoop.instance().start()
    475         except KeyboardInterrupt:
    476             pass
    477 
    478 launch_new_instance = IPKernelApp.launch_instance

...........................................................................
/opt/conda/lib/python3.5/site-packages/zmq/eventloop/ioloop.py in start(self=<zmq.eventloop.ioloop.ZMQIOLoop object>)
    172             )
    173         return loop
    174     
    175     def start(self):
    176         try:
--> 177             super(ZMQIOLoop, self).start()
        self.start = <bound method ZMQIOLoop.start of <zmq.eventloop.ioloop.ZMQIOLoop object>>
    178         except ZMQError as e:
    179             if e.errno == ETERM:
    180                 # quietly return on ETERM
    181                 pass

...........................................................................
/opt/conda/lib/python3.5/site-packages/tornado/ioloop.py in start(self=<zmq.eventloop.ioloop.ZMQIOLoop object>)
    882                 self._events.update(event_pairs)
    883                 while self._events:
    884                     fd, events = self._events.popitem()
    885                     try:
    886                         fd_obj, handler_func = self._handlers[fd]
--> 887                         handler_func(fd_obj, events)
        handler_func = <function wrap.<locals>.null_wrapper>
        fd_obj = <zmq.sugar.socket.Socket object>
        events = 1
    888                     except (OSError, IOError) as e:
    889                         if errno_from_exception(e) == errno.EPIPE:
    890                             # Happens when the client closes the connection
    891                             pass

...........................................................................
/opt/conda/lib/python3.5/site-packages/tornado/stack_context.py in null_wrapper(*args=(<zmq.sugar.socket.Socket object>, 1), **kwargs={})
    270         # Fast path when there are no active contexts.
    271         def null_wrapper(*args, **kwargs):
    272             try:
    273                 current_state = _state.contexts
    274                 _state.contexts = cap_contexts[0]
--> 275                 return fn(*args, **kwargs)
        args = (<zmq.sugar.socket.Socket object>, 1)
        kwargs = {}
    276             finally:
    277                 _state.contexts = current_state
    278         null_wrapper._wrapped = True
    279         return null_wrapper

...........................................................................
/opt/conda/lib/python3.5/site-packages/zmq/eventloop/zmqstream.py in _handle_events(self=<zmq.eventloop.zmqstream.ZMQStream object>, fd=<zmq.sugar.socket.Socket object>, events=1)
    435             # dispatch events:
    436             if events & IOLoop.ERROR:
    437                 gen_log.error("got POLLERR event on ZMQStream, which doesn't make sense")
    438                 return
    439             if events & IOLoop.READ:
--> 440                 self._handle_recv()
        self._handle_recv = <bound method ZMQStream._handle_recv of <zmq.eventloop.zmqstream.ZMQStream object>>
    441                 if not self.socket:
    442                     return
    443             if events & IOLoop.WRITE:
    444                 self._handle_send()

...........................................................................
/opt/conda/lib/python3.5/site-packages/zmq/eventloop/zmqstream.py in _handle_recv(self=<zmq.eventloop.zmqstream.ZMQStream object>)
    467                 gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
    468         else:
    469             if self._recv_callback:
    470                 callback = self._recv_callback
    471                 # self._recv_callback = None
--> 472                 self._run_callback(callback, msg)
        self._run_callback = <bound method ZMQStream._run_callback of <zmq.eventloop.zmqstream.ZMQStream object>>
        callback = <function wrap.<locals>.null_wrapper>
        msg = [<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>]
    473                 
    474         # self.update_state()
    475         
    476 

...........................................................................
/opt/conda/lib/python3.5/site-packages/zmq/eventloop/zmqstream.py in _run_callback(self=<zmq.eventloop.zmqstream.ZMQStream object>, callback=<function wrap.<locals>.null_wrapper>, *args=([<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>],), **kwargs={})
    409         close our socket."""
    410         try:
    411             # Use a NullContext to ensure that all StackContexts are run
    412             # inside our blanket exception handler rather than outside.
    413             with stack_context.NullContext():
--> 414                 callback(*args, **kwargs)
        callback = <function wrap.<locals>.null_wrapper>
        args = ([<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>],)
        kwargs = {}
    415         except:
    416             gen_log.error("Uncaught exception, closing connection.",
    417                           exc_info=True)
    418             # Close the socket on an uncaught exception from a user callback

...........................................................................
/opt/conda/lib/python3.5/site-packages/tornado/stack_context.py in null_wrapper(*args=([<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>],), **kwargs={})
    270         # Fast path when there are no active contexts.
    271         def null_wrapper(*args, **kwargs):
    272             try:
    273                 current_state = _state.contexts
    274                 _state.contexts = cap_contexts[0]
--> 275                 return fn(*args, **kwargs)
        args = ([<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>],)
        kwargs = {}
    276             finally:
    277                 _state.contexts = current_state
    278         null_wrapper._wrapped = True
    279         return null_wrapper

...........................................................................
/opt/conda/lib/python3.5/site-packages/ipykernel/kernelbase.py in dispatcher(msg=[<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>])
    271         if self.control_stream:
    272             self.control_stream.on_recv(self.dispatch_control, copy=False)
    273 
    274         def make_dispatcher(stream):
    275             def dispatcher(msg):
--> 276                 return self.dispatch_shell(stream, msg)
        msg = [<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>]
    277             return dispatcher
    278 
    279         for s in self.shell_streams:
    280             s.on_recv(make_dispatcher(s), copy=False)

...........................................................................
/opt/conda/lib/python3.5/site-packages/ipykernel/kernelbase.py in dispatch_shell(self=<ipykernel.ipkernel.IPythonKernel object>, stream=<zmq.eventloop.zmqstream.ZMQStream object>, msg={'buffers': [], 'content': {'allow_stdin': True, 'code': "user_data2 = applyParallel(df.groupby('user_id', sort=False), parse_user).reset_index()", 'silent': False, 'stop_on_error': True, 'store_history': True, 'user_expressions': {}}, 'header': {'date': '2017-10-01T00:04:59.415816', 'msg_id': '067C62CA6C434745839D9F6A029D9E64', 'msg_type': 'execute_request', 'session': '7CE34AA35BF445A7829595957BFA0E9B', 'username': 'username', 'version': '5.0'}, 'metadata': {}, 'msg_id': '067C62CA6C434745839D9F6A029D9E64', 'msg_type': 'execute_request', 'parent_header': {}})
    223             self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
    224         else:
    225             self.log.debug("%s: %s", msg_type, msg)
    226             self.pre_handler_hook()
    227             try:
--> 228                 handler(stream, idents, msg)
        handler = <bound method Kernel.execute_request of <ipykernel.ipkernel.IPythonKernel object>>
        stream = <zmq.eventloop.zmqstream.ZMQStream object>
        idents = [b'7CE34AA35BF445A7829595957BFA0E9B']
        msg = {'buffers': [], 'content': {'allow_stdin': True, 'code': "user_data2 = applyParallel(df.groupby('user_id', sort=False), parse_user).reset_index()", 'silent': False, 'stop_on_error': True, 'store_history': True, 'user_expressions': {}}, 'header': {'date': '2017-10-01T00:04:59.415816', 'msg_id': '067C62CA6C434745839D9F6A029D9E64', 'msg_type': 'execute_request', 'session': '7CE34AA35BF445A7829595957BFA0E9B', 'username': 'username', 'version': '5.0'}, 'metadata': {}, 'msg_id': '067C62CA6C434745839D9F6A029D9E64', 'msg_type': 'execute_request', 'parent_header': {}}
    229             except Exception:
    230                 self.log.error("Exception in message handler:", exc_info=True)
    231             finally:
    232                 self.post_handler_hook()

...........................................................................
/opt/conda/lib/python3.5/site-packages/ipykernel/kernelbase.py in execute_request(self=<ipykernel.ipkernel.IPythonKernel object>, stream=<zmq.eventloop.zmqstream.ZMQStream object>, ident=[b'7CE34AA35BF445A7829595957BFA0E9B'], parent={'buffers': [], 'content': {'allow_stdin': True, 'code': "user_data2 = applyParallel(df.groupby('user_id', sort=False), parse_user).reset_index()", 'silent': False, 'stop_on_error': True, 'store_history': True, 'user_expressions': {}}, 'header': {'date': '2017-10-01T00:04:59.415816', 'msg_id': '067C62CA6C434745839D9F6A029D9E64', 'msg_type': 'execute_request', 'session': '7CE34AA35BF445A7829595957BFA0E9B', 'username': 'username', 'version': '5.0'}, 'metadata': {}, 'msg_id': '067C62CA6C434745839D9F6A029D9E64', 'msg_type': 'execute_request', 'parent_header': {}})
    385         if not silent:
    386             self.execution_count += 1
    387             self._publish_execute_input(code, parent, self.execution_count)
    388 
    389         reply_content = self.do_execute(code, silent, store_history,
--> 390                                         user_expressions, allow_stdin)
        user_expressions = {}
        allow_stdin = True
    391 
    392         # Flush output before sending the reply.
    393         sys.stdout.flush()
    394         sys.stderr.flush()

...........................................................................
/opt/conda/lib/python3.5/site-packages/ipykernel/ipkernel.py in do_execute(self=<ipykernel.ipkernel.IPythonKernel object>, code="user_data2 = applyParallel(df.groupby('user_id', sort=False), parse_user).reset_index()", silent=False, store_history=True, user_expressions={}, allow_stdin=True)
    191 
    192         self._forward_input(allow_stdin)
    193 
    194         reply_content = {}
    195         try:
--> 196             res = shell.run_cell(code, store_history=store_history, silent=silent)
        res = undefined
        shell.run_cell = <bound method ZMQInteractiveShell.run_cell of <ipykernel.zmqshell.ZMQInteractiveShell object>>
        code = "user_data2 = applyParallel(df.groupby('user_id', sort=False), parse_user).reset_index()"
        store_history = True
        silent = False
    197         finally:
    198             self._restore_input()
    199 
    200         if res.error_before_exec is not None:

...........................................................................
/opt/conda/lib/python3.5/site-packages/ipykernel/zmqshell.py in run_cell(self=<ipykernel.zmqshell.ZMQInteractiveShell object>, *args=("user_data2 = applyParallel(df.groupby('user_id', sort=False), parse_user).reset_index()",), **kwargs={'silent': False, 'store_history': True})
    496             )
    497         self.payload_manager.write_payload(payload)
    498 
    499     def run_cell(self, *args, **kwargs):
    500         self._last_traceback = None
--> 501         return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
        self.run_cell = <bound method ZMQInteractiveShell.run_cell of <ipykernel.zmqshell.ZMQInteractiveShell object>>
        args = ("user_data2 = applyParallel(df.groupby('user_id', sort=False), parse_user).reset_index()",)
        kwargs = {'silent': False, 'store_history': True}
    502 
    503     def _showtraceback(self, etype, evalue, stb):
    504         # try to preserve ordering of tracebacks and print statements
    505         sys.stdout.flush()

...........................................................................
/opt/conda/lib/python3.5/site-packages/IPython/core/interactiveshell.py in run_cell(self=<ipykernel.zmqshell.ZMQInteractiveShell object>, raw_cell="user_data2 = applyParallel(df.groupby('user_id', sort=False), parse_user).reset_index()", store_history=True, silent=False, shell_futures=True)
   2712                 self.displayhook.exec_result = result
   2713 
   2714                 # Execute the user code
   2715                 interactivity = "none" if silent else self.ast_node_interactivity
   2716                 has_raised = self.run_ast_nodes(code_ast.body, cell_name,
-> 2717                    interactivity=interactivity, compiler=compiler, result=result)
        interactivity = 'last_expr'
        compiler = <IPython.core.compilerop.CachingCompiler object>
   2718                 
   2719                 self.last_execution_succeeded = not has_raised
   2720 
   2721                 # Reset this so later displayed values do not modify the

...........................................................................
/opt/conda/lib/python3.5/site-packages/IPython/core/interactiveshell.py in run_ast_nodes(self=<ipykernel.zmqshell.ZMQInteractiveShell object>, nodelist=[<_ast.Assign object>], cell_name='<ipython-input-55-9ff99ce2f186>', interactivity='none', compiler=<IPython.core.compilerop.CachingCompiler object>, result=<ExecutionResult object at 7f85fdec8fd0, executi..._before_exec=None error_in_exec=None result=None>)
   2816 
   2817         try:
   2818             for i, node in enumerate(to_run_exec):
   2819                 mod = ast.Module([node])
   2820                 code = compiler(mod, cell_name, "exec")
-> 2821                 if self.run_code(code, result):
        self.run_code = <bound method InteractiveShell.run_code of <ipykernel.zmqshell.ZMQInteractiveShell object>>
        code = <code object <module> at 0x7f85fe279810, file "<ipython-input-55-9ff99ce2f186>", line 1>
        result = <ExecutionResult object at 7f85fdec8fd0, executi..._before_exec=None error_in_exec=None result=None>
   2822                     return True
   2823 
   2824             for i, node in enumerate(to_run_interactive):
   2825                 mod = ast.Interactive([node])

...........................................................................
/opt/conda/lib/python3.5/site-packages/IPython/core/interactiveshell.py in run_code(self=<ipykernel.zmqshell.ZMQInteractiveShell object>, code_obj=<code object <module> at 0x7f85fe279810, file "<ipython-input-55-9ff99ce2f186>", line 1>, result=<ExecutionResult object at 7f85fdec8fd0, executi..._before_exec=None error_in_exec=None result=None>)
   2876         outflag = 1  # happens in more places, so it's easier as default
   2877         try:
   2878             try:
   2879                 self.hooks.pre_run_code_hook()
   2880                 #rprint('Running code', repr(code_obj)) # dbg
-> 2881                 exec(code_obj, self.user_global_ns, self.user_ns)
        code_obj = <code object <module> at 0x7f85fe279810, file "<ipython-input-55-9ff99ce2f186>", line 1>
        self.user_global_ns = {'In': ['', 'import os\n\nimport pandas as pd', "def parse_order(x):\n    series = pd.Series()\n\n  ...ys_since_prior_order'].iloc[0]\n\n    return series", "def parse_user(x):\n    parsed_orders = x.groupby...t'] = x['eval_set'].values[-1]\n\n    return series", "orders = pd.read_csv('../data/raw/orders_small.c...roducts = pd.read_csv('../data/raw/products.csv')", 'orders.head()', 'order_products.head()', "df = orders.merge(order_products, how='left', on='order_id')", "df = df.merge(products, how='left', on='product_id')", "df['days_since_prior_order'] = df['days_since_prior_order'].fillna(0).astype(int)", 'df.isnull().sum()', "null_cols = ['product_id', 'aisle_id', 'department_id', 'add_to_cart_order', 'reordered']", 'df[null_cols] = df[null_cols].fillna(0).astype(int)', "if not os.path.isdir('../data/processed'):\n        os.makedirs('../data/processed')", 'from tqdm import tqdm', 'tqdm.pandas(desc="my bar!")', "user_data = df.groupby('user_id', sort=False).progress_apply(parse_user).reset_index()", 'def applyParallel(dfGrouped, func):\n    result =...) for name, group in dfGrouped)\n    return result', "user_data = applyParallel(df.groupby('user_id', sort=False), parse_user).reset_index()", 'from joblib import Parallel, delayed', ...], 'MatplotlibFinder': <class '__main__.MatplotlibFinder'>, 'NamespaceMagics': <class 'IPython.core.magics.namespace.NamespaceMagics'>, 'Out': {5:    order_id  user_id eval_set  order_number  ord...                7.0  
4                     8.0  , 6:    order_id  product_id  add_to_cart_order  reor...    222       24852                  1          1, 10: order_id                    0
user_id           ...   750
department_id             750
dtype: int64, 25: 2, 33: (2062, 11), 39: <class 'list'>, 40: order_ids                  2497897 1318871 12613...               train
Length: 20620, dtype: object, 41: <class 'pandas.core.series.Series'>, 42: (20620,), 43: (2062, 11), ...}, 'Parallel': <class 'joblib.parallel.Parallel'>, '_':                      index                      ...0 7 7 7 8 6 8 6 7 8 7 6 7 7 8 8 6 14 8 5 30 10..., '_10': order_id                    0
user_id           ...   750
department_id             750
dtype: int64, '_25': 2, '_33': (2062, 11), '_39': <class 'list'>, ...}
        self.user_ns = {'In': ['', 'import os\n\nimport pandas as pd', "def parse_order(x):\n    series = pd.Series()\n\n  ...ys_since_prior_order'].iloc[0]\n\n    return series", "def parse_user(x):\n    parsed_orders = x.groupby...t'] = x['eval_set'].values[-1]\n\n    return series", "orders = pd.read_csv('../data/raw/orders_small.c...roducts = pd.read_csv('../data/raw/products.csv')", 'orders.head()', 'order_products.head()', "df = orders.merge(order_products, how='left', on='order_id')", "df = df.merge(products, how='left', on='product_id')", "df['days_since_prior_order'] = df['days_since_prior_order'].fillna(0).astype(int)", 'df.isnull().sum()', "null_cols = ['product_id', 'aisle_id', 'department_id', 'add_to_cart_order', 'reordered']", 'df[null_cols] = df[null_cols].fillna(0).astype(int)', "if not os.path.isdir('../data/processed'):\n        os.makedirs('../data/processed')", 'from tqdm import tqdm', 'tqdm.pandas(desc="my bar!")', "user_data = df.groupby('user_id', sort=False).progress_apply(parse_user).reset_index()", 'def applyParallel(dfGrouped, func):\n    result =...) for name, group in dfGrouped)\n    return result', "user_data = applyParallel(df.groupby('user_id', sort=False), parse_user).reset_index()", 'from joblib import Parallel, delayed', ...], 'MatplotlibFinder': <class '__main__.MatplotlibFinder'>, 'NamespaceMagics': <class 'IPython.core.magics.namespace.NamespaceMagics'>, 'Out': {5:    order_id  user_id eval_set  order_number  ord...                7.0  
4                     8.0  , 6:    order_id  product_id  add_to_cart_order  reor...    222       24852                  1          1, 10: order_id                    0
user_id           ...   750
department_id             750
dtype: int64, 25: 2, 33: (2062, 11), 39: <class 'list'>, 40: order_ids                  2497897 1318871 12613...               train
Length: 20620, dtype: object, 41: <class 'pandas.core.series.Series'>, 42: (20620,), 43: (2062, 11), ...}, 'Parallel': <class 'joblib.parallel.Parallel'>, '_':                      index                      ...0 7 7 7 8 6 8 6 7 8 7 6 7 7 8 8 6 14 8 5 30 10..., '_10': order_id                    0
user_id           ...   750
department_id             750
dtype: int64, '_25': 2, '_33': (2062, 11), '_39': <class 'list'>, ...}
   2882             finally:
   2883                 # Reset our crash handler in place
   2884                 sys.excepthook = old_excepthook
   2885         except SystemExit as e:

...........................................................................
/home/jovyan/work/preprocessing/<ipython-input-55-9ff99ce2f186> in <module>()
----> 1 user_data2 = applyParallel(df.groupby('user_id', sort=False), parse_user).reset_index()

...........................................................................
/home/jovyan/work/preprocessing/<ipython-input-54-eba9c584b524> in applyParallel(dfGrouped=<pandas.core.groupby.DataFrameGroupBy object>, func=<function parse_user>)
      1 def applyParallel(dfGrouped, func):
----> 2     result = Parallel(n_jobs=-1)(delayed(func)(dfGrouped))
      3     return pd.concat(result)

...........................................................................
/opt/conda/lib/python3.5/site-packages/joblib/parallel.py in __call__(self=Parallel(n_jobs=-1), iterable=(<function parse_user>, (<pandas.core.groupby.DataFrameGroupBy object>,), {}))
    784             if pre_dispatch == "all" or n_jobs == 1:
    785                 # The iterable was consumed all at once by the above for loop.
    786                 # No need to wait for async callbacks to trigger to
    787                 # consumption.
    788                 self._iterating = False
--> 789             self.retrieve()
        self.retrieve = <bound method Parallel.retrieve of Parallel(n_jobs=-1)>
    790             # Make sure that we get a last message telling us we are done
    791             elapsed_time = time.time() - self._start_time
    792             self._print('Done %3i out of %3i | elapsed: %s finished',
    793                         (len(self._output), len(self._output),

---------------------------------------------------------------------------
Sub-process traceback:
---------------------------------------------------------------------------
TypeError                                          Sun Oct  1 00:04:59 2017
PID: 126                                Python 3.5.2: /opt/conda/bin/python
...........................................................................
/opt/conda/lib/python3.5/site-packages/joblib/parallel.py in __call__(self=<joblib.parallel.BatchedCalls object>)
    126     def __init__(self, iterator_slice):
    127         self.items = list(iterator_slice)
    128         self._size = len(self.items)
    129 
    130     def __call__(self):
--> 131         return [func(*args, **kwargs) for func, args, kwargs in self.items]
        self.items = [<function parse_user>]
    132 
    133     def __len__(self):
    134         return self._size
    135 

...........................................................................
/opt/conda/lib/python3.5/site-packages/joblib/parallel.py in <listcomp>(.0=<list_iterator object>)
    126     def __init__(self, iterator_slice):
    127         self.items = list(iterator_slice)
    128         self._size = len(self.items)
    129 
    130     def __call__(self):
--> 131         return [func(*args, **kwargs) for func, args, kwargs in self.items]
        func = undefined
        args = undefined
        kwargs = undefined
    132 
    133     def __len__(self):
    134         return self._size
    135 

TypeError: 'function' object is not iterable
___________________________________________________________________________

In [68]:
def applyParallel(df, func):
    result = Parallel(n_jobs=-1)(delayed(func)(group) for name, group in tqdm(df.groupby('user_id', sort=False)))
    return result

In [69]:
user_data2 = applyParallel(df, parse_user)

100%|██████████| 2062/2062 [02:56<00:00, 10.08it/s]


In [71]:
user_data2[0]

order_ids                  2497897 1318871 1261384 2100631 580568 1680554...
order_numbers              1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 1...
order_dows                 1 1 1 1 2 1 2 1 1 2 2 1 1 1 2 3 2 2 3 1 5 1 1 ...
order_hours                9 10 10 11 10 13 7 10 11 10 9 16 10 13 11 11 1...
days_since_prior_orders    0 7 7 7 8 6 8 6 7 8 7 6 7 7 8 8 6 14 8 5 30 10...
product_ids                12341_14444_10441_196_46149_35561 12341_6184_1...
aisle_ids                  32_117_117_77_77_107 32_32_117_77_77_117 77_11...
department_ids             4_19_19_7_7_19 4_4_19_7_7_19 7_19_7_4_9 4_19_1...
reorders                   0_0_0_0_0_0 1_0_1_1_1_0 1_1_1_0_0 1_1_1_1_0_1_...
eval_set                                                               train
dtype: object

In [63]:
user_data.shape

(2062, 11)

In [59]:
user_data.head()

Unnamed: 0,user_id,order_ids,order_numbers,order_dows,order_hours,days_since_prior_orders,product_ids,aisle_ids,department_ids,reorders,eval_set
0,52,2497897 1318871 1261384 2100631 580568 1680554...,1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 1...,1 1 1 1 2 1 2 1 1 2 2 1 1 1 2 3 2 2 3 1 5 1 1 ...,9 10 10 11 10 13 7 10 11 10 9 16 10 13 11 11 1...,0 7 7 7 8 6 8 6 7 8 7 6 7 7 8 8 6 14 8 5 30 10...,12341_14444_10441_196_46149_35561 12341_6184_1...,32_117_117_77_77_107 32_32_117_77_77_117 77_11...,4_19_19_7_7_19 4_4_19_7_7_19 7_19_7_4_9 4_19_1...,0_0_0_0_0_0 1_0_1_1_1_0 1_1_1_0_0 1_1_1_1_0_1_...,train
1,108,2313085 2393725 3074448 2137209 1503688 278828...,1 2 3 4 5 6 7 8,0 0 1 3 0 0 0 6,13 13 18 11 17 15 10 15,0 0 15 30 30 28 30 6,4658_21137_41960_30489_38289_44359_21249_17606...,115_24_83_67_98_83_66_88_67 115_67_88_83_24_98...,7_4_4_20_7_4_6_13_20 7_20_13_4_4_7_6_20_4 12_1...,0_0_0_0_0_0_0_0_0 1_1_1_1_1_1_1_1_1 0_0_0_1_0_...,train
2,116,3370391 1115121 1221109 1765618 3307561 222423...,1 2 3 4 5 6 7,0 0 4 6 5 1 5,5 17 16 6 17 4 12,0 14 25 9 13 10 4,9605_22667_46049_25843_38906_23233 24852_13627...,121_43_107_107_37_112 24_94_94_94_54 24_43_134...,14_3_19_19_1_3 4_7_7_7_17 4_3_5 17_17_7_17_5 6...,0_0_0_0_0_0 0_0_0_0_0 1_1_0 1_0_0_0_0 0_1_0_0_...,train
3,124,1318740 490161 2638175 1660930 1923886 3051829...,1 2 3 4 5 6 7 8 9,0 2 1 1 1 1 1 1 1,22 10 10 10 8 9 8 8 8,0 30 6 7 7 21 21 14 21,16797_24184_42625_9387_39428_19478_19068_28985...,24_83_16_24_24_9_35_83_83 83_83_16_83_35_24_83...,4_4_4_4_4_9_12_4_4 4_4_4_4_12_4_4_4_4_4_4_4_4_...,0_0_0_0_0_0_0_0_0 1_1_1_1_1_1_0_0_0_0_0_0_0_0_...,train
4,170,3126026 2066764 2772634 2587424,1 2 3 4,1 1 2 4,16 9 9 18,0 30 30 30,25466_30850_36866 13060_35730 13060_33452_25783 0,115_26_45 70_55 70_64_64 0,7_7_19 11_11 11_7_7 0,0_0_0 0_0 1_0_0 0,test


In [64]:
user_data2.shape

(20620, 2)

In [66]:
user_data2.head(20)

Unnamed: 0,index,0
0,order_ids,2497897 1318871 1261384 2100631 580568 1680554...
1,order_numbers,1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 1...
2,order_dows,1 1 1 1 2 1 2 1 1 2 2 1 1 1 2 3 2 2 3 1 5 1 1 ...
3,order_hours,9 10 10 11 10 13 7 10 11 10 9 16 10 13 11 11 1...
4,days_since_prior_orders,0 7 7 7 8 6 8 6 7 8 7 6 7 7 8 8 6 14 8 5 30 10...
5,product_ids,12341_14444_10441_196_46149_35561 12341_6184_1...
6,aisle_ids,32_117_117_77_77_107 32_32_117_77_77_117 77_11...
7,department_ids,4_19_19_7_7_19 4_4_19_7_7_19 7_19_7_4_9 4_19_1...
8,reorders,0_0_0_0_0_0 1_0_1_1_1_0 1_1_1_0_0 1_1_1_1_0_1_...
9,eval_set,train


In [27]:
user_data.to_csv('../data/processed/user_data.csv', index=False)