In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import json
import logging
import mmh3
import os
import shutil
import string
import textwrap
from datetime import datetime, timedelta
from sqlalchemy import create_engine

import dask
import dask.multiprocessing
import numpy as np
import pandas as pd

# Make sure you `pip install bt-ai[dev]` for these (or you can use the [prod] dependencies if you like)
from bt_ai.stable.data_input.dataframe import MultiDataFrameLoader, DataFrameTarget
from bt_ai.stable.data_input.redshift import UnloadQuery, UnloadTask, HourlyEventDumpQuery, RawEventDumpQuery, RawEventQueryWithSession
from bt_ai.stable.data_input.resources import ResourcesDb, ResourceDump

# Make sure you `pip install bt-notebook-utils` for these
from notebook_utils.logging import setup_logging
from notebook_utils.luigi import run_luigi_tasks
from notebook_utils.s3 import delete_s3_folder
from notebook_utils.sequences import daterange, hourrange, date_compressed_hourrange, pairwise


# Use this to get multi-processing with out-of-core processing
dask.set_options(get=dask.multiprocessing.get)

<dask.context.set_options at 0x104907860>

In [3]:
REDSHIFT_CREDENTIALS = json.loads(os.environ.get('REDSHIFT_CREDENTIALS'))

In [4]:
# This section sets up the logging, so running Luigi jobs produces output in this notebook

# Check `notebook_utils.logging.LOGGER_OVERRIDES` for the default logger overrides, 
# or, optionally, pass in your own additional overrides (which can override LOGGER_OVERRIDES)
logging_overrides = {
    'luigi-interface': logging.INFO,
}
setup_logging(level=logging.DEBUG, overrides=logging_overrides)
# Use any name you want for this logger.  
LOG = logging.getLogger('jupyter')

In [5]:
# change the site_id and start/end dates to be something sane
#site_id = 'ae6897195848feb20f96c5beac08e41b'
today=datetime.today()
_7ago=datetime.today() - timedelta(days=7)

end_date=datetime(today.year,today.month, today.day)
start_date = datetime(_7ago.year,_7ago.month, _7ago.day)

# make sure you change this to your own sandbox bucket on S3
s3_data_bucket = 'vladm-sandbox'
s3_data_path = 's3://' + s3_data_bucket
local_data_path = 'output_data'
luigi_planner_uri = 'http://localhost:8082'

model_store_path = local_data_path

### Get list of all site id in the last 7 days

In [None]:
#Create engine
engine = create_engine('postgresql+psycopg2://'+REDSHIFT_CREDENTIALS['user']+':'+REDSHIFT_CREDENTIALS['password']+'@'+REDSHIFT_CREDENTIALS['host']+":"+str(REDSHIFT_CREDENTIALS['port'])+"/"+REDSHIFT_CREDENTIALS['dbname'])

In [None]:
QRY_SITE_REQUESTS = textwrap.dedent("""\
                        SELECT DISTINCT
                            site_id,
                            count(distinct recset) as count_
                        FROM 
                            recs.requests
                        WHERE
                            date(event_time) < '"""+str(datetime.date(end_date))+"""' AND 
                            date(event_time) >= '"""+str(datetime.date(start_date))+"""'
                        GROUP BY
                            site_id
                        ORDER BY
                            count(distinct recset) DESC
                        """)

In [None]:
TBL_SITE_REQUESTS = pd.read_sql_query(QRY_SITE_REQUESTS,engine)

## Extract data from requests table

In [14]:
def requests(site_id):
    global requests_ddf
    # Extract data to from recs.requests table
    dump_sub_folder = 'requests'

    delete_s3_folder(s3_data_bucket, dump_sub_folder)

    query_template = textwrap.dedent('''\
                    SELECT 
                      date(event_time) AS request_day,
                      date_trunc('hour', event_time) AS request_time,
                      site_id,
                      medium,
                      campaign,
                      email,
                      recset 
                    FROM recs.requests 
                    WHERE event_time < '{end_date}' 
                      AND event_time >= '{start_date}'
                      AND site_id = '{site_id}'
                ''')

    s3_unload_path_template = '''s3://{root}/requests/{site_id}/s{start_date}.e{end_date}'''

    DATETIME_FORMAT = '%Y%m%dT%H%M%S'

    daily_requests = [
        UnloadTask(
            redshift_query=UnloadQuery(
                query=query_template.format(
                    site_id=site_id,
                    start_date=s,
                    end_date=e
                ),
                column_names=['request_day','request_time', 'site_id', 'medium','campaign','email', 'recset'],
                s3_unload_path=s3_unload_path_template.format(
                    root=s3_data_bucket,
                    site_id=site_id,
    #                 start_date=start_date.strftime(DATETIME_FORMAT),
    #                 end_date=end_date.strftime(DATETIME_FORMAT)
                    start_date=s.strftime(DATETIME_FORMAT),
                    end_date=e.strftime(DATETIME_FORMAT)
                ),
                index_columns=['recset','site_id'],
                date_columns=['request_day']
            ),
            redshift_credentials=REDSHIFT_CREDENTIALS
        )
        for s, e in pairwise(daterange(start_date, end_date))
    ]

    run_luigi_tasks(daily_requests, scheduler_uri=luigi_planner_uri, multiprocess=True, num_processes=8)

    LOG.info('Loading dumped data')
    requests_loader = MultiDataFrameLoader.create_multi_dataframe_target(
        [task.output() for task in daily_requests], 
        compute=False
    )

    with requests_loader.open('r') as infile:
        requests_ddf = infile.read()

In [15]:
requests('ae6897195848feb20f96c5beac08e41b')

2018-05-17 15:55:13 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_date_28ev_2327658c35   has status   PENDING
2018-05-17 15:55:13 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_date_28ev_8c111e1522   has status   PENDING
2018-05-17 15:55:14 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_date_28ev_baed2b73e0   has status   PENDING
2018-05-17 15:55:15 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_date_28ev_889a7b13dd   has status   PENDING
2018-05-17 15:55:15 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_date_28ev_de37a3a18a   has status   PENDING
2018-05-17 15:55:16 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_date_28ev_ee8dcc70b4   has status   PENDING
2018-05-17 15:55:17 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_date_28e

INFO    INFO    2018-05-17 15:55:17  luigi-interface: DEBUG   [pid 24997] Worker Worker(salt=232959324, workers=8, host=Vladimirs-MacBook-Pro.local, username=vladmalabanan, pid=24652) running   UnloadTask(job_name=NOT_SET, parent_flow=, redshift_query=SELECT+date%28event_time%29+AS+request_day%2C+date_trunc%28%27hour%27%2C+event_time%29+AS+request_time%2C+site_id%2C+medium%2C+campaign%2C+email%2C+recset+FROM+recs.requests+WHERE+event_time+%3C+%272018-05-16+00%3A00%3A00%27+AND+event_time+%3E%3D+%272018-05-15+00%3A00%3A00%27+AND+site_id+%3D+%27ae6897195848feb20f96c5beac08e41b%27%0A::s3%3A%2F%2Fvladm-sandbox%2Frequests%2Fae6897195848feb20f96c5beac08e41b%2Fs20180515T000000.e20180516T000000::%09::%5B%22request_day%22%2C+%22request_time%22%2C+%22site_id%22%2C+%22medium%22%2C+%22campaign%22%2C+%22email%22%2C+%22recset%22%5D) bt_ai.stable.workflow.task:  bt_ai.stable.data_input.redshift: 
Starting UnloadTaskUNLOAD ( 'SELECT 
  date(event_time) AS request_day,
  date_trunc(\'hour\', event_time)

 luigi-interface: [pid 24995] Worker Worker(salt=232959324, workers=8, host=Vladimirs-MacBook-Pro.local, username=vladmalabanan, pid=24652) done      UnloadTask(job_name=NOT_SET, parent_flow=, redshift_query=SELECT+date%28event_time%29+AS+request_day%2C+date_trunc%28%27hour%27%2C+event_time%29+AS+request_time%2C+site_id%2C+medium%2C+campaign%2C+email%2C+recset+FROM+recs.requests+WHERE+event_time+%3C+%272018-05-14+00%3A00%3A00%27+AND+event_time+%3E%3D+%272018-05-13+00%3A00%3A00%27+AND+site_id+%3D+%27ae6897195848feb20f96c5beac08e41b%27%0A::s3%3A%2F%2Fvladm-sandbox%2Frequests%2Fae6897195848feb20f96c5beac08e41b%2Fs20180513T000000.e20180514T000000::%09::%5B%22request_day%22%2C+%22request_time%22%2C+%22site_id%22%2C+%22medium%22%2C+%22campaign%22%2C+%22email%22%2C+%22recset%22%5D)
2018-05-17 15:56:27 2018-05-17 15:56:27 INFO     luigi-interface: INFO    Informed scheduler that task   UnloadTask_NOT_SET__SELECT_date_28ev_de37a3a18a   has status   DONE
 bt_ai.stable.workflow.task: UnloadTask c

## Extract data from interactions table 

In [17]:
def interactions(site_id):
    global interactions_ddf
    # Extract data to from recs.interactions table
    dump_sub_folder = 'interactions'

    delete_s3_folder(s3_data_bucket, dump_sub_folder)

    query_template = textwrap.dedent('''\
                    SELECT DISTINCT 
                      site_id,
                      recset,
                      event_type,
                      min(date(event_time)) AS action_day,
                      min(date_trunc('hour', event_time)) AS action_time
                    FROM recs.interactions
                    WHERE event_time < '{end_date}' 
                      AND event_time >= '{start_date}' 
                      AND site_id='{site_id}'
                    GROUP BY site_id,
                      recset,
                      event_type
                ''')

    s3_unload_path_template = '''s3://{root}/interactions/{site_id}/s{start_date}.e{end_date}'''

    DATETIME_FORMAT = '%Y%m%dT%H%M%S'

    daily_interactions = [
        UnloadTask(
            redshift_query=UnloadQuery(
                query=query_template.format(
                    site_id=site_id,
                    start_date=s,
                    end_date=e
                ),
                column_names=['site_id', 'recset', 'event_type', 'action_day','action_time'],
                s3_unload_path=s3_unload_path_template.format(
                    root=s3_data_bucket,
                    site_id=site_id,
    #                 start_date=start_date.strftime(DATETIME_FORMAT),
    #                 end_date=end_date.strftime(DATETIME_FORMAT)
                    start_date=s.strftime(DATETIME_FORMAT),
                    end_date=e.strftime(DATETIME_FORMAT)
                ),
                index_columns=['recset', 'site_id'],
                date_columns=['action_day']
            ),
            redshift_credentials=REDSHIFT_CREDENTIALS
        )
        for s, e in pairwise(daterange(start_date, end_date))
    ]

    run_luigi_tasks(daily_interactions, scheduler_uri=luigi_planner_uri, multiprocess=True, num_processes=8)

    LOG.info('Loading dumped data')
    interactions_loader = MultiDataFrameLoader.create_multi_dataframe_target(
        [task.output() for task in daily_interactions], 
        compute=False
    )

    with interactions_loader.open('r') as infile:
        interactions_ddf = infile.read()

In [18]:
interactions('ae6897195848feb20f96c5beac08e41b')

2018-05-17 15:58:02 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_DISTINCT__fe53387963   has status   PENDING
2018-05-17 15:58:03 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_DISTINCT__7a18375f48   has status   PENDING
2018-05-17 15:58:04 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_DISTINCT__d03daf8481   has status   PENDING
2018-05-17 15:58:04 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_DISTINCT__f5faf8bd27   has status   PENDING
2018-05-17 15:58:05 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_DISTINCT__e88f70e96a   has status   PENDING
2018-05-17 15:58:06 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_DISTINCT__75038af0ae   has status   PENDING
2018-05-17 15:58:06 INFO     luigi-interface: Informed scheduler that task   UnloadTask_NOT_SET__SELECT_DISTINCT

Starting UnloadTask

2018-05-17 15:58:06 INFO    2018-05-17 15:58:06  luigi-interface: DEBUG   [pid 25088] Worker Worker(salt=227375716, workers=8, host=Vladimirs-MacBook-Pro.local, username=vladmalabanan, pid=24652) running   UnloadTask(job_name=NOT_SET, parent_flow=, redshift_query=SELECT+DISTINCT+site_id%2C+recset%2C+event_type%2C+min%28date%28event_time%29%29+AS+action_day%2C+min%28date_trunc%28%27hour%27%2C+event_time%29%29+AS+action_time%0AFROM+recs.interactions%0AWHERE+event_time+%3C+%272018-05-16+00%3A00%3A00%27+AND+event_time+%3E%3D+%272018-05-15+00%3A00%3A00%27+AND+site_id%3D%27ae6897195848feb20f96c5beac08e41b%27%0AGROUP+BY+site_id%2C+recset%2C+event_type%0A::s3%3A%2F%2Fvladm-sandbox%2Finteractions%2Fae6897195848feb20f96c5beac08e41b%2Fs20180515T000000.e20180516T000000::%09::%5B%22site_id%22%2C+%22recset%22%2C+%22event_type%22%2C+%22action_day%22%2C+%22action_time%22%5D) bt_ai.stable.data_input.redshift: 
UNLOAD ( 'SELECT DISTINCT 
  site_id,
  recset,
  event_type,
  min(date

2018-05-17 15:59:20 INFO     luigi-interface: [pid 25087] Worker Worker(salt=227375716, workers=8, host=Vladimirs-MacBook-Pro.local, username=vladmalabanan, pid=24652) done      UnloadTask(job_name=NOT_SET, parent_flow=, redshift_query=SELECT+DISTINCT+site_id%2C+recset%2C+event_type%2C+min%28date%28event_time%29%29+AS+action_day%2C+min%28date_trunc%28%27hour%27%2C+event_time%29%29+AS+action_time%0AFROM+recs.interactions%0AWHERE+event_time+%3C+%272018-05-15+00%3A00%3A00%27+AND+event_time+%3E%3D+%272018-05-14+00%3A00%3A00%27+AND+site_id%3D%27ae6897195848feb20f96c5beac08e41b%27%0AGROUP+BY+site_id%2C+recset%2C+event_type%0A::s3%3A%2F%2Fvladm-sandbox%2Finteractions%2Fae6897195848feb20f96c5beac08e41b%2Fs20180514T000000.e20180515T000000::%09::%5B%22site_id%22%2C+%22recset%22%2C+%22event_type%22%2C+%22action_day%22%2C+%22action_time%22%5D)
2018-05-17 15:59:20 INFO     bt_ai.stable.workflow.task: UnloadTask completed successfully
2018-05-17 15:59:20 INFO     luigi-interface: Informed scheduler 

## Merge requests and interactions table 

In [90]:
def merge_req_int(site_id):
    global final_df_site
    
    merge_ddf=dask.dataframe.merge(requests_ddf, interactions_ddf, on=['recset', 'site_id'], how='left')
    merge_ddf=merge_ddf.fillna("No Action")

#     aggregate_ddf=merge_ddf.groupby(['site_id', 'request_day', 'request_time', 'action_day','action_time', 'medium','campaign','email','event_type']).recset.nunique()
    aggregate_ddf=merge_ddf.groupby(['site_id', 'request_day', 'request_time', 'action_day','action_time', 'medium','campaign','event_type']).recset.nunique()
    
    print("     Merging requests and interactions table")
    
#     final_df_site_ = pd.DataFrame(dask.compute(aggregate_ddf)[0]).reset_index()
    final_df_site = pd.DataFrame(dask.compute(aggregate_ddf)[0]).reset_index()
    
#     final_df_site_['domain']=pd.DataFrame(final_df_site_['email'].str.split("@",1).values.tolist())[1]
#     final_df_site=final_df_site_.groupby(['site_id', 'request_day', 'request_time', 'action_day','action_time', 'medium','campaign','domain','event_type']).recset.sum().reset_index()
   
    print("     Processing complete.")


In [91]:
merge_req_int('ae6897195848feb20f96c5beac08e41b')

     Merging requests and interactions table
     Processing complete.


In [89]:
pd.DataFrame(final_df_site).shape

(11098, 10)

In [92]:
final_df_site.shape

(447, 9)

In [72]:
pd.DataFrame(final_df_site_['domain'].values.tolist())[1]

0                gmail.com
1              hotmail.com
2                   me.com
3                gmail.com
4                  shaw.ca
5                gmail.com
6                gmail.com
7                gmail.com
8                mymts.net
9                mymts.net
10               gmail.com
11               mymts.net
12               mymts.net
13       thestorysource.ca
14                 mts.net
15               gmail.com
16               gmail.com
17                 mts.net
18               mymts.net
19             hotmail.com
20                 shaw.ca
21             hotmail.com
22               yahoo.com
23               gmail.com
24               gmail.com
25              veolia.com
26              veolia.com
27            ibexherd.com
28            ibexherd.com
29               gmail.com
               ...        
88353          hotmail.com
88354          hotmail.com
88355          hotmail.com
88356          hotmail.com
88357              shaw.ca
88358               me.com
8

## Run on loop per site 

In [None]:
%%time

TBL_DAILY_REQUESTS=pd.DataFrame()

for index, row in TBL_SITE_REQUESTS.iterrows():

    print("Processing site: " + row.site_id)

    requests(row.site_id)
    interactions(row.site_id)
    merge_req_int(row.site_id)
    
    %%time
    TBL_DAILY_REQUESTS=TBL_DAILY_REQUESTS.append(final_df_site)

## Preprocess for reporting

#### For performance report

In [None]:

TBL_DAILY_REQUESTS.to_csv('TBL_DAILY_REQUESTS.csv')

#### For requests vs actions reports 

In [None]:
TBL_DAILY_REQUESTS_V2 = pd.pivot_table(TBL_DAILY_REQUESTS, values='count', index=['request_day', 'site_id', 'medium', 'action_day'],columns=['event_type'], aggfunc=np.sum)

In [None]:
final_df

### Appendix

In [None]:
TBL_SITE_REQUESTS2=TBL_SITE_REQUESTS[30:32]

In [None]:
TBL_SITE_REQUESTS2