In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, when, sum as _sum, expr, from_unixtime, lead, lag, min, max, desc
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

import plotly.express as px
import pandas as pd

spark = SparkSession.builder \
   .appName("Sessionizing") \
   .getOrCreate()

In [0]:
# Would have been applied to the full solution, a utils notebook with the functions that would be ran from this notebook.
# %run /Users/idan/.../utils   

In [0]:
# Define schema
schema = StructType([
    StructField("visitor_id", StringType(), True),
    StructField("site_url", StringType(), True),
    StructField("page_view_url", StringType(), True),
    StructField("timestamp", IntegerType(), True)
])

In [0]:
# Method to load all CSV from the S3 folder and union them, with a note if there were no files

# List files in the S3 directory using Databricks utilities
s3_path = "s3://data-integrity/Idan/inputs/"
file_info = dbutils.fs.ls(s3_path)

# Filter for CSV files and get their paths
csv_files = [file.path for file in file_info if file.path.endswith(".csv")]

# Print the names of the files that were loaded
if not csv_files:
    print("No CSV files found in the folder.")
else:
    print("CSV files loaded:")
    for file in csv_files:
        print(file)

df = spark.read.option("header", True).schema(schema).csv(csv_files[0])
for file in csv_files[1:]:
    df = df.union(spark.read.option("header", True).schema(schema).csv(file))

df = df.na.drop().dropDuplicates()

CSV files loaded:
s3://data-integrity/Idan/inputs/input_1.csv
s3://data-integrity/Idan/inputs/input_2.csv
s3://data-integrity/Idan/inputs/input_3.csv


In [0]:
# Loading all CSV files from the S3 folder into DF. For a more complex logic we will make sure we load only files uploaded to S3 in the past day, week, or any timeframe agreed.

#df = spark \
#    .read \
#    .option("header", False) \
#    .schema(schema) \
#    .csv("s3://data-integrity/Idan/inputs/*.csv")
#
# Write files to S3 mock:
# df.write.parquet("s3://data-integrity/Idan/inputs/file")

In [0]:
# Another option to load files from s3 by 3 DFs if we know the excact files we need:

df1 = spark \
   .read \
   .option("header", True) \
   .option("inferSchema", False) \
   .schema(schema) \
   .csv("s3://data-integrity/Idan/inputs/input_1.csv")

df2 = spark \
   .read \
   .option("header", True) \
   .option("inferSchema", False) \
   .schema(schema) \
   .csv("s3://data-integrity/Idan/inputs/input_2.csv")

df3 = spark \
   .read \
   .option("header", True) \
   .option("inferSchema", False) \
   .schema(schema) \
   .csv("s3://data-integrity/Idan/inputs/input_3.csv")

In [0]:
df = df1.union(df2).union(df3).na.drop().dropDuplicates()


In [0]:
df.show(50)

+-------------+------------+-------------------+----------+
|   visitor_id|    site_url|      page_view_url| timestamp|
+-------------+------------+-------------------+----------+
| visitor_8424| www.s_1.com| www.s_1.com/page_1|1347844479|
| visitor_5300| www.s_5.com| www.s_5.com/page_1|1347844474|
| visitor_9640| www.s_6.com| www.s_6.com/page_1|1347844481|
| visitor_7955| www.s_7.com| www.s_7.com/page_1|1347844464|
| visitor_9431| www.s_6.com| www.s_6.com/page_1|1347844476|
| visitor_8475| www.s_4.com| www.s_4.com/page_1|1347844488|
| visitor_8511| www.s_8.com| www.s_8.com/page_1|1347844494|
| visitor_4275| www.s_8.com| www.s_8.com/page_1|1347844470|
| visitor_6355| www.s_6.com| www.s_6.com/page_1|1347844477|
| visitor_4334| www.s_9.com| www.s_9.com/page_1|1347844502|
| visitor_5122| www.s_9.com| www.s_9.com/page_1|1347844487|
| visitor_8460| www.s_1.com| www.s_1.com/page_1|1347844479|
| visitor_1327| www.s_4.com| www.s_4.com/page_1|1347844504|
| visitor_9024|www.s_10.com|www.s_10.com

In [0]:
df.count()

145964

In [0]:
# Convert integer Unix timestamp to datetime
df = df.withColumn("timestamp", from_unixtime(col("timestamp")).cast("timestamp"))

# Sort the DataFrame by timestamp, visitor_id, site_url, and page_view_url
df = df.orderBy(["timestamp", "visitor_id", "site_url", "page_view_url"])

# Define session window based on visitor_id, site_url, and ordered by timestamp
session_window = Window.partitionBy("visitor_id", "site_url").orderBy("timestamp", "page_view_url")

# Calculate session length
df = df.withColumn("next_timestamp", lead("timestamp").over(session_window)) \
       .withColumn("session_length", (col("next_timestamp").cast("long") - col("timestamp").cast("long"))) \
       .withColumn("session_length", when(col("session_length").isNull(), 0).otherwise(col("session_length")))

# Identify sessions
df = df.withColumn("new_session", when(col("session_length") > 30*60, 1).otherwise(0)) \
       .withColumn("session_id", _sum("new_session").over(session_window) + 1)

# Calculate the actual session length as the difference between the first and last timestamp in each session
df = df.withColumn("session_start", min("timestamp").over(Window.partitionBy("visitor_id", "site_url", "session_id"))) \
       .withColumn("session_end", max("timestamp").over(Window.partitionBy("visitor_id", "site_url", "session_id"))) \
       .withColumn("actual_session_length", (col("session_end").cast("long") - col("session_start").cast("long")))

# Filter out null values in case there are sessions with no page views
df_filtered = df.filter(col("actual_session_length").isNotNull())

In [0]:
# Mock snowflake connection options
sfOptions = {
  "sfUrl": dbutils.secrets.get("snowflake-credentials-data-integrity", "url"),
  "sfUser": dbutils.secrets.get("snowflake-credentials-data-integrity", "user"),
  "sfPassword": dbutils.secrets.get("snowflake-credentials-data-integrity", "password"),
  "sfDatabase": "DW",
  "sfSchema": "PUBLIC",
  "sfWarehouse": "RESEARCH_WAREHOUSE"
}

In [0]:
# Append the final data to a research table where we can also query it through snowflake
table_name = "research.filtered_Sessions"

df_filtered.write\
    .format("snowflake")\
    .options(**sfOptions)\
    .option("dbtable", table_name)\
    .mode("append")\
    .save()

In [0]:
# Query functions
def num_sessions(site_url):
    if df_filtered.filter(col("site_url") == site_url).select("site_url").first() is None:
        return "Site URL not found"
    return df_filtered.filter(col("site_url") == site_url).groupBy("visitor_id", "session_id").count().count()

def median_session_length(site_url):
    filtered_df = df_filtered.filter(col("site_url") == site_url)
    if filtered_df.select("site_url").first() is None:
        return "Site URL not found"
    unique_sessions_df = filtered_df.dropDuplicates(["visitor_id", "site_url", "session_id"])
    median_session_length_df = unique_sessions_df.groupBy("site_url") \
                                                 .agg(expr("percentile_approx(actual_session_length, 0.5)").alias("median_session_length"))
    result = median_session_length_df.collect()
    if result:
        return result[0]["median_session_length"]
    else:
        return "No session lengths found for the given site URL"

def num_unique_visited_sites(visitor_id):
    if df_filtered.filter(col("visitor_id") == visitor_id).select("visitor_id").first() is None:
        return "Visitor ID not found"
    return df_filtered.filter(col("visitor_id") == visitor_id).select("site_url").distinct().count()

In [0]:
# Example usage
site_url = "www.s_5.com"
user_id = "visitor_4030"
result = num_sessions(site_url)
if result == "Site URL not found":
    print(result)
else:
    print(f"Number of sessions for {site_url}: {result}")

result = median_session_length(site_url)
if result == "Site URL not found":
    print(result)
else:
    print(f"Median session length for {site_url}: {result}")

result = num_unique_visited_sites(user_id)
if result == "Visitor ID not found":
    print(result)
else:
    print(f"Number of unique visited sites by {user_id}: {result}")


Number of sessions for www.s_5.com: 3555
Median session length for www.s_5.com: 1634
Number of unique visited sites by visitor_4030: 3


In [0]:
# Use dbutils.widgets to create input widgets
dbutils.widgets.dropdown("command", "num_sessions", ["num_sessions", "median_session_length", "num_unique_visited_sites"], "Command")
dbutils.widgets.text("site_url", "", "Site URL")
dbutils.widgets.text("visitor_id", "", "Visitor ID")

# Function to get widget values
def get_widget_value(widget_name):
    try:
        return dbutils.widgets.get(widget_name)
    except:
        return None

# Use the get_widget_value function to retrieve the values
command = get_widget_value("command")
site_url = get_widget_value("site_url")
visitor_id = get_widget_value("visitor_id")

# Example usage based on the command widget's value
# Example usage based on the command widget's value
if command == 'num_sessions':
    result = num_sessions(site_url)
    if result == "Site URL not found":
        print(result)
    else:
        print(f"Executing num_sessions command for site URL: {site_url}, number of sessions is:")
        print(result)
elif command == 'median_session_length':
    result = median_session_length(site_url)
    if result == "Site URL not found":
        print(result)
    else:
        print(f"Executing median_session_length command for site URL: {site_url}, median session length is:")
        print(result)
elif command == 'num_unique_visited_sites':
    result = num_unique_visited_sites(visitor_id)
    if result == "Visitor ID not found":
        print(result)
    else:
        print(f"Executing num_unique_visited_sites command for visitor ID: {visitor_id}, amount of unique visited sites is:")
        print(result)



Executing num_sessions command for site URL: www.s_8.com, number of sessions is:
3608


### Misc

In [0]:
# Exploratory Data Analysis (EDA)
# Count total number of rows
total_rows = df.count()
print(f"Total number of rows: {total_rows}")

# Count number of unique visitors
unique_visitors = df.select("visitor_id").distinct().count()
print(f"Number of unique visitors: {unique_visitors}")

# Count number of unique sites
unique_sites = df.select("site_url").distinct().count()
print(f"Number of unique sites: {unique_sites}")

# Count number of unique pages
unique_page_view_url = df.select("page_view_url").distinct().count()
print(f"Number of page view urls: {unique_page_view_url}")

# Find the most visited site
most_visited_site = df.groupBy("site_url").count().orderBy(desc("count")).limit(1).collect()[0][0]
print(f"Most visited site: {most_visited_site}")


Total number of rows: 145964
Number of unique visitors: 10000
Number of unique sites: 10
Number of page view urls: 70
Most visited site: www.s_6.com


In [0]:
timestamp_min_max = df.select(
    min("timestamp").alias("Minimum Timestamp"),
    max("timestamp").alias("Maximum Timestamp")
)
display(timestamp_min_max)

Minimum Timestamp,Maximum Timestamp
2012-09-17T01:14:06Z,2012-09-17T19:44:14Z


In [0]:
df_pd = df_filtered.toPandas()

# Actual Session Length count BoxPlot
fig_session_length = px.box(df_pd, y="actual_session_length")
fig_session_length.update_layout(
    title_text="Actual Session Length Distribution",
    yaxis_title="Length"
)
fig_session_length.show()

# Grouping by 'session_id' and counting occurrences bar graph
session_id_counts = df_pd.groupby('session_id').size().reset_index(name='counts')

fig_session_id = px.bar(session_id_counts, x='session_id', y='counts', labels={'counts': 'Count'})
fig_session_id.update_layout(
    title_text="Sessions Distribution",
    xaxis_title="Session IDs",
    yaxis_title="Count"
)
fig_session_id.show()

In [0]:
# Calculate Q1, Q3, and IQR for session_length
session_length_stats = df.selectExpr(
    "percentile(session_length, 0.25) as Q1",
    "percentile(session_length, 0.75) as Q3"
).collect()
Q1_session_length = session_length_stats[0]["Q1"]
Q3_session_length = session_length_stats[0]["Q3"]
IQR_session_length = Q3_session_length - Q1_session_length

# Calculate lower and upper bounds for session_length outliers
lower_bound_session_length = Q1_session_length - 1.5 * IQR_session_length
upper_bound_session_length = Q3_session_length + 1.5 * IQR_session_length

# Calculate percentage of outliers for session_length
total_count = df.count()
outliers_count_session_length = df.filter(
    (col("session_length") < lower_bound_session_length) | 

    (col("session_length") > upper_bound_session_length)
).count()
percentage_outliers_session_length = (outliers_count_session_length / total_count) * 100

# Calculate Q1, Q3, and IQR for session_id
session_id_stats = df.selectExpr(
    "percentile(session_id, 0.25) as Q1",
    "percentile(session_id, 0.75) as Q3"
).collect()
Q1_session_id = session_id_stats[0]["Q1"]
Q3_session_id = session_id_stats[0]["Q3"]
IQR_session_id = Q3_session_id - Q1_session_id

# Calculate lower and upper bounds for session_id outliers
lower_bound_session_id = Q1_session_id - 1.5 * IQR_session_id
upper_bound_session_id = Q3_session_id + 1.5 * IQR_session_id

# Calculate percentage of outliers for session_id
outliers_count_session_id = df.filter(
    (col("session_id") < lower_bound_session_id) | 
    (col("session_id") > upper_bound_session_id)
).count()
percentage_outliers_session_id = (outliers_count_session_id / total_count) * 100

# Display the results
print(f"Percentage of session_length suspected outliers (very long sessions, maybe the person left the site open): {percentage_outliers_session_length}%")
print(f"Percentage of session_id suspected outliers: {percentage_outliers_session_id}%")

Percentage of session_length suspected outliers (very long sessions, maybe the person left the site open): 3.8824641692472115%
Percentage of session_id suspected outliers: 18.875887205064263%
