## User registrations and app-loaded exploration Notebook

In [19]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL scatch pad") \
    .getOrCreate()

ConnectionRefusedError: [Errno 111] Connection refused

## Task one: events split

Let's have a glance into the data and get the data schema out of input

In [2]:
#pickup input data data 
data_input = "./data-input"
df = spark.read.json(data_input)
df.show(10, truncate=False)

+---------------+--------+-------+-----------+----------+-------------------+------------------------+
|browser_version|campaign|channel|device_type|event     |initiator_id       |timestamp               |
+---------------+--------+-------+-----------+----------+-------------------+------------------------+
|null           |null    |null   |null       |registered|3074457347135400447|2020-01-08T06:21:14.000Z|
|79.0           |null    |null   |desktop    |app_loaded|3074457345816644047|2020-01-08T06:24:42.000Z|
|               |null    |null   |tablet-app |app_loaded|3074457346184244610|2020-01-08T06:25:10.000Z|
|79.0           |null    |null   |desktop    |app_loaded|3074457347135385819|2020-01-08T06:25:11.000Z|
|78.0           |null    |null   |desktop    |app_loaded|3074457346246864126|2020-01-08T06:27:23.000Z|
|76.0           |null    |null   |desktop    |app_loaded|3074457346612629694|2020-01-08T17:54:39.000Z|
|79.0           |null    |null   |desktop    |app_loaded|3074457347100151

In [3]:
df.schema

StructType([StructField('browser_version', StringType(), True), StructField('campaign', StringType(), True), StructField('channel', StringType(), True), StructField('device_type', StringType(), True), StructField('event', StringType(), True), StructField('initiator_id', LongType(), True), StructField('timestamp', StringType(), True)])

Define schema according to the exploration
`|browser_version|campaign|channel|device_type|event|initiator_id|timestamp|`


In [4]:
'''
StructType([StructField('browser_version', StringType(), True), StructField('campaign', StringType(), True), StructField('channel', StringType(), True), StructField('device_type', StringType(), True), StructField('event', StringType(), True), StructField('initiator_id', LongType(), True), StructField('timestamp', StringType(), True)])
'''
from pyspark.sql.types import (
    LongType,
    StringType,
    StructType,
    TimestampType,
)
input_data_schema = StructType() \
      .add("browser_version",StringType(),True) \
      .add("campaign",StringType(),True) \
      .add("channel",StringType(),True) \
      .add("device_type",StringType(),True) \
      .add("event",StringType(),True) \
      .add("initiator_id",LongType(),True) \
      .add("timestamp",TimestampType(),True)


Read data again with data schema

In [5]:
df = spark.read.option("inferSchema", True).schema(input_data_schema) \
    .json(data_input)
    
df.show(5)

+---------------+--------+-------+-----------+----------+-------------------+-------------------+
|browser_version|campaign|channel|device_type|     event|       initiator_id|          timestamp|
+---------------+--------+-------+-----------+----------+-------------------+-------------------+
|           null|    null|   null|       null|registered|3074457347135400447|2020-01-08 06:21:14|
|           79.0|    null|   null|    desktop|app_loaded|3074457345816644047|2020-01-08 06:24:42|
|               |    null|   null| tablet-app|app_loaded|3074457346184244610|2020-01-08 06:25:10|
|           79.0|    null|   null|    desktop|app_loaded|3074457347135385819|2020-01-08 06:25:11|
|           78.0|    null|   null|    desktop|app_loaded|3074457346246864126|2020-01-08 06:27:23|
+---------------+--------+-------+-----------+----------+-------------------+-------------------+
only showing top 5 rows



Split dataframe into two sub dataframes -- user_registration and app_loaded

In [7]:
user_registration_df = df.select("event",
                                 "timestamp",
                                 "initiator_id",
                                 "channel"
                                ).where(df.event == "registered")
app_loaded_df = df.select("event",
                          "timestamp",
                          "initiator_id",
                          "device_type"
                          ).where(df.event == "app_loaded")

Data transformation then Write dataframes partition by day in parquet format

In [9]:
from pyspark.sql.functions import (
    col,
    date_format
)

user_registration_df = user_registration_df.withColumn(
        "derived_tstamp_day", date_format(col("timestamp"), "yyyy-MM-dd")
    ).withColumnRenamed(
    'timestamp', 'time'
    )

app_loaded_df = app_loaded_df.withColumn(
        "derived_tstamp_day", date_format(col("timestamp"), "yyyy-MM-dd")
    ).withColumnRenamed(
    'timestamp', 'time'
    )

Load data in parquet format

In [13]:
user_registration_df.show()
user_registration_df.toJSON().first()

+----------+-------------------+-------------------+-------+------------------+
|     event|               time|       initiator_id|channel|derived_tstamp_day|
+----------+-------------------+-------------------+-------+------------------+
|registered|2020-01-08 06:21:14|3074457347135400447|   null|        2020-01-08|
|registered|2020-01-08 17:55:42|3074457347136974015|   null|        2020-01-08|
|registered|2020-01-08 23:49:14|3074457347138054179|   null|        2020-01-08|
|registered|2020-01-08 12:48:21|3074457347125446331| invite|        2020-01-08|
|registered|2020-01-08 12:52:17|3074457347125292449| invite|        2020-01-08|
|registered|2020-01-08 02:41:00|3074457347135206851|   null|        2020-01-08|
|registered|2020-01-08 10:32:39|3074457347135939667|   null|        2020-01-08|
|registered|2020-01-08 13:50:06|3074457347136484019| invite|        2020-01-08|
|registered|2020-01-08 12:08:15|3074457347136224280|   null|        2020-01-08|
|registered|2020-01-08 12:13:44|30744573

'{"event":"registered","time":"2020-01-08T06:21:14.000Z","initiator_id":3074457347135400447,"derived_tstamp_day":"2020-01-08"}'

In [14]:
write_path = "data-output/user_registration"

user_registration_df.repartition(1)\
  .write.option("compression", "snappy")\
  .save(
    path=write_path,
    format="parquet",
    mode="overwrite",
    partitionBy="derived_tstamp_day",
  )

In [15]:
write_path = "data-output/app_loaded"

app_loaded_df.repartition(1)\
  .write.option("compression", "snappy")\
  .save(
    path=write_path,
    format="parquet",
    mode="overwrite",
    partitionBy="derived_tstamp_day",
  )

In [14]:
import pandas as pd

In [15]:
pd.__version__

'1.5.1'

## Task two: customer journey -- next-week-conversion-rate

In [3]:
#pickup input data data 
user_registration_data_input = "./data-output/user_registration"
udf = spark.read.parquet(user_registration_data_input)
udf.show(10, truncate=False)

+----------+-------------------+-------------------+-------+-------+------------------+
|event     |time               |initiator_id       |channel|format |derived_tstamp_day|
+----------+-------------------+-------------------+-------+-------+------------------+
|registered|2020-01-28 14:19:36|3074457347192467026|null   |parquet|2020-01-28        |
|registered|2020-01-28 16:38:31|3074457347193109377|null   |parquet|2020-01-28        |
|registered|2020-01-28 19:03:38|3074457347192713430|invite |parquet|2020-01-28        |
|registered|2020-01-28 15:01:18|3074457347192562179|invite |parquet|2020-01-28        |
|registered|2020-01-28 15:03:32|3074457347191986668|invite |parquet|2020-01-28        |
|registered|2020-01-28 08:19:09|3074457347191225250|null   |parquet|2020-01-28        |
|registered|2020-01-28 17:22:48|3074457347177297037|invite |parquet|2020-01-28        |
|registered|2020-01-28 14:04:30|3074457347191921811|invite |parquet|2020-01-28        |
|registered|2020-01-28 07:46:36|

Check if there's any duplicates

In [4]:
udf.count()

7318

In [5]:
udf.select("initiator_id").distinct().count()

7317

In [6]:

from pyspark.sql.functions import (
    col,
    count,
)
x = udf.groupby("initiator_id").agg(count('initiator_id').alias('count')).filter(col('count')>1)
x.show()

+-------------------+-----+
|       initiator_id|count|
+-------------------+-----+
|3074457347132582884|    2|
+-------------------+-----+



In [7]:
udf.filter(col("initiator_id")==3074457347132582884).show()

+----------+-------------------+-------------------+-------+-------+------------------+
|     event|               time|       initiator_id|channel| format|derived_tstamp_day|
+----------+-------------------+-------------------+-------+-------+------------------+
|registered|2020-01-07 02:20:49|3074457347132582884|   null|parquet|        2020-01-07|
|registered|2020-01-07 02:20:50|3074457347132582884|   null|parquet|        2020-01-07|
+----------+-------------------+-------------------+-------+-------+------------------+



In [8]:
import pyspark.sql.functions as F

x = udf.groupby("initiator_id").agg(F.min('time').alias("time"),
                                    F.first("channel").alias("channel"),
                                    F.first("event").alias("event"),
                                    F.first("derived_tstamp_day").alias("derived_tstamp_day")
                                   )
x.show(5)

+-------------------+-------------------+-------+----------+------------------+
|       initiator_id|               time|channel|     event|derived_tstamp_day|
+-------------------+-------------------+-------+----------+------------------+
|3074457345618261067|2020-01-10 12:39:25| direct|registered|        2020-01-10|
|3074457345618261107|2020-01-10 12:40:45| direct|registered|        2020-01-10|
|3074457345618261140|2020-01-10 12:55:00| direct|registered|        2020-01-10|
|3074457345618261281|2020-01-10 13:09:14| direct|registered|        2020-01-10|
|3074457345618261321|2020-01-10 13:10:33| direct|registered|        2020-01-10|
+-------------------+-------------------+-------+----------+------------------+
only showing top 5 rows



In [9]:
#pickup input data data 
app_loaded_data_input = "./data-output/app_loaded"
adf = spark.read.parquet(app_loaded_data_input)
adf.show(10, truncate=False)

+----------+-------------------+-------------------+-----------+-------+------------------+
|event     |time               |initiator_id       |device_type|format |derived_tstamp_day|
+----------+-------------------+-------------------+-----------+-------+------------------+
|app_loaded|2020-01-29 06:30:14|3074457346185514918|desktop    |parquet|2020-01-29        |
|app_loaded|2020-01-29 06:31:49|3074457345894038970|desktop-app|parquet|2020-01-29        |
|app_loaded|2020-01-29 06:32:16|3074457347170125143|mobile-app |parquet|2020-01-29        |
|app_loaded|2020-01-29 06:33:42|3074457347115508571|mobile-app |parquet|2020-01-29        |
|app_loaded|2020-01-29 06:34:54|3074457346921032451|desktop    |parquet|2020-01-29        |
|app_loaded|2020-01-29 06:35:21|3074457347085959347|desktop    |parquet|2020-01-29        |
|app_loaded|2020-01-29 06:35:27|3074457346118464419|desktop    |parquet|2020-01-29        |
|app_loaded|2020-01-29 06:36:19|3074457346554502961|desktop    |parquet|2020-01-

In [10]:
y = adf.groupby("initiator_id").agg(F.min('time').alias("time"),
                                    F.first("device_type").alias("device_type"),
                                    F.first("event").alias("event"),
                                    F.first("derived_tstamp_day").alias("derived_tstamp_day"))
y.show(5)

+------------+-------------------+-----------+----------+------------------+
|initiator_id|               time|device_type|     event|derived_tstamp_day|
+------------+-------------------+-----------+----------+------------------+
|       70525|2020-01-09 05:21:09|    desktop|app_loaded|        2020-01-29|
|       70529|2020-01-27 06:10:35|    desktop|app_loaded|        2020-01-28|
|       70593|2020-01-20 08:52:44|    desktop|app_loaded|        2020-01-30|
|       85072|2020-01-01 17:03:40|    desktop|app_loaded|        2020-01-01|
|       89245|2020-01-08 13:27:46|    desktop|app_loaded|        2020-01-29|
+------------+-------------------+-----------+----------+------------------+
only showing top 5 rows



In [11]:
resd=x.join(y, x.initiator_id == y.initiator_id, 'left').select(x.initiator_id, x.time.alias("ut"), y.time.alias("at"))
resd.show()

+-------------------+-------------------+-------------------+
|       initiator_id|                 ut|                 at|
+-------------------+-------------------+-------------------+
|3074457347194263830|2020-01-28 23:46:47|               null|
|3074457347192604456|2020-01-28 14:52:25|               null|
|3074457347196629066|2020-01-29 15:11:05|               null|
|3074457347182996888|2020-01-27 16:04:19|               null|
|3074457347186926395|2020-01-27 09:15:32|2020-01-30 13:44:06|
|3074457347187118466|2020-01-27 01:30:02|               null|
|3074457347135708603|2020-01-08 09:05:45|2020-01-08 09:07:01|
|3074457347170176922|2020-01-21 03:32:41|               null|
|3074457347172852947|2020-01-21 18:47:58|               null|
|3074457347171116641|2020-01-21 11:26:08|               null|
|3074457347167462946|2020-01-21 15:37:50|               null|
|3074457347125292481|2020-01-30 09:09:55|               null|
|3074457347198677110|2020-01-30 03:52:51|               null|
|3074457

In [12]:
print(resd.filter(F.col("at").isNotNull()).count() / resd.count())

0.28030613639469726


In [13]:
udf.count()

7318

In [18]:
adf.count()

ConnectionRefusedError: [Errno 111] Connection refused

In [15]:
resd.count()


7317

In [16]:
df.where(F.col('initiator_id')==3074457347192604456).show()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Py4JError: An error occurred while calling o115.showString

In [None]:
df.count()

In [None]:
a = resd.withColumn('week_diff',(F.datediff(F.trunc("at","week"), F.trunc("ut","week"))/7).cast('int'))
a.show(20)

In [None]:
print(a.where(F.col("week_diff") == 1).count()/a.count())

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


In [17]:
a.show(5)

NameError: name 'a' is not defined