# 📊 Marketing Funnel Analysis with PySpark & Spark SQL
Using Databricks Notebook

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DoubleType

spark = SparkSession.builder.appName("MarketingFunnelAnalysis").getOrCreate()

visit_schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("visit_time", TimestampType()),
    StructField("utm_source", StringType())
])

signup_schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("signup_time", TimestampType())
])

purchase_schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("purchase_time", TimestampType()),
    StructField("amount", DoubleType())
])

visits_df = spark.read.csv("/path/web_visits.csv", header=True, schema=visit_schema)
signups_df = spark.read.csv("/path/user_signups.csv", header=True, schema=signup_schema)
purchases_df = spark.read.csv("/path/purchases.csv", header=True, schema=purchase_schema)

visits_df.createOrReplaceTempView("web_visits")
signups_df.createOrReplaceTempView("user_signups")
purchases_df.createOrReplaceTempView("purchases")

In [None]:
spark.sql("""
CREATE OR REPLACE TEMP VIEW funnel_users AS
SELECT
  v.user_id,
  v.utm_source,
  v.visit_time,
  s.signup_time,
  p.purchase_time,
  p.amount
FROM web_visits v
LEFT JOIN user_signups s ON v.user_id = s.user_id
LEFT JOIN purchases p ON v.user_id = p.user_id
""")

In [None]:
def funnel_summary():
    return spark.sql("""
        SELECT
          COUNT(DISTINCT user_id) AS visits,
          COUNT(DISTINCT signup_time) AS signups,
          COUNT(DISTINCT purchase_time) AS purchases,
          ROUND(COUNT(DISTINCT signup_time)/COUNT(DISTINCT user_id)*100, 2) AS visit_to_signup_rate,
          ROUND(COUNT(DISTINCT purchase_time)/COUNT(DISTINCT signup_time)*100, 2) AS signup_to_purchase_rate
        FROM funnel_users
    """)

In [None]:
def conversion_by_channel():
    return spark.sql("""
        SELECT
          utm_source,
          COUNT(DISTINCT user_id) AS visitors,
          COUNT(DISTINCT signup_time) AS signups,
          COUNT(DISTINCT purchase_time) AS purchases,
          ROUND(COUNT(DISTINCT signup_time)/COUNT(DISTINCT user_id)*100, 2) AS signup_rate,
          ROUND(COUNT(DISTINCT purchase_time)/COUNT(DISTINCT signup_time)*100, 2) AS purchase_rate
        FROM funnel_users
        GROUP BY utm_source
        ORDER BY visitors DESC
    """)

In [None]:
from pyspark.sql.functions import datediff

def time_to_conversion():
    return spark.sql("SELECT * FROM funnel_users").withColumn(
        "days_to_purchase",
        datediff("purchase_time", "signup_time")
    ).select("user_id", "utm_source", "signup_time", "purchase_time", "days_to_purchase").filter("days_to_purchase IS NOT NULL")

In [None]:
def aov_by_channel():
    return spark.sql("""
        SELECT
          utm_source,
          COUNT(*) AS purchase_count,
          ROUND(AVG(amount), 2) AS avg_order_value
        FROM funnel_users
        WHERE purchase_time IS NOT NULL
        GROUP BY utm_source
        ORDER BY avg_order_value DESC
    """)

In [None]:
funnel_summary().show()
conversion_by_channel().show()
time_to_conversion().show()
aov_by_channel().show()