# Refactorisation

In [1]:
import numpy as np
import random
import time
from datetime import datetime
from datetime import timedelta
import pandas as pd
from statistics import mean
from loguru import logger
import uuid
import json
import pytz
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import itertools
from statsmodels.formula.api import ols
plt.style.use('seaborn')

### Read Data

In [2]:
# global constants
filename = '20200715-E7S2WK4HFP-br.jsonl'

def parse_file(n_lines=None):
    with open(filename) as infile:
        if n_lines is not None: # only read at most n_lines
            file_iterator = itertools.islice(infile,n_lines,n_lines+500000)
        else:
            file_iterator = infile
        all_dict = list(map(json.loads,file_iterator))
        logger.info("Loaded {} rows",len(all_dict))
        return all_dict
    
def create_frame(br_object):
    deals = br_object['Bidrequest']['imp'][0]['pmp']['deals']
    for deal in deals:
        if deal['id'] == "E7S2WK4HFP":
            price = deal['bidfloor'] 
    ts = br_object['Timestamp_status_receive_ms']/1000
    tz = pytz.timezone(br_object['Timezone'])
    record={
        'UTC_date': datetime.utcfromtimestamp(ts),
        'local_date': datetime.fromtimestamp(ts,tz=tz),
        'local_hour': datetime.fromtimestamp(ts,tz=tz).hour,
        'ts': ts,
        'TZ': br_object['Timezone'],
        'imps': br_object['Imps'],
        'CPM': price
        }
    record['price'] = (record['imps'] * record['CPM']) / 1000
    return record

In [3]:
%%time

# We have to load 6 000 000 lines
first = True
N_LINES = 0
while N_LINES < 5500000:
    if first:
        first = False
        all_dicts = parse_file(n_lines=N_LINES)
        records = list(map(create_frame, all_dicts))
        df = pd.DataFrame.from_records(records).sort_values("ts",ascending=True)
    else:
        N_LINES += 500000
        all_dicts = parse_file(n_lines=N_LINES)
        records = list(map(create_frame, all_dicts))
        df = df.append(pd.DataFrame.from_records(records).sort_values("ts",ascending=True))  

2020-07-31 10:00:13.179 | INFO     | __main__:parse_file:11 - Loaded 500000 rows
2020-07-31 10:00:42.315 | INFO     | __main__:parse_file:11 - Loaded 500000 rows
2020-07-31 10:01:09.918 | INFO     | __main__:parse_file:11 - Loaded 500000 rows
2020-07-31 10:01:37.653 | INFO     | __main__:parse_file:11 - Loaded 500000 rows
2020-07-31 10:02:07.844 | INFO     | __main__:parse_file:11 - Loaded 500000 rows
2020-07-31 10:02:39.244 | INFO     | __main__:parse_file:11 - Loaded 500000 rows
2020-07-31 10:03:08.042 | INFO     | __main__:parse_file:11 - Loaded 500000 rows
2020-07-31 10:03:39.420 | INFO     | __main__:parse_file:11 - Loaded 500000 rows
2020-07-31 10:04:11.250 | INFO     | __main__:parse_file:11 - Loaded 500000 rows
2020-07-31 10:04:45.445 | INFO     | __main__:parse_file:11 - Loaded 500000 rows
2020-07-31 10:05:33.041 | INFO     | __main__:parse_file:11 - Loaded 500000 rows
2020-07-31 10:06:39.423 | INFO     | __main__:parse_file:11 - Loaded 500000 rows


CPU times: user 4min 45s, sys: 11.4 s, total: 4min 57s
Wall time: 6min 54s


In [4]:
def win_proba(probability):
    return random.random() < probability
def delay_notif():
    if random.random() < 0.90:
        delay = random.randint(2,60)
    else:
        delay = random.randint(60,900)
    return delay

In [5]:
df['win'] = df.apply(lambda x: win_proba(0.95), axis = 1)
df['seconds_notif'] = df.apply(lambda x: delay_notif(), axis = 1)

In [6]:
data = df[df.TZ == 'America/New_York']
data.set_index('local_date', inplace=True)
data.sort_index(inplace = True)

In [7]:
data.head()

Unnamed: 0_level_0,UTC_date,local_hour,ts,TZ,imps,CPM,price,win,seconds_notif
local_date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2020-07-07 20:00:00.042000-04:00,2020-07-08 00:00:00.042,20,1594166000.0,America/New_York,25.037222,9,0.225335,True,182
2020-07-07 20:00:00.070000-04:00,2020-07-08 00:00:00.070,20,1594166000.0,America/New_York,27.289524,9,0.245606,True,17
2020-07-07 20:00:01.305000-04:00,2020-07-08 00:00:01.305,20,1594166000.0,America/New_York,20.371296,9,0.183342,True,17
2020-07-07 20:00:01.706000-04:00,2020-07-08 00:00:01.706,20,1594166000.0,America/New_York,1.015774,9,0.009142,True,3
2020-07-07 20:00:01.729000-04:00,2020-07-08 00:00:01.729,20,1594166000.0,America/New_York,2.353671,9,0.021183,True,59


### Pacing

In [11]:
databis = data['2020-07-07 20:00:00':'2020-07-09 23:59:59']

In [8]:
def gen_prop_lr(br_object):
    aggr = br_object.imps.groupby([br_object.index.date,br_object.index.weekday,br_object.index.hour]).sum()
    aggr.index.names = ['date','weekday','hour']
    aggr = aggr.reset_index()
    model = ols('imps ~ C(weekday) + C(hour)', data=aggr).fit()
    weekday_list = range(7)
    weekday_list = list(itertools.chain.from_iterable(itertools.repeat(x, 24) for x in weekday_list))
    hour_list = list()
    for i in range(7):
        for z in range(24):
            hour_list.append(z)
    df_fitting = pd.DataFrame({'weekday':weekday_list,'hour':hour_list})
    prediction = model.predict(df_fitting)
    df_fitting['fitted'] = prediction
    pattern = df_fitting.pivot_table('fitted', index = df_fitting.hour, columns=df_fitting.weekday)
    line, col = pattern.shape
    for i in range(col):
        pattern.iloc[:,i] = pattern.iloc[:,i]*100/pattern.iloc[:,i].sum()
    return pattern

In [9]:
def gen_prop_lr_hour(br_object):
    aggr = br_object.imps.groupby([br_object.index.date,br_object.index.hour]).sum().reset_index()
    aggr.columns = ['date','hour','imps']
    model = ols('imps ~ C(hour)', data=aggr).fit()
    hour_list = list()
    for z in range(24):
        hour_list.append(z)
    df_fitting = pd.DataFrame({'hour':hour_list})
    prediction = model.predict(df_fitting)
    df_fitting['fitted'] = prediction
    df_fitting.index = df_fitting.hour
    del df_fitting['hour']
    df_fitting.iloc[:,0] = df_fitting.iloc[:,0]*100/df_fitting.iloc[:,0].sum()
    return df_fitting

In [10]:
def meta_prop(data):
    if data.empty:
        unif = True
        without_weekday = False
        prop = 1/24
    elif set(data.index.hour.unique()) != set(range(24)):
        unif = True
        without_weekday = False
        prop = 1/24
    else:
        diff = data.tail(1).index - data.head(1).index
        if diff.days < 7:
            unif = False
            without_weekday = True
            prop = gen_prop_lr_hour(data)
        elif diff.days == 7:
            unif = False
            without_weekday = False
            prop = gen_prop_lr(data)
    return (prop, unif, without_weekday)

In [38]:
def send_pending_notifications(instance_obj, current_ts = None):
        """ Send notifications 
        
        :param current_ts: if None: will send all notifications, else send before current_ts
        :return:
        """
        while len(pending_notifications) > 0 and (pending_notifications[0]['timestamp'] <= current_ts if current_ts else True):
            ev = pending_notifications.pop(0)
            instance_obj.receive_notification(ev['status'], ev['br_price'])

In [39]:
class Algo:
    def __init__(self, daily_budget, nb_hours_day, timezone):
        """Class constructor"""
        # Fixed attributes
        self.daily_budget = daily_budget
        self.building = list()
        # Impossible day and hour to initialize the setup
        self.day = 0
        self.engaged_budget = 0
        self.tz = pytz.timezone(timezone)
        
    def day_reset(self, day, month, year):
        """ Reset variables when there is a new day
        """
        if not self.building:
            self.building_data = pd.DataFrame.from_records(self.building)
        else:
            self.building_data = pd.DataFrame.from_records(self.building, index='Date')
        self.current_hour = -1
        self.remaining_budget_hour = 0
        self.remaining_budget = self.daily_budget
        self.spent_budget = 0
        self.surplus_hour = 0
        self.BT = [0]
        self.acceleration = [{'ts':self.tz.localize(datetime(year,month,day,0,0,0)),
                              'A':0}]
        self.speed = [{'ts':self.tz.localize(datetime(year,month,day,0,0,0)),
                              'S':0}]
        self.size_acceleration = 1
        self.sum_acceleration = 0
        self.size_speed = 1
        self.sum_speed = 0
        self.prop_table, self.unif, self.without_weekday = meta_prop(self.building_data)
        
    def hour_reset(self, weekday):
        """ Reset budget for the following hour
        """
        self.current_hour += 1 
        self.remaining_hours = 24 - self.current_hour
        # Evolutive target
        self.surplus_hour += self.remaining_budget_hour / self.remaining_hours
        if self.unif:
            self.budget_hour = self.prop_table * self.daily_budget + self.surplus_hour
        elif self.without_weekday:
            self.budget_hour = (self.prop_table.iloc[self.current_hour, 0]/100)*self.daily_budget + self.surplus_hour
        else:
            self.budget_hour = (self.prop_table.iloc[self.current_hour, weekday]/100)*self.daily_budget + self.surplus_hour
        self.target = self.budget_hour/3600
        self.spent_hour = 0
        self.remaining_budget_hour = self.budget_hour - self.spent_hour
    
    def gen_mean(self, mean_type):
        """ Calculate the average variation and speed of variation for bt
        """
        created_time = self.acceleration[-1]['ts'] - timedelta(minutes=30)
        if mean_type == 'acceleration':
            while self.acceleration[0]['ts'] < created_time:
                self.size_acceleration -= 1 
                self.sum_acceleration += self.acceleration[0]['A']
                del self.acceleration[0]
            try:
                average = self.sum_acceleration / self.size_acceleration
            except ZeroDivisionError:
                average = 0
        elif mean_type == 'speed':
            while self.speed[0]['ts'] < created_time:
                self.size_speed -= 1 
                self.sum_speed += self.speed[0]['S']
                del self.speed[0]
            try:
                average = self.sum_speed / self.size_speed
            except ZeroDivisionError:
                average = 0
        return average
    
    def bt_calculation(self, average_acceleration, average_speed, remaining_time, coef=1):
        """ Calculate the per second budget
        """
        self.remaining_budget = self.daily_budget - (self.engaged_budget + self.spent_budget)
        alpha = average_acceleration * coef
        try:
            bt = self.remaining_budget_hour * ((1 + alpha * average_speed) / remaining_time) 
        except ZeroDivisionError:
            bt = 1
        return bt
    
    def buying_decision(self, ts, price):
        """From a BR, decide whether to buy or not
        
        Arguments:
        :ts: timestamp of the BR
        :price: price of the BR
        """
        # TS de la BR
        weekday = ts.weekday()
        day = ts.day
        month = ts.month
        year = ts.year
        hour = ts.hour
        
        # If we begin a new day, we reset variables
        if self.day != day:
            self.day_reset(day, month, year)
        self.day = day
        
        # Changement of hour
        while hour != self.current_hour:
            self.hour_reset(weekday)
            
        # Remaining time before the end of the hour
        end_hour = self.tz.localize(datetime(year,month,day,hour,59,59,999999))
        remaining_time =  datetime.timestamp(end_hour) - datetime.timestamp(ts)
        
        # Calculation of bt
        average_acceleration = self.gen_mean('acceleration')
        average_speed = self.gen_mean('speed')
        bt = self.bt_calculation(average_acceleration, average_speed, remaining_time)
        
        # Calculation of vt and at
        self.BT.append(bt)
        vt = self.BT[-1] - self.BT[-2] 
        self.speed.append({'ts':ts,
                          'S': vt})
        at = self.speed[-1]['S'] - self.speed[-2]['S']
        self.acceleration.append({'ts':ts,
                          'A': at})
        
        # Buying decision
        if (bt >= self.target) and (self.remaining_budget - price) >= 0:
            buying = True
            self.engaged_budget += price
            self.spent_hour += price
        else:
            buying = False
        self.remaining_budget_hour = self.budget_hour - self.spent_hour

        return buying   
    
    def receive_notification(self, status, br_price):
        """ From a notification, take into account the status (win/lose)
        """
        if status == 'win':
            self.engaged_budget -= br_price
            self.spent_budget += br_price
        elif status == 'lose':
            self.engaged_budget -= br_price
            self.spent_hour -= br_price

In [40]:
pacing = Algo(daily_budget=3000, nb_hours_day=24, 'America/New_York')

In [41]:
%%time 
records = list()
pending_notifications = list()
day = 7
for current_ts, row in databis.iterrows():
    # Send current notifications
    send_pending_notifications(pacing, current_ts)
    if current_ts.day != day:
        day = current_ts.day
        # Send remaining notifications
        send_pending_notifications(pacing)
        records[-1]['engaged'] = pacing.engaged_budget
        pacing.remaining_budget = pacing.daily_budget - (pacing.engaged_budget + pacing.spent_budget)
        records[-1]['remaining'] = pacing.remaining_budget
        records[-1]['spent'] = pacing.spent_budget
        
    # Receive BR
    buying = pacing.buying_decision(current_ts, row['price'])
    
    # Making a decision
    if buying:
        next_notif_ts = current_ts + timedelta(seconds=row['seconds_notif'])
        status = "win" if row['win'] else "lose"
        notif_id = uuid.uuid4()
        pending_notifications.append({"timestamp": next_notif_ts, "status": status, 'br_price': row['price'], 'id': notif_id})
        pending_notifications.sort(key=lambda x: x['timestamp'])
    record = {
        'buying':buying,
        'remaining':pacing.remaining_budget,
        'spent':pacing.spent_budget,
        'engaged':pacing.engaged_budget
    }
    records.append(record)
# Send remaining notifications
send_pending_notifications(pacing)
# Update last row after sending last notifications
records[-1]['engaged'] = pacing.engaged_budget
pacing.remaining_budget = pacing.daily_budget - (pacing.engaged_budget + pacing.spent_budget)
records[-1]['remaining'] = pacing.remaining_budget
records[-1]['spent'] = pacing.spent_budget
pacing_df = pd.DataFrame.from_records(records)
new_df_et = pd.concat([databis.reset_index(),pacing_df], axis=1, ignore_index=True)
new_df_et.columns = ['local_date','UTC_date','hour','ts','TZ','imps','CPM','price','win','seconds_notif',
                 'buying','remaining','spent','engaged']
new_df_et.set_index('local_date', inplace=True)

CPU times: user 2min 23s, sys: 408 ms, total: 2min 23s
Wall time: 2min 23s


In [42]:
new_df_et.groupby(new_df_et.index.day).tail(1)

Unnamed: 0_level_0,UTC_date,hour,ts,TZ,imps,CPM,price,win,seconds_notif,buying,remaining,spent,engaged
local_date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
2020-07-07 23:59:59.803000-04:00,2020-07-08 03:59:59.803,23,1594181000.0,America/New_York,8.834286,9,0.079509,True,30,True,0.976009,2999.023991,1.120597e-13
2020-07-08 23:59:59.259000-04:00,2020-07-09 03:59:59.259,23,1594267000.0,America/New_York,30.677381,9,0.276096,True,23,False,0.130134,2999.869866,6.706441e-14
2020-07-09 23:59:59.139000-04:00,2020-07-10 03:59:59.139,23,1594354000.0,America/New_York,4.214881,9,0.037934,True,112,False,0.139666,2999.860334,8.848478e-14
