In [68]:
import time
import redis
import pandas as pd
from datetime import datetime

from dec import subscriber, publisher, constants as C, statistics

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
rc = redis.StrictRedis(host='localhost', port=6379, db=0)
pubsub = rc.pubsub()
pubsub.subscribe(['events'])

In [4]:
# Empty message
message = pubsub.get_message()

message

{'type': 'subscribe', 'pattern': None, 'channel': b'events', 'data': 1}

In [5]:
message = pubsub.get_message()

message

{'type': 'message',
 'pattern': None,
 'channel': b'events',
 'data': b"[{'clip': '7494', 'country': 'US', 'event_id': '6f024f8a-df90-4ac4-b738-bd422514df5e', 'publisher_id': '4', 'viewable_time': 18.3, 'timestamp': 1531767388.819867}, {'clip': '7886', 'country': 'CH', 'event_id': '9ede9ebd-44fc-42c9-af51-a257b96ebc8b', 'publisher_id': '9', 'viewable_time': 27.6, 'timestamp': 1531767388.819867}, {'clip': '5982', 'country': 'RU', 'event_id': '996d9b9a-df46-4703-9602-ff5a622499d2', 'publisher_id': '1', 'viewable_time': 11.0, 'timestamp': 1531767388.819867}, {'clip': '7997', 'country': 'JP', 'event_id': '9154e307-e8ad-4440-845f-c234fb448a12', 'publisher_id': '7', 'viewable_time': 5.0, 'timestamp': 1531767388.819867}, {'clip': '7806', 'country': 'EN', 'event_id': 'c1b12d05-a386-4314-8598-43fed3b0603f', 'publisher_id': '1', 'viewable_time': 8.1, 'timestamp': 1531767388.819867}, {'clip': '7828', 'country': 'CH', 'event_id': 'e32dbe14-946b-4ae1-b068-715cb1997f4b', 'publisher_id': '5', 'viewab

In [6]:
events_to_process = eval(message['data'])

events_to_process

[{'clip': '7494',
  'country': 'US',
  'event_id': '6f024f8a-df90-4ac4-b738-bd422514df5e',
  'publisher_id': '4',
  'viewable_time': 18.3,
  'timestamp': 1531767388.819867},
 {'clip': '7886',
  'country': 'CH',
  'event_id': '9ede9ebd-44fc-42c9-af51-a257b96ebc8b',
  'publisher_id': '9',
  'viewable_time': 27.6,
  'timestamp': 1531767388.819867},
 {'clip': '5982',
  'country': 'RU',
  'event_id': '996d9b9a-df46-4703-9602-ff5a622499d2',
  'publisher_id': '1',
  'viewable_time': 11.0,
  'timestamp': 1531767388.819867},
 {'clip': '7997',
  'country': 'JP',
  'event_id': '9154e307-e8ad-4440-845f-c234fb448a12',
  'publisher_id': '7',
  'viewable_time': 5.0,
  'timestamp': 1531767388.819867},
 {'clip': '7806',
  'country': 'EN',
  'event_id': 'c1b12d05-a386-4314-8598-43fed3b0603f',
  'publisher_id': '1',
  'viewable_time': 8.1,
  'timestamp': 1531767388.819867},
 {'clip': '7828',
  'country': 'CH',
  'event_id': 'e32dbe14-946b-4ae1-b068-715cb1997f4b',
  'publisher_id': '5',
  'viewable_time':

## Statistics
We would like to see a bunch of statistics saved and updated in Redis
1. total sum of viewable_time per publisher (viewable_time_sum_per_publisher)
2. the top 10 publishers by events count (top_n_publisher_by_count)
3. the number of uniques clips per publisher (unique_clips_count_per_publisher)
4. total sum of clips per country viewed by day and by night (clips_count_per_country_day_night)

In [28]:
# 1
statistics.viewable_time_sum_per_publisher(events_to_process)

Unnamed: 0,publisher_id,viewable_time
0,0,1485.3
1,1,1386.3
2,2,1453.8
3,3,1345.9
4,4,1640.2
5,5,1806.3
6,6,1544.2
7,7,1370.2
8,8,1385.5
9,9,1690.0


In [29]:
# 2
statistics.top_n_publisher_by_count(events_to_process, n = 3)

Unnamed: 0,publisher_id,count
5,5,118
9,9,110
4,4,103


In [30]:
# 3
statistics.unique_clips_count_per_publisher(events_to_process)

Unnamed: 0,publisher_id,unique_clips
0,0,"{6726, 7407, 7400, 1791, 6066, 0670, 9946, 687..."
1,1,"{8297, 4970, 1019, 8732, 2480, 9835, 7609, 961..."
2,2,"{0382, 6743, 5612, 0969, 5576, 2304, 0545, 486..."
3,3,"{2726, 9997, 7000, 9617, 6193, 6573, 0276, 557..."
4,4,"{6740, 6614, 9734, 6286, 6017, 3360, 8832, 821..."
5,5,"{0441, 9846, 4049, 3294, 5047, 9270, 7489, 545..."
6,6,"{3514, 9610, 6455, 3078, 7706, 6858, 1373, 255..."
7,7,"{1114, 0918, 8375, 0753, 7796, 7972, 0969, 553..."
8,8,"{0441, 6614, 0118, 0274, 6095, 0643, 2468, 141..."
9,9,"{5376, 7073, 2396, 1940, 4674, 5551, 5752, 548..."


In [31]:
# 4
statistics.clips_count_per_country_day_night(events_to_process)

Unnamed: 0,country,daynight,count
0,CH,night,110
1,DE,night,117
2,EN,night,114
3,FR,night,103
4,IT,night,113
5,JP,night,100
6,NE,night,104
7,RU,night,129
8,US,night,110


## Single step job

These will be the main steps of a single run:

1. Get a new message from the publisher
2. If the message is not empty, continue, else reloop
3. Compute the above statistics for the new events
4. Read the previous computed statistics
5. Update all the statistics
6. Write the updated statistics

This will be the format of the persisted statistics:




In [32]:
statistics_persisted = {
    'statistics': {
        'viewable_time_sum_per_publisher': {
            '<PUBID>': '<actual_total_sum>',
            # ...
        },
        'top_n_publisher_by_count': {
            'data': '<actual_data>',
            'publishers': '<PUBID_1>,...,<PUBID_i>,...,<PUBID_N>',
        },
        'unique_clips_count_per_publisher': {
            'data': {
                '<PUBID>': '<CLIP_1>,...,<CLIP_N>',
                # ...
            },
            'counts': {
                '<PUBID>': 'N',
                # ...
            },
        },
        'clips_count_per_country_day_night': {
            '<COUNTRY_1>': {
                'day': 'X',
                'night': 'Y'
            },
            # ...
        }
    },
    'last_update_timestamp': '<last_update_timestamp_value>'
}

In [33]:
last_stats_str = rc.get('statistics')

try:
    last_stats = eval(last_stats_str)
except TypeError:
    last_stats = {}

In [34]:
viewable_time = statistics.viewable_time_sum_per_publisher(events_to_process)
top_pub = statistics.top_n_publisher_by_count(events_to_process, n=10)
unique_clips_count = statistics.unique_clips_count_per_publisher(events_to_process)
clips_count = statistics.clips_count_per_country_day_night(events_to_process)

In [35]:
last_stats

{}

In [61]:
stats = last_stats.get('statistics', {})

# 1. Viewable time
updated_viewable_time = subscriber.update_viewable_time(stats, viewable_time)
updated_viewable_time_list = list(updated_viewable_time.T.to_dict().values())

# 2. Top pub
updated_top_pub = subscriber.update_top_pub(stats, top_pub)
updated_top_pub_list = list(updated_top_pub.T.to_dict().values())

# 3. Unique clips count
updated_unique_clips_count = subscriber.update_unique_clips_count(stats, unique_clips_count)
updated_unique_clips_count_list = list(updated_unique_clips_count.T.to_dict().values())

# 4. Clips count
updated_clips_count = subscriber.update_clips_count(stats, clips_count)
updated_clips_count_list = list(updated_clips_count.T.to_dict().values())

In [62]:
updated_viewable_time

Unnamed: 0,publisher_id,viewable_time
0,0,1485.3
1,1,1386.3
2,2,1453.8
3,3,1345.9
4,4,1640.2
5,5,1806.3
6,6,1544.2
7,7,1370.2
8,8,1385.5
9,9,1690.0


In [63]:
updated_top_pub

Unnamed: 0,publisher_id,count
0,5,118.0
1,9,110.0
2,4,103.0
3,0,102.0
4,3,100.0
5,1,96.0
6,8,96.0
7,6,95.0
8,7,92.0
9,2,88.0


In [64]:
','.join(updated_top_pub[C.PUBLISHER_ID].values[:10])

'5,9,4,0,3,1,8,6,7,2'

In [65]:
updated_unique_clips_count

Unnamed: 0,publisher_id,unique_clips,unique_clips_count
0,0,"{6726, 7407, 5787, 2830, 7400, 1863, 6997, 349...",102
1,1,"{8297, 4970, 1019, 8732, 6547, 2480, 6220, 983...",96
2,2,"{0382, 6743, 5328, 5612, 4054, 8142, 0969, 366...",88
3,3,"{2726, 2819, 9997, 3404, 7000, 6811, 8833, 988...",99
4,4,"{2832, 6364, 8546, 6740, 6614, 0593, 9734, 349...",102
5,5,"{0441, 4868, 6575, 6140, 9846, 4049, 6990, 329...",117
6,6,"{8273, 9731, 3514, 9610, 2208, 7078, 6455, 408...",95
7,7,"{1114, 0918, 8375, 6675, 2830, 1186, 0753, 779...",92
8,8,"{4961, 0441, 6614, 0118, 8484, 4211, 1231, 027...",95
9,9,"{7886, 5130, 5376, 8862, 9106, 4478, 7345, 048...",108


In [66]:
updated_clips_count

Unnamed: 0,country,daynight,count
0,CH,night,110.0
1,DE,night,117.0
2,EN,night,114.0
3,FR,night,103.0
4,IT,night,113.0
5,JP,night,100.0
6,NE,night,104.0
7,RU,night,129.0
8,US,night,110.0


In [69]:
updated_stats = {
    'statistics': {
        'viewable_time_sum_per_publisher': updated_viewable_time_list,
        'top_n_publisher_by_count': {
            'data': updated_top_pub_list,
            'publishers': ','.join(updated_top_pub[C.PUBLISHER_ID].values[:10]),
        },
        'unique_clips_count_per_publisher': {
            'data': updated_unique_clips_count_list,
        },
        'clips_count_per_country_day_night': updated_clips_count_list
    },
    'last_update_timestamp': datetime.now().timestamp()
}

In [70]:
updated_stats

{'statistics': {'viewable_time_sum_per_publisher': [{'publisher_id': '0',
    'viewable_time': 1485.3000000000004},
   {'publisher_id': '1', 'viewable_time': 1386.3},
   {'publisher_id': '2', 'viewable_time': 1453.8000000000002},
   {'publisher_id': '3', 'viewable_time': 1345.9},
   {'publisher_id': '4', 'viewable_time': 1640.2000000000005},
   {'publisher_id': '5', 'viewable_time': 1806.3000000000002},
   {'publisher_id': '6', 'viewable_time': 1544.2000000000003},
   {'publisher_id': '7', 'viewable_time': 1370.2000000000003},
   {'publisher_id': '8', 'viewable_time': 1385.4999999999998},
   {'publisher_id': '9', 'viewable_time': 1690.0000000000011}],
  'top_n_publisher_by_count': {'data': [{'publisher_id': '5', 'count': 118.0},
    {'publisher_id': '9', 'count': 110.0},
    {'publisher_id': '4', 'count': 103.0},
    {'publisher_id': '0', 'count': 102.0},
    {'publisher_id': '3', 'count': 100.0},
    {'publisher_id': '1', 'count': 96.0},
    {'publisher_id': '8', 'count': 96.0},
    {

# Test code

In [16]:
# Update viewable_time
stats = last_stats.get('statistics', {})

last_viewable_time = stats.get('viewable_time_sum_per_publisher', pd.DataFrame(columns=[C.PUBLISHER_ID, C.VIEWABLE_TIME]))

In [17]:
last_viewable_time

Unnamed: 0,publisher_id,viewable_time


In [18]:
updated_df = (pd.merge(last_viewable_time, pd.DataFrame(viewable_time), how='outer', on=[C.PUBLISHER_ID])
              .set_index([C.PUBLISHER_ID])
              .sum(axis=1)
              .reset_index()
              .rename(columns={0: C.VIEWABLE_TIME})
             )

list(updated_df.T.to_dict().values())

[{'publisher_id': '0', 'viewable_time': 1485.3000000000004},
 {'publisher_id': '1', 'viewable_time': 1386.3},
 {'publisher_id': '2', 'viewable_time': 1453.8000000000002},
 {'publisher_id': '3', 'viewable_time': 1345.9},
 {'publisher_id': '4', 'viewable_time': 1640.2000000000005},
 {'publisher_id': '5', 'viewable_time': 1806.3000000000002},
 {'publisher_id': '6', 'viewable_time': 1544.2000000000003},
 {'publisher_id': '7', 'viewable_time': 1370.2000000000003},
 {'publisher_id': '8', 'viewable_time': 1385.4999999999998},
 {'publisher_id': '9', 'viewable_time': 1690.0000000000011}]

In [19]:
# Update top_pub
stats = last_stats.get('statistics', {})

last_top_pub_dict = stats.get('top_pub', {})
last_top_pub = last_top_pub_dict.get('data', pd.DataFrame(columns=[C.PUBLISHER_ID, 'count']))

In [20]:
updated_df = (pd.merge(last_top_pub, pd.DataFrame(top_pub), how='outer', on=[C.PUBLISHER_ID])
              .set_index([C.PUBLISHER_ID])
              .sum(axis=1)
              .reset_index()
              .rename(columns={0: 'count'})
             )

list(updated_df.T.to_dict().values())

[{'publisher_id': '5', 'count': 118.0},
 {'publisher_id': '9', 'count': 110.0},
 {'publisher_id': '4', 'count': 103.0},
 {'publisher_id': '0', 'count': 102.0},
 {'publisher_id': '3', 'count': 100.0},
 {'publisher_id': '1', 'count': 96.0},
 {'publisher_id': '8', 'count': 96.0},
 {'publisher_id': '6', 'count': 95.0},
 {'publisher_id': '7', 'count': 92.0},
 {'publisher_id': '2', 'count': 88.0}]

In [21]:
# Update top_pub
stats = last_stats.get('statistics', {})

last_unique_clips_count_dict = stats.get('unique_clips_count_per_publisher', {})

last_unique_clips_count = last_unique_clips_count_dict.get('data', pd.DataFrame(columns=[C.PUBLISHER_ID, 'clips']))
last_unique_clips_count

Unnamed: 0,publisher_id,clips


In [22]:
unique_clips_count

[{'publisher_id': '0',
  'unique_clips': {'0016',
   '0095',
   '0214',
   '0220',
   '0491',
   '0670',
   '0878',
   '0942',
   '1080',
   '1311',
   '1312',
   '1428',
   '1464',
   '1502',
   '1791',
   '1863',
   '2021',
   '2055',
   '2069',
   '2164',
   '2297',
   '2334',
   '2444',
   '2580',
   '2636',
   '2768',
   '2830',
   '2924',
   '3024',
   '3133',
   '3174',
   '3236',
   '3284',
   '3365',
   '3493',
   '3529',
   '3569',
   '3583',
   '3635',
   '3679',
   '3923',
   '4376',
   '4511',
   '4700',
   '4870',
   '4923',
   '4951',
   '5133',
   '5198',
   '5252',
   '5386',
   '5439',
   '5522',
   '5578',
   '5690',
   '5721',
   '5779',
   '5787',
   '5855',
   '5981',
   '6066',
   '6102',
   '6111',
   '6127',
   '6382',
   '6407',
   '6420',
   '6511',
   '6590',
   '6726',
   '6875',
   '6997',
   '7084',
   '7117',
   '7211',
   '7265',
   '7318',
   '7400',
   '7407',
   '7436',
   '7437',
   '7723',
   '7913',
   '7915',
   '7941',
   '7953',
   '8328',
   '

In [23]:
def special_sum(lis):
    head = lis[0]
    if isinstance(head, set) is False:
        head = set()
    if len(lis) > 1:
        return head.union(special_sum(lis[1:]))
    else:
        return head

updated_df = (pd.merge(last_unique_clips_count, pd.DataFrame(unique_clips_count), how='outer', on=[C.PUBLISHER_ID])
              .set_index([C.PUBLISHER_ID])
              .aggregate(special_sum, axis=1)
              .reset_index()
              .rename(columns={0: 'unique_clips'})
             )

list(updated_df.T.to_dict().values())

[{'publisher_id': '0',
  'unique_clips': {'0016',
   '0095',
   '0214',
   '0220',
   '0491',
   '0670',
   '0878',
   '0942',
   '1080',
   '1311',
   '1312',
   '1428',
   '1464',
   '1502',
   '1791',
   '1863',
   '2021',
   '2055',
   '2069',
   '2164',
   '2297',
   '2334',
   '2444',
   '2580',
   '2636',
   '2768',
   '2830',
   '2924',
   '3024',
   '3133',
   '3174',
   '3236',
   '3284',
   '3365',
   '3493',
   '3529',
   '3569',
   '3583',
   '3635',
   '3679',
   '3923',
   '4376',
   '4511',
   '4700',
   '4870',
   '4923',
   '4951',
   '5133',
   '5198',
   '5252',
   '5386',
   '5439',
   '5522',
   '5578',
   '5690',
   '5721',
   '5779',
   '5787',
   '5855',
   '5981',
   '6066',
   '6102',
   '6111',
   '6127',
   '6382',
   '6407',
   '6420',
   '6511',
   '6590',
   '6726',
   '6875',
   '6997',
   '7084',
   '7117',
   '7211',
   '7265',
   '7318',
   '7400',
   '7407',
   '7436',
   '7437',
   '7723',
   '7913',
   '7915',
   '7941',
   '7953',
   '8328',
   '

In [24]:
# Update clips_count
stats = last_stats.get('statistics', {})

data_str = stats.get('clips_count_per_country_day_night', None)
last_clips_count = pd.DataFrame(data_str) if data_str else pd.DataFrame(columns=[C.COUNTRY, 'daynight', 'count'])

last_clips_count

Unnamed: 0,country,daynight,count


In [25]:
updated_df = (pd.merge(last_clips_count, pd.DataFrame(clips_count), how='outer', on=[C.COUNTRY, 'daynight'])
              .set_index([C.COUNTRY, 'daynight'])
              .sum(axis=1)
              .reset_index()
              .rename(columns={0: 'count'})
             )

list(updated_df.T.to_dict().values())

[{'country': 'CH', 'daynight': 'night', 'count': 110.0},
 {'country': 'DE', 'daynight': 'night', 'count': 117.0},
 {'country': 'EN', 'daynight': 'night', 'count': 114.0},
 {'country': 'FR', 'daynight': 'night', 'count': 103.0},
 {'country': 'IT', 'daynight': 'night', 'count': 113.0},
 {'country': 'JP', 'daynight': 'night', 'count': 100.0},
 {'country': 'NE', 'daynight': 'night', 'count': 104.0},
 {'country': 'RU', 'daynight': 'night', 'count': 129.0},
 {'country': 'US', 'daynight': 'night', 'count': 110.0}]