#### Import libraries, initialize helper class to create Spark session and export data

In [1]:
import pandas as pd, re, sys
from datetime import datetime
from bqWrapper.bq import bqWrapper
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

pd.options.display.max_columns = None

In [2]:
bqw = bqWrapper()
spark = bqw.connection

In [3]:
df_ltv = bqw.create_bigquery_connection(connection=spark, table='ltv_data')
df_event = bqw.create_bigquery_connection(connection=spark, table='event_data')
df_session = bqw.create_bigquery_connection(connection=spark, table='session_data')

In [4]:
COHORTS_DIMENSIONS = ['first_touch_date', 'traffic_source', 'os', 'country']

#### Process event data

In [5]:
df_event = df_event.withColumn('d_cat', when(df_event.d_1 == 1, 'd_1').when(df_event.d_3 == 1, 'd_3').when(df_event.d_5 == 1, 'd_5').\
    when(df_event.d_7 == 1, 'd_7').\
        when((df_event.d_1 != 1)& (df_event.d_3 != 1) & (df_event.d_5 != 1) & (df_event.d_7 != 1), None))

In [6]:
session_cols = [i for i in df_event.columns if 'session' in i and 'id' not in i]
agg_dict = dict(zip(session_cols, ['sum' for _ in range(len(session_cols))]))
df_agg = df_event.groupBy(COHORTS_DIMENSIONS).pivot('d_cat').agg(agg_dict)
# rename columns which are e.g. d_1_sum(...) to ..._d_1
df_agg = df_agg.toDF(*[(f"!{i.replace('_sum(', '').replace(')', '')}_"+re.findall(r'd_\d+|null', i)[0]).replace('!'+re.findall(r'd_\d+|null', i)[0], '') if i[i.find('sum'):].replace('sum(', '').replace(')', '') in session_cols else i for i in df_agg.columns])
# Drop unnecessary columns of type d_<event>_null
df_agg = df_agg.drop(*[str(col) for col in df_agg.columns if '_null' in col])

#### Get rid of duplicates in LTV table

In [7]:
window = Window.partitionBy([col(i) for i in COHORTS_DIMENSIONS]).orderBy(col("cohort_ltv_avg_lifetime").desc())
df_ltv = df_ltv.withColumn('row', row_number().over(window)) \
    .filter(col('row') == 1) \
    .drop('row')

#### Join tables to obtain final dataframe

In [8]:
df = df_ltv.join(
    df_agg,
    on=COHORTS_DIMENSIONS,
    how='left'
)
df = df.join(
    df_session,
    on=COHORTS_DIMENSIONS,
    how='left'
)

#### Process final data

In [9]:
# Search duplicates
df.groupby(COHORTS_DIMENSIONS) \
    .count() \
    .where('count > 1') \
    .sort('count', ascending=False) \
    .show()

+----------------+--------------+---+-------+-----+
|first_touch_date|traffic_source| os|country|count|
+----------------+--------------+---+-------+-----+
+----------------+--------------+---+-------+-----+



In [10]:
# Detect erroneous values in all possible forms
df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df.columns if c != 'first_touch_date'])

traffic_source,os,country,cohort_ltv_avg_lifetime,session_used_extra_steps_d_1,session_completed_levels_d_1,session_ended_levels_d_1,session_reseted_levels_d_1,session_played_levels_d_1,session_no_more_extra_steps_d_1,session_challenged_friend_d_1,session_user_engagement_sec_d_1,session_retried_levels_d_1,session_was_challenged_d_1,session_claimed_ad_rewards_d_1,session_failed_levels_d_1,session_used_extra_steps_d_3,session_completed_levels_d_3,session_ended_levels_d_3,session_reseted_levels_d_3,session_played_levels_d_3,session_no_more_extra_steps_d_3,session_challenged_friend_d_3,session_user_engagement_sec_d_3,session_retried_levels_d_3,session_was_challenged_d_3,session_claimed_ad_rewards_d_3,session_failed_levels_d_3,session_used_extra_steps_d_5,session_completed_levels_d_5,session_ended_levels_d_5,session_reseted_levels_d_5,session_played_levels_d_5,session_no_more_extra_steps_d_5,session_challenged_friend_d_5,session_user_engagement_sec_d_5,session_retried_levels_d_5,session_was_challenged_d_5,session_claimed_ad_rewards_d_5,session_failed_levels_d_5,session_used_extra_steps_d_7,session_completed_levels_d_7,session_ended_levels_d_7,session_reseted_levels_d_7,session_played_levels_d_7,session_no_more_extra_steps_d_7,session_challenged_friend_d_7,session_user_engagement_sec_d_7,session_retried_levels_d_7,session_was_challenged_d_7,session_claimed_ad_rewards_d_7,session_failed_levels_d_7,avg_session_duration_sec,avg_sessions_count
0,0,21,0,3348,2646,2581,3407,2646,3631,4078,512,2777,4156,4024,3322,3924,3687,3627,3901,3687,4008,4136,2990,3684,4157,4093,3787,4043,3929,3875,4061,3929,4088,4147,3346,3907,4158,4131,3974,4071,3981,3935,4072,3981,4109,4150,3446,3964,4158,4128,4017,0,0


In [11]:
# Nulls in d_* variables are literally zeros
df = df.fillna(0)

In [12]:
# Replace empty countries strings with real values
df = df.withColumn('country', when(df.country == '', 'Rest of the World').otherwise(df.country))

Write to the table of BigQuery dataset

In [15]:
spark.conf.set('temporaryGcsBucket', 'processed-data-bucket')
df.write.format('bigquery') \
  .option('table', 'dsc_511.processed_data') \
  .save()

In [17]:
df = df.withColumn('session_user_engagement_sec_d_1', df.session_user_engagement_sec_d_1.cast('bigint'))\
       .withColumn('session_user_engagement_sec_d_3', df.session_user_engagement_sec_d_3.cast('bigint'))\
       .withColumn('session_user_engagement_sec_d_5', df.session_user_engagement_sec_d_5.cast('bigint'))\
       .withColumn('session_user_engagement_sec_d_7', df.session_user_engagement_sec_d_7.cast('bigint'))

In [20]:
def melt(df,  id_vars, value_vars, var_name="variable", value_name="value"):
    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

In [25]:
df_melted = melt(
    df,
    COHORTS_DIMENSIONS+['cohort_ltv_avg_lifetime', 'avg_session_duration_sec', 'avg_sessions_count'],
    [i for i in df.columns if i not in COHORTS_DIMENSIONS+['cohort_ltv_avg_lifetime', 'avg_session_duration_sec', 'avg_sessions_count']],
    'session_metric', 'session_metric_value'
)

In [26]:
spark.conf.set('temporaryGcsBucket', 'processed-data-bucket')
df_melted.write.format('bigquery') \
  .option('table', 'dsc_511.processed_data_long') \
  .save()

In [27]:
session_cols

['session_user_engagement_sec',
 'session_claimed_ad_rewards',
 'session_challenged_friend',
 'session_was_challenged',
 'session_played_levels',
 'session_completed_levels',
 'session_retried_levels',
 'session_failed_levels',
 'session_reseted_levels',
 'session_ended_levels',
 'session_used_extra_steps',
 'session_no_more_extra_steps']