### Start Spark Session

In [1]:
from pyspark import SparkContext, SparkConf

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,,pyspark3,idle,,,✔


SparkSession available as 'spark'.


In [2]:
%%configure -f
{"executorCores":4,"numExecutors":13}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,,pyspark3,idle,,,✔


SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,,pyspark3,idle,,,✔


### Pre-Process Data

In [3]:
basePath = 's3://srdata-lab/Matt/DeepARInputs/deepar_input/hivejob0'
paths    =['s3://srdata-lab/Matt/DeepARInputs/deepar_input/hivejob0/*tuning_evnt_start_dt=*']
df=spark.read.option("basePath",basePath).parquet(*paths)

In [4]:
from pyspark.sql.functions import col
# Subset markets to periods of "good" data
df=df.where((col("mkt_cd").isin(["501",  "635"]))
           |((col("mkt_cd").isin(["539"])) & (df.tuning_evnt_start_dt >= "2017-04-01"))
           | ((col("mkt_cd").isin(["609"])) & (df.tuning_evnt_start_dt >= "2018-01-01")))

In [None]:
#df.groupby('mkt_cd').agg(f.min('tuning_evnt_start_dt')).show()

In [5]:
import pyspark.sql.functions as f
df.groupby('mkt_cd').agg(f.countDistinct('syscode'),f.countDistinct('stn_id'),f.min('tuning_evnt_start_dt')).show()

+------+-----------------------+----------------------+-------------------------+
|mkt_cd|count(DISTINCT syscode)|count(DISTINCT stn_id)|min(tuning_evnt_start_dt)|
+------+-----------------------+----------------------+-------------------------+
|   635|                      6|                   116|               2016-09-01|
|   539|                     11|                   121|               2017-04-01|
|   609|                     13|                   106|               2018-01-01|
|   501|                     12|                   123|               2016-09-01|
+------+-----------------------+----------------------+-------------------------+

In [None]:


# # Add MarchMad and CFBNT indicators
# df = df.withColumn("MarchMadInd",
#                    f.when((df["tuning_evnt_start_dt"] >= '2018-03-15' ) & (df["tuning_evnt_start_dt"] <= '2018-04-10'), 1)
#                     .otherwise(0))

# df.groupby('MarchMadInd').agg(f.countDistinct('tuning_evnt_start_dt')).show()

In [None]:
# df = df.withColumn("CFBNT_Ind",
#                    f.when((df["tuning_evnt_start_dt"] >= '2018-01-06' ) & (df["tuning_evnt_start_dt"] <= '2018-01-10'), 1)
#                     .otherwise(0))

# df.groupby('CFBNT_Ind').agg(f.countDistinct('tuning_evnt_start_dt')).show()

In [6]:
from pyspark.sql import functions as f
import pyspark.sql.functions as f
# Sysocode Subscriber Buckets
# Sagemaker can't train on cat fields with missing attributes (since 6th and 7th buckets were missing, training threw an error)
# Limit buckets to 6 total
df = df.withColumn("Syscode_Subsc_Buckt",
                   f.when((df["sub_cnt"] <= 5000 ) & (df["sub_cnt"] > 0), 0)
                    .when((df["sub_cnt"] <= 10000 ) & (df["sub_cnt"] > 5000), 1)
                    .when((df["sub_cnt"] <= 20000 ) & (df["sub_cnt"] > 10000), 2)
                    .when((df["sub_cnt"] <= 30000 ) & (df["sub_cnt"] > 20000), 3)
                    .when((df["sub_cnt"] <= 40000 ) & (df["sub_cnt"] > 30000), 4)
                    .when((df["sub_cnt"] <= 50000 ) & (df["sub_cnt"] > 40000), 5)
                    .when((df["sub_cnt"] > 50000 ), 6)
                    .otherwise(0)) #if NULLs exist, place them in 0th bucket

df.groupby('Syscode_Subsc_Buckt').agg(f.countDistinct('syscode')).sort('Syscode_Subsc_Buckt').show()

+-------------------+-----------------------+
|Syscode_Subsc_Buckt|count(DISTINCT syscode)|
+-------------------+-----------------------+
|                  0|                     42|
|                  1|                      3|
|                  2|                      6|
|                  3|                      5|
|                  4|                      6|
|                  5|                      3|
|                  6|                     18|
+-------------------+-----------------------+

In [7]:
df = df.withColumn('mindate', df['mindate'].cast('string'))

In [8]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

df = df.withColumn('Syscode_Stn', F.concat(F.col('syscode'), F.lit(" - "), F.col('stn_id')))
df.withColumn("Syscode_Stn", df["Syscode_Stn"].cast(DoubleType()).alias("Syscode_Stn")) 

DataFrame[daypart_hour: int, syscode: string, stn_id: string, Syscode_Stn: double, mindate: string, stn_qlty_cd: string, ntwrk_genre_cd: string, ntwrk_genre_recd: string, stn_norm_nm: string, mkt_cd: string, total_events: double, spp_impressions: double, sub_cnt: int, tuning_evnt_start_dt: date, Syscode_Subsc_Buckt: int]

In [9]:
keep_columns=["Syscode_Stn", "tuning_evnt_start_dt", "daypart_hour", "mindate","stn_qlty_cd","ntwrk_genre_recd",
              "Syscode_Subsc_Buckt","spp_impressions"] #"MarchMadInd","CFBNT_Ind"

df=df.select([column for column in df.columns if column in keep_columns])

In [10]:
# Aggregate at Daily level
# Since 0-impression records are getting assigned "Syscode_Subsc_Buckt"=0, take the max("Syscode_Subsc_Buckt")
from pyspark.sql.functions import *

df = df.groupBy("Syscode_Stn", "mindate","stn_qlty_cd","ntwrk_genre_recd","tuning_evnt_start_dt").agg(f.sum("spp_impressions").alias('spp_imp_daily'), f.max("Syscode_Subsc_Buckt").alias("Syscode_Subsc_Buckt"))

df.show(5)

+------------+-------------------+-----------+----------------+--------------------+------------------+-------------------+
| Syscode_Stn|            mindate|stn_qlty_cd|ntwrk_genre_recd|tuning_evnt_start_dt|     spp_imp_daily|Syscode_Subsc_Buckt|
+------------+-------------------+-----------+----------------+--------------------+------------------+-------------------+
|3466 - 14899|2017-03-07 00:00:00|         SD|          SPORTS|          2018-03-12|5.1591689999999994|                  6|
|1299 - 64549|2016-09-01 00:00:00|         HD|            NEWS|          2018-03-12|         91.556397|                  1|
|6487 - 10222|2016-09-02 00:00:00|         SD|         SPANISH|          2018-03-12|            0.0175|                  2|
|3370 - 61854|2017-10-28 00:00:00|         HD|          SPORTS|          2018-03-12|448.64582099999996|                  4|
|3370 - 59432|2017-10-28 00:00:00|         HD|            KIDS|          2018-03-12|3914.1021940000005|                  4|
+-------

In [11]:
df.orderBy(["Syscode_Stn","tuning_evnt_start_dt"]).show()

+------------+-------------------+-----------+----------------+--------------------+------------------+-------------------+
| Syscode_Stn|            mindate|stn_qlty_cd|ntwrk_genre_recd|tuning_evnt_start_dt|     spp_imp_daily|Syscode_Subsc_Buckt|
+------------+-------------------+-----------+----------------+--------------------+------------------+-------------------+
|0318 - 10021|2017-10-28 00:00:00|         SD| Movies & Series|          2018-01-01| 89.31305300000002|                  3|
|0318 - 10021|2017-10-28 00:00:00|         SD| Movies & Series|          2018-01-02|123.86610299999998|                  3|
|0318 - 10021|2017-10-28 00:00:00|         SD| Movies & Series|          2018-01-03|        121.907775|                  3|
|0318 - 10021|2017-10-28 00:00:00|         SD| Movies & Series|          2018-01-04|164.37999100000002|                  3|
|0318 - 10021|2017-10-28 00:00:00|         SD| Movies & Series|          2018-01-05|210.98889800000003|                  3|
|0318 - 

In [12]:
# Encode cat field as 0-based sequence of positive integers
from pyspark.sql import functions as f
from pyspark.sql.functions import isnan
df = df.withColumn("stn_qlty_index",
                   f.when(df["stn_qlty_cd"] == 'HD',0)
                    .otherwise(1))

df = df.withColumn("ntwrk_genre_index",
                   f.when(df["ntwrk_genre_recd"] == 'SPORTS',0)
                    .when(df["ntwrk_genre_recd"] == 'NEWS',1)
                    .when(df["ntwrk_genre_recd"] == 'SPANISH',2)
                    .when(df["ntwrk_genre_recd"] == 'Major Sports & Misc',3)
                    .when(df["ntwrk_genre_recd"].isNull(), 4)
                    .when(df["ntwrk_genre_recd"] == 'Movies & Series',5)
                    .when(df["ntwrk_genre_recd"] == 'Music',6)
                    .when(df["ntwrk_genre_recd"] == 'KIDS',7))

df.schema

StructType(List(StructField(Syscode_Stn,StringType,true),StructField(mindate,StringType,true),StructField(stn_qlty_cd,StringType,true),StructField(ntwrk_genre_recd,StringType,true),StructField(tuning_evnt_start_dt,DateType,true),StructField(spp_imp_daily,DoubleType,true),StructField(Syscode_Subsc_Buckt,IntegerType,true),StructField(stn_qlty_index,IntegerType,false),StructField(ntwrk_genre_index,IntegerType,true)))

In [13]:
df.groupby('ntwrk_genre_index').agg(f.countDistinct('Syscode_Stn')).sort('ntwrk_genre_index').show()

+-----------------+---------------------------+
|ntwrk_genre_index|count(DISTINCT Syscode_Stn)|
+-----------------+---------------------------+
|                0|                        693|
|                1|                        563|
|                2|                        176|
|                3|                        187|
|                4|                       1984|
|                5|                        559|
|                6|                        162|
|                7|                        280|
+-----------------+---------------------------+

### Prepare training set and test set

In [14]:
# Separate Full minus (total-2) months for training; testing data should be full data
train_data = df.filter(df.tuning_evnt_start_dt <= "2018-04-30")
test_data = df

In [15]:
train_data.select(f.date_format('tuning_evnt_start_dt','yyyy-MM').alias('month')).groupby('month').count().sort('month').show()

+-------+------+
|  month| count|
+-------+------+
|2016-09| 58806|
|2016-10| 61380|
|2016-11| 59400|
|2016-12| 61380|
|2017-01| 61380|
|2017-02| 55440|
|2017-03| 61380|
|2017-04| 95850|
|2017-05| 99220|
|2017-06| 96426|
|2017-07|100334|
|2017-08|102605|
|2017-09| 99369|
|2017-10|102703|
|2017-11| 99495|
|2017-12|102827|
|2018-01|144460|
|2018-02|130480|
|2018-03|144461|
|2018-04|139870|
+-------+------+

In [17]:
test_data.select(f.date_format('tuning_evnt_start_dt','yyyy-MM').alias('month')).groupby('month').count().sort('month').show(22)

+-------+------+
|  month| count|
+-------+------+
|2016-09| 58806|
|2016-10| 61380|
|2016-11| 59400|
|2016-12| 61380|
|2017-01| 61380|
|2017-02| 55440|
|2017-03| 61380|
|2017-04| 95850|
|2017-05| 99220|
|2017-06| 96426|
|2017-07|100334|
|2017-08|102605|
|2017-09| 99369|
|2017-10|102703|
|2017-11| 99495|
|2017-12|102827|
|2018-01|144460|
|2018-02|130480|
|2018-03|144461|
|2018-04|139870|
|2018-05|144600|
|2018-06|140058|
+-------+------+

In [18]:
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *

model_input_train = (train_data
  .groupby("Syscode_Stn")
  .agg(F.min("MinDate"),
    F.collect_list("spp_imp_daily"),
    F.min("stn_qlty_index"),
    F.min("ntwrk_genre_index"),
    F.min("Syscode_Subsc_Buckt")))
    #F.collect_list("MarchMadInd"),
    #F.collect_list("CFBNT_Ind")))

model_train_w_combo = model_input_train.select(col("Syscode_Stn"),
                                 col("min(MinDate)").alias("start"),
                                 col("collect_list(spp_imp_daily)").alias("target"), 
                                 col("min(stn_qlty_index)").alias("cat1"),
                                 col("min(ntwrk_genre_index)").alias("cat2"),
                                 col("min(Syscode_Subsc_Buckt)").alias("cat3"))
                                 #col("collect_list(MarchMadInd)").alias("dynamic_feat"),
                                 #col("collect_list(CFBNT_Ind)").alias("dynamic_feat2"))

model_train_actual_input = model_input_train.select(col("min(MinDate)").alias("start"),
                                 col("collect_list(spp_imp_daily)").alias("target"), 
                                 col("min(stn_qlty_index)").alias("cat1"),
                                 col("min(ntwrk_genre_index)").alias("cat2"),
                                 col("min(Syscode_Subsc_Buckt)").alias("cat3"))
                                 #col("collect_list(MarchMadInd)").alias("dynamic_feat"),
                                 #col("collect_list(CFBNT_Ind)").alias("dynamic_feat2"))


model_input_test = (test_data
  .groupby("Syscode_Stn")
  .agg(F.min("MinDate"),
    F.collect_list("spp_imp_daily"),
    F.min("stn_qlty_index"),
    F.min("ntwrk_genre_index"),
    F.min("Syscode_Subsc_Buckt")))
    #F.collect_list("MarchMadInd"),
    #F.collect_list("CFBNT_Ind"))

model_test_w_combo = model_input_test.select(col("Syscode_Stn"),
                                 col("min(MinDate)").alias("start"),
                                 col("collect_list(spp_imp_daily)").alias("target"), 
                                 col("min(stn_qlty_index)").alias("cat1"),
                                 col("min(ntwrk_genre_index)").alias("cat2"),
                                 col("min(Syscode_Subsc_Buckt)").alias("cat3"))
                                 #col("collect_list(MarchMadInd)").alias("dynamic_feat"),
                                 #col("collect_list(CFBNT_Ind)").alias("dynamic_feat2"))

model_test_actual_input = model_input_test.select(col("min(MinDate)").alias("start"),
                                 col("collect_list(spp_imp_daily)").alias("target"), 
                                 col("min(stn_qlty_index)").alias("cat1"),
                                 col("min(ntwrk_genre_index)").alias("cat2"),
                                 col("min(Syscode_Subsc_Buckt)").alias("cat3"))
                                 #col("collect_list(MarchMadInd)").alias("dynamic_feat"),
                                 #col("collect_list(CFBNT_Ind)").alias("dynamic_feat2"))


model_train_actual_input.show()

+-------------------+--------------------+----+----+----+
|              start|              target|cat1|cat2|cat3|
+-------------------+--------------------+----+----+----+
|2017-10-28 00:00:00|[827.80993, 965.9...|   0|   4|   3|
|2016-09-01 00:00:00|[1.655002, 7.3641...|   1|   4|   0|
|2016-09-01 00:00:00|[1.813888, 10.992...|   1|   4|   0|
|2016-09-01 00:00:00|[1827.48557300000...|   0|   1|   2|
|2017-03-15 00:00:00|[5.965, 13.972223...|   1|   7|   6|
|2017-03-15 00:00:00|[1201.162801, 676...|   0|   4|   6|
|2017-03-07 00:00:00|[1964.33801099999...|   0|   4|   6|
|2017-03-07 00:00:00|[21.3363869999999...|   1|   4|   6|
|2017-03-07 00:00:00|[1574.029682, 122...|   0|   4|   6|
|2017-10-28 00:00:00|[251.058605000000...|   1|   1|   5|
|2017-10-28 00:00:00|[338.069443000000...|   1|   4|   4|
|2017-10-28 00:00:00|[2190.96271900000...|   0|   4|   4|
|2017-10-28 00:00:00|[1308.10083399999...|   0|   5|   3|
|2016-09-01 00:00:00|[8.71139000000000...|   1|   1|   0|
|2016-09-01 00

In [19]:
train_final_input = model_train_actual_input.select("start","target", f.array(["cat1","cat2","cat3"]).alias("cat"))
                                                    #f.array(["dynamic_feat","dynamic_feat2"]).alias("dynamic_feat"))
test_final_input = model_test_actual_input.select("start","target", f.array(["cat1","cat2","cat3"]).alias("cat"))
                                                    #f.array(["dynamic_feat","dynamic_feat2"]).alias("dynamic_feat"))

model_train_w_combo = model_train_w_combo.select("Syscode_Stn","start","target", f.array(["cat1","cat2","cat3"]).alias("cat"))
                                                    #f.array(["dynamic_feat","dynamic_feat2"]).alias("dynamic_feat"))
model_test_w_combo = model_test_w_combo.select("Syscode_Stn","start","target", f.array(["cat1","cat2","cat3"]).alias("cat"))
                                                    #f.array(["dynamic_feat","dynamic_feat2"]).alias("dynamic_feat"))

train_final_input.show()

+-------------------+--------------------+---------+
|              start|              target|      cat|
+-------------------+--------------------+---------+
|2017-10-28 00:00:00|[54.84166, 58.684...|[1, 6, 3]|
|2016-09-01 00:00:00|[9.77249999999999...|[1, 4, 0]|
|2016-09-01 00:00:00|[2709.37080799999...|[0, 1, 2]|
|2017-10-28 00:00:00|[100.683337000000...|[1, 4, 2]|
|2017-03-15 00:00:00|[0.620834, 1.5358...|[1, 7, 6]|
|2017-03-15 00:00:00|[1230.692473, 884...|[0, 4, 6]|
|2017-03-07 00:00:00|[1625.81554999999...|[0, 1, 6]|
|2017-03-07 00:00:00|[2631.80556199999...|[0, 4, 6]|
|2017-03-07 00:00:00|[64.481668, 9.488...|[1, 4, 6]|
|2017-03-07 00:00:00|[2159.36526299999...|[0, 4, 6]|
|2017-10-28 00:00:00|[217.632774, 234....|[1, 1, 5]|
|2017-10-28 00:00:00|[1987.56245299999...|[0, 4, 4]|
|2017-10-28 00:00:00|[1371.844955, 135...|[0, 5, 3]|
|2016-09-01 00:00:00|[3.290279, 2.2597...|[1, 1, 0]|
|2016-09-01 00:00:00|[0.36194400000000...|[1, 4, 5]|
|2016-09-01 00:00:00|[1.053889, 7.4197...|[1, 

In [20]:
print(model_test_w_combo.count())

4581

In [21]:
# Write test and train data sets to JSON format on S3
model_test_w_combo.write.mode('overwrite').json('s3://srdata-lab/Matt/DeepARInputs/deepar_input/final_model_input/deepar_4mkt_2mo_test')
model_train_w_combo.write.mode('overwrite').json('s3://srdata-lab/Matt/DeepARInputs/deepar_input/final_model_input/deepar_4mkt_2mo_train')

test_final_input.write.mode('overwrite').json('s3://srdata-lab/Matt/DeepARInputs/deepar_input/final_model_input/deepar_4mkt_2mo_test_input')
train_final_input.write.mode('overwrite').json('s3://srdata-lab/Matt/DeepARInputs/deepar_input/final_model_input/deepar_4mkt_2mo_train_input')