In [1]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, when, max
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

inventory = spark.read.parquet("inventory.parquet")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/27 09:48:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
inventory.show(5)

+--------+------------+------------+-----+-------------+--------------------+--------+----------+
| user_id|is_logged_in| device_type|event|is_mobile_app|                site|order_id|      date|
+--------+------------+------------+-----+-------------+--------------------+--------+----------+
|51217510|       false|Mobile Phone|click|        false|        dailynews.no|     976|2023-09-07|
|51217510|       false|Mobile Phone| view|        false|play.google.com/s...|     976|2023-09-04|
|51217510|       false|     Desktop| view|        false|apps.apple.com/no...|     877|2023-09-02|
|51217510|       false|     Desktop| view|        false|play.google.com/s...|    1075|2023-09-02|
|51217510|       false|Mobile Phone| view|        false|apps.apple.com/no...|     976|2023-09-05|
+--------+------------+------------+-----+-------------+--------------------+--------+----------+
only showing top 5 rows



Find what is the percentage of logged in users every day.

In [4]:
inventory = inventory.withColumn("is_logged_in_numeric",
                                 inventory.is_logged_in.cast(IntegerType()))
logged_in_users_percentage_grouped_by_date = inventory.groupBy("date").mean(
    "is_logged_in_numeric").orderBy("date")
logged_in_users_percentage_grouped_by_date.show()

+----------+-------------------------+
|      date|avg(is_logged_in_numeric)|
+----------+-------------------------+
|2023-09-01|      0.49901463963963966|
|2023-09-02|      0.48885483421565895|
|2023-09-03|       0.5006273525721455|
|2023-09-04|       0.4890631448617416|
|2023-09-05|                      0.0|
|2023-09-06|       0.5014803327224023|
|2023-09-07|       0.4879081841781664|
+----------+-------------------------+



Which site has the most logged in users?

In [5]:
logged_in_users_grouped_by_site = inventory.groupBy("site").sum("is_logged_in_numeric")
sorted_by_logged_in_users = logged_in_users_grouped_by_site.orderBy(
    col("sum(is_logged_in_numeric)").desc())
sorted_by_logged_in_users.show(1)

+--------------------+-------------------------+
|                site|sum(is_logged_in_numeric)|
+--------------------+-------------------------+
|play.google.com/s...|                     3648|
+--------------------+-------------------------+
only showing top 1 row



Calculate the share of logged in users who are using Mobile App.

In [6]:
inventory_logged_in = inventory.filter(col("is_logged_in") == 1)
mobile_app_users = inventory_logged_in.filter(col("is_mobile_app") == 1).count()

share_of_mobile_app_users_all = (mobile_app_users / inventory.count()) * 100
print(
    f'Share of logged in users who are using mobile app compared to all users: {share_of_mobile_app_users_all:.2f}%')

share_of_mobile_app_users_logged_in = (
                                              mobile_app_users / inventory_logged_in.count()) * 100
print(
    f'Share of logged in users who are using mobile app compared to other logged in users: {share_of_mobile_app_users_logged_in:.2f}%')

Share of logged in users who are using mobile app compared to all users: 4.73%
Share of logged in users who are using mobile app compared to other logged in users: 10.36%


Create a new column called identity_type which will take the following value:
○ If device_type is “Mobile Phone” and is_mobile_app is set to True then “Mobile Phone App”
○ If device_type is “Mobile Phone” and is_mobile_app is set to False then “Mobile Phone Web”
○ If device_type is “Desktop” then “Desktop”
○ Otherwise “Unknown”

In [7]:
inventory = inventory.withColumn(
    "identity_type",
    when((col("device_type") == "Mobile Phone") & (col("is_mobile_app") == True),
         "Mobile Phone App")
    .when((col("device_type") == "Mobile Phone") & (col("is_mobile_app") == False),
          "Mobile Phone Web")
    .when(col("device_type") == "Desktop", "Desktop")
    .otherwise("Unknown")
)
inventory.show()

+--------+------------+------------+-----+-------------+--------------------+--------+----------+--------------------+----------------+
| user_id|is_logged_in| device_type|event|is_mobile_app|                site|order_id|      date|is_logged_in_numeric|   identity_type|
+--------+------------+------------+-----+-------------+--------------------+--------+----------+--------------------+----------------+
|51217510|       false|Mobile Phone|click|        false|        dailynews.no|     976|2023-09-07|                   0|Mobile Phone Web|
|51217510|       false|Mobile Phone| view|        false|play.google.com/s...|     976|2023-09-04|                   0|Mobile Phone Web|
|51217510|       false|     Desktop| view|        false|apps.apple.com/no...|     877|2023-09-02|                   0|         Desktop|
|51217510|       false|     Desktop| view|        false|play.google.com/s...|    1075|2023-09-02|                   0|         Desktop|
|51217510|       false|Mobile Phone| view|      

Create a new column in the dataset called max_order_id which will show the
maximum order_id for each identity_type. The DataFrame must persist the original
number of records.

In [8]:
inventory_max_order_id = inventory.groupBy("identity_type").agg(
    max(col("order_id")).alias("max_order_id"))

# Join the result DataFrame back to the original DataFrame to add the max_order_id column
inventory = inventory.join(inventory_max_order_id, on="identity_type", how="left")

# Show the DataFrame with the new column
inventory.show()

+----------------+--------+------------+------------+-----+-------------+--------------------+--------+----------+--------------------+------------+
|   identity_type| user_id|is_logged_in| device_type|event|is_mobile_app|                site|order_id|      date|is_logged_in_numeric|max_order_id|
+----------------+--------+------------+------------+-----+-------------+--------------------+--------+----------+--------------------+------------+
|Mobile Phone Web|51217510|       false|Mobile Phone|click|        false|        dailynews.no|     976|2023-09-07|                   0|         976|
|Mobile Phone Web|51217510|       false|Mobile Phone| view|        false|play.google.com/s...|     976|2023-09-04|                   0|         976|
|         Desktop|51217510|       false|     Desktop| view|        false|apps.apple.com/no...|     877|2023-09-02|                   0|       10000|
|         Desktop|51217510|       false|     Desktop| view|        false|play.google.com/s...|    1075|202

You have been notified by the Marketing team that they would like to know what was
the number of clicks (event column equals to “click”) each day for a given campaign. They sent you the list of users taking part in this campaign (selected_users.parquet). Your goal is to filter the dataset to include only selected users and calculate the total number of clicks per day.

In [9]:
selected_users = spark.read.parquet("selected_users.parquet")
selected_users.show()

+--------+
| user_id|
+--------+
|93330430|
|30514363|
|43260855|
|72293645|
|60562662|
|82779235|
|25861362|
|28056430|
|61316195|
|30120729|
|91790087|
|98278464|
|44932320|
|86908090|
|44735657|
|97066027|
|97196946|
|88742844|
|37133593|
|89365279|
+--------+
only showing top 20 rows



In [10]:
inventory_with_selected_users = inventory.join(
    selected_users,
    inventory.user_id == selected_users.user_id,
    "inner"
)
inventory_with_selected_users.show()

+----------------+--------+------------+------------+-----+-------------+--------------------+--------+----------+--------------------+------------+--------+
|   identity_type| user_id|is_logged_in| device_type|event|is_mobile_app|                site|order_id|      date|is_logged_in_numeric|max_order_id| user_id|
+----------------+--------+------------+------------+-----+-------------+--------------------+--------+----------+--------------------+------------+--------+
|         Desktop|35849347|        true|     Desktop|click|        false|play.google.com/s...|    9414|2023-09-03|                   1|       10000|35849347|
|Mobile Phone Web|35849347|        true|Mobile Phone| view|        false|apps.apple.com/no...|     976|2023-09-04|                   1|         976|35849347|
|Mobile Phone Web|35849347|        true|Mobile Phone|click|        false|play.google.com/s...|     976|2023-09-02|                   1|         976|35849347|
|Mobile Phone Web|35849347|        true|Mobile Phone

In [11]:
inventory_with_selected_users_clicks_number_grouped_by_date = inventory_with_selected_users.filter(
    inventory_with_selected_users.event == "click").groupBy("date").count()
inventory_with_selected_users_clicks_number_grouped_by_date.show()

+----------+-----+
|      date|count|
+----------+-----+
|2023-09-03| 1082|
|2023-09-01| 1142|
|2023-09-05|  604|
|2023-09-07| 1134|
|2023-09-02| 1171|
|2023-09-04| 1101|
|2023-09-06| 1085|
+----------+-----+



What was the number of clicks per day for users who weren’t in this campaign?

In [12]:
inventory_without_selected_users = inventory.join(
    selected_users,
    inventory.user_id == selected_users.user_id,
    "left_anti"
)
inventory_without_selected_users_clicks_number_grouped_by_date = inventory_without_selected_users.filter(
    inventory_without_selected_users.event == "click").groupBy("date").count()
inventory_without_selected_users_clicks_number_grouped_by_date.show()


+----------+-----+
|      date|count|
+----------+-----+
|2023-09-03| 2499|
|2023-09-01| 2419|
|2023-09-05| 1206|
|2023-09-07| 2528|
|2023-09-02| 2434|
|2023-09-04| 2522|
|2023-09-06| 2425|
+----------+-----+

