## 1. Data Wrangling


In this part of notebook I will cover how to wrangle data so we can extract the feature easily. We will try to label each event whether it is belong to users subscription phase.

example:

| userId | upgrade_time | downgrade_time | ...event |
|--------|--------------|----------------|----------|
|  1111  |  2020-12-05  |   2020-12-29   | ...event |
|  2222  |  2020-11-12  |      null      | ...event |
|  3333  |  2020-10-15  |   2020-10-29   | ...event |

the null value in downgrade time means that the user isn't churning

### Import Needed Library and Initialiaze PySpark

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import SparkContext, SparkConf

In [None]:
spark = SparkSession.builder.appName('sparkify') \
    .config('spark.driver.maxResultSize', '3g') \
    .getOrCreate()

### Load the Dataset from Google Cloud Storage

In [None]:
# Load dataset from GCS and change ts from bigint to datetime format
df = spark.read.parquet('gs://udacity-dsnd/sparkify_event_data.parquet/')
df = df.withColumnRenamed("ts","ts_temp").withColumn("ts", (F.col("ts_temp") / 1000).cast(T.TimestampType())).drop("ts_temp")
df.cache()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, userAgent: string, userId: string, ts: timestamp]

In [None]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- ts: timestamp (nullable = true)



### Wrangle Dataframe to Get Event Labeled Dataframe

First, we want to find when each user upgrading and downgrading subscription

In [None]:
# up_df is data when user upgrading
# | userId |        ts         |      page      |
# | 234124 | 2018-1-1 12:00:00 | Submit Upgrade |
up_df = df.select(["userId", "ts"]) \
  .filter(df.page == "Submit Upgrade") \
  .withColumnRenamed('ts', 'up_ts')

# down_df is data when user upgrading
# | userId |        ts         |       page       |
# | 234124 | 2018-1-5 12:00:00 | Submit Downgrade |
down_df = df.select(["userId", "ts"]) \
    .filter(df.page == "Submit Downgrade") \
    .withColumnRenamed('ts', 'down_ts') \
    .withColumnRenamed("userId", "userIdTemp")


Second, We query to get every upgrade event and the following downgrade event time 

In [None]:
# key_df join up_df and down_df to create dataframe when user upgrade and following downgrade in the same row like below
# | userId |      up_ts        |      down_ts      |  isChurn  |
# | 234124 | 2018-1-2 12:00:00 | 2018-1-5 12:00:00 |   True    |
# | 234124 | 2018-1-6 12:00:00 | 2018-1-9 12:00:00 |   True    |
key_df = up_df.join(down_df,
    (down_df.userIdTemp == up_df.userId) & 
    (down_df.down_ts > up_df.up_ts), how="left") \
  .drop(F.col("userIdTemp")) \
  .groupBy(F.col("userId"), up_df.up_ts) \
  .agg(F.min(down_df.down_ts)) \
  .withColumnRenamed("max(userId)", "userId") \
  .withColumn("down_ts", 
    F.when(F.col("min(down_ts)").isNull(), '2099-12-31 00:00:00') \
    .otherwise(F.col("min(down_ts)"))) \
  .withColumn("isChurn", 
    F.when(F.col("min(down_ts)").isNull(), False).otherwise(True)) \
  .orderBy(up_df.up_ts)

key_df.cache()

DataFrame[userId: string, up_ts: timestamp, min(down_ts): timestamp, down_ts: string, isChurn: boolean]

In [None]:
key_df.show()

+-------+-------------------+-------------------+-------------------+-------+
| userId|              up_ts|       min(down_ts)|            down_ts|isChurn|
+-------+-------------------+-------------------+-------------------+-------+
|1448964|2018-10-01 00:00:55|2018-11-18 05:04:25|2018-11-18 05:04:25|   true|
|1712107|2018-10-01 00:02:02|               null|2099-12-31 00:00:00|  false|
|1652185|2018-10-01 00:06:41|               null|2099-12-31 00:00:00|  false|
|1851656|2018-10-01 00:11:30|2018-11-01 10:24:19|2018-11-01 10:24:19|   true|
|1588246|2018-10-01 00:13:55|2018-10-01 12:27:24|2018-10-01 12:27:24|   true|
|1585800|2018-10-01 00:15:59|               null|2099-12-31 00:00:00|  false|
|1808085|2018-10-01 00:21:03|2018-10-18 23:55:46|2018-10-18 23:55:46|   true|
|1995340|2018-10-01 00:22:00|2018-10-09 07:40:04|2018-10-09 07:40:04|   true|
|1417592|2018-10-01 00:30:01|2018-11-30 08:43:17|2018-11-30 08:43:17|   true|
|1875033|2018-10-01 00:30:48|               null|2099-12-31 00:0

In [None]:
# save the result then read it again to reduce query complexity
key_df.drop("min(down_ts)").write.parquet("gs://udacity-dsnd/key_df.parquet")
key_df = spark.read.parquet("hdfs:///user/key_df.parquet")
key_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- up_ts: timestamp (nullable = true)
 |-- down_ts: string (nullable = true)
 |-- isChurn: boolean (nullable = true)



Last, we query to label every event with value from key_df and save it to GCS

In [None]:
# label every event that fall between key_df's up_ts and down_ts with the same userId and save it to GCS
# the resulting table will look like below
# | userId |      up_ts        |      down_ts      |  isChurn  | event  |
# | 234124 | 2018-1-5 12:00:00 | 2018-1-5 12:00:00 |   True    | event1 |
# | 234124 | 2018-1-5 12:00:00 | 2018-1-5 12:00:00 |   True    | event2 |
df = df.withColumnRenamed("userId", "userIdTemp")
key_df.join(df, (key_df.up_ts <= df.ts) & (df.ts <= key_df.down_ts) & (key_df.userId == df.userIdTemp),how='left') \
  .write.parquet('gs://udacity-dsnd/event_labeled.parquet')