In [1]:
import os
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

print('[ '+str(datetime.utcnow())+' ] : Deactivation model 3.0 triggered')
start = datetime.utcnow()

#import numpy as np
#import pandas as pd
#from pyhive import hive

#import xgboost as xgb
#from sklearn import metrics
#from sklearn.model_selection import train_test_split
#from xgboost.sklearn import XGBClassifier
#from sklearn.ensemble import RandomForestClassifier

#import matplotlib.pylab as plt

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
#from pyspark.sql.types import DoubleType, StringType

[ 2019-08-20 17:32:05.842286 ] : Deactivation model 3.0 triggered


In [2]:
spark = SparkSession\
    .builder\
    .appName('deact-model')\
    .master('yarn')\
    .config('spark.submit.deployMode','client')\
    .config('spark.yarn.queue','public')\
    .config('hive.exec.dynamic.partition', 'true')\
    .config('hive.exec.dynamic.partition.mode', 'nonstrict')\
    .config('spark.yarn.dist.archives','hdfs:////user/grp_gdoop_admin/anaconda/anaconda2_env.zip#ANACONDA')\
    .config('spark.sql.warehouse.dir','hdfs://cerebro-namenode-vip.snc1/user/grp_gdoop_clv/grp_gdoop_clv_hiveDB.db')\
    .enableHiveSupport()\
    .getOrCreate()

# The 2 Hive partition settings allow dynamic partitioning - write new partitions into tables using folder names

# SQL Pipeline - NA

##### Constants

In [3]:
hdfs_folder = 'hdfs://cerebro-namenode-vip.snc1/user/grp_gdoop_clv/deact-model/'
hive_db = 'grp_gdoop_clv_db'
hive_tbl_prefix = 'ce_keep_deact_'

FILE_DICT = {
    'pop': hdfs_folder + hive_tbl_prefix + 'pop',
    'response': hdfs_folder + hive_tbl_prefix + 'response'
}

DATE_DICT = {}
DATE_DICT['train'] = datetime.strftime(datetime.utcnow(), '%Y-%m-%d')
DATE_DICT['score'] = datetime.strftime(datetime.strptime(DATE_DICT['train'], '%Y-%m-%d') + timedelta(days=365), '%Y-%m-%d')

##### Utilities

In [4]:
def write_df_to_orc(df, filename, description):
    print('[ '+str(datetime.utcnow())+' ] : Saving results')
    df.write\
        .mode('overwrite')\
        .format('orc')\
        .option('orc.compress','snappy')\
        .save(filename)
    print('[ '+str(datetime.utcnow())+' ] : Saved '+description+' to: '+filename)

In [5]:
def create_hive_external_tbl(spark, file_loc, columns):
    tbl = file_loc.split('/')[-1]
    file_loc = file_loc.replace('-vip.snc1','')
    
    # Drop old table schema
    qry_drop = 'drop table if exists '+hive_db+'.'+tbl
    spark.sql(qry_drop)
    
    # Create new table schema
    qry_create = 'create external table if not exists '+hive_db+'.'+tbl+' ('+\
        ', '.join([col+' string' for col in columns])+') '+\
        'partitioned by (phase string, record_date string)'+\
        'stored as orc '+\
        'location "'+file_loc+'"'
    spark.sql(qry_create)
        
    print('[ '+str(datetime.utcnow())+' ] : View results in Hive table: '+hive_db+'.'+tbl)

In [6]:
def add_hive_partition(spark, filename, phase, record_date):
    tbl = filename.split('/')[-1]
    qry_add_partition = 'alter table '+hive_db+'.'+tbl+' add if not exists partition('+\
        'phase="'+phase+'", record_date="'+record_date+'")'
    spark.sql(qry_add_partition)
    print('[ '+str(datetime.utcnow())+' ] : Added partition (phase="'+phase+'", record_date="'+record_date+'") to '+hive_db+'.'+tbl)

In [7]:
def drop_hive_partition(spark, filename, phase, record_date):
    tbl = filename.split('/')[-1]
    qry_drop_partition = 'alter table '+hive_db+'.'+tbl+' drop if exists partition('+\
        'phase="'+phase+'", record_date="'+record_date+'")'
    spark.sql(qry_drop_partition)
    print('[ '+str(datetime.utcnow())+' ] : Dropped partition (phase="'+phase+'", record_date="'+record_date+'") from '+hive_db+'.'+tbl)

    filename = filename+'/phase='+phase+'/record_date='+record_date
    os.system('hadoop fs -rm -R -skipTrash '+filename)
    print('[ '+str(datetime.utcnow())+' ] : Dropped folder '+filename+'\n')

In [8]:
def save_orc_update_hive(spark, df, filename, phase, record_date, description, refresh_tbl_schema):
    write_df_to_orc(df, filename+'/phase='+phase+'/record_date='+record_date, description)
    
    if refresh_tbl_schema:
        create_hive_external_tbl(spark, filename, df.columns)
        
    date_7d = datetime.strftime(datetime.strptime(DATE_DICT[phase], '%Y-%m-%d') + timedelta(days=-7), '%Y-%m-%d')
    add_hive_partition(spark, filename, phase, record_date)
    drop_hive_partition(spark, filename, phase, date_7d)

##### Create Tables

In [9]:
DATE_DICT

{'score': '2020-08-19', 'train': '2019-08-20'}

In [10]:
def create_pop_tbl(spark, phase, refresh_tbl_schema=False):
    print('[ '+str(datetime.utcnow())+' ] : Loading population '+phase+' data')
    date_12m1d = datetime.strftime(datetime.strptime(DATE_DICT[phase], '%Y-%m-%d') + relativedelta(months=-12, days=-1), '%Y-%m-%d')
    
    df = spark.sql('select * from cia_realtime.user_attrs')\
        .filter(F.col('record_date') == date_12m1d)\
        .filter(F.col('recency_9block').isin(['1-High Rec (0-30 Days)','2-Med Rec (31-120 Days)','3-Low Rec (121-365 Days)']))\
        .filter(F.col('brand') == 'groupon')\
        .select('brand','consumer_id','user_key','recency_segment','frequency_segment','recency_9block','frequency_9block')\
        .groupby('brand','consumer_id','user_key')\
        .agg({'recency_segment':'min','frequency_segment':'min','recency_9block':'min','frequency_9block':'min'})\
        .withColumnRenamed('min(frequency_segment)','frequency_segment')\
        .withColumnRenamed('min(recency_segment)','recency_segment')\
        .withColumnRenamed('min(frequency_9block)','frequency_9block')\
        .withColumnRenamed('min(recency_9block)','recency_9block')
    
    filename = FILE_DICT['pop']
    save_orc_update_hive(spark, df, filename, phase, DATE_DICT[phase], phase+' population from '+date_12m1d, refresh_tbl_schema)

In [11]:
def create_response_tbl(spark, phase, refresh_tbl_schema=False):
    print('[ '+str(datetime.utcnow())+' ] : Loading '+phase+' response data')
    date_2d = datetime.strftime(datetime.strptime(DATE_DICT[phase], '%Y-%m-%d') + timedelta(days=-2), '%Y-%m-%d')
    date_12m1d = datetime.strftime(datetime.strptime(DATE_DICT[phase], '%Y-%m-%d') + relativedelta(months=-12, days=-1), '%Y-%m-%d')
    
    df = spark.sql('select * from grp_gdoop_marketing_analytics_db.me_orders_fgt_usd')\
        .filter(F.col('attribution_type') == '3.1')\
        .filter(F.col('platform_key') == 1)\
        .filter((F.col('transaction_date') <= date_2d) & (F.col('transaction_date') >= date_12m1d))\
        .filter(F.col('country_id').isin([40,235]))\
        .filter(F.col('txn_amount_loc') <> 0)\
        .filter(F.col('user_brand_affiliation') == 'groupon')\
        .select('consumer_id','order_date')\
        .groupby('consumer_id')\
        .agg({'order_date':'min'})\
        .withColumnRenamed('min(order_date)','first_order_date')
    
    filename = FILE_DICT['response']
    save_orc_update_hive(spark, df, filename, phase, DATE_DICT[phase], phase+' responses from '+date_12m1d+' thru '+date_2d, refresh_tbl_schema)

In [12]:
for phase in list(DATE_DICT):
    create_pop_tbl(spark, phase)
    create_response_tbl(spark, phase)

[ 2019-08-20 17:34:32.254752 ] : Loading population train data
[ 2019-08-20 17:34:39.440888 ] : Saving results
[ 2019-08-20 17:38:40.078135 ] : Saved train population from 2018-08-19 to: hdfs://cerebro-namenode-vip.snc1/user/grp_gdoop_clv/deact-model/ce_keep_deact_pop/phase=train/record_date=2019-08-20
[ 2019-08-20 17:38:40.324873 ] : Added partition (phase="train", record_date="2019-08-20") to grp_gdoop_clv_db.ce_keep_deact_pop
[ 2019-08-20 17:38:40.487078 ] : Dropped partition (phase="train", record_date="2019-08-13") from grp_gdoop_clv_db.ce_keep_deact_pop
[ 2019-08-20 17:38:42.616488 ] : Dropped folder hdfs://cerebro-namenode-vip.snc1/user/grp_gdoop_clv/deact-model/ce_keep_deact_pop/phase=train/record_date=2019-08-13

[ 2019-08-20 17:38:43.345827 ] : Saving results
[ 2019-08-20 17:41:06.844544 ] : Saved train responses from 2018-08-19 thru 2019-08-18 to: hdfs://cerebro-namenode-vip.snc1/user/grp_gdoop_clv/deact-model/ce_keep_deact_response/phase=train/record_date=2019-08-20
[ 2019-

In [19]:
df_train = spark.sql('select * from grp_gdoop_clv_db.eb_pip_deact_all_features '+\
                     'where record_date = date_sub(current_date, 2)')
df_train.printSchema()

root
 |-- brand: string (nullable = true)
 |-- consumer_id: string (nullable = true)
 |-- data_set: string (nullable = true)
 |-- recency_segment: string (nullable = true)
 |-- frequency_segment: string (nullable = true)
 |-- recency_9block: string (nullable = true)
 |-- frequency_9block: string (nullable = true)
 |-- deact_date: string (nullable = true)
 |-- next_order_date: string (nullable = true)
 |-- deact_flag: byte (nullable = true)
 |-- tenure_days: integer (nullable = true)
 |-- recency: integer (nullable = true)
 |-- frequency_t24m: integer (nullable = true)
 |-- nob_t24m: float (nullable = true)
 |-- gp_t24m: float (nullable = true)
 |-- frequency_t12m: integer (nullable = true)
 |-- nob_t12m: float (nullable = true)
 |-- gp_t12m: float (nullable = true)
 |-- local_orders_t24m: integer (nullable = true)
 |-- shopping_orders_t24m: integer (nullable = true)
 |-- travel_orders_t24m: integer (nullable = true)
 |-- app_orders_t24m: integer (nullable = true)
 |-- touch_orders_t24m

In [18]:
df_train.show()

+-------+--------------------+----------+--------------------+-----------------+--------------------+--------------------+----------+---------------+----------+-----------+-------+--------------+--------+--------+--------------+--------+-------+-----------------+--------------------+------------------+---------------+-----------------+---------------+--------------+--------------------+----------------------+--------------------+-------------+---------------+-----------------------------+---------------+-------------+--------------+-------------+--------------+--------------+------------------------+------------+------------+--------------+--------+------------+--------+---------+------------+------------------+-------------------+-------------+-------------------+--------------------+------------------+-----------+
|  brand|         consumer_id|  data_set|     recency_segment|frequency_segment|      recency_9block|    frequency_9block|deact_date|next_order_date|deact_flag|tenure_days|

In [20]:
df_train.count()

30580049

In [32]:
def feature_engineering(df):
    return df.filter((df.deact_date.isNotNull()) & (df.tenure_days.isNotNull()))\
        .withColumn('last_purchase_date', F.date_sub(df.deact_date, 365))\
        .withColumn('gap_purchase_last_visit', F.datediff(df.last_visit_date, F.col('last_purchase_date')))\
        .withColumn('days_until_deact', F.datediff(df.deact_date, df.record_date))

a = feature_engineering(df_train)
#0a.count()
a.select('last_purchase_date','last_visit_date','gap_purchase_last_visit','deact_date','record_date','days_until_deact').show()

+------------------+---------------+-----------------------+----------+-----------+----------------+
|last_purchase_date|last_visit_date|gap_purchase_last_visit|deact_date|record_date|days_until_deact|
+------------------+---------------+-----------------------+----------+-----------+----------------+
|        2018-07-06|     2018-07-06|                      0|2019-07-06| 2019-08-14|             -39|
|        2018-06-22|     2018-07-25|                     33|2019-06-22| 2019-08-14|             -53|
|        2018-06-19|     2018-07-17|                     28|2019-06-19| 2019-08-14|             -56|
|        2017-08-24|           null|                   null|2018-08-24| 2019-08-14|            -355|
|        2017-12-05|           null|                   null|2018-12-05| 2019-08-14|            -252|
|        2018-01-14|     2018-08-09|                    207|2019-01-14| 2019-08-14|            -212|
|        2018-06-08|     2018-07-26|                     48|2019-06-08| 2019-08-14|        

In [33]:
a.count()

27728816