## PySpark data analysis

The goal is to explore the data from file `inventory.parquet` and perform various operations on them to get the expected information. All actions are performed on Spark Dataframe and the structure of the original data remains the same throughout whole processing.

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [3]:
spark = SparkSession.builder.appName("Dataframe").getOrCreate()

inventory = spark.read.parquet("../inventory.parquet")
users = spark.read.parquet("../selected_users.parquet")

inventory.show(5)
users.show(5)

+--------+------------+------------+-----+-------------+--------------------+--------+----------+
| user_id|is_logged_in| device_type|event|is_mobile_app|                site|order_id|      date|
+--------+------------+------------+-----+-------------+--------------------+--------+----------+
|51217510|       false|Mobile Phone|click|        false|        dailynews.no|     976|2023-09-07|
|51217510|       false|Mobile Phone| view|        false|play.google.com/s...|     976|2023-09-04|
|51217510|       false|     Desktop| view|        false|apps.apple.com/no...|     877|2023-09-02|
|51217510|       false|     Desktop| view|        false|play.google.com/s...|    1075|2023-09-02|
|51217510|       false|Mobile Phone| view|        false|apps.apple.com/no...|     976|2023-09-05|
+--------+------------+------------+-----+-------------+--------------------+--------+----------+
only showing top 5 rows

+--------+
| user_id|
+--------+
|93330430|
|30514363|
|43260855|
|72293645|
|60562662|
+----

In [7]:
# Task number 1 - Find what is the percentage of logged in users every day
unique_users_count = inventory.select(["user_id", "is_logged_in"]).distinct().filter(inventory.is_logged_in == "true").count()
print(f'Number of unique users: {unique_users_count}')
print("Users logged in % for every day")
inventory.select(["user_id", "date", "is_logged_in"]).distinct().filter(inventory.is_logged_in == "true").groupBy("date").agg(F.round(F.count("user_id")/unique_users_count*100, 2).alias("% logged in")).show()

Number of unique users: 2470
Users logged in % for every day
+----------+-----------+
|      date|% logged in|
+----------+-----------+
|2023-09-03|      70.73|
|2023-09-01|      71.09|
|2023-09-07|      70.81|
|2023-09-02|      68.74|
|2023-09-04|      70.12|
|2023-09-06|       71.3|
+----------+-----------+



Percentage of logged in users every day looks very similar in this period.

In [5]:
# Task number 2 - Which site has the most logged in users?
inventory.select(["user_id", "site", "is_logged_in"]).distinct().filter(inventory.is_logged_in == "true").groupBy("site").count().show(truncate=False)

+-----------------------------------------------------------------+-----+
|site                                                             |count|
+-----------------------------------------------------------------+-----+
|play.google.com/store/apps/details?id=com.agens.android.dailynews|1765 |
|sportinfo.no                                                     |1755 |
|play.google.com/store/apps/details?id=com.agens.android.sportinfo|1703 |
|apps.apple.com/no/app/sportingo/id87564849                       |1728 |
|dailynews.no                                                     |1775 |
|apps.apple.com/no/app/dailynews/id5637563                        |1740 |
+-----------------------------------------------------------------+-----+



Users have logged most often via `dailynews.no` website, altough differences between sites are very small.

In [8]:
# Task number 3 - Calculate the share of logged in users who are using Mobile App
mobile_users = inventory.select(["user_id", "device_type", "is_logged_in"]).filter((inventory.device_type == "Mobile Phone") & (inventory.is_logged_in == "true")).distinct().count()

mobile_share = mobile_users / unique_users_count
mobile_share

0.9331983805668016

About 93.3% of logged in users use mobile app. This value was found by getting the number of unique users who use mobile phone and then divide it by number of unique users.

In [7]:
# # Task number 4 - Create a new column called identity_type which will take the following value:
# ○ If device_type is “Mobile Phone” and is_mobile_app is set to True then “Mobile Phone App”
# ○ If device_type is “Mobile Phone” and is_mobile_app is set to False then “Mobile Phone Web”
# ○ If device_type is “Desktop” then “Desktop”
# ○ Otherwise “Unknown”
inv_identity = inventory.withColumn("identity_type",
    F.when((inventory.device_type == "Desktop"), F.lit("Desktop")).
    when(((inventory.device_type == "Mobile Phone") & (~inventory.is_mobile_app)),
       F.lit("Mobile Phone Web")).
    when(((inventory.device_type == "Mobile Phone") & (inventory.is_mobile_app)),
       F.lit("Mobile Phone App")).
    otherwise(F.lit("Unknown")))
# F.when is used only at the first time as the rest of whens is chained to this method

In [8]:
inv_identity.show()

+--------+------------+------------+-----+-------------+--------------------+--------+----------+----------------+
| user_id|is_logged_in| device_type|event|is_mobile_app|                site|order_id|      date|   identity_type|
+--------+------------+------------+-----+-------------+--------------------+--------+----------+----------------+
|51217510|       false|Mobile Phone|click|        false|        dailynews.no|     976|2023-09-07|Mobile Phone Web|
|51217510|       false|Mobile Phone| view|        false|play.google.com/s...|     976|2023-09-04|Mobile Phone Web|
|51217510|       false|     Desktop| view|        false|apps.apple.com/no...|     877|2023-09-02|         Desktop|
|51217510|       false|     Desktop| view|        false|play.google.com/s...|    1075|2023-09-02|         Desktop|
|51217510|       false|Mobile Phone| view|        false|apps.apple.com/no...|     976|2023-09-05|Mobile Phone Web|
|51217510|       false|Mobile Phone|click|         true|apps.apple.com/no...|   

In [9]:
inventory.show()

+--------+------------+------------+-----+-------------+--------------------+--------+----------+
| user_id|is_logged_in| device_type|event|is_mobile_app|                site|order_id|      date|
+--------+------------+------------+-----+-------------+--------------------+--------+----------+
|51217510|       false|Mobile Phone|click|        false|        dailynews.no|     976|2023-09-07|
|51217510|       false|Mobile Phone| view|        false|play.google.com/s...|     976|2023-09-04|
|51217510|       false|     Desktop| view|        false|apps.apple.com/no...|     877|2023-09-02|
|51217510|       false|     Desktop| view|        false|play.google.com/s...|    1075|2023-09-02|
|51217510|       false|Mobile Phone| view|        false|apps.apple.com/no...|     976|2023-09-05|
|51217510|       false|Mobile Phone|click|         true|apps.apple.com/no...|     995|2023-09-02|
|51217510|       false|     Desktop|click|        false|play.google.com/s...|     368|2023-09-03|
|51217510|       fal

As seen above, identity types in the new column are properly assigned and the original dataframe `inventory` is unchanged.

In [10]:
# Task number 5 - Create a new column in the dataset called max_order_id which will show the
# maximum order_id for each identity_type. The DataFrame must persist the original
# number of records.
# helper dataframe to get maximum order_id per identity type
maxord = inv_identity.groupBy("identity_type").agg(F.max("order_id").alias("order_id"))
maxord.show()
# helper dict for the following query
map_max = {dev: maxord.where(maxord.identity_type == dev).select("order_id").first()[0] for dev in ["Desktop", "Mobile Phone App", "Mobile Phone Web"]}     
print(f"{map_max=}")
inv_max_order = inv_identity.withColumn("max_order_id",
    F.when((inv_identity.identity_type == "Desktop"), 
       F.lit(map_max["Desktop"])).
    when((inv_identity.identity_type == "Mobile Phone App"),
       F.lit(map_max["Mobile Phone App"])).
    when((inv_identity.identity_type == "Mobile Phone Web"),
       F.lit(map_max["Mobile Phone Web"])))
inv_max_order.show()



+----------------+--------+
|   identity_type|order_id|
+----------------+--------+
|Mobile Phone Web|     976|
|Mobile Phone App|     995|
|         Desktop|   10000|
+----------------+--------+

map_max={'Desktop': 10000, 'Mobile Phone App': 995, 'Mobile Phone Web': 976}
+--------+------------+------------+-----+-------------+--------------------+--------+----------+----------------+------------+
| user_id|is_logged_in| device_type|event|is_mobile_app|                site|order_id|      date|   identity_type|max_order_id|
+--------+------------+------------+-----+-------------+--------------------+--------+----------+----------------+------------+
|51217510|       false|Mobile Phone|click|        false|        dailynews.no|     976|2023-09-07|Mobile Phone Web|         976|
|51217510|       false|Mobile Phone| view|        false|play.google.com/s...|     976|2023-09-04|Mobile Phone Web|         976|
|51217510|       false|     Desktop| view|        false|apps.apple.com/no...|     877|

In [11]:
print(inventory.count())
print(inv_max_order.count())

46733
46733


For this task, new column `max_order_id` stores the maximum order id for a given identity type. For better readability, a helper dataframe `maxord` and helper dict `map_max` have been created.  
As seen in the cell above, number of records in dataframe `inv_max_order` is the same as the original dataframe.

Task 6 and Task 7 are similar to each other, therefore the got placed in the same cell.

In [14]:
# Task number 6 - You have been notified by the Marketing team that they would like to know what was
# the number of clicks (event column equals to “click”) each day for a given campaign.
# They sent you the list of users taking part in this campaign
# (selected_users.parquet). Your goal is to filter the dataset to include only selected
# users and calculate the total number of clicks per day.

# Task number 7 - What was the number of clicks per day for users who weren’t in this campaign?

# helper for getting information from the users dataframe which user participates in this campaign
campaign_result = inventory.join(users.withColumn("check", lit(True)), "user_id", "left").fillna(False)
print("Total clicks during this period of time")
campaign_result.select(["event", "date"]).where(inventory.event == "click").groupBy("date").count().show()
# number 6
print("Clicks of users taking part in the campaign")
campaign_result.filter(campaign_result.check == "True").select(["event", "date"]).where(inventory.event == "click").groupBy("date").count().show()

# number 7
print("Clicks of users not taking part in the campaign")
campaign_result.filter(campaign_result.check == "False").select(["event", "date"]).where(inventory.event == "click").groupBy("date").count().show()


Total clicks during this period of time
+----------+-----+
|      date|count|
+----------+-----+
|2023-09-03| 3581|
|2023-09-01| 3561|
|2023-09-05| 1810|
|2023-09-07| 3662|
|2023-09-02| 3605|
|2023-09-04| 3623|
|2023-09-06| 3510|
+----------+-----+

Clicks of users taking part in the campaign
+----------+-----+
|      date|count|
+----------+-----+
|2023-09-03| 1082|
|2023-09-01| 1142|
|2023-09-05|  604|
|2023-09-07| 1134|
|2023-09-02| 1171|
|2023-09-04| 1101|
|2023-09-06| 1085|
+----------+-----+

Clicks of users not taking part in the campaign
+----------+-----+
|      date|count|
+----------+-----+
|2023-09-03| 2499|
|2023-09-01| 2419|
|2023-09-05| 1206|
|2023-09-07| 2528|
|2023-09-02| 2434|
|2023-09-04| 2522|
|2023-09-06| 2425|
+----------+-----+



After adding the numbers of users participating and not participating for each day, the sum for each day matches total number of clicks per day.