In [128]:
import pandas as pd
import datetime
import numpy as np

pd.set_option('display.max_columns',None)


"""
Your first task is to create a stacked chart of **unique online devices per
day**, segregated by fleet size. The fleet size is an attribute of each user
and is defined as the number of online devices that this user had at a
particular day. You can split the dataset in the following fleet sizes:

* 1-2 devices
* 3-9 devices
* 10-99 devices
* 100-999 devices

A device should be counted as online for a particular day if it was online for
any amount of time during that day. For example, a device that appear online
for only a second should still be counted for that day.

The specific rules we have selected to deal with problematic sections
of a device's timeline can be summarised in the following table:

| current event | current server | next event | next server | rule        |
|---------------|----------------|------------|-------------|-------------|
| online        | X              | online     | X           | Assume device was online from current event's timestamp until next event's timestamp
| online        | X              | online     | Y           | Assume device was online from current event's timestamp until X's destruction time or next event's timestamp, whichever is smaller
| online        | X              | offline    | X           | Normal case
| online        | X              | offline    | Y           | Assume device was online from current event's timestamp until X's destruction time. Ignore next event.
| offline       | X              | online     | X           | Normal case
| offline       | X              | online     | Y           | Normal case
| offline       | X              | offline    | X           | Ignore next event
| offline       | X              | offline    | Y           | Ignore next event
"""

ModuleNotFoundError: No module named 'pandarallel'

In [129]:
import sys
!{sys.executable} -m pip install pandarallel

Collecting pandarallel
  Downloading https://files.pythonhosted.org/packages/99/06/bd582106766c483d6da51c05b0cdd7cb61894bb843c7ecc4789032232327/pandarallel-1.4.8.tar.gz
Collecting dill (from pandarallel)
[?25l  Downloading https://files.pythonhosted.org/packages/e2/96/518a8ea959a734b70d2e95fef98bcbfdc7adad1c1e5f5dd9148c835205a5/dill-0.3.2.zip (177kB)
[K     |████████████████████████████████| 184kB 824kB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pandarallel, dill
  Building wheel for pandarallel (setup.py) ... [?25ldone
[?25h  Created wheel for pandarallel: filename=pandarallel-1.4.8-cp37-none-any.whl size=16113 sha256=bef7505442209ccde5b195005052d99f4a0bb77f4ca97edbb6da36e5ae133b18
  Stored in directory: /Users/jakkie/Library/Caches/pip/wheels/75/a2/85/b45be2e86d86e9ec5da6d05c4b994d18c81abe76e3f39415aa
  Building wheel for dill (setup.py) ... [?25ldone
[?25h  Created wheel for dill: filename=dill-0.3.2-cp37-none-any.whl size=78913 sha256=62858330ff22cba8daf8be25

In [130]:
from pandarallel import pandarallel

In [124]:
def mark_ignored_rows(row):
    #df.loc[df['prev_server_id']=='None', 'ignored'] = 0
    if(row['connected']==False and row['prev_connected']==False):
        return True
#     if(df['prev_connected'] & df['connected'] == False):
#         return True
    else:
        return False
    
def process_dataframe(df):
    df['ignored'] = df.apply(lambda row: prepare_final(row), axis=1)

def parallelize_dataframe(df, func, n_cores=4):
    pool = Pool(n_cores)
    df_split = np.array_split(df, n_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

In [3]:
names=['id','created_at','destroyed_at']
servers = pd.read_csv('data/servers.csv',names=names)

In [4]:
names = ['timestamp','device_id', 'user_id','server_id','connected']
events = pd.read_csv('data/connectivity_events.csv',names=names)#,nrows=1000)

In [67]:
events_merged=events.merge(servers,left_on='server_id',right_on='id')

events_merged['prev_server_id']=(events_merged.sort_values(by=['timestamp'], ascending=True)
                               .groupby(['device_id'])['server_id'].shift(1))
events_merged['prev_connected']=(events_merged.sort_values(by=['timestamp'], ascending=True)
                         .groupby(['device_id'])['connected'].shift(1)).astype(bool)
events_merged['prev_timestamp']=(events_merged.sort_values(by=['timestamp'], ascending=True)
                                         .groupby(['device_id'])['timestamp'].shift(1))
events_merged['ignored']=False
events_merged.shape[0]

9308207

In [131]:
test = events_merged.sample(100000)
test.head()

Unnamed: 0,timestamp,device_id,user_id,server_id,connected,id,created_at,destroyed_at,prev_server_id,prev_connected,prev_timestamp,ignored
1285584,2017-08-30 18:22:56.613,21383,540,8,False,8,2017-08-29 20:12:51.954571,2017-09-02 12:32:37.348,8.0,True,2017-08-30 17:55:55.747,False
4234268,2017-11-15 21:59:32.016,9529,168,19,True,19,2017-11-09 20:01:36.97067,2017-11-22 16:15:45.274,19.0,False,2017-11-15 21:59:31.918,False
4870566,2017-11-29 18:04:07.287,9529,168,20,True,20,2017-11-22 16:15:57.285482,2017-12-08 23:35:12.164,20.0,False,2017-11-29 18:04:07.156,False
5664668,2017-12-12 04:06:26.383,20685,1325,26,True,26,2017-12-11 22:13:30.831468,2017-12-13 17:03:45.565,26.0,False,2017-12-12 04:06:12.969,False
7334910,2018-01-10 23:44:45.512,28117,1792,28,True,28,2018-01-08 13:19:01.744274,2018-01-12 17:05:49.192,28.0,False,2018-01-10 23:24:19.626,False


In [69]:
test.dtypes

timestamp          object
device_id           int64
user_id             int64
server_id           int64
connected            bool
id                  int64
created_at         object
destroyed_at       object
prev_server_id    float64
prev_connected       bool
prev_timestamp     object
ignored              bool
dtype: object

In [97]:
test['ignored'].value_counts()

False    10000
Name: ignored, dtype: int64

In [103]:
# test['ignored'] = test.apply (lambda row: prepare_final(row), axis=1)
process_dataframe(test)

In [104]:
test['ignored'].value_counts()

False    9621
True      379
Name: ignored, dtype: int64

In [109]:
test[(test['connected']==False) & (test['prev_connected']==False)].shape[0]

4092

In [113]:
%%timeit
process_dataframe(events_merged)

2min 7s ± 3.16 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [134]:
%%timeit
pandarallel.initialize()
events_merged['ignored'] = events_merged.parallel_apply(lambda row: prepare_final(row), axis=1)

INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process

In [135]:
print(events_merged[(events_merged['connected']==False) & (events_merged['prev_connected']==False)].shape[0])
print(events_merged[events_merged['ignored']==True].shape[0])

379931
379931


In [123]:
df_split

[                       timestamp  device_id  user_id  server_id  connected  \
 1044783   2017-08-26 05:18:44.39      28100     1792          5       True   
 2906679   2017-10-17 00:13:52.05      24529     2233         15      False   
 9011778  2018-02-20 12:20:14.358      32164      119         32      False   
 327703   2017-08-03 07:53:05.195       9529      168          2      False   
 1588320  2017-09-10 00:45:53.564       9529      168         12       True   
 ...                          ...        ...      ...        ...        ...   
 2475145  2017-10-06 17:24:33.557       9529      168         13      False   
 604876   2017-08-09 23:29:57.733      14786      119          5      False   
 4344440  2017-11-18 11:10:05.964      11327      199         19       True   
 1027996  2017-08-25 16:34:03.722      24373     2422          5       True   
 5520827  2017-12-11 13:04:15.986      20685     1325         25       True   
 
          id                  created_at          

In [120]:
with Pool(4) as pool:
    results = pool.map_async(process_dataframe, test)
    l = results.get()

Process ForkPoolWorker-21:
Process ForkPoolWorker-22:
Process ForkPoolWorker-24:
Process ForkPoolWorker-23:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):


KeyboardInterrupt: 

In [None]:


pandarallel.initialize()
test.parallel_apply(func, axis=1)

In [None]:
events_merged['ignored']=events_merged.apply(prepare_final,axis=1)
print(events_merged.head())
events_merged['day'] = events_merged['timestamp'].dt.date
events_grouped = events_merged.groupby('day').count()
print(events_grouped)

In [None]:
print("end:{}".format(datetime.datetime.now()))