In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 pyspark-shell'


In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5"
    ).getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [3]:
# Read stream from Kafka
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "clickstream") \
    .load()


In [4]:
from pyspark.sql.functions import col
# Kafka message value is in binary → convert to string
json_df = stream_df.select(
    col("value").cast("string").alias("json_value")
)

json_df.printSchema()

root
 |-- json_value: string (nullable = true)



In [5]:
from pyspark.sql.types import StructType, StructField, StringType, LongType

event_schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("visitor_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("item_id", StringType(), True),
    StructField("transaction_id", StringType(), True)
])



In [6]:
from pyspark.sql.functions import from_json

parsed_df = json_df.select(
    from_json(col("json_value"), event_schema).alias("data")
)

events_stream_df = parsed_df.select("data.*")

events_stream_df.printSchema()


root
 |-- timestamp: string (nullable = true)
 |-- visitor_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- transaction_id: string (nullable = true)



In [7]:
from pyspark.sql.functions import col, from_unixtime, to_timestamp

# Convert epoch → string → TIMESTAMP
events_stream_df = events_stream_df.withColumn(
    "event_time",
    to_timestamp(
        from_unixtime(col("timestamp").cast("long") / 1000)
    )
)

events_stream_df.printSchema()



root
 |-- timestamp: string (nullable = true)
 |-- visitor_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)



In [8]:
query = events_stream_df.writeStream \
    .format("console") \
    .outputMode("append") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()


KeyboardInterrupt: 

In [9]:
from pyspark.sql.functions import (
    col,
    window,
    count
)
# Sessionize events by visitor_id with 30 minutes gap
sessionized_df = events_stream_df \
    .withWatermark("event_time", "30 minutes") \
    .groupBy(
        col("visitor_id"),
        window(col("event_time"), "30 minutes")
    ) \
    .agg(
        count("*").alias("total_events")
    )


In [10]:
from pyspark.sql.functions import window, count

sessionized_df = events_stream_df \
    .withWatermark("event_time", "30 minutes") \
    .groupBy(
        col("visitor_id"),
        window(col("event_time"), "30 minutes")
    ) \
    .agg(
        count("*").alias("total_events")
    )


In [11]:
query = sessionized_df.writeStream \
    .format("console") \
    .outputMode("update") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()


KeyboardInterrupt: 

In [12]:
from pyspark.sql.functions import col, window, count, when
kpi_df = events_stream_df \
    .withWatermark("event_time", "30 minutes") \
    .groupBy(
        col("visitor_id"),
        window(col("event_time"), "30 minutes")
    ) \
    .agg(
        count(when(col("event_type") == "view", True)).alias("views"),
        count(when(col("event_type") == "addtocart", True)).alias("add_to_cart"),
        count(when(col("event_type") == "transaction", True)).alias("purchases")
    )


In [13]:
from pyspark.sql.functions import expr

kpi_df = kpi_df.withColumn(
    "conversion_rate",
    expr("CASE WHEN add_to_cart > 0 THEN purchases / add_to_cart ELSE 0 END")
)


In [14]:
kpi_query = kpi_df.writeStream \
    .format("console") \
    .outputMode("update") \
    .option("truncate", "false") \
    .start()

kpi_query.awaitTermination()


KeyboardInterrupt: 