### Setting up the environment and loading the data:

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, max

# Initializing the Spark session 
spark = SparkSession.builder.appName("Data Analysis").getOrCreate()

# Or if you are running with error (reproducibility of the code) set up spark as following:
# spark = SparkSession.builder \
#     .appName("Inventory Analysis") \
#     .config("spark.driver.python", "/path/to/python") \
#     .config("spark.executor.python", "/path/to/python") \
#     .getOrCreate()

In [3]:
inventory_df = spark.read.parquet("./../data/inventory.parquet")
selected_users_df = spark.read.parquet("./../data/selected_users.parquet")

### Tasks:

#### 1. Find what is the percentage of logged in users every day.

In [4]:
from pyspark.sql.functions import count, sum as sum_

# Calculation of the percentage of logged-in users per day (DAU)
logged_in_percentage = inventory_df.groupBy("date").agg(
    (sum_(col("is_logged_in").cast("int")) / count("*") * 100).alias("logged_in_percentage")
)

logged_in_percentage.show()

+----------+--------------------+
|      date|logged_in_percentage|
+----------+--------------------+
|2023-09-03|  50.062735257214555|
|2023-09-01|   49.90146396396396|
|2023-09-05|                 0.0|
|2023-09-07|   48.79081841781664|
|2023-09-02|  48.885483421565894|
|2023-09-04|   48.90631448617416|
|2023-09-06|  50.148033272240234|
+----------+--------------------+


#### 2. Which site has the most logged in users?

In [5]:
most_logged_in_site = inventory_df.filter(col("is_logged_in")).groupBy("site").count().orderBy(col("count").desc()).limit(3)

most_logged_in_site.show()

+--------------------+-----+
|                site|count|
+--------------------+-----+
|play.google.com/s...| 3648|
|        dailynews.no| 3629|
|apps.apple.com/no...| 3569|
+--------------------+-----+


#### 3. Calculate the share of logged in users who are using Mobile App

In [6]:
mobile_app_share = inventory_df.filter(col("is_logged_in") & col("is_mobile_app")).count() / inventory_df.filter(col("is_logged_in")).count()

print(f"Share of logged-in users on mobile app: {mobile_app_share}")

Share of logged-in users on mobile app: 0.10361965491372843


#### 4. Create a new column called identity_type which will take the following value:

 
* If device_type is “Mobile Phone” and is_mobile_app is set to True then “Mobile Phone App”
* If device_type is “Mobile Phone” and is_mobile_app is set to False then “Mobile Phone Web”
* If device_type is “Desktop” then “Desktop”
* Otherwise “Unknown”

In [7]:
inventory_df = inventory_df.withColumn(
    "identity_type",
    when((col("device_type") == "Mobile Phone") & (col("is_mobile_app")), "Mobile Phone App")
    .when((col("device_type") == "Mobile Phone") & (~col("is_mobile_app")), "Mobile Phone Web")
    .when(col("device_type") == "Desktop", "Desktop")
    .otherwise("Unknown")
)

inventory_df.show()

+--------+------------+------------+-----+-------------+--------------------+--------+----------+----------------+
| user_id|is_logged_in| device_type|event|is_mobile_app|                site|order_id|      date|   identity_type|
+--------+------------+------------+-----+-------------+--------------------+--------+----------+----------------+
|51217510|       false|Mobile Phone|click|        false|        dailynews.no|     976|2023-09-07|Mobile Phone Web|
|51217510|       false|Mobile Phone| view|        false|play.google.com/s...|     976|2023-09-04|Mobile Phone Web|
|51217510|       false|     Desktop| view|        false|apps.apple.com/no...|     877|2023-09-02|         Desktop|
|51217510|       false|     Desktop| view|        false|play.google.com/s...|    1075|2023-09-02|         Desktop|
|51217510|       false|Mobile Phone| view|        false|apps.apple.com/no...|     976|2023-09-05|Mobile Phone Web|
|51217510|       false|Mobile Phone|click|         true|apps.apple.com/no...|   

#### 5. Create a new column in the dataset called max_order_id which will show the maximum order_id for each identity_type. The DataFrame must persist the original number of records:

In [8]:
from pyspark.sql.functions import col, when, max

max_order_id = inventory_df.groupBy("identity_type").agg(max("order_id").alias("max_order_id"))
inventory_df = inventory_df.join(max_order_id, "identity_type")

In [9]:
max_order_id.show()

+----------------+------------+
|   identity_type|max_order_id|
+----------------+------------+
|Mobile Phone Web|         976|
|Mobile Phone App|         995|
|         Desktop|       10000|
+----------------+------------+


In [10]:
inventory_df.show()

+----------------+--------+------------+------------+-----+-------------+--------------------+--------+----------+------------+
|   identity_type| user_id|is_logged_in| device_type|event|is_mobile_app|                site|order_id|      date|max_order_id|
+----------------+--------+------------+------------+-----+-------------+--------------------+--------+----------+------------+
|Mobile Phone Web|51217510|       false|Mobile Phone|click|        false|        dailynews.no|     976|2023-09-07|         976|
|Mobile Phone Web|51217510|       false|Mobile Phone| view|        false|play.google.com/s...|     976|2023-09-04|         976|
|         Desktop|51217510|       false|     Desktop| view|        false|apps.apple.com/no...|     877|2023-09-02|       10000|
|         Desktop|51217510|       false|     Desktop| view|        false|play.google.com/s...|    1075|2023-09-02|       10000|
|Mobile Phone Web|51217510|       false|Mobile Phone| view|        false|apps.apple.com/no...|     976|2

#### 6. You have been notified by the Marketing team that they would like to know what was the number of clicks (event column equals to “click”) each day for a given campaign. They sent you the list of users taking part in this campaign (selected_users.parquet). Your goal is to filter the dataset to include only selected users and calculate the total number of clicks per day:

In [11]:
from pyspark.sql.functions import count

# Join with the selected_users DataFrame and then filter and count
campaign_clicks_per_day = inventory_df.join(selected_users_df, "user_id") \
    .filter(col("event") == "click") \
    .groupBy("date").count()

campaign_clicks_per_day.show()

+----------+-----+
|      date|count|
+----------+-----+
|2023-09-03| 1082|
|2023-09-01| 1142|
|2023-09-05|  604|
|2023-09-07| 1134|
|2023-09-02| 1171|
|2023-09-04| 1101|
|2023-09-06| 1085|
+----------+-----+


#### 7. What was the number of clicks per day for users who weren’t in this campaign:

In [12]:
from pyspark.sql.functions import count

non_campaign_users_df = inventory_df.join(selected_users_df, "user_id", "left_anti")
non_campaign_clicks_per_day = non_campaign_users_df.filter(
    col("event") == "click"
).groupBy("date").count()

non_campaign_clicks_per_day.show()

+----------+-----+
|      date|count|
+----------+-----+
|2023-09-03| 2499|
|2023-09-01| 2419|
|2023-09-05| 1206|
|2023-09-07| 2528|
|2023-09-02| 2434|
|2023-09-04| 2522|
|2023-09-06| 2425|
+----------+-----+
