In [None]:
import hashlib
import pyspark.sql.functions as f
from pyspark.sql import Window
import time
import seaborn as sns

**Initial Note:** Initialize a cluster with runtime 13.3 LTS and spark version 3.4.1

## Project Overview

Let's do a quick recap of the project: 
- You'll be working with the [Google Analytics Sample dataset](https://console.cloud.google.com/marketplace/product/obfuscated-ga360-data/obfuscated-ga360-data?inv=1&invt=AbmlmQ), which contains real data from the [Google Merchandise Store](https://shop.googlemerchandisestore.com/), a real ecommerce store that sells Google-branded merchandise. This data is similar to what you deal with in your day-to-day work as a data analyst.
- In Part 1 you downloaded the data, preprocessed it and answered some simple analytics questions.
- In Part 2 you answered more complex analytics questions.

### Part 3 - Sequential Analysis

In this final part of the project you will do sequential analysis. This type of analysis enables you to monitor the behavior of users in a session.

### Task Completion and Validation
Throughout the notebooks, you will be asked to complete a series of tasks and answer questions. You’ll encounter empty cells where you need to implement the solution, as well as commented-out cells that you should uncomment and fill in with your responses. Afterward, assertion cells will check whether you've completed the tasks correctly.

This way you can have immediate feedback on your work, and you can ask questions if you get stuck.

## Load the data

If you have completed the first part of this project, you should already have the data saved in the DBFS.

In [None]:
df_main = spark.read.parquet('/FileStore/final_project/ga_sessions_main.parquet')
df_hits = spark.read.parquet('/FileStore/final_project/ga_sessions_hits.parquet')
df_network = spark.read.parquet('/FileStore/final_project/ga_sessions_network.parquet')

### [OPTIONAL] In case you don't have the data yet, run the cells bellow

In [None]:
%sh wget https://raw.githubusercontent.com/inesmcm26/lp-big-data-mercedes/main/data/ga_sessions.zip

In [None]:
%sh unzip ga_sessions.zip

In [None]:
dbutils.fs.cp('file:/databricks/driver/ga_sessions_main.parquet', 'dbfs:/FileStore/final_project/ga_sessions_main.parquet')
dbutils.fs.cp('file:/databricks/driver/ga_sessions_network.parquet', 'dbfs:/FileStore/final_project/ga_sessions_network.parquet')
dbutils.fs.cp('file:/databricks/driver/ga_sessions_hits.parquet', 'dbfs:/FileStore/final_project/ga_sessions_hits.parquet')

df_main = spark.read.parquet('/FileStore/final_project/ga_sessions_main.parquet')
df_hits = spark.read.parquet('/FileStore/final_project/ga_sessions_hits.parquet')
df_network = spark.read.parquet('/FileStore/final_project/ga_sessions_network.parquet')

### Data Cleaning

Run the cells bellow to apply the data cleaning operations you implemented in the first part of this project.

In [None]:
df_main = (
    df_main
    .fillna('Direct', subset=['channelGrouping'])
)

df_network = (
    df_network
    .withColumn(
        'geoNetwork',
        f.col('geoNetwork').withField('continent', f.lower(f.col('geoNetwork').getField('continent')))
    )
)

## Datasets Overview

The column that identifies a session and is **common to all tables** is the `sessionId` column.


**Main dataset:**
Besides the session id, the main dataset contains the following columns:
- **visitorId**: The unique identifier for a visitor
- **visitNumber**: The visit number of this visitor. If this is the first visit to the website, then this is set to 1.
- **visitStartTime**: The timestamp (expressed as POSIX time) of the beginning of the session
- **totals**: A struct with statistics about the session, such as total number of hits, time on site, number of transactions and revenue, etc.
- **channelGrouping**: The channel via which the user came to the Store

**Hits dataset:**
Besides the session id, the hits dataset contains the following column:
- **hits**: An array of structs representing all the hits in this session. A hit is an interaction that results in data being sent to Google Analytics. It can either be a page visit or an interaction with some page element. Each struct is a hit defined by the following fields:
    - **hitNumber**: The number of this hit in the session
    - **type**: Type of the hit: 'PAGE' (Page visit) or 'EVENT' (Interaction with some element on the page)
    - **hour**: Hour of the hit
    - **minute**: Minute of the hit
    - **time**: Time spent on the hit
    - **page**: Structured information about the page
    - **contentGroup**: Information about the content categorization of the page on the website
    - **product**: Array of structs with product information of all products displayed on the page
    - **eventInfo**: If hit is of type 'EVENT', this field contains information about the event
    - **promotion**: Array of structs with promotion information of all promotions displayed on the page.
    - **promotionActionInfo**: Present when there is a promotion on the hit. It explains whether the promotion was clicked (which corresponds to a hit of type 'EVENT' and this event is a 'Promotion Click'), or the promotion is just viewed on the page but was not clicked. 
    - **transaction**: Information about the transaction when the hit is an event 'Confirm Checkout'. Null otherwise.

**Network dataset:** Besides the session id, the network dataset contains the following columns:

- **trafficSource**: A struct with information about the source of the session, as well as adds and campaign information
- **device**: A struct with information about the device used in the session
- **geoNetwork**: A struct with information about the geographic location of the user. Most of this information is obscured and only city, country and country are available.
- **customDimensions**: Extra traffic information. You can ignore this column.



## Sequential analysis

### Sessions path analysis - Simple analysis

To analyze the sessions path and understand where users are most lost in the process, we'll calculate the number and percentage of sessions that stop at each stage of the sequence: Home -> Item -> Shopping Cart -> Payment Method -> Checkout Confirmation.

Each of this type of page is identified by a `pagePathLevel1`. This is a field of the `page` struct in the `hits` column. We should use the page path instead of the page title to identify the page on a hit mostly beacuse each item page has a different title, but the `pagePathLevel1` is the same for all items.

| Page | pagePathLevel1 |
| - | - |
| Home | /home |
| Item | /google+redesign/ |
| Shopping Cart | /basket.html|
| Payment Method | /payment.html |
| Checkout Confirmation | /ordercompleted.html |

We want to calculate the number and percentage of sessions that reached each stage to understand where users are most lost in the process.

#### The way you currently do it

Your current approach to this analysis relies on SQL syntax and functionalities. This method requires creating separate tables for each page type and performing multiple left joins to track the sequence of hits.

To begin, we'll transform the hits table into a more familiar format: one row per hit. We'll focus on extracting only the essential information from each hit:
- Session ID
- Hit number
- pagePathLevel1


In [None]:
df_exploded_hits = (
    df_hits
    .select(
        'sessionId',
        f.inline('hits')
    )
    .select(
        'sessionId',
        'hitNumber',
        f.col('page').getField('pagePathLevel1').alias('pagePath')
    )
)

df_exploded_hits.display()

Now let's create separate table for the hits corresponding to each page and then perform multiple left joins to track the progression through the sequence.

This corresponds to the following SQL code:

In [None]:
# Register PySpark DataFrame as SQL temporary view to manipulate it with SQL syntax
df_exploded_hits.createOrReplaceTempView('hits_view')

In [None]:
%sql

WITH filtered_hits AS (
  SELECT sessionId, pagePath, hitNumber
  FROM hits_view
  WHERE pagePath IN ("/home", "/google+redesign/", "/basket.html", "/yourinfo.html", "/payment.html", "/revieworder.html", "/ordercompleted.html")
),
home_hits AS (
  SELECT sessionId, hitNumber
  FROM filtered_hits
  WHERE pagePath == "/home"
),
item_hits AS (
  SELECT sessionId, hitNumber
  FROM filtered_hits
  WHERE pagePath == "/google+redesign/"
),
basket_hits AS (
  SELECT sessionId, hitNumber
  FROM filtered_hits
  WHERE pagePath == "/basket.html"
),
payment_hits AS (
  SELECT sessionId, hitNumber
  FROM filtered_hits
  WHERE pagePath == "/payment.html"
),
order_completed_hits AS (
  SELECT sessionId, hitNumber
  FROM filtered_hits
  WHERE pagePath == "/ordercompleted.html"
),
uni AS (
  SELECT
    home_hits.sessionId AS home_sessionId, 
    item_hits.sessionId AS item_sessionId,
    basket_hits.sessionId AS basket_sessionId,
    payment_hits.sessionId AS payment_sessionId,
    order_completed_hits.sessionId AS order_completed_sessionId
  FROM home_hits
  LEFT JOIN item_hits ON home_hits.sessionId == item_hits.sessionId AND home_hits.hitNumber < item_hits.hitNumber
  LEFT JOIN basket_hits ON home_hits.sessionId == basket_hits.sessionId AND item_hits.hitNumber < basket_hits.hitNumber
  LEFT JOIN payment_hits ON home_hits.sessionId == payment_hits.sessionId AND basket_hits.hitNumber < payment_hits.hitNumber
  LEFT JOIN order_completed_hits ON home_hits.sessionId == order_completed_hits.sessionId AND payment_hits.hitNumber < order_completed_hits.hitNumber
)

SELECT
  COUNT(DISTINCT home_sessionId) AS total_home,
  COUNT(DISTINCT item_sessionId) AS total_item,
  (COUNT(DISTINCT item_sessionId) / COUNT(DISTINCT home_sessionId)) * 100 AS 1st_2nd,
  COUNT(DISTINCT basket_sessionId) AS basket_item,
  (COUNT(DISTINCT basket_sessionId) / COUNT(DISTINCT item_sessionId)) * 100 AS 2nd_3rd,
  COUNT(DISTINCT payment_sessionId) AS payment_item,
  (COUNT(DISTINCT payment_sessionId) / COUNT(DISTINCT basket_sessionId)) * 100 AS 3th_4th,
  COUNT(DISTINCT order_completed_sessionId) AS order_completed_item,
  (COUNT(DISTINCT order_completed_sessionId) / COUNT(DISTINCT payment_sessionId)) * 100 AS 4th_5th
FROM uni

On the next cell you can find the equivalent code in PySpark. Let's run the code and count much time it takes to finish:

In [None]:
start = time.time()

# Create the exploded hits dataframe
df_exploded_hits = (
    df_hits
    .select(
        'sessionId',
        f.inline('hits')
    )
    .select(
        'sessionId',
        'hitNumber',
        f.col('page').getField('pagePathLevel1').alias('pagePath')
    )
)

# Filter hits based on specific page paths
filtered_hits = (
    df_exploded_hits
    .filter(
        f.col('pagePath').isin(
            '/home',
            '/google+redesign/',
            '/basket.html',
            '/payment.html',
            '/ordercompleted.html'
        )
    )
)

# Create individual DataFrames for each page path
home_hits = (
    filtered_hits
    .filter(f.col('pagePath') == '/home')
    .select(
        f.col('sessionId').alias('home_sessionId'),
        f.col('hitNumber').alias('home_hitNumber')
    )
)
item_hits = (
    filtered_hits
    .filter(f.col('pagePath') == '/google+redesign/')
    .select(
        f.col('sessionId').alias('item_sessionId'),
        f.col('hitNumber').alias('item_hitNumber')
    )
)
basket_hits = (
    filtered_hits
    .filter(f.col('pagePath') == '/basket.html')
    .select(
        f.col('sessionId').alias('basket_sessionId'),
        f.col('hitNumber').alias('basket_hitNumber')
    )
)
payment_hits = (
    filtered_hits
    .filter(f.col('pagePath') == '/payment.html')
    .select(
        f.col('sessionId').alias('payment_sessionId'),
        f.col('hitNumber').alias('payment_hitNumber')
    )
)
order_completed_hits = (
    filtered_hits
    .filter(f.col('pagePath') == '/ordercompleted.html')
    .select(
        f.col('sessionId').alias('order_completed_sessionId'),
        f.col('hitNumber').alias('order_completed_hitNumber')
    )
)

# Perform left joins in sequence
uni = (
    home_hits
    .join(
        item_hits,
        on=[(home_hits.home_sessionId == item_hits.item_sessionId)
            & (home_hits.home_hitNumber < item_hits.item_hitNumber)
        ],
        how='left'
    )
    .join(
        basket_hits,
        on=[(home_hits.home_sessionId == basket_hits.basket_sessionId)
            & (item_hits.item_hitNumber < basket_hits.basket_hitNumber)
        ],
        how='left'
    )
    .join(
        payment_hits,
        on=[(home_hits.home_sessionId == payment_hits.payment_sessionId)
            & (basket_hits.basket_hitNumber < payment_hits.payment_hitNumber)
        ],
        how='left'
    )
    .join(
        order_completed_hits,
        on=[(home_hits.home_sessionId == order_completed_hits.order_completed_sessionId)
            & (payment_hits.payment_hitNumber < order_completed_hits.order_completed_hitNumber)
        ],
        how='left'
    )
)

# Calculate the conversion rates
result = (
    uni.select(
        f.countDistinct('home_sessionId').alias('total_home'),
        f.countDistinct('item_sessionId').alias('total_item'),
        f.countDistinct('basket_sessionId').alias('total_basket'),
        f.countDistinct('payment_sessionId').alias('total_payment'),
        f.countDistinct('order_completed_sessionId').alias('total_order_completed')
    )
    .withColumn('1st_2nd', (f.col('total_item') / f.col('total_home')) * 100)
    .withColumn('2nd_3rd', (f.col('total_basket') / f.col('total_item')) * 100)
    .withColumn('3th_4th', (f.col('total_payment') / f.col('total_basket')) * 100)
    .withColumn('4th_5th', (f.col('total_order_completed') / f.col('total_payment')) * 100)
)

result.display()

end = time.time()

joins_time = end-start

#### The UDFs way

Now let's see a different approach using PySpark that eliminates the need for multiple left joins. We'll be using User Defined Functions (UDFs).

Let's start by creating a function that receives an array of hits and returns 1 if the hit was on a `item` page after there was a hit on the `home` page, and 0 otherwise. Then we'll register this function as a UDF. After that, we'll create a new boolean column on the hits dataframe that is the result of applying this UDF to the `hits` column.

It is easier to use UDFs on top of the original hits dataframe rather than the exploded one. Since we have one row for each session, and each row has an array of hits, we can iterate through the array using python, and it becomes easy to deal with this complex column.

In [None]:
from pyspark.sql.types import IntegerType

@f.udf(IntegerType())
def item_after_home(hits):
    # Hits is a list of dictionaries
    visited_home = False
    
    for hit in hits:
        page_path = hit['page']['pagePathLevel1']
        if page_path == '/home':
            visited_home = True
        
        if visited_home and page_path == '/google+redesign/':
            return 1

    return 0


df_hits_item_after_home = (
    df_hits
    .withColumn('item_after_home', item_after_home(f.col('hits')))
)

df_hits_item_after_home.display()

Now we're going to elaborate on that function. Instead of returning just a 1 or a 0, we'll return an array of 1's and 0's, one for each stage.

To make it more clear:
- The first element of the array should be True if there is a hit on the `home` page.
- The second element should be True if there is a hit on an `item` page after there was a hit on the `home` page.
- The third element should be True if there is a hit on the `basket` page after there was a hit on an `item` page, which, in turn, was after a hit on the `home` page.
- etc.

After that, we can register this function as a UDF and apply it to the `hits` column on the hits dataframe.

In [None]:
from pyspark.sql.types import IntegerType, ArrayType

@f.udf(ArrayType(IntegerType()))
def stages(hits):
    result = []

    home = 0
    item_after_home = 0
    basket_after_item = 0
    payment_after_basket = 0
    order_completed_after_payment = 0
    
    for hit in hits:
        page_path = hit['page']['pagePathLevel1']

        if page_path == '/home':
            home = 1
        
        if home and (page_path == '/google+redesign/'):
            item_after_home = 1
        
        if item_after_home and (page_path == '/basket.html'):
            basket_after_item = 1
        
        if basket_after_item and (page_path == '/payment.html'):
            payment_after_basket = 1
        
        if payment_after_basket and page_path == '/ordercompleted.html':
            order_completed_after_payment = 1 

    # Store results in a list
    result = [
        home,
        item_after_home,
        basket_after_item,
        payment_after_basket,
        order_completed_after_payment
    ]
    
    return result


df_stages = (
    df_hits
    .withColumn('stages', stages(f.col('hits')))
)

df_stages.display()

Now we can create a boolean column for each stage. This means create one column for each array element.

In [None]:
df_stages_cols = (
    df_stages
    .select(
        'sessionId',
        f.element_at('stages', 1).alias('home_seen'),
        f.element_at('stages', 2).alias('item_after_home'),
        f.element_at('stages', 3).alias('basket_after_item'),
        f.element_at('stages', 4).alias('payment_after_basket'),
        f.element_at('stages', 5).alias('order_completed_after_payment'),
    )
)

Now, we have a dataframe where each row represents a single session, and we can clearly see which stages were reached during that session.

To calculate the number of sessions that reached each stage we just need to count the number of sessions where each stage column is 1.

After this, we can calculate the percentage of sessions that reached each stage from the previous one. This is done by dividing the number of sessions that reach a stage by the number of sessions that reached the previous stage.

In [None]:
result = (
    df_stages_cols
    .agg(
        f.sum("home_seen").alias("total_home"),
        f.sum("item_after_home").alias("total_item_after_home"),
        f.sum("basket_after_item").alias("total_basket_after_item"),
        f.sum("payment_after_basket").alias("total_payment_after_basket"),
        f.sum("order_completed_after_payment").alias("total_order_completed_after_payment")
    )
    .withColumn(
        "item_after_home_ratio",  f.col("total_item_after_home") / f.col("total_home")
    ).withColumn(
        "basket_after_item_ratio",  f.col("total_basket_after_item") / f.col("total_item_after_home")
    ).withColumn(
        "payment_after_basket_ratio", f.col("total_payment_after_basket") / f.col("total_basket_after_item")
    ).withColumn(
        "order_completed_after_payment_ratio", 
        f.col("total_order_completed_after_payment") / f.col("total_payment_after_basket")
    )
)

result.display()

Finally, let's run everything at once and count how long it takes:

In [None]:
start = time.time()

result = (
    df_hits
    .withColumn('stages', stages(f.col('hits')))
    .select(
        'sessionId',
        f.element_at('stages', 1).alias('home_seen'),
        f.element_at('stages', 2).alias('item_after_home'),
        f.element_at('stages', 3).alias('basket_after_item'),
        f.element_at('stages', 4).alias('payment_after_basket'),
        f.element_at('stages', 5).alias('order_completed_after_payment'),
    )
    .agg(
        f.sum("home_seen").alias("total_home"),
        f.sum("item_after_home").alias("total_item_after_home"),
        f.sum("basket_after_item").alias("total_basket_after_item"),
        f.sum("payment_after_basket").alias("total_payment_after_basket"),
        f.sum("order_completed_after_payment").alias("total_order_completed_after_payment")
    )
    .withColumn(
        "item_after_home_ratio",  f.col("total_item_after_home") / f.col("total_home")
    ).withColumn(
        "basket_after_item_ratio",  f.col("total_basket_after_item") / f.col("total_item_after_home")
    ).withColumn(
        "payment_after_basket_ratio", f.col("total_payment_after_basket") / f.col("total_basket_after_item")
    ).withColumn(
        "order_completed_after_payment_ratio", 
        f.col("total_order_completed_after_payment") / f.col("total_payment_after_basket")
    )
)

result.display()

end = time.time()

udf_time = end-start

Using UDFs is an alternative way of finding out if a session reached a certain stage.

You may find it more intuitive to deal with an array of hits and iterate through it, rather than have one row per hit and using window functions or even doing multiple left joins to track a sequence.

#### The PySpark built-in functions way

Finally, let's see how we could achieve the same resulting using only PySpark built-in functions.

This approach requires more steps and a more complex reasoning than using UDF's. You'll be guided through each step to achieve the final result. The aim is not to challenge your thinking process, but rather to show that PySpark offers tools and syntax that allow you to perform the same analysis without the need for joins or UDFs.

We'll be working with the exploded hits dataframe: the dataframe that has one row per session hit.

First, let's start by adding a new boolean column to the `df_exploded_hits` dataframe for each page path, which indicates if the hit is on that page.

In [None]:
df_pages_flag = (
    df_exploded_hits
    .withColumn("is_home", (f.col("pagePath") == "/home"))
    .withColumn("is_item", (f.col("pagePath") == "/google+redesign/"))
    .withColumn("is_basket", (f.col("pagePath") == "/basket.html"))
    .withColumn("is_payment", (f.col("pagePath") == "/payment.html"))
    .withColumn("is_order_completed", (f.col("pagePath") == "/ordercompleted.html"))
)

Next, we'll create a dataframe called `df_stages` with the following columns:
- `home_seen`: boolean column that is true if the hit happened after a hit on the home page.
- `items_after_home`: boolean column that is true if the hit is on an item page and the home page has been seen.
- `basket_after_item`: boolean column that is true if the hit is on the basket page and there was a **previous hit on an item page, which, in turn, had a previous hit on the home page**.
- `payment_after_basket`: boolean column that is true if the hit is on the payment page and there was a previous hit on the home, item and basket pages in sequence
- `order_completed_after_payment`: boolean column that indicates if the hit is on the order completed page and there was a previous hit on the home, item, basket and payment pages in sequence

**NOTES:**
- Columns should be created one after the other. The column created in the previous step should be used to create the new column.
- To create some of these columns, we need to use a window that goes from the current hit to the first hit of the session.
- The function to be applied over the window needs to be a window function that returns True if some condition verifies. The condition is that there was a previous hit on the page we're interested in, and that hit was preceded by another hit on another page, and so on.

**HINT**:
- The function `f.max`, when applied to a boolean column, returns True if at least one value on that column is True, and False otherwise.

In [None]:
window = Window.partitionBy("sessionId").orderBy("hitNumber").rowsBetween(Window.unboundedPreceding, 0)

df_stages = (
    df_pages_flag
    .withColumn("home_seen", f.max("is_home").over(window))
    .withColumn("item_after_home", f.col('is_item') & f.col('home_seen'))
    .withColumn("basket_after_item", f.col('is_basket') & f.max('item_after_home').over(window))
    .withColumn("payment_after_basket",
                f.col("is_payment") & f.max('basket_after_item').over(window)
    )
    .withColumn("order_completed_after_payment",
                f.col("is_order_completed") & f.max('payment_after_basket').over(window)
    )
)

At this point, we have information on whether a hit on a page was preceded by hits on all previous pages in the sequence. This data is enough to calculate the number of sessions that reached each stage of the sequence.

The boolean columns we created indicate whether a certain stage was reached in a session. For instance, if a session has a hit with a True value in the `order_completed_after_payment` column, it means that the session reached the final stage of the sequence. This is because for this value to be True, there must have been a previous hit on a `payment` page with a True value in the `payment_after_basket` column, and so on, tracing back through the entire funnel.

Now let's take a moment to consider something. It does not matter if there is more than one hit on an `item` page after a hit on the `home` page in some session, for example. We only need at least one hit on each page that respects the sequence to consider that the session reached a certain stage.

Therefore, to calculate the number of sessions that reached each stage, we need to count the number of **distinct** sessions where each of these columns is true.

We can store these values either in columns of a final dataframe or in separate variables for printing. 

After this, we calculate the percentage of sessions that reached each stage from the previous one.

In [None]:
result = (
    df_stages
    .agg(
        f.countDistinct(f.when(f.col("home_seen"), f.col("sessionId"))).alias("total_home"),
        f.countDistinct(f.when(f.col("item_after_home"), f.col("sessionId"))).alias("total_item_after_home"),
        f.countDistinct(f.when(f.col("basket_after_item"), f.col("sessionId"))).alias("total_basket_after_item"),
        f.countDistinct(f.when(f.col("payment_after_basket"), f.col("sessionId"))).alias("total_payment_after_basket"),
        f.countDistinct(f.when(f.col("order_completed_after_payment"), f.col("sessionId"))).alias("total_order_completed_after_payment")
    )
    .withColumn(
        "item_after_home_ratio",  f.col("total_item_after_home") / f.col("total_home")
    ).withColumn(
        "basket_after_item_ratio",  f.col("total_basket_after_item") / f.col("total_item_after_home")
    ).withColumn(
        "payment_after_basket_ratio", f.col("total_payment_after_basket") / f.col("total_basket_after_item")
    ).withColumn(
        "order_completed_after_payment_ratio", 
        f.col("total_order_completed_after_payment") / f.col("total_payment_after_basket")
    )
)

result.display()

Finally, let's do it all at once and see how long it takes to run

In [None]:
window = Window.partitionBy("sessionId").orderBy("hitNumber").rowsBetween(Window.unboundedPreceding, 0)

start = time.time()

result = (
    df_hits
    .select(
        'sessionId',
        f.inline('hits')
    )
    .select(
        'sessionId',
        'hitNumber',
        f.col('page').getField('pagePathLevel1').alias('pagePath')
    )
    .withColumn("is_home", (f.col("pagePath") == "/home"))
    .withColumn("is_item", (f.col("pagePath") == "/google+redesign/"))
    .withColumn("is_basket", (f.col("pagePath") == "/basket.html"))
    .withColumn("is_payment", (f.col("pagePath") == "/payment.html"))
    .withColumn("is_order_completed", (f.col("pagePath") == "/ordercompleted.html"))
    .withColumn("home_seen", f.max("is_home").over(window))
    .withColumn("item_after_home", f.col('is_item') & f.col('home_seen'))
    .withColumn("basket_after_item", f.col('is_basket') & f.max('item_after_home').over(window))
    .withColumn("payment_after_basket",
                f.col("is_payment") & f.max('basket_after_item').over(window)
    )
    .withColumn("order_completed_after_payment",
                f.col("is_order_completed") & f.max('payment_after_basket').over(window)
    )
    .agg(
        f.countDistinct(f.when(f.col("home_seen"), f.col("sessionId"))).alias("total_home"),
        f.countDistinct(f.when(f.col("item_after_home"), f.col("sessionId"))).alias("total_item_after_home"),
        f.countDistinct(f.when(f.col("basket_after_item"), f.col("sessionId"))).alias("total_basket_after_item"),
        f.countDistinct(f.when(f.col("payment_after_basket"), f.col("sessionId"))).alias("total_payment_after_basket"),
        f.countDistinct(f.when(f.col("order_completed_after_payment"), f.col("sessionId"))).alias("total_order_completed_after_payment")
    )
    .withColumn(
        "item_after_home_ratio",  f.col("total_item_after_home") / f.col("total_home")
    ).withColumn(
        "basket_after_item_ratio",  f.col("total_basket_after_item") / f.col("total_item_after_home")
    ).withColumn(
        "payment_after_basket_ratio", f.col("total_payment_after_basket") / f.col("total_basket_after_item")
    ).withColumn(
        "order_completed_after_payment_ratio", 
        f.col("total_order_completed_after_payment") / f.col("total_payment_after_basket")
    )
)

result.display()

end = time.time()

built_in_time = end-start

We can now analyse the running times of each approach:

In [None]:
sns.barplot(x=['Joins', 'UDF', 'PySpark Built-In Functions'], y=[joins_time, udf_time, built_in_time])

### Promotion effectiveness - More complex path analysis

In Part 2, question 7, we identified the most clicked promotion in sessions where at least one product was added to the cart.

However, we are not sure if clicking the promotion is what lead to the products being added to the cart. 

So now let's try to understand which promotions actually led to additions to the cart. For that we need to do sequential analysis.

We want to find out which promotion led to the most product additions to the cart.

Let's start by thinking how we can infer that a product being added to the cart was due to a promotion click:
- When a promotion is clicked, the user enters a new page. This corresponds to the following hits: 
    - 'EVENT' hit, with `eventInfo.eventAction` equal to 'Promotion Click'
    - 'PAGE' hit, immediately after the 'EVENT' hit, with a certain `pageTitle`
- If the user leaves the page before adding a product to the cart, we conclude that the promotion click did not lead to the product addition.
- On the other hand, if the user navigates through the page (for instance, clicks on the 'Quick View' to see the details of a product, etc.), never leaves, and adds a product to the cart, we conclude that the promotion click led to the product addition.


#### Using UDFs - The easy way

Define a function that receives an array of hits from a session and returns a map where the keys are promotion IDs, and the values represent the number of times each promotion resulted in a product being added to the cart.

The function should follow these steps:

1. Iterate through the hits:
    - When encountering a hit of type 'EVENT' where `eventInfo.eventAction` equals 'Promotion Click', the subsequent hit will be a 'PAGE' hit. Any hits that follow on the same page are considered a result of this promotion click.
    - If one of these subsequent hits is of type 'EVENT' and has `eventInfo.eventAction` equal to 'Add to Cart', it indicates that the promotion led to a product being added to the cart.
    - Take into consideration that if the user leaves the page meanwhile, the promotion click did not lead to a product addition.
2. Update the map:
    - Use the promotion ID as the key. If the promotion ID is not already in the map, initialize its value to 1. If the promotion ID already exists, increment its value by 1.

Finally, register this function as a UDF (User-Defined Function) and apply it to the `hits` column of the hits dataframe.

In [None]:
# WRITE YOUR CODE HERE

For each session, you already have information about which promotions led to product additions and how many times each promotion contributed to a product being added to the cart.

To calculate the total number of product additions driven by each promotion across all sessions:
1. Explode the column created with the UDF, so each promotion ID and its count are represented as individual rows.
2. Group the exploded data by the promotion ID.
3. Aggregate the counts by summing the values for each promotion ID.

This will give you the total number of product additions attributed to each promotion.

In [None]:
# WRITE YOUR CODE HERE

#### Using PySpark Built-In Functions - The hard way

We can do this analysis using PySpark built-in functions. However, it involves more steps than using UDFs and a more complex thought process. 

So, the first thing we need to identify is 'PAGE' hits that happened after a promotion click.

Let's transform the dataframe to a more friendly format: one row per hit.

In [None]:
df_exploded_hits = (
    df_hits
    .select(
        'sessionId',
        f.inline('hits')
    )
)

Now, create a new column `origin_promo` that has the promotion id of the promotion click hit that originated the 'PAGE' hit. This column will be Null for all hits that did not originate from a promotion click. You can check if a hit originated from a promotion click by checking if the previous hit was a 'EVENT' hit with `eventInfo.eventAction` equal to 'Promotion Click'.

Then select only the following columns for simplification:
- sessionId
- hitNumber
- type
- page.pageTitle
- eventInfo.eventAction
- promotion
- origin_promo

In [None]:
window_origin_promo = Window.partitionBy('sessionId').orderBy('hitNumber')

df_origin_promo = (
    df_exploded_hits
    .withColumn('origin_promo',
                f.when(
                    f.lag(f.col('eventInfo')).over(window_origin_promo).isNotNull()
                    & (f.lag(f.col('eventInfo').getField('eventAction')).over(window_origin_promo) == 'Promotion Click'),
                    f.lag('promotion').over(window_origin_promo)
                ).otherwise(None)
    )
    .select(
        'sessionId',
        'type',
        'hitNumber',
        f.col('page').getField('pageTitle').alias('pageTitle'),
        f.col('eventInfo').getField('eventAction').alias('eventAction'),
        'promotion',
        'origin_promo'
    )
)

So now we know if a page was reached by a promotion click. However, it is possible that the user entered the same page more than once during the session, once via a promotion click and the other ones via another page. So we need to be able to classify hits on a page into 'page views'. Imagine this scenario:

1. The user enters page A via a promotion click. This corresponds to hit number 1.
2. The user navigates in page A, which results in multiple hits on that page (hit 2 to 10).
3. The user leaves the page.
4. The user enters page A again later, this time not via a promotion click. This corresponds to hit number 20.
5. The user navigates in page A again, which results in another hits on that page (hit 21 to 23).
6. The user adds a product to the cart. This is hit number 24. We can not conclude that the promotion click led to the product addition, since the user left the page meanwhile, before adding the product to the cart.

Hits 1-10 correspond to one page view, and hits 20-24 correspond to another page view.

Let's create a new column `page_view` that indicates the page view number of the hit. To simplify, this column can be the hit number of the hit that originated the page view. For instance, if the hit that originated the page view is hit number 1, then all hits from 1 to 10 are part of the page view '1'. On the other hand, if the hit that originated the page view is hit number 20, then all hits from 20 to 24 are part of the page view '20'.

In our scenario, page view '1' was originated by a promotion click, and page view '20' was not.

To create the `page_view` column, we can group the hits by session and page, order them by hit number, and see if the hit numbers are sequential. If there is a break in the sequence, we can assume that a new page view started.

In [None]:
window_page = Window.partitionBy('sessionId', 'pageTitle').orderBy('hitNumber')

df_page_view = (
    df_origin_promo
    # See if hits on a page are sequential
    .withColumn('lastHitNumber', f.lag('hitNumber').over(window_page))
    .withColumn('sequential', f.when(f.col('hitNumber') == (f.col('lastHitNumber') + 1), True).otherwise(False))
    # Set page view on the first hit of that page view
    .withColumn(
        'pageView',
        f.when(
            f.col('sequential') == False,
            f.col('hitNumber')
        ).otherwise(None)
    )
    # Propagate the page view to all sequential hits on the page after that one
    .withColumn(
        'pageView',
        f.last('pageView', ignorenulls=True).over(window_page)
    )
)

With this information, we can now identify all the hits on a page that were originated by a promotion click. These hits correspond to hits of a page view that was itself originated by a promotion click.

So we just need to propagate the `origin_promo` value through the hits of a page view.

In [None]:
window_page_view = Window.partitionBy('sessionId', 'pageTitle', 'pageView').orderBy('hitNumber')

df_final = (
    df_page_view
    .withColumn(
        'origin_promo',
        f.last('origin_promo', ignorenulls=True).over(window_page_view)
    )
)

We now have one row per hit, and we know if that hit was originated by a promotion click and of that hit corresponds to adding a product to the cart.

We just need to count the number of times a product was added to the cart because of a certain promotion.

In [None]:
(
    df_final
    .filter(
        (f.col('eventAction') == 'Add to Cart')
        & (f.col('origin_promo').isNotNull())
    )
    .groupBy(f.element_at(f.col('origin_promo'), 1).getField('promoId').alias('promoId'))
    .agg(f.count('visitId').alias('nr_purchases'))
    .orderBy(f.desc('nr_purchases'))
).display()