## Pipeline 

Main goal. Pipeline skeleton to:
1. Classify households according to some grouping
2. Compute TV metrics
3. Testing sample size methodologies


Pipeline structure
1. Send relevant info to filter crosswalk-eventlog data (client, week for crosswalk, campaign dates, etc)
2. Creating base data frame with ctrl group and visited flags
3. Calculate metrics and choose variables to group data set
4. Sample sizes analysis (100 samples of total data)

In [None]:
import os
currentdir = os.getcwd()
correctdir = currentdir.rsplit('tv2ds/',1)[0]
os.chdir(correctdir)

from tv2ds.ds_lib import notebook_prodrun
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import datetime 
from datetime import datetime, timedelta
import tvsquared.settings
from tvsquared.lib.request import Request
from tvsquared.lib.athena import AthenaDatabase
pd.options.display.float_format = '{:.6f}'.format
pd.set_option('display.max_colwidth', None)

In [None]:
notebook_prodrun.set_env('US', prodrunenv='PROD')
crosswalk_suffix = os.environ.get('TV2PRODRUNENV').lower()
crosswalk_suffix

'produsa'

In [None]:
def make_request(clientid, brandid = 1, datefrom=False, dateto=False):
    # Helper Function to make a request from client, brand, and date range arguments
    # Returns the request object
    if datefrom and dateto:
        request = Request(clientarg=False, brandarg=False, datesarg=False)
        datefrom = datetime.datetime.strptime(datefrom, '%Y-%m-%d')
        dateto = datetime.datetime.strptime(dateto, '%Y-%m-%d')
        request = request.init(partnerid=None, clientid=clientid, brandid=brandid,datefrom=datefrom, dateto=dateto,loglevel=-1, extargs=None, request=None, usespark=None, readPreference=None,prodrun=False)
        return(request)
    else:
        request = Request(clientarg=False, brandarg=False, datesarg=False)
        request = request.init(partnerid=None, clientid=clientid, brandid=brandid,datefrom=None, dateto=None,loglevel=-1, extargs=None, request=None, usespark=None, readPreference=None,prodrun=False)
        return(request)


def query_athena(request, query, copy_to_local=False):
    athena_db = AthenaDatabase.get_client_database(request)
    query_results = AthenaDatabase.execute_query(athena_db, query, copy_to_local=copy_to_local)
    df = pd.DataFrame(query_results)
    return(df)

In [None]:
client = {'clientid': 9306}
request = make_request(client['clientid'])

## ✅️ Step1. Send relevant info to filter crosswalk-eventlog data

### ➡️ Client and information used to extract sample data:
**Drizly**<br>
vendor_name='inscape', yy='2022', mm='03',dd='28',crosswalk_suffix='produsa',dateto='2022-03-31', datefrom='2022-03-01', clientid='c9306_drizly'<br>

**Therealreal**<br>
vendor_name='inscape', yy='2022', mm='01',dd='24',crosswalk_suffix='produsa',dateto='2022-01-30', datefrom='2022-01-01', clientid='c9534_uti'<br>

**Uti**<br>
vendor_name='inscape', yy='2022', mm='01',dd='24',crosswalk_suffix='produsa',dateto='2022-01-30', datefrom='2022-01-01', clientid='9534'<br>

In [None]:
# Inputs needed for queries in Athena

yy_input='2022' 
mm_input='03'
dd_input='28'
dateto_input='2022-04-03' 
# dateto_input='2022-03-31' 
datefrom_input='2022-03-01'  
lookback_window_input=30
clientid_input='c9306_drizly'
# clientid_input='c9534_uti'
# clientid_input='c16319_the_realreal'

### ➡️ Control data set

#### ➜ All control hhs per day/ per week - my function

Tables created: 
-  df_control_pday
- control_n_total

This query is bring back: 8,040,334 unique hhs ids as expected but only 106,355 are catalogued in the eventlog with an event, so only those will have a date attached to it.<br/>
The rest, which is the mayority (7m), have no date attached to it and they are just valid and active .. so how to turn this into daily control groups? is there a date variable that can be used in the campaign universe? <br/>

**note 6th October**: <br/>
I am still working on this section, it is not a straight forward task. it is under investigation in RAD-570 Original query exploration. 
I am using a function that Michael shared with me.<br/>

**note 11th October**:<br/>
I ran and modified the function Michael shared with me and I do now have control groups at daily and weekly levels. However, for the control exposed, I modified the function I had at campaign level to add just a date variable as new information and keeping in mind the high level figures I already had should still add up (I reckoned, it was just a matter of adding new infromation, but the totals should be the same). The thing now is that these numbers are very different from the visitied in Michael's function so now sure this if this is right anymore. 
Maybe I need to run Michael's function for the exposed group and see how those numbers compare to the ones I have. 

In [None]:
def query_control_perday(subquery_name: str, yy_value:str, mm_value:str, dd_value:str, dateto_value:str, datefrom_value:str, clientid_value:str): 
   return '''
   with filtered_hh as (
      select 
         key_value as mapped_tv2_hhid
      from {crosswalk_suffix}_modeldata.crosswalk
      where 
         yy='{yy}' 
         and mm='{mm}' 
         and dd='{dd}'
         and key_name ='tv2_hhid' 
         and vendor_name = '{vendor_name}'
         and excluded_stamp is null
      ),
      campaign_universe as (
         select 
            complex_ranges.mapped_tv2_hhid 
         FROM {crosswalk_suffix}_modeldata.crosswalk cw
         CROSS JOIN UNNEST(complex_range) AS t (complex_ranges)
         join filtered_hh fh on fh.mapped_tv2_hhid = complex_ranges.mapped_tv2_hhid
         where 
            vendor_name = '{vendor_name}'
            and complex_ranges.first_seen <= timestamp '{dateto}' + interval '1' day
            and complex_ranges.last_seen >= timestamp '{datefrom}' 
            and key_name = 'tv2_hhid'
            and yy='{yy}'
            and mm='{mm}' 
            and dd='{dd}'
      ),
      client_eventlog as (
         select 
            date_trunc('day', datadatetime) as day,
            crosswalk_link_id,
            event_class,
            in_scope,
            datadatetime
         from {clientid}_{crosswalk_suffix}.eventlog
         where 
            datadatetime between timestamp '{datefrom}' 
            and timestamp '{dateto}' + interval '7' day
      ),
      hh_impressed_30days as (
         select
            distinct crosswalk_link_id as mapped_tv2_hhid
         from campaign_universe ex
         join client_eventlog ev on ev.crosswalk_link_id = ex.mapped_tv2_hhid
         where
            event_class='impression'
            and datadatetime between timestamp '{datefrom}' - interval '30' day    
            and timestamp '{dateto}' + interval '1' day + interval '6' day
            and in_scope
      ),
      hh_control as (
         select
            distinct cu.mapped_tv2_hhid
         from campaign_universe cu
         left join hh_impressed_30days hh on hh.mapped_tv2_hhid  = cu.mapped_tv2_hhid
         where hh.mapped_tv2_hhid is null
      ),
      hh_control_perday as (
         select
            day,
            mapped_tv2_hhid
         from hh_control
         left join client_eventlog on crosswalk_link_id = mapped_tv2_hhid
      )
      select
         *
      from {result}


'''.format(
      vendor_name='inscape', 
      yy=yy_value, mm=mm_value,dd=dd_value,
      crosswalk_suffix='produsa',
      dateto=dateto_value, 
      datefrom=datefrom_value,  
      clientid=clientid_value,
      result=subquery_name
      )   

In [None]:
df_control_pday= query_athena(
    request, 
    query_control_perday(
        subquery_name='hh_control_perday',
        yy_value=yy_input, mm_value=mm_input,dd_value=dd_input,
        dateto_value=dateto_input, 
        datefrom_value=datefrom_input,  
        clientid_value=clientid_input,
        ))#.astype({"ctrl_hhs": "int"}) #--> need to us this for the previous code when I tried bringing flags directly from Athena, this was the problem

df_control_pday.day = pd.to_datetime(df_control_pday.day)

In [None]:
df_control_pday.shape

(8228310, 2)

In [None]:
df_control_pday.head()

In [None]:
# df_control_pday.ctrl_hhs.sum()
df_control_pday.mapped_tv2_hhid.nunique()

8040334

In [None]:
control_n_total = df_control_pday.groupby(['day'], dropna=False).agg(n_total=("mapped_tv2_hhid", "nunique")) #, dropna=False
control_n_total

Unnamed: 0_level_0,n_total
day,Unnamed: 1_level_1
2022-03-01,2902
2022-03-02,2837
2022-03-03,2971
2022-03-04,3292
2022-03-05,3518
2022-03-06,3117
2022-03-07,1756
2022-03-08,2764
2022-03-09,2782
2022-03-10,2743


In [None]:
control_n_total.n_total.sum()

8081940

#### ➜ All control hhs per day/ per week - Michael function
tables created: 
- df_control_daily
- df_control_weekly

In [None]:
# pd.DataFrame({"lala": ["1.0", "2.3"]}).assign(lala=lambda df: np.floor(pd.to_numeric(df.lala, errors="coerce")).astype("Int64"))
request_ctrl_fun = make_request(9306,1)

In [None]:
def get_control_group(
    request, datefrom_value:str, dateto_value:str, yy_value:str, mm_value:str, dd_value:str,clientid_value:str, 
    granularity:str, lookback_window:int, filter_linear=True,):
    """
    Slow version to get us off the ground. Take a date range and then it will run the athena query per granularity
    specified to get the aggregate totals for eligible control households and the number of visits

    Parameters
    ----------
    request : Request
        request object
    datefrom : str
        start date for date range
    dateto : str
        end date for date range
    cw_yy : int
        crosswalk year
    cw_mm : int
        crosswalk month
    cw_dd : int
        crosswalk day
    granularity: what frequency to pass to pandas for date range, W (weekly) or D (daily) -- (hint: weekly on Monday is 'W-MON' ?)
    lookback_window : int
        how far to look back to fund elibigle households, e.g. not exposed in last '30' days
    crosswalk_suffix : str
        which env are you using, default 'prod'
    vendor_name : str
        crosswalk vendor name e.g. inscape
    filter_linear : bool
        should we consider both OTT and Linear impressions when building the control group? default True

    Returns
    -------
    DataFrame
        control households and visits per granularity specified, with visit rate calculated

    Version control
    -------
    Created by: Michael Comerford
    Last Modified by: Chio Martinez 10th Oct 
        - introduced change on concatenated df to use concat instead of append in the final df
        - introduced granularity functionality: 'W' and 'D'
        - changes to function call and parameters 

    """
    # get db from request object
    athena_db = AthenaDatabase.get_client_database(request)

    # create list of dates from specified range
    datelist = pd.date_range(start=datefrom_input, end=dateto_input, freq=granularity).to_list()

    # if filter_linear is true we need to remove the event filter
    overlap = ''
    if not filter_linear:
        overlap = "and event = 'vod'"

    # we'll store the aggregated results in a DataFrame
    dfs=[]


    for date in datelist:

        # if granularity = 'W' then date datefrom=date - 7 days, 
        datefrom_granularity = date
        if granularity == 'W':
            datefrom_granularity = date - timedelta(days=6)

        print(
            '''Getting Data for dates between {datefrom} until {dateto}...'''
            .format(
                dateto=date, 
                datefrom=datefrom_granularity)
            )

        query = """
        -- select universe of unfiltered people for time range
        

        with filtered_hh as (
        select 
            key_value as mapped_tv2_hhid
        from {crosswalk_suffix}_modeldata.crosswalk
        where 
            yy='{yy}' 
            and mm='{mm}' 
            and dd='{dd}'
            and key_name ='tv2_hhid' 
            and vendor_name = '{vendor_name}'
            and excluded_stamp is null
        ),
        campaign_universe as (
            select 
                complex_ranges.mapped_tv2_hhid 
            FROM {crosswalk_suffix}_modeldata.crosswalk cw
            CROSS JOIN UNNEST(complex_range) AS t (complex_ranges)
            join filtered_hh fh on fh.mapped_tv2_hhid = complex_ranges.mapped_tv2_hhid
            where 
                vendor_name = '{vendor_name}'
                and complex_ranges.first_seen <= timestamp '{dateto}' + interval '1' day
                and complex_ranges.last_seen >= timestamp '{datefrom}' 
                and key_name = 'tv2_hhid'
                and yy='{yy}'
                and mm='{mm}' 
                and dd='{dd}'
        ),
        client_eventlog as (
            select 
            *
            from {clientid}_{crosswalk_suffix}.eventlog
            where 
                datadatetime between timestamp '{datefrom}' 
                and timestamp '{dateto}' + interval '7' day
        ),
        hh_impressed_30days as (
            select
                distinct crosswalk_link_id as mapped_tv2_hhid
            from campaign_universe ex
            join client_eventlog ev on ev.crosswalk_link_id = ex.mapped_tv2_hhid
            where
                event_class='impression'
                {overlap}
                and datadatetime between timestamp '{datefrom}' - interval '{lookback_window}' day    
                and timestamp '{dateto}' + interval '7' day
                and in_scope
        ),
        hh_control as (
            select
                distinct cu.mapped_tv2_hhid
            from campaign_universe cu
            left join hh_impressed_30days hh on hh.mapped_tv2_hhid  = cu.mapped_tv2_hhid
            where hh.mapped_tv2_hhid is null
        ),
        n_hh_control_visited as (
            select 
                count(distinct mapped_tv2_hhid) as ctrl_visited
            from client_eventlog
            join hh_control on crosswalk_link_id = mapped_tv2_hhid
            where
                event_class ='response' and event= 'all response'
                and datadatetime between timestamp '{datefrom}' 
                and timestamp '{dateto}' + interval '7' day
                and in_scope in (TRUE, null)
        ),
        n_hh_control as (
            select CAST(count(distinct mapped_tv2_hhid) AS double) as ctrl_hh
            from hh_control
        ),
        final_results as (
            select *
            from n_hh_control_visited
            cross join n_hh_control
        )
        select
            '{datefrom}' as day, 
            ctrl_hh,
            ctrl_visited,
            ctrl_visited/ctrl_hh as ctrl_vr
        from final_results

        """.format(
            yy=yy_value, mm=mm_value, dd=dd_value,
            dateto=date, 
            datefrom=datefrom_granularity,  
            clientid=clientid_value,
            lookback_window=lookback_window, 
            overlap=overlap,
            vendor_name='inscape',
            crosswalk_suffix='produsa',
            )   

        query_results = AthenaDatabase.execute_query(athena_db, query)
        
        dfs.append(pd.DataFrame(query_results))
        results = pd.concat(dfs, ignore_index=True)
        
        request.log.info(query)


    return results

In [None]:
granularity_input='D'

def to_int(col: pd.Series):
    return np.floor(pd.to_numeric(col, errors="coerce")).astype("Int64")


df_control_daily= get_control_group(
    request=request_ctrl_fun, 
    yy_value=yy_input, mm_value=mm_input,dd_value=dd_input,
    granularity=granularity_input,
    lookback_window=lookback_window_input,
    dateto_value=dateto_input, 
    datefrom_value=datefrom_input,  
    clientid_value=clientid_input,
    ).assign(
        ctrl_hh=lambda df: to_int(df.ctrl_hh),
        ctrl_visited=lambda df: to_int(df.ctrl_visited),
    )

df_control_daily.day = pd.to_datetime(df_control_daily.day)
df_control_daily.dtypes


Getting Data for dates between 2022-03-01 00:00:00 until 2022-03-01 00:00:00...
Getting Data for dates between 2022-03-02 00:00:00 until 2022-03-02 00:00:00...
Getting Data for dates between 2022-03-03 00:00:00 until 2022-03-03 00:00:00...
Getting Data for dates between 2022-03-04 00:00:00 until 2022-03-04 00:00:00...
Getting Data for dates between 2022-03-05 00:00:00 until 2022-03-05 00:00:00...
Getting Data for dates between 2022-03-06 00:00:00 until 2022-03-06 00:00:00...
Getting Data for dates between 2022-03-07 00:00:00 until 2022-03-07 00:00:00...
Getting Data for dates between 2022-03-08 00:00:00 until 2022-03-08 00:00:00...
Getting Data for dates between 2022-03-09 00:00:00 until 2022-03-09 00:00:00...
Getting Data for dates between 2022-03-10 00:00:00 until 2022-03-10 00:00:00...
Getting Data for dates between 2022-03-11 00:00:00 until 2022-03-11 00:00:00...
Getting Data for dates between 2022-03-12 00:00:00 until 2022-03-12 00:00:00...
Getting Data for dates between 2022-03-1

day             datetime64[ns]
ctrl_hh                  Int64
ctrl_visited             Int64
ctrl_vr                 object
dtype: object

In [None]:
df_control_daily.head()

Unnamed: 0,day,ctrl_hh,ctrl_visited,ctrl_vr
0,2022-03-01,8637847,17999,0.0020837368385895
1,2022-03-02,8651376,18100,0.0020921527396335
2,2022-03-03,8651396,18248,0.0021092549687934
3,2022-03-04,8629766,18041,0.002090554946681
4,2022-03-05,8593316,17899,0.0020828979174046


In [None]:
granularity_input='W'

df_control_weekly= get_control_group(
    request=request_ctrl_fun, 
    yy_value=yy_input, mm_value=mm_input,dd_value=dd_input,
    granularity=granularity_input,
    lookback_window=lookback_window_input,
    dateto_value=dateto_input, 
    datefrom_value=datefrom_input,  
    clientid_value=clientid_input,
    )

df_control_weekly.day = pd.to_datetime(df_control_weekly.day).dt.date

Getting Data for dates between 2022-02-28 00:00:00 until 2022-03-06 00:00:00...
Getting Data for dates between 2022-03-07 00:00:00 until 2022-03-13 00:00:00...
Getting Data for dates between 2022-03-14 00:00:00 until 2022-03-20 00:00:00...
Getting Data for dates between 2022-03-21 00:00:00 until 2022-03-27 00:00:00...
Getting Data for dates between 2022-03-28 00:00:00 until 2022-04-03 00:00:00...


In [None]:
df_control_weekly

Unnamed: 0,day,ctrl_hh,ctrl_visited,ctrl_vr
0,2022-02-28,8389057.0,28908,0.003445917699689
1,2022-03-07,8324378.0,29325,0.003522785726453
2,2022-03-14,8262491.0,29041,0.0035147995925199
3,2022-03-21,8297656.0,28593,0.0034459129180578
4,2022-03-28,8308905.0,28947,0.0034838525654102


#### ➜ Control visited per day

Tables created: 
- df_control_visited_pday
- df_control_visited_pday_agg

**note 11th Oct:** <br/>
The function above creates the visited per day as well, but I created this to follow on the same logic as I did at campaign level and just adding date as new information. The thing now is that these numbers are very different from the visitied in the function above so now sure this if this is right anymore

In [None]:
def query_control_visited_perday(subquery_name: str, yy_value:str, mm_value:str, dd_value:str, dateto_value:str, datefrom_value:str, clientid_value:str): 
   return '''
   with filtered_hh as (
      select 
         key_value as mapped_tv2_hhid
      from {crosswalk_suffix}_modeldata.crosswalk
      where 
         yy='{yy}' 
         and mm='{mm}' 
         and dd='{dd}'
         and key_name ='tv2_hhid' 
         and vendor_name = '{vendor_name}'
         and excluded_stamp is null
      ),
      campaign_universe as (
         select 
            complex_ranges.mapped_tv2_hhid 
         FROM {crosswalk_suffix}_modeldata.crosswalk cw
         CROSS JOIN UNNEST(complex_range) AS t (complex_ranges)
         join filtered_hh fh on fh.mapped_tv2_hhid = complex_ranges.mapped_tv2_hhid
         where 
            vendor_name = '{vendor_name}'
            and complex_ranges.first_seen <= timestamp '{dateto}' + interval '1' day
            and complex_ranges.last_seen >= timestamp '{datefrom}' 
            and key_name = 'tv2_hhid'
            and yy='{yy}'
            and mm='{mm}' 
            and dd='{dd}'
      ),
      client_eventlog as (
         select 
         *
         from {clientid}_{crosswalk_suffix}.eventlog
         where 
            datadatetime between timestamp '{datefrom}' 
            and timestamp '{dateto}' + interval '7' day
      ),
      hh_impressed_30days as (
         select
            distinct crosswalk_link_id as mapped_tv2_hhid
         from campaign_universe ex
         join client_eventlog ev on ev.crosswalk_link_id = ex.mapped_tv2_hhid
         where
            event_class='impression'
            and datadatetime between timestamp '{datefrom}' - interval '30' day    
            and timestamp '{dateto}' + interval '1' day + interval '6' day
            and in_scope
      ),
      hh_control as (
         select
            distinct cu.mapped_tv2_hhid
         from campaign_universe cu
         left join hh_impressed_30days hh on hh.mapped_tv2_hhid  = cu.mapped_tv2_hhid
         where hh.mapped_tv2_hhid is null
      ),
      n_hh_control_visited_per_day as (
         select
            mapped_tv2_hhid, 
            date_trunc('day', datadatetime) as day
         from client_eventlog
         join hh_control on crosswalk_link_id = mapped_tv2_hhid
         where
            event_class ='response' and event= 'all response'
            and datadatetime between timestamp '{datefrom}' 
            and timestamp '{dateto}' + interval '6' day + interval '1' day
            and in_scope in (TRUE, null)
      )
      select
         *
      from {result}

'''.format(
      vendor_name='inscape', 
      yy=yy_value, mm=mm_value,dd=dd_value,
      crosswalk_suffix='produsa',
      dateto=dateto_value, 
      datefrom=datefrom_value,  
      clientid=clientid_value,
      result=subquery_name
      )   

In [None]:
df_control_visited_pday= query_athena(
    request, 
    query_control_visited_perday(
        subquery_name='n_hh_control_visited_per_day',
        yy_value=yy_input, mm_value=mm_input,dd_value=dd_input,
        dateto_value=dateto_input, 
        datefrom_value=datefrom_input,  
        clientid_value=clientid_input,
        ))

df_control_visited_pday.day = pd.to_datetime(df_control_visited_pday.day)
# df_control_visited_pday.dtypes

In [None]:
#total rows extracted from source: 175,337  (all visits, no impressions cause we are in the control)
print(df_control_visited_pday.shape)

# number of unique hhs in control visited group = 64,506
print(df_control_visited_pday.mapped_tv2_hhid.nunique())

(175337, 2)
64506


In [None]:
df_control_visited_pday.head()

In [None]:
# aggregating visits by hhid and date
df_control_visited_pday_agg = pd.DataFrame(
    df_control_visited_pday.groupby(
        ['mapped_tv2_hhid', 'day'], 
        as_index=True
        ).mapped_tv2_hhid.count()
    ).rename(columns={'mapped_tv2_hhid':'num_visits'}).reset_index(drop=False)

In [None]:
# adding visited flag
df_control_visited_pday=df_control_visited_pday_agg.assign(visited = 1)

In [None]:
df_control_visited_pday.head()

In [None]:
print(df_control_visited_pday.num_visits.sum()) #number of visits by control: 175,337 
print(df_control_visited_pday.mapped_tv2_hhid.nunique()) # number of unique hhs in control visited group = 64,506

175337
64506


### ➡️ Exposed data set

#### ➜ All exposed hhs per day and exposed visited per day

Tables created:
- df_exposed_visited_pday
- df_exposed_visited_pday_agg
- df_exposed_pday 

In [None]:
def query_exposed_visited_perday(subquery_name: str, yy_value:str, mm_value:str, dd_value:str, dateto_value:str, datefrom_value:str, clientid_value:str):
   return '''
   with filtered_hh as (
   select 
        key_value as mapped_tv2_hhid
   from {crosswalk_suffix}_modeldata.crosswalk
   where 
        yy='{yy}' 
        and mm='{mm}' 
        and dd='{dd}'
        and key_name ='tv2_hhid' 
        and vendor_name = '{vendor_name}'
        and excluded_stamp is null
   ),
   campaign_universe as (
      select 
         complex_ranges.mapped_tv2_hhid 
      FROM {crosswalk_suffix}_modeldata.crosswalk cw
      CROSS JOIN UNNEST(complex_range) AS t (complex_ranges)
      join filtered_hh fh on fh.mapped_tv2_hhid = complex_ranges.mapped_tv2_hhid
      where 
         vendor_name = '{vendor_name}'
         and complex_ranges.first_seen <= timestamp '{dateto}' + interval '1' day
         and complex_ranges.last_seen >= timestamp '{datefrom}' 
         and key_name = 'tv2_hhid'
         and yy='{yy}'
         and mm='{mm}' 
         and dd='{dd}'
   ),
   client_eventlog as (
      select 
      *
      from {clientid}_{crosswalk_suffix}.eventlog
      where 
         datadatetime between timestamp '{datefrom}' 
         and timestamp '{dateto}' + interval '7' day
   ),
   hh_impressed_in_campaign as (
      select
         distinct crosswalk_link_id as mapped_tv2_hhid
      from campaign_universe ex
      join client_eventlog ev on ev.crosswalk_link_id = ex.mapped_tv2_hhid
      where
         event_class='impression'
         and event = 'linear'
         and datadatetime between timestamp '{datefrom}' 
         and timestamp '{dateto}' + interval '1' day
         and in_scope
   ),
   n_hh_impressed_visited_per_day as (
      select 
         mapped_tv2_hhid,
         date_trunc('day', datadatetime) as day,
         case when (event_class ='response' and event= 'all response') then 1 else 0 end as visits
      from client_eventlog
      join hh_impressed_in_campaign on crosswalk_link_id = mapped_tv2_hhid
      where
         ((event_class ='response' and event= 'all response') or (event_class='impression' and event = 'linear'))
         and datadatetime between timestamp '{datefrom}' 
         and timestamp '{dateto}' + interval '6' day + interval '1' day
         and in_scope in (TRUE, null)
   )
   select
      *
   from {result}
   
'''.format(
   vendor_name='inscape', 
   yy=yy_value, mm=mm_value,dd=dd_value,
   crosswalk_suffix='produsa',
   dateto=dateto_value, 
   datefrom=datefrom_value,  
   clientid=clientid_value,
   result=subquery_name
   )   

In [None]:
df_exposed_visited_pday= query_athena(
    request,
    query_exposed_visited_perday(
        subquery_name='n_hh_impressed_visited_per_day',
        yy_value=yy_input, mm_value=mm_input,dd_value=dd_input,
        dateto_value=dateto_input, 
        datefrom_value=datefrom_input,  
        clientid_value=clientid_input,
    )).astype({"visits": "int"})

df_exposed_visited_pday.day = pd.to_datetime(df_exposed_visited_pday.day)
df_exposed_visited_pday.dtypes

mapped_tv2_hhid            object
day                datetime64[ns]
visits                      int64
dtype: object

In [None]:
#total rows extracted from source: 4,961,585 (all visits and impressions)
print(df_exposed_visited_pday.shape)

# number of total visits exposed group = 40,529 (extra information)
print(df_exposed_visited_pday.visits.sum())

# number of hhs that visited in exposed = 16,069
print(df_exposed_visited_pday.loc[df_exposed_visited_pday['visits']>0,'mapped_tv2_hhid'].nunique())

# number of hhs in exposed = 1,007,512
print(df_exposed_visited_pday.mapped_tv2_hhid.nunique())

(4961585, 3)
40529
16069
1007152


In [None]:
df_exposed_visited_pday.head()

In [None]:
# aggregating table to count num of visits per hhid and day
df_exposed_visited_pday_agg = pd.DataFrame(
    df_exposed_visited_pday.groupby(
        ['mapped_tv2_hhid', 'day'], 
        as_index=True
        ).visits.sum()
    ).rename(columns={'visits':'num_visits'}).reset_index(drop=False)

In [None]:
df_exposed_visited_pday_agg[df_exposed_visited_pday_agg["num_visits"]>1].head()

In [None]:
print(df_exposed_visited_pday_agg.num_visits.sum())
print(df_exposed_visited_pday_agg.mapped_tv2_hhid.nunique())
print(df_exposed_visited_pday_agg.shape)

40529
1007152
(2580817, 3)


Creating visited flag for exposed df - final exposed table:

In [None]:
df_exposed_pday=df_exposed_visited_pday_agg.assign(visited = np.where(df_exposed_visited_pday_agg.num_visits>0, 1, 0))

In [None]:
df_exposed_pday.head()

In [None]:
#total rows of exposed df: 2,580,817 (aggregated visits per hhid per day and hhids that were impressed)
print(df_exposed_pday.shape)

# number of total visits exposed group = 40,529 (extra information)
print(df_exposed_pday.num_visits.sum())

# number of hhs that visited in exposed = 16,069
print(df_exposed_pday.loc[df_exposed_pday['num_visits']>0,'mapped_tv2_hhid'].nunique())

# number of hhs in exposed group= 1,007,512
print(df_exposed_pday.mapped_tv2_hhid.nunique())

(2580817, 4)
40529
16069
1007152


## ✅️ Step2. Creating base data frame with ctrl group and visited flags

### Creating flags and merging the 2 data sets (ctrl and exposed)
- control group flag: This indicates if a hh belongs to the control group or not
- visited flag: Indicates if a hh visited the clients website on the time frame specified. <br> 
From sql query: <br> 
    (event_class ='response' and event= 'all response' <br> 
       and datadatetime between timestamp '{datefrom}' and timestamp '{dateto}' + interval '6' day + interval '1' day <br> 
       and in_scope)

 #### ➜ Per day data sets

In [None]:
df_control_visited_pday=df_control_visited_pday.assign(ctrl_flag = 1)
df_exposed_pday=df_exposed_pday.assign(ctrl_flag = 0)
base_df_pday=pd.concat([df_control_visited_pday, df_exposed_pday], axis=0)

NameError: name 'df_control_visited_pday' is not defined

In [None]:
base_df_pday.head()

In [None]:
base_df_pday.shape

(2686892, 5)

## ✅️ Step3. Calculate metrics and choose variables to group data set 

### Metrics

Metrics include:
- exp_visited	
- exp_hh	
- ctrl_visited	
- ctrl_hh	
- exp_vr=exp_visited/exp_hh	
- ctrl_vr=ctrl_visited/ctrl_hh 
- uplift=(exp_visited/exp_hh - ctrl_visited/ctrl_hh)/ (ctrl_visited/ctrl_hh)

### ➜ Metric and pivot table Functions

In [None]:
def ctrl_hh_column(df: pd.DataFrame):
    return np.where(df.ctrl_flag==1,1,0)

def ctrl_visited_column(df: pd.DataFrame):
    return np.where((df.ctrl_flag==1) & (df.visited==1),1,0)

def exp_hh_column(df: pd.DataFrame):
    return np.where(df.ctrl_flag==0,1,0)

def exp_visited_column(df: pd.DataFrame):
    return np.where((df.ctrl_flag==0) & (df.visited==1),1,0)

def add_vars_for_metrics(df: pd.DataFrame):
    return df.assign(
        ctrl_hh=ctrl_hh_column,
        ctrl_visited=ctrl_visited_column,
        exp_hh=exp_hh_column,
        exp_visited=exp_visited_column,
    )

def add_vars_for_exposed_metrics(df: pd.DataFrame):
    return df.assign(
        exp_hh=exp_hh_column,
        exp_visited=exp_visited_column,
    )

In [None]:
def pivot_table(df: pd.DataFrame, source_column_name: str, margins_value: str):
    table = pd.pivot_table(
        df,
        values=['ctrl_hh', 'ctrl_visited','exp_hh','exp_visited'], 
        index=[source_column_name],
        aggfunc=np.sum,
        margins=margins_value)

    table['ctrl_vr']=table.ctrl_visited/table.ctrl_hh
    table['exp_vr']=table.exp_visited/table.exp_hh
    table['uplift']=(table.exp_visited/table.exp_hh - table.ctrl_visited/table.ctrl_hh)/ (table.ctrl_visited/table.ctrl_hh)
    table['diff_vr']=table.exp_vr - table.ctrl_vr 


    return table

# Way to fix decimal places - need to reset the decimals options for the cell as the global ones are set at the start of the notebook
# decimals = pd.Series([0, 0, 0, 0, 4], index=['ctrl_hh', 'ctrl_visited', 'exp_hh', 'exp_visited', 'ctrl_vr']) 
# table.round(decimals)

In [None]:
def pivot_table_exposed_vars(df: pd.DataFrame, source_column_name: str, margins_value: str):
    table = pd.pivot_table(
        df,
        values=['exp_hh','exp_visited'], 
        index=[source_column_name],
        aggfunc=np.sum,
        margins=margins_value)

    return table

### ➜ High level metrics

In [None]:
overall_metrics_table = pd.DataFrame(
    base_df_pday
    .pipe(add_vars_for_metrics)
    .pipe(pivot_table, source_column_name='day', margins_value=True)
    .loc['All']
    ).T
overall_metrics_table[['exp_hh','exp_visited','exp_vr']]

Unnamed: 0,exp_hh,exp_visited,exp_vr
All,2580817.0,26446.0,0.010247


### ➜ Daily metrics

In [None]:
base_df_pday.head()

In [None]:
df_exposed_pday.head()

In [None]:
metrics_df_pday = pd.DataFrame(
    base_df_pday
    .pipe(add_vars_for_exposed_metrics)
    .pipe(pivot_table_exposed_vars, source_column_name='day', margins_value=False)
    .merge(df_control_daily.set_index('day'), right_index=True, left_index=True)
    .assign(
        exp_vr=lambda df: df.exp_visited/df.exp_hh,
        ctrl_vr=lambda df: df.ctrl_visited/df.ctrl_hh,
        uplift=lambda df: (df.exp_visited/df.exp_hh - df.ctrl_visited/df.ctrl_hh)/ (df.ctrl_visited/df.ctrl_hh),
    ))

metrics_df_pday

Unnamed: 0_level_0,exp_hh,exp_visited,ctrl_hh,ctrl_visited,ctrl_vr,exp_vr,uplift
day,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2022-03-01,63260,672,8637847,17999,0.002084,0.010623,4.097969
2022-03-02,58923,676,8651376,18100,0.002092,0.011473,4.483634
2022-03-03,49547,739,8651396,18248,0.002109,0.014915,6.071279
2022-03-04,55573,779,8629766,18041,0.002091,0.014018,5.705205
2022-03-05,46612,874,8593316,17899,0.002083,0.018751,8.002139
2022-03-06,40496,731,8571266,17811,0.002078,0.018051,7.686842
2022-03-07,29937,455,8527104,17584,0.002062,0.015199,6.370331
2022-03-08,44473,681,8511329,17837,0.002096,0.015313,6.306784
2022-03-09,70183,697,8499405,17760,0.00209,0.009931,3.752766
2022-03-10,78049,676,8481764,17886,0.002109,0.008661,3.107261


In [None]:
# os.getcwd()
# metrics_df_pday.to_csv('daily_drizly_Mar2022.csv',index=True)

## ✅️ Step4. Grouping Sample sizes analysis


In [None]:
#Random sampling

def random_sampling(df, percentage_from_df,random_state_value):
    random_sample = df.sample(frac=percentage_from_df, replace=False, random_state=random_state_value)
    return(random_sample)

In [None]:
# For reproducibility - random_state=1
randomSample = random_sampling(base_df_pday, 0.20, random_state_value=1)
randomSample.head()

In [None]:
# at the moment this function creates the whole pivot table first but 
# there is probably a more efficent way of calculaing the metrics of the overall sample without creating a pivot table 
dfs=[]
for i in range(100):
    sampled_df=pd.DataFrame(
        random_sampling(base_df_pday, 0.20, random_state_value=None)
        .pipe(add_vars_for_metrics)
        .pipe(pivot_table, source_column_name='hh_detail_life', margins_value=True)
        .loc['All']
        ).T
    dfs.append(sampled_df)
final_sampled_df = pd.concat(dfs, ignore_index=True)

In [None]:
from scipy import stats
import math


## What is  the N (sample size) given %


alpha = 0.05                    # Type 1 error: Percent of the time a difference will be detected, when in fact there are none
beta  = 0.2                     # Type 2 error: percent of the time the minimum effect size will NOT be detected, assuming it exists
Qalpha = scipy.stats.norm.ppf(1-alpha/2)      # quantile value
Qbeta  = scipy.stats.norm.ppf(beta)           # quantile value
Pc   = 0.0035                   # Base Visiting Rate for Control
Dmin = 0.005                    # Min value of the diference between vr control and the vr exposed (practical significance, detecting % that makes business sense from client perspective)
Pt   = Pc + Dmin             # Visiting Rate (base + min detected) for Treatment (Null hypothesis for Beta)

sd1 = math.sqrt(2*Pc*(1-Pc))
sd2 = math.sqrt((Pc)*(1-Pc) + Pt*(1-Pt))
N   = math.ceil(((Qalpha*sd1 - Qbeta*sd2)/(Dmin))**2)
N # min value in exposed (includes both visited and not)

print(f"{N} would be the smallest value of exposed sample size that would be significant for a desired detecting effect of at least + {Dmin*100}% in visiting rate in exposed sample")
## ----------------------------------------- ----------------------------------------- ----------------------------------------- -----------------------------------------##

## where N is set, what % is significant


alpha = 0.05                    # Type 1 error: Percent of the time a difference will be detected, when in fact there are none
beta  = 0.2                     # Type 2 error: percent of the time the minimum effect size will NOT be detected, assuming it exists
Qalpha = scipy.stats.norm.ppf(1-alpha/2)      # quantile value
Qbeta  = scipy.stats.norm.ppf(beta)           # quantile value
Pc   = 0.0035                   # Base Visiting Rate for Control
sd1 = math.sqrt(2*Pc*(1-Pc))
sd2 = math.sqrt((Pc)*(1-Pc) + Pt*(1-Pt))
N = 110000 #exposed people

Dmin = (Qalpha*sd1 - Qbeta*sd2)/math.sqrt(N)

print(f"{(Pc + Dmin)*100}% would be the minium Visiting Rate value that would be significant for given value of N = {N} in exposed sample")

2613 would be the smallest value of sample size that would be significant for a desired detecting effect of at least + 0.5% in visiting rate in exposed sample
0.42705572914969225% would be the minium Visiting Rate value that would be significant for given value of N = 110000 in exposed sample


In [None]:
print(Qalpha)
print(Qbeta)

1.959963984540054
-0.8416212335729142


In [None]:
def get_control_group_sampled(
    request, datefrom_value:str, dateto_value:str, yy_value:str, mm_value:str, dd_value:str,clientid_value:str, 
    granularity:str, lookback_window:int, filter_linear=True,):
    """
    Slow version to get us off the ground. Take a date range and then it will run the athena query per granularity
    specified to get the aggregate totals for eligible control households and the number of visits

    Parameters
    ----------
    request : Request
        request object
    datefrom : str
        start date for date range
    dateto : str
        end date for date range
    cw_yy : int
        crosswalk year
    cw_mm : int
        crosswalk month
    cw_dd : int
        crosswalk day
    granularity: what frequency to pass to pandas for date range, W (weekly) or D (daily) -- (hint: weekly on Monday is 'W-MON' ?)
    lookback_window : int
        how far to look back to fund elibigle households, e.g. not exposed in last '30' days
    crosswalk_suffix : str
        which env are you using, default 'prod'
    vendor_name : str
        crosswalk vendor name e.g. inscape
    filter_linear : bool
        should we consider both OTT and Linear impressions when building the control group? default True

    Returns
    -------
    DataFrame
        control households and visits per granularity specified, with visit rate calculated

    Version control
    -------
    Created by: Michael Comerford
    Last Modified by: Chio Martinez 10th Oct 
        - introduced change on concatenated df to use concat instead of append in the final df
        - introduced granularity functionality: 'W' and 'D'
        - changes to function call and parameters 

    """
    # get db from request object
    athena_db = AthenaDatabase.get_client_database(request)

    # create list of dates from specified range
    datelist = pd.date_range(start=datefrom_input, end=dateto_input, freq=granularity).to_list()

    # if filter_linear is true we need to remove the event filter
    overlap = ''
    if not filter_linear:
        overlap = "and event = 'vod'"

    # we'll store the aggregated results in a DataFrame
    dfs=[]


    for date in datelist:

        # if granularity = 'W' then date datefrom=date - 7 days, 
        datefrom_granularity = date
        if granularity == 'W':
            datefrom_granularity = date - timedelta(days=6)

        print(
            '''Getting Data for dates between {datefrom} until {dateto}...'''
            .format(
                dateto=date, 
                datefrom=datefrom_granularity)
            )

        query = """
        -- select universe of unfiltered people for time range
        

        with filtered_hh as (
        select 
            key_value as mapped_tv2_hhid
        from {crosswalk_suffix}_modeldata.crosswalk 
        where 
            yy='{yy}' 
            and mm='{mm}' 
            and dd='{dd}'
            and key_name ='tv2_hhid' 
            and vendor_name = '{vendor_name}'
            and excluded_stamp is null
        ),
        campaign_universe as (
            select 
                complex_ranges.mapped_tv2_hhid 
            FROM {crosswalk_suffix}_modeldata.crosswalk cw 
            CROSS JOIN UNNEST(complex_range) AS t (complex_ranges)
            join filtered_hh fh on fh.mapped_tv2_hhid = complex_ranges.mapped_tv2_hhid
            where 
                vendor_name = '{vendor_name}'
                and complex_ranges.first_seen <= timestamp '{dateto}' + interval '1' day
                and complex_ranges.last_seen >= timestamp '{datefrom}' 
                and key_name = 'tv2_hhid'
                and yy='{yy}'
                and mm='{mm}' 
                and dd='{dd}'
        ),
        client_eventlog as (
            select 
            *
            from {clientid}_{crosswalk_suffix}.eventlog
            where 
                datadatetime between timestamp '{datefrom}' 
                and timestamp '{dateto}' + interval '7' day
        ),
        hh_impressed_30days as (
            select
                distinct crosswalk_link_id as mapped_tv2_hhid
            from campaign_universe ex
            join client_eventlog ev on ev.crosswalk_link_id = ex.mapped_tv2_hhid
            where
                event_class='impression'
                {overlap}
                and datadatetime between timestamp '{datefrom}' - interval '{lookback_window}' day    
                and timestamp '{dateto}' + interval '7' day
                and in_scope
        ),
        hh_control as (
            select
                distinct cu.mapped_tv2_hhid
            from campaign_universe cu
            left join hh_impressed_30days hh on hh.mapped_tv2_hhid  = cu.mapped_tv2_hhid
            where hh.mapped_tv2_hhid is null
        ),
        n_hh_control_visited as (
            select 
                count(distinct mapped_tv2_hhid) as ctrl_visited
            from client_eventlog
            join hh_control on crosswalk_link_id = mapped_tv2_hhid 
            where
                event_class ='response' and event= 'all response'
                and datadatetime between timestamp '{datefrom}' 
                and timestamp '{dateto}' + interval '7' day
                and in_scope in (TRUE, null)
        ),
        n_hh_control as (
            select CAST(count(distinct mapped_tv2_hhid) AS double) as ctrl_hh
            from hh_control
        ),
        final_results as (
            select *
            from n_hh_control_visited
            cross join n_hh_control
        )
        select
            '{datefrom}' as day, 
            ctrl_hh,
            ctrl_visited,
            ctrl_visited/ctrl_hh as ctrl_vr
        from final_results

        """.format(
            yy=yy_value, mm=mm_value, dd=dd_value,
            dateto=date, 
            datefrom=datefrom_granularity,  
            clientid=clientid_value,
            lookback_window=lookback_window, 
            overlap=overlap,
            vendor_name='inscape',
            crosswalk_suffix='produsa',
            )   

        query_results = AthenaDatabase.execute_query(athena_db, query)
        
        dfs.append(pd.DataFrame(query_results))
        results = pd.concat(dfs, ignore_index=True)
        
        request.log.info(query)


    return results

In [None]:
yy_input='2022' 
mm_input='03'
dd_input='28'
dateto_input='2022-04-03' 
datefrom_input='2022-03-01'  
lookback_window_input=30
clientid_input='c9306_drizly'
# clientid_input='c9534_uti'
# clientid_input='c16319_the_realreal'

In [None]:
granularity_input='W'

df_control_weekly= get_control_group(
    request=request_ctrl_fun, 
    yy_value=yy_input, mm_value=mm_input,dd_value=dd_input,
    granularity=granularity_input,
    lookback_window=lookback_window_input,
    dateto_value=dateto_input, 
    datefrom_value=datefrom_input,  
    clientid_value=clientid_input,
    )

df_control_weekly.day = pd.to_datetime(df_control_weekly.day).dt.date

Getting Data for dates between 2022-02-28 00:00:00 until 2022-03-06 00:00:00...
Getting Data for dates between 2022-03-07 00:00:00 until 2022-03-13 00:00:00...
Getting Data for dates between 2022-03-14 00:00:00 until 2022-03-20 00:00:00...
Getting Data for dates between 2022-03-21 00:00:00 until 2022-03-27 00:00:00...
Getting Data for dates between 2022-03-28 00:00:00 until 2022-04-03 00:00:00...


In [None]:
datelist = pd.date_range(start=datefrom_input, end=dateto_input, freq="W").to_list()
datelist

[Timestamp('2022-03-06 00:00:00', freq='W-SUN'),
 Timestamp('2022-03-13 00:00:00', freq='W-SUN'),
 Timestamp('2022-03-20 00:00:00', freq='W-SUN'),
 Timestamp('2022-03-27 00:00:00', freq='W-SUN'),
 Timestamp('2022-04-03 00:00:00', freq='W-SUN')]