In [None]:
import pyspark.sql.functions as f
from pyspark.sql import Window

## Project Introduction

In this final project you will have the opportunity to apply everything you've learned throughout the course on a similar setting to what you face on your daily work as a Data Analyst at Mercedes.

The goal of this project is to understand how you can manipulate and analyze Google Analytics data about user interactions on a website.

For that, you'll work with the [Google Analytics Sample dataset](https://console.cloud.google.com/marketplace/product/obfuscated-ga360-data/obfuscated-ga360-data?inv=1&invt=AbmlmQ), which contains real data from the [Google Merchandise Store](https://shop.googlemerchandisestore.com/), a real ecommerce store that sells Google-branded merchandise.

The data is typical of what an ecommerce website would see and includes the following information:
- **Traffic source data**: information about where website visitors originate, including data about organic traffic, paid search traffic, and display traffic
- **Content data**: information about the behavior of users on the site, such as URLs of pages that visitors look at, how they interact with content, etc.
- **Transactional data**: information about the transactions on the Google Merchandise Store website.

## Download the data

The data is available on a zip file. This zip contains three parquet files:
- `ga_sessions_main.parquet`: the main information about each session
- `ga_sessions_hits.parquet`: detailed information about hits in each session
- `ga_sessions_network.parquet`: information about traffic sources, device and geographic information

**NOTE:** To make things a bit easier, only data from the first 15 days of August 2016 was included in the dataset. Also, some noisy information about `hits` was removed from the original data.

Let's download the data and save it to the Databricks File System (DBFS).

In [None]:
%sh wget https://raw.githubusercontent.com/inesmcm26/lp-big-data-mercedes/main/data/ga_sessions.zip

In [None]:
%sh unzip ga_sessions.zip

In [None]:
dbutils.fs.cp('file:/databricks/driver/ga_sessions_main.parquet', 'dbfs:/FileStore/final_project/ga_sessions_main.parquet')
dbutils.fs.cp('file:/databricks/driver/ga_sessions_network.parquet', 'dbfs:/FileStore/final_project/ga_sessions_network.parquet')
dbutils.fs.cp('file:/databricks/driver/ga_sessions_hits.parquet', 'dbfs:/FileStore/final_project/ga_sessions_hits.parquet')

The column that identifies a session and is **common to all tables** is the `sessionId` column.

Run the following cell to load each dataset into spark dataframes.

In [None]:
df_main = spark.read.parquet('/FileStore/final_project/ga_sessions_main.parquet')
df_hits = spark.read.parquet('/FileStore/final_project/ga_sessions_hits.parquet')
df_network = spark.read.parquet('/FileStore/final_project/ga_sessions_network.parquet')

### Datasets Overview

#### Main dataset

In [None]:
df_main.printSchema()

Besides the session id, the main dataset contains the following columns:
- **visitorId**: The unique identifier for a visitor
- **visitNumber**: The visit number of this user. If this is the first visit to the website, then this is set to 1.
- **visitStartTime**: The timestamp (expressed as POSIX time) of the beginning of the session
- **totals**: A struct with statistics about the session, such as total number of hits, time on site, number of transactions and revenue, etc.
- **channelGrouping**: The channel via which the user came to the Store

#### Hits dataset

In [None]:
df_hits.printSchema()

Besides the session id, the hits dataset contains the following columns:

- **hits**: An array of structs representing all the hits in this session. A hit is an interaction that results in data being sent to Google Analytics. Each struct is a hit defined by the following fields:
    - **hitNumber**: The number of this hit in the session
    - **type**: Type of the hit (PAGE or EVENT)
    - **hour**: Hour of the hit
    - **minute**: Minute of the hit
    - **time**: Time spent on the hit
    - **page**: Information about the page
    - **contentGroup**: Information about the content categorization of the page on the website
    - **product**: Array of structs with product information of all products displayed on the page
    - **eventInfo**: If hit is of type 'EVENT', this field contains information about the event
    - **promotion**: Array of structs with promotion information of all promotions displayed on the page.
    - **promotionActionInfo**: Present when there is a promotion on the hit. It explains whether the promotion was clicked (which corresponds to a hit of type 'EVENT' and this event is a 'Promotion Click'), or the promotion is just viewed on the page but was not clicked. 
    - **transaction**: Information about the transaction when the hit is an event 'Confirm Checkout'. Null otherwise.



#### Network dataset


In [None]:
df_network.printSchema()

Besides the session id, the network dataset contains the following columns:

- **trafficSource**: A struct with information about the source of the session, as well as adds and campaign information
- **device**: A struct with information about the device used in the session
- **geoNetwork**: A struct with information about the geographic location of the user. Most of this information is obscured and only city, country and country are available.
- **customDimensions**: Extra traffic information. You can ignore this column.


## Dataset analysis and cleaning

Start by checking how many rows each dataset has.

In [None]:
print(df_main.count())
print(df_hits.count())
print(df_network.count())

Now, see if there are any missing values on the main dataset.

In [None]:
df_main.describe().display()

Assume that if the channel grouping is missing, the channel via which the user came to the Store is 'Direct' and fill the missing values.

In [None]:
df_main = (
    df_main
    .fillna('Direct', subset=['channelGrouping'])
)

df_main.describe().display()

## Answer business questions

### Easy questions


Users access the store through different channels, and each session has a corresponding revenue value.

1. Which channel generates the highest total revenue across all sessions?

Notes:
- Use the `channelGrouping` column in the main dataset for channel types.
- Calculate the revenue using the `totalTransactionRevenue` field within the `totals` column.

In [None]:
(
    df_main
    .groupBy('channelGrouping')
    .agg(
        f.sum(f.col('totals').getField('totalTransactionRevenue')).alias('revenue')
    )
    .orderBy(f.desc('revenue'))
).display()

In [None]:
res = "Referral"

2. Users access the store through different browsers. Which are the top 3 browsers ranked by the total time users spent on the site?

Notes:
- You can find the browser used by a user on a session in the `device` column of the network dataframe
- The total time spent on site on a session is registered on the `totals` column of the main dataframe

In [None]:
(
    df_main
    .join(
        df_network,
        on=['sessionId']
    )
    .select(
        f.col('device').getField('browser').alias('browser'),
        f.col('totals').getField('timeOnSite').alias('time')
    )
    .groupBy('browser')
    .agg(f.sum(f.col('time')).alias('total_time'))
    .orderBy(f.desc('total_time'))
).display()

In [None]:
res = ['Chrome', 'Safari', 'Firefox']

3. Analyse the website traffic (total number of sessions) per hour of the day and day of the week.

Visualize the result using a pivot table.

**NOTE:** The start time of each session is in UNIX time. You may have to first transform it to a date before being able to extract the hour and day of week.

What is the total number of sessions registered at 8pm on tuesdays?

In [None]:
(
    df_main
    .withColumn('date', f.from_unixtime('visitStartTime'))
    .withColumn('day_of_week', f.dayofweek('date'))
    .withColumn('hour', f.hour('date'))
    .groupBy('hour')
    .pivot('day_of_week')
    .agg(
        f.count('sessionId')
    )
    .orderBy('hour')
).display()

In [None]:
result = 330

4. Identify the `visitorId` of the user with highest average time gap between two consecutive sessions. Consider only visitors that have more than 6 registered sessions.

In [None]:
window = Window.partitionBy('visitorId').orderBy('visitStartTime')

(
    df_main
    .withColumn('date', f.from_unixtime('visitStartTime'))
    .withColumn('prev_date', f.lag('date').over(window))
    .withColumn('days_gap', f.date_diff('date', 'prev_date'))
    .groupBy('visitorId')
    .agg(
        f.count('sessionId').alias('nr_sessions'),
        f.avg('days_gap').alias('avg_days_gap')
    )
    .filter(f.col('nr_sessions') > 6)
    .orderBy(f.desc('avg_days_gap'))
    .limit(1)
).display()

In [None]:
result = '8436426603099391262'

### Medium questions

5. What are the top 5 products that are most added to the cart?

**NOTES:**
- A hit of type 'EVENT' can correspond to one of the following event actions (`eventAction` field of `eventInfo`):
    - Product Click
    - Add to Cart
    - Remove from Cart
    - Quickview Click
    - Onsite Click
    - Promotion Click
- A product is identified by its SKU value. You can find this value in field `productSKU` of a product. Remember that the `product` field is an array of product information of all products involved in a hit.

In [None]:
(
    df_hits
    .select(
        'sessionId',
        f.inline('hits')
    )
    .filter(
        (f.col('type') == 'EVENT')
        & (f.col('eventInfo').getField('eventAction') == "Add to Cart")
    )
    .groupBy(
        f.element_at(f.col('product'), 1).getField('productSKU').alias('product_sku'),
        f.element_at(f.col('product'), 1).getField('v2ProductName').alias('product_name')
    )
    .agg(f.count('sessionId').alias('nr_added_to_cart'))
    .orderBy(f.desc('nr_added_to_cart'))
).display()

In [None]:
result = ["GGOEGFKQ020399", "GGOEGAAX0037", "GGOEGAAX0104", "GGOEGAAX0342", "GGOEGAAX0074"]

6. What is the average time spent by users on the 'Shopping Cart' page in sessions where a purchase was made?

Answer with 2 decimal places.

**NOTES**
- To determine sessions where purchases were made, filter the main dataframe by checking the `transactions` field of the `totals` column. If the field is non-null and greater than 0, it indicates that a purchase occurred during the session.
- Hits that correspond to users being on the 'Shopping Cart' page are of type 'PAGE', and the `pageTitle` field in `page` is 'Shopping Cart'.
- The time spent on a hit is available on the `time` field of the `hits` column of the hits dataframe

In [None]:
(
    df_hits
    .select(
        'sessionId',
        f.inline('hits')
    )
    .join(
        df_main,
        on=['sessionId']
    )
    .filter(
        f.col('totals').getField('transactions').isNotNull()
        & (f.col('totals').getField('transactions') > 0)
        & (f.col('type') == 'PAGE')
        & (f.col('page').getField('pageTitle') == 'Shopping Cart')
    )
    .select(f.round(f.avg('time'), 2).alias('avg_time'))
).display()

In [None]:
result = 767495.67

### Hard questions

%md
6. Considering only sessions where there was a promotion click and at least one product was added to the cart, what is the id of the most clicked promotion?

**NOTES:**
- You can check if a product was added to the cart or a promotion was clicked by analysing the `eventInfo` column. A hit of type 'EVENT' can correspond to one of the following event actions (`eventAction` field of `eventInfo` column):
    - Product Click
    - Add to Cart
    - Remove from Cart
    - Quickview Click
    - Onsite Click
    - Promotion Click
- For hits where there was a promotion click, the column `promotion` contains an array with only element - details of the clicked promotion. You can find the promotion id on the field `promoId` of the element in that column.

In [None]:
(
    df_hits
    .withColumn(
        'events_list',
        f.transform(
            'hits',
            lambda hit: f.when(hit.getField('type') == 'EVENT', hit.getField('eventInfo').getField('eventAction')).otherwise('None')
        )
    )
    .filter(
        f.array_contains('events_list', 'Add to Cart')
        & f.array_contains('events_list', 'Promotion Click')
    )
    .select(
        'visitId',
        f.inline('hits')
    )
    .groupBy(f.element_at('promotion', 1).getField('promoId'))
    .agg(f.count('visitId').alias('nr_clicks'))
    .orderBy(f.desc('nr_clicks'))
).display()

In [None]:
result = "Apparel Row 1"

8. Identify the user that most views promotions in sessions but never clicks on them.

Use a UDF to answer the question.

**NOTES:**
- You can check if promotions where viewed on a hit by checking the `promoIsView` field of the `promotionActionInfo` column.
- Similarly, you can see if a user clicked on a promotion on a hit by checking the `promoIsClick` field of the `promotionActionInfo` column.

In [None]:
from pyspark.sql.types import BooleanType

def view_but_no_click(hits_list):
    clicked = False
    view = False
    for hit in hits_list:
        if hit['promotionActionInfo']:
            if hit['promotionActionInfo']['promoIsClick']:
                clicked = True
            if hit['promotionActionInfo']['promoIsView']:
                view = True
    
    return view and not clicked

view_but_no_click_udf = f.udf(view_but_no_click, BooleanType())

(
    df_hits
    .join(
        df_main,
        on=['sessionId']
    )
    .withColumn(
        'view_but_no_click',
        view_but_no_click_udf(f.col('hits'))
    )
    .filter(f.col('view_but_no_click'))
    .groupBy('visitorId')
    .agg(f.sum(f.when(f.col('view_but_no_click'), 1).otherwise(0)).alias('nr_sessions'))
    .orderBy(f.desc('nr_sessions'))
).display()

In [None]:
res = '0593150394512575588'

## Sequential analysis

##### Sessions path analysis - Simple analysis

We'll focus on these pages to understand where users are most lost in the process:

| Page | pagePathLevel1 |
| - | - |
| Home | /home |
| Item | /google+redesign/ |
| Shopping Cart | /basket.html|
| Checkout Your Information | /yourinfo.html |
| Checkout Review | /revieworder.html |
| Payment Method | /payment.html |
| Checkout Confirmation | /ordercompleted.html |

We want to calculate the number and percentage of sessions that stop at each of these stages.

**The way you currently do it**

To understand how many sessions stop at each stage, you create one table containing the hits fot each page and do multiple left joins to monitor the sequence of hits.

First of all, lets process the hits table to put it on the format you're used to dealing: row per hit.

In [None]:
df_exploded_hits = (
    df_hits
    .select(
        'sessionId',
        f.inline('hits')
    )
    .select(
        'sessionId',
        'hitNumber',
        f.col('page').getField('pagePathLevel1').alias('pagePath')
    )
)

df_exploded_hits.display()

Now let's create one table with the hits for each page and then do the multiple left joins.

This corresponds to the following SQL code:

In [None]:
# Register PySpark DataFrame as SQL temporary view to manipulate it with SQL sintax
df_exploded_hits.createOrReplaceTempView('hits_view')

In [None]:
%sql

WITH filtered_hits AS (
  SELECT sessionId, pagePath, hitNumber
  FROM hits_view
  WHERE pagePath IN ("/home", "/google+redesign/", "/basket.html", "/yourinfo.html", "/revieworder.html", "/payment.html", "/ordercompleted.html")
),
home_hits AS (
  SELECT sessionId, hitNumber
  FROM filtered_hits
  WHERE pagePath == "/home"
),
item_hits AS (
  SELECT sessionId, hitNumber
  FROM filtered_hits
  WHERE pagePath == "/google+redesign/"
),
basket_hits AS (
  SELECT sessionId, hitNumber
  FROM filtered_hits
  WHERE pagePath == "/basket.html"
),
user_info_hits AS (
  SELECT sessionId, hitNumber
  FROM filtered_hits
  WHERE pagePath == "/yourinfo.html"
),
review_order_hits AS (
  SELECT sessionId, hitNumber
  FROM filtered_hits
  WHERE pagePath == "/revieworder.html"
),
payment_hits AS (
  SELECT sessionId, hitNumber
  FROM filtered_hits
  WHERE pagePath == "/payment.html"
),
order_completed_hits AS (
  SELECT sessionId, hitNumber
  FROM filtered_hits
  WHERE pagePath == "/ordercompleted.html"
),
uni AS (
  SELECT
    home_hits.sessionId AS home_sessionId, 
    item_hits.sessionId AS item_sessionId,
    basket_hits.sessionId AS basket_sessionId,
    user_info_hits.sessionId AS user_info_sessionId,
    review_order_hits.sessionId AS review_order_sessionId,
    payment_hits.sessionId AS payment_sessionId,
    order_completed_hits.sessionId AS order_completed_sessionId
  FROM home_hits
  LEFT JOIN item_hits ON home_hits.sessionId == item_hits.sessionId AND home_hits.hitNumber < item_hits.hitNumber
  LEFT JOIN basket_hits ON home_hits.sessionId == basket_hits.sessionId AND item_hits.hitNumber < basket_hits.hitNumber
  LEFT JOIN user_info_hits ON home_hits.sessionId == user_info_hits.sessionId AND basket_hits.hitNumber < user_info_hits.hitNumber
  LEFT JOIN review_order_hits ON home_hits.sessionId == review_order_hits.sessionId AND user_info_hits.hitNumber < review_order_hits.hitNumber
  LEFT JOIN payment_hits ON home_hits.sessionId == payment_hits.sessionId AND review_order_hits.hitNumber < payment_hits.hitNumber
  LEFT JOIN order_completed_hits ON home_hits.sessionId == order_completed_hits.sessionId AND payment_hits.hitNumber < order_completed_hits.hitNumber
)

SELECT
  COUNT(DISTINCT home_sessionId) AS total_home,
  COUNT(DISTINCT item_sessionId) AS total_item,
  (COUNT(DISTINCT item_sessionId) / COUNT(DISTINCT home_sessionId)) * 100 AS 1st_2nd,
  COUNT(DISTINCT basket_sessionId) AS basket_item,
  (COUNT(DISTINCT basket_sessionId) / COUNT(DISTINCT item_sessionId)) * 100 AS 2nd_3rd,
  COUNT(DISTINCT user_info_sessionId) AS user_info_item,
  (COUNT(DISTINCT user_info_sessionId) / COUNT(DISTINCT basket_sessionId)) * 100 AS 3rd_4th,
  COUNT(DISTINCT review_order_sessionId) AS review_order_item,
  (COUNT(DISTINCT review_order_sessionId) / COUNT(DISTINCT user_info_sessionId)) * 100 AS 4th_5th,
  COUNT(DISTINCT payment_sessionId) AS payment_item,
  (COUNT(DISTINCT payment_sessionId) / COUNT(DISTINCT review_order_sessionId)) * 100 AS 5th_6th,
  COUNT(DISTINCT order_completed_sessionId) AS order_completed_item,
  (COUNT(DISTINCT order_completed_sessionId) / COUNT(DISTINCT payment_sessionId)) * 100 AS 6th_7th
FROM uni

Transform the SQL code into PySpark code.

In [None]:
# Filter hits based on specific page paths
filtered_hits = (
    df_exploded_hits
    .filter(
        f.col("pagePath").isin(
            "/home",
            "/google+redesign/",
            "/basket.html",
            "/yourinfo.html",
            "/revieworder.html",
            "/payment.html",
            "/ordercompleted.html"
        )
    )
)

# Create individual DataFrames for each page path
home_hits = (
    filtered_hits
    .filter(f.col("pagePath") == "/home")
    .select(
        f.col("sessionId").alias("home_sessionId"),
        f.col("hitNumber").alias("home_hitNumber")
    )
)
item_hits = (
    filtered_hits
    .filter(f.col("pagePath") == "/google+redesign/")
    .select(
        f.col("sessionId").alias("item_sessionId"),
        f.col("hitNumber").alias("item_hitNumber")
    )
)
basket_hits = (
    filtered_hits
    .filter(f.col("pagePath") == "/basket.html")
    .select(
        f.col("sessionId").alias("basket_sessionId"),
        f.col("hitNumber").alias("basket_hitNumber")
    )
)
user_info_hits = (
    filtered_hits
    .filter(f.col("pagePath") == "/yourinfo.html")
    .select(
        f.col("sessionId").alias("user_info_sessionId"),
        f.col("hitNumber").alias("user_info_hitNumber")
    )
)
review_order_hits = (
    filtered_hits
    .filter(f.col("pagePath") == "/revieworder.html")
    .select(
        f.col("sessionId").alias("review_order_sessionId"),
        f.col("hitNumber").alias("review_order_hitNumber")
    )
)
payment_hits = (
    filtered_hits
    .filter(f.col("pagePath") == "/payment.html")
    .select(
        f.col("sessionId").alias("payment_sessionId"),
        f.col("hitNumber").alias("payment_hitNumber")
    )
)
order_completed_hits = (
    filtered_hits
    .filter(f.col("pagePath") == "/ordercompleted.html")
    .select(
        f.col("sessionId").alias("order_completed_sessionId"),
        f.col("hitNumber").alias("order_completed_hitNumber")
    )
)

# Perform left joins in sequence
uni = (
    home_hits
    .join(
        item_hits,
        on=[(home_hits.home_sessionId == item_hits.item_sessionId)
            & (home_hits.home_hitNumber < item_hits.item_hitNumber)
        ],
        how="left"
    )
    .join(
        basket_hits,
        on=[(home_hits.home_sessionId == basket_hits.basket_sessionId)
            & (item_hits.item_hitNumber < basket_hits.basket_hitNumber)
        ],
        how="left"
    )
    .join(
        user_info_hits,
        on=[(home_hits.home_sessionId == user_info_hits.user_info_sessionId)
            & (basket_hits.basket_hitNumber < user_info_hits.user_info_hitNumber)
        ],
        how="left"
    )
    .join(
        review_order_hits,
        on=[(home_hits.home_sessionId == review_order_hits.review_order_sessionId)
            & (user_info_hits.user_info_hitNumber < review_order_hits.review_order_hitNumber)
        ],
        how="left"
    )
    .join(
        payment_hits,
        on=[(home_hits.home_sessionId == payment_hits.payment_sessionId)
            & (review_order_hits.review_order_hitNumber < payment_hits.payment_hitNumber)
        ],
        how="left"
    )
    .join(
        order_completed_hits,
        on=[(home_hits.home_sessionId == order_completed_hits.order_completed_sessionId)
            & (payment_hits.payment_hitNumber < order_completed_hits.order_completed_hitNumber)
        ],
        how="left"
    )
)

result = (
    uni.select(
        f.countDistinct('home_sessionId').alias('total_home'),
        f.countDistinct('item_sessionId').alias('total_item'),
        f.countDistinct('basket_sessionId').alias('basket_item'),
        f.countDistinct('user_info_sessionId').alias('user_info_item'),
        f.countDistinct('review_order_sessionId').alias('review_order_item'),
        f.countDistinct('payment_sessionId').alias('payment_item'),
        f.countDistinct('order_completed_sessionId').alias('order_completed_item')
    )
    .withColumn("1st_2nd", (f.col("total_item") / f.col("total_home")) * 100)
    .withColumn("2nd_3rd", (f.col("basket_item") / f.col("total_item")) * 100)
    .withColumn("3rd_4th", (f.col("user_info_item") / f.col("basket_item")) * 100)
    .withColumn("4th_5th", (f.col("review_order_item") / f.col("user_info_item")) * 100)
    .withColumn("5th_6th", (f.col("payment_item") / f.col("review_order_item")) * 100)
    .withColumn("6th_7th", (f.col("order_completed_item") / f.col("payment_item")) * 100)
)

result.display()


##### Promotion effectiveness - More complex path analysis


In question 6 we saw which was the most clicked promotion in sessions where products were added to the cart and the promotion was clicked.

However, we are not sure if clicking the promotion is what lead to the products being added to the cart. 

So now let's see which promotion actually led to additions to the cart. For that we need to do sequential analysis.

In [None]:
from pyspark.sql.types import MapType, StringType, IntegerType

def promotion_purchases(hits):
    # hits is an array of dicts

    promos = {}
    current_promo = None
    prev_page = None
    last_was_promo = False

    for hit in hits:
        current_page = hit['page']['pageTitle']

        # If last event was promo click is first time in page.
        # No need to compare current to last page
        if last_was_promo:
            last_was_promo = False
        else:
            # Reset current promo if the page changes
            if current_page != prev_page:
                current_promo = None

        # Check if the event is a promotion click and update the current promo
        if hit['eventInfo'] and (hit['eventInfo']['eventAction'] == 'Promotion Click'):
            current_promo = hit['promotion'][0]['promoId']
            last_was_promo = True

         # If the event is 'Add to Cart' and a promo is active, update the count
        if current_promo and hit['eventInfo'] and (hit['eventInfo']['eventAction'] == 'Add to Cart'):
            if current_promo not in promos:
                promos[current_promo] = 0
            promos[current_promo] += 1
        
        prev_page = current_page
    
    return promos


promotion_purchases_udf = f.udf(promotion_purchases, MapType(StringType(), IntegerType()))

res = (
    df_hits
    .withColumn(
        'promotion_purchases',
        promotion_purchases_udf(f.col('hits'))
    )
)

res.display()

(
    res
    .select(
        f.explode('promotion_purchases')
    )
    .withColumnRenamed('key', 'promoId')
    .withColumnRenamed('value', 'nr_purchases')
    .groupby('promoId')
    .agg(f.sum('nr_purchases').alias('total_purchases'))
    .orderBy(f.desc('total_purchases'))
).display()

In [None]:
# get origin promo
# if we got to a page because a promo was clicked, set origin promo to promo name
# everytime a promo is clicked, the next hit is a 'PAGE' hit with the resulting page

window_origin_promo = Window.partitionBy('visitId', 'visitNumber', 'visitorId', 'visitStartTime').orderBy('hitNumber')

df_origin_promo = (
    df_hits
    .select(
        'visitId',
        'visitNumber',
        'visitorId',
        'visitStartTime',
        f.inline('hits')
    )
    .withColumn('origin_promo',
                f.when(
                    f.lag(f.col('eventInfo')).over(window_origin_promo).isNotNull()
                    & (f.lag(f.col('eventInfo').getField('eventAction')).over(window_origin_promo) == 'Promotion Click'),
                    f.lag('promotion').over(window_origin_promo)
                ).otherwise(None)
    )
    .select(
        'visitId',
        'visitNumber',
        'visitorId',
        'visitStartTime',
        'type',
        'hitNumber',
        f.col('page').getField('pageTitle').alias('pageTitle'),
        f.col('eventInfo').getField('eventAction').alias('eventAction'),
        'promotion',
        'origin_promo'
    )
)

# df_origin_promo.display()


# Now let's see what happens in each session and each visited page
# We can only conclude that an item was added to the cart because of a promotion if a promotion click led to a page and the user left that page until adding an item to the cart
# But how can we see if the user never left the page between a promotion click and adding an item to the cart? We need to check if the hit numbers are sequential between these two events

# First create a new column with the last hit number on each page on a session
# Then create a 'sequential' column taht is True if the row's hitNumber is equal to the last hit Number and False otherwise

# Let 'sequential' be False for every time the user enters the page during the session. So, it will be false for the first time ever the user entered the page during the session bue also for every time the user entered the page, then got out and then got in again.

# It is important to track this events because we will only want to consider that an addition to the cart was due to a promotion click if the user never left the page between clicking the promotion and adding the item to the cart.
# Imagine the scenario where user clicks promotion -> gets into page A -> exists page A and visits page B -> goes back to page A -> adds item to cart
# This case should not count for the promotion's success cases

window_page = Window.partitionBy('visitId', 'visitNumber', 'visitorId', 'visitStartTime', 'pageTitle').orderBy('hitNumber')

df_sequential = (
    df_origin_promo
    .withColumn('lastHitNumber', f.lag('hitNumber').over(window_page))
    .withColumn('sequential', f.when(f.col('hitNumber') == (f.col('lastHitNumber') + 1), True).otherwise(False))
)

# df_sequential.display()

# Now let's register the page view number. It is a column that indicates which time the page was visited in the session is that hit originated from
# page View could be sequential but it doesn't really matter. to make things easier let's save the page view as the hit number of the hit that originated entering the page after being in a different page

# Let's set the page view as the hit number and then set as the last hit number for all the rows where sequential = True
# Need to use ignoreNulls = True

df_page_view = (
    df_sequential
    .withColumn(
        'pageView',
        f.when(
            f.col('sequential') == False,
            f.col('hitNumber')
        ).otherwise(None)
    )
    .withColumn(
        'pageView',
        f.last('pageView', ignorenulls=True ).over(window_page)
    )
)

# df_page_view.display()

# Finally, we can update the origin promo column to propagate to all hits on a page that followed a visist to a page that was originated from a promotion click

# For that we can get the last origin promo for each page view in a session
window_page_view = Window.partitionBy('visitId', 'visitNumber', 'visitorId', 'visitStartTime', 'pageTitle', 'pageView').orderBy('hitNumber')

df_final = (
    df_page_view
    .withColumn(
        'origin_promo',
        f.last('origin_promo', ignorenulls=True).over(window_page_view)
    )
)

# df_final.display()

(
    df_final
    .filter(
        (f.col('eventAction') == 'Add to Cart')
        & (f.col('origin_promo').isNotNull())
    )
    .groupBy(f.element_at(f.col('origin_promo'), 1).getField('promoId').alias('promoId'))
    .agg(f.count('visitId').alias('nr_purchases'))
    .orderBy(f.desc('nr_purchases'))
).display()