# Analysis User Behavior Data

## Overview


## Initialize AWS SDK and import library

In [None]:
import boto3
import time
import json
import os
from datetime import datetime

# print('AWS_ACCESS_KEY_ID', os.environ['AWS_ACCESS_KEY_ID'])
# print('AWS_SECRET_ACCESS_KEY', os.environ['AWS_SECRET_ACCESS_KEY'])
# print('AWS_SESSION_TOKEN', os.environ['AWS_SESSION_TOKEN'])

s3_resource = boto3.resource('s3')
s3 = s3_resource.meta.client
athena_client = boto3.client('athena')
BUCKET = '<Staging-Bucket>'
DL_BUCKET = '<Data-Lake-Bucket>'
GLUE_DB = '<GLUE_DB_NAME>'
START_DATE = '<yyyy-mm-dd>'
END_DATE = '<yyyy-mm-dd>'
STAT_DATE = '<yyyy-mm-dd>'

## Define helper functions

In [None]:
def execute_athena_query(query):
    #print(query)
    response = athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': GLUE_DB,
            'Catalog': 'AwsDataCatalog'
        },
        WorkGroup='primary'
    )
    query_execution_id = response['QueryExecutionId']

    while True:
        try:
            query_results = athena_client.get_query_results(QueryExecutionId=query_execution_id)
            break
        except Exception as err:
            if 'Query has not yet finished' in err.response['Message']:
                time.sleep(3)
            else:
                raise (err)
    if 'UpdateCount' in query_results:
        print(str(datetime.now()) + ':Updated ' + str(query_results['UpdateCount']) + ' Rows')


def list_keys(
    bucket_name,
    prefix
) -> list:
    """
    Lists keys in a bucket under prefix and not containing delimiter

    :param bucket_name: the name of the bucket
    :type bucket_name: str
    :param prefix: a key prefix
    :type prefix: str
    :return: a list of matched keys
    :rtype: list
    """
    prefix = prefix or ''
    delimiter = ''
    config = {
        'PageSize': None,
        'MaxItems': None,
    }

    paginator = s3.get_paginator('list_objects_v2')
    response = paginator.paginate(
        Bucket=bucket_name, Prefix=prefix, Delimiter=delimiter, PaginationConfig=config
    )

    keys = []
    for page in response:
        if 'Contents' in page:
            for k in page['Contents']:
                keys.append(k['Key'])

    return keys

def chunks(items, chunk_size):
    """
    Yield successive chunks of a given size from a list of items
    """
    if chunk_size <= 0:
        raise ValueError('Chunk size must be a positive integer')
    for i in range(0, len(items), chunk_size):
        yield items[i:i + chunk_size]


def delete_objects(bucket: str, keys):
    """
    Delete keys from the bucket.

    :param bucket: Name of the bucket in which you are going to delete object(s)
    :type bucket: str
    :param keys: The key(s) to delete from S3 bucket.

        When ``keys`` is a string, it's supposed to be the key name of
        the single object to delete.

        When ``keys`` is a list, it's supposed to be the list of the
        keys to delete.
    :type keys: str or list
    """
    if isinstance(keys, str):
        keys = [keys]

    # We can only send a maximum of 1000 keys per request.
    # For details see:
    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.delete_objects
    for chunk in chunks(keys, chunk_size=1000):
        response = s3.delete_objects(Bucket=bucket, Delete={"Objects": [{"Key": k} for k in chunk]})
        deleted_keys = [x['Key'] for x in response.get("Deleted", [])]
        if "Errors" in response:
            errors_keys = [x['Key'] for x in response.get("Errors", [])]
            raise Exception("Errors when deleting: {}".format(errors_keys))


def delete_s3_folder(bucket, prefix):
    keys = list_keys(bucket_name=bucket, prefix=prefix)
    if keys:
        delete_objects(bucket=bucket, keys=keys)

def execute_athena_ctas_query(table_name, sql_query):
    execute_athena_query(f'drop table {table_name}')
    delete_s3_folder(BUCKET, f'rudder_athena/{table_name}/')
    execute_athena_query(
    f'''
CREATE table {table_name}
WITH (
    format='PARQUET',
    parquet_compression='SNAPPY',
    external_location = 's3://{BUCKET}/rudder_athena/{table_name}/'
)
AS
{sql_query}
    ''')

## Update Page Table

In [None]:
execute_athena_query(f'drop table {GLUE_DB}.rudderstack_rawevents_part')

execute_athena_query(f'''
CREATE EXTERNAL TABLE {GLUE_DB}.`rudderstack_rawevents_part`(
  `anonymousid` string COMMENT 'from deserializer', 
  `channel` string COMMENT 'from deserializer', 
  `context` map<string,string> COMMENT 'from deserializer', 
  `integrations` struct<all:boolean> COMMENT 'from deserializer', 
  `messageid` string COMMENT 'from deserializer', 
  `originaltimestamp` string COMMENT 'from deserializer', 
  `properties` map<string,string> COMMENT 'from deserializer', 
  `receivedat` string COMMENT 'from deserializer', 
  `request_ip` string COMMENT 'from deserializer', 
  `rudderid` string COMMENT 'from deserializer', 
  `sentat` string COMMENT 'from deserializer', 
  `timestamp` string COMMENT 'from deserializer', 
  `type` string COMMENT 'from deserializer', 
  `userid` string COMMENT 'from deserializer',
  `event` string)
PARTITIONED BY ( 
  `received_at_utc_date` string)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'paths'='anonymousId,channel,context,integrations,messageId,originalTimestamp,properties,receivedAt,request_ip,rudderId,sentAt,timestamp,type,userId,event') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://{BUCKET}/rudderstack_firehose'
''')

execute_athena_query(f'MSCK REPAIR TABLE {GLUE_DB}.`rudderstack_rawevents_part`')

In [None]:
execute_athena_query(f'''
create external table {GLUE_DB}.views
(
id string,
user_id string,
anonymous_id string,
`timestamp` string,
context_ip string,
session_id string,
is_session_start string,
image string,
feature string, 
productid string, 
price real, 
name string, 
category string
)
partitioned by (
    received_at_utc_date string
)
stored as parquet
LOCATION
  's3://{DL_BUCKET}/datalake/uba/views'
''')

In [None]:
execute_athena_query(f'''

create external table {GLUE_DB}.purchases
(
id string,
user_id string,
anonymous_id string,
`timestamp` string,
context_ip string,
session_id string,
is_session_start string,
orderid string, 
cartid string, 
ordertotal double
)
partitioned by (
    received_at_utc_date string
)
stored as parquet
LOCATION
  's3://{DL_BUCKET}/datalake/uba/purchases'
''')

In [None]:
execute_athena_query(
f'''
create external table {GLUE_DB}.startcheckout
(
id string,
user_id string,
anonymous_id string,
`timestamp` string,
context_ip string,
session_id string,
is_session_start string,
cartquantity integer, 
cartid string, 
carttotal double
)
partitioned by (
    received_at_utc_date string
)
stored as parquet
LOCATION
  's3://{DL_BUCKET}/datalake/uba/startcheckout'
'''
)

In [None]:
execute_athena_query(
f'''
create external table {GLUE_DB}.purchases
(
id string,
user_id string,
anonymous_id string,
`timestamp` string,
context_ip string,
session_id string,
is_session_start string,
orderid string, 
cartid string, 
ordertotal double
)
partitioned by (
    received_at_utc_date string
)
stored as parquet
LOCATION
  's3://{DL_BUCKET}/datalake/uba/purchases'
''')

In [None]:
execute_athena_query(
f'''
insert into {GLUE_DB}.pages
select 
  messageid id
 ,userid user_id
 ,anonymousid
 ,"timestamp"
 ,context['useragent'] context_user_agent
 ,request_ip context_ip
 ,properties['url'] context_page_url
 ,properties['referrer'] context_page_referrer
 ,received_at_utc_date
from uba.rudderstack_rawevents_part
where type = 'page'
  and received_at_utc_date < '{END_DATE}'
  and received_at_utc_date >= '{START_DATE}'
''')

In [None]:
execute_athena_query(
f'''
insert into {GLUE_DB}.identifies
select 
  messageid id
 ,userid user_id
 ,anonymousid
 ,"timestamp"
 ,request_ip context_ip
 ,received_at_utc_date
from uba.rudderstack_rawevents_part
where type = 'identify'
  and received_at_utc_date < '{END_DATE}'
  and received_at_utc_date >= '{START_DATE}'
''')

In [None]:
execute_athena_query(
f'''
insert into {GLUE_DB}.views
select 
  messageid id
 ,userid user_id
 ,anonymousid
 ,"timestamp"
 ,request_ip context_ip
 ,context['sessionid'] session_id
 ,case when context['sessionstart'] = 'true' then 'Y' else 'N' end is_session_start
 ,properties['image'] image
 ,properties['feature'] feature
 ,properties['productid'] productid
 ,cast(properties['price'] as real) price
 ,properties['name'] name
 ,properties['category'] category
 ,received_at_utc_date
from uba.rudderstack_rawevents_part
where type = 'track'
  and event = 'View'
  and received_at_utc_date < '{END_DATE}'
  and received_at_utc_date >= '{START_DATE}'
''')

In [None]:
execute_athena_query(
f'''
insert into {GLUE_DB}.startcheckout
select 
  messageid id
 ,userid user_id
 ,anonymousid
 ,"timestamp"
 ,request_ip context_ip
 ,context['sessionid'] session_id
 ,case when context['sessionstart'] = 'true' then 'Y' else 'N' end is_session_start
 ,cast(properties['cartquantity'] as integer) cartquantity
 ,properties['cartid'] cartid
 ,cast(properties['carttotal'] as double) carttotal
 ,received_at_utc_date
from uba.rudderstack_rawevents_part
where type = 'track'
  and event = 'StartCheckout'
  and received_at_utc_date < '{END_DATE}'
  and received_at_utc_date >= '{START_DATE}'
''')

In [None]:
execute_athena_query(
f'''
insert into {GLUE_DB}.purchases
select 
  messageid id
 ,userid user_id
 ,anonymousid
 ,"timestamp"
 ,request_ip context_ip
 ,context['sessionid'] session_id
 ,case when context['sessionstart'] = 'true' then 'Y' else 'N' end is_session_start
 ,properties['orderid'] orderid
 ,properties['cartid'] cartid
 ,cast(properties['ordertotal'] as double) ordertotal
 ,received_at_utc_date
from uba.rudderstack_rawevents_part
where type = 'track'
  and event = 'Purchase'
  and received_at_utc_date < '{END_DATE}'
  and received_at_utc_date >= '{START_DATE}'
''')

## Create Summary Tables

In [None]:
# 创建dwd表
execute_athena_ctas_query('dws_biz_daily', '''
select A.received_at_utc_date stat_date
,cnt_product_views
,cnt_user_views
,COALESCE(cnt_startcheckout, 0) cnt_startcheckout
,COALESCE(sum_startcheckout_carttotal, 0) sum_startcheckout_carttotal
,COALESCE(cnt_purchase, 0) cnt_purchase
,COALESCE(sum_purchase_ordertotal, 0) sum_purchase_ordertotal
from (
select received_at_utc_date, 
       count(distinct view_id) cnt_product_views,
       count(distinct unique_user_id) cnt_user_views
from (
select case when user_id != '' then user_id else anonymous_id end unique_user_id, id as view_id, received_at_utc_date from views
) group by received_at_utc_date
) A
left join (
select received_at_utc_date, 
       count(startcheckout_id) cnt_startcheckout,
       sum(carttotal) sum_startcheckout_carttotal
from (
select distinct user_id, id as startcheckout_id, carttotal, received_at_utc_date from startcheckout
) group by received_at_utc_date
) B on (A.received_at_utc_date = B.received_at_utc_date)
left join (
select received_at_utc_date, 
       count(purchase_id) cnt_purchase,
       sum(ordertotal) sum_purchase_ordertotal
from (
select distinct user_id, id as purchase_id, ordertotal, received_at_utc_date from purchases
) group by received_at_utc_date
) C on (A.received_at_utc_date = C.received_at_utc_date)
'''
)

In [None]:
execute_athena_ctas_query('dwd_user_acti', f'''
select 
  A.stat_date
  ,case when A.user_id != '' then A.user_id else F.user_id end user_id,
  A.anonymous_id,
  A.name,
  A.category,
  A.session_id,
  B.cartquantity,
  B.carttotal,
  B.cartid,
  C.orderid,
  C.ordertotal
from (
select distinct received_at_utc_date as stat_date, user_id, anonymous_id, session_id, name, category 
from views
where received_at_utc_date >= '{START_DATE}' and session_id is not null
) A
left join (
  select distinct user_id, anonymous_id from identifies where received_at_utc_date >= '{START_DATE}'
) F on (A.anonymous_id = F.anonymous_id)
left join (
  select user_id, cartquantity, carttotal, session_id, cartid from startcheckout where received_at_utc_date >= '{START_DATE}'
) B on (F.user_id = B.user_id and A.session_id = B.session_id)
left join (
  select user_id, orderid, ordertotal, cartid from purchases where received_at_utc_date >= '{START_DATE}'
) C on (B.cartid = C.cartid)
order by stat_date, session_id
''')

In [None]:
execute_athena_ctas_query('dim_user_info', f'''
select 
  user_id, 
  min(received_at_utc_date) reg_date,
  max(received_at_utc_date) signup_date
from identifies
group by user_id
''')

In [None]:
execute_athena_ctas_query('dws_user_retention', f'''
select 
reg_month,
count(*) reg_count,
sum(active_w1) ret_count_w1,
sum(active_w2) ret_count_w2,
sum(active_w3) ret_count_w3,
sum(active_w4) ret_count_w4,
sum(active_w4more) ret_count_w4m,
sum(active_w1) * 100.0 / count(*) w1_ret_rate,
sum(active_w2) * 100.0 / count(*) w2_ret_rate,
sum(active_w3) * 100.0 / count(*) w3_ret_rate,
sum(active_w4) * 100.0 / count(*) w4_ret_rate,
sum(active_w4more) * 100.0 / count(*) w4m_ret_rate
from (
    select
      reg_month,
      user_id,
      max(case when days_since_reg between 1 and 7 then 1 else 0 end) active_w1,
      max(case when days_since_reg between 8 and 14 then 1 else 0 end) active_w2,
      max(case when days_since_reg between 15 and 21 then 1 else 0 end) active_w3,
      max(case when days_since_reg between 22 and 28 then 1 else 0 end) active_w4,
      max(case when days_since_reg > 28 then 1 else 0 end) active_w4more
    from (
        select
          substr(reg_date, 1, 7) reg_month,
          user_id, 
          reg_date,
          visit_date, 
          DATE_DIFF('day', date(reg_date), date(visit_date)) AS days_since_reg
        from (
            select A.user_id, stat_date as visit_date, reg_date
            from dwd_user_acti A
            join dim_user_info B on (A.user_id = B.user_id)
            --where stat_date > reg_date
            group by A.user_id, stat_date, reg_date
        )
    )
    group by reg_month, user_id
)
group by reg_month
''')