# INFO 408 Lab Exercise 13: Clickstream Analysis in Python using Apache Spark and Apache Kafka

This is a clickstream processing demo using Apache Kafka and Spark Structured Streaming. It is based on the original Scala version described at https://github.com/IBM/kafka-streaming-click-analysis/blob/master/README.md. The clickstream data is from Wikipedia, and is streamed line-by-line into a Kafka topic called `clickstream` by the script `clickstream_producer.py` (see the main lab document). Each line comprises four tab-separated values: the previous page visited (`prev`), the current page (`curr`), the type of page (`type`), and the number of clicks for that navigation path (`n`). (In other words, how many times have people navigated from the `prev` page to the `curr` page?)

The following code cell subscribes to the `clickstream` topic, extracts the columns from the received records, and starts an in-memory query (more or less like an SQL view) to summarise the most popular destination pages in terms of clicks. The in-memory query can then be queried as an SQL “table” by Spark Structured Streaming.

It will normally take a few seconds to start the query, because we are running the entire cluster one one machine. Normally the Spark and Kafka nodes would be separate servers. **The second code cell will not do anything until the first cell has finished.**

In [None]:
import pyspark.sql.functions as psf

# Subscribe to the "clickstream" topic.
records = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "clickstream")
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")
    .load()
)

messages = (
    records
    # Extract the individual columns from the current line.
    .withColumn("prev", psf.split(records.value, "\t")[0])
    .withColumn("curr", psf.split(records.value, "\t")[1])
    .withColumn("type", psf.split(records.value, "\t")[2])
    .withColumn("n", psf.split(records.value, "\t")[3])
    # Group by the current page.
    .groupBy("curr")
    # Sum the number of clicks.
    .agg(psf.sum("n").alias("num_clicks"))
    # Sort ascending.
    .orderBy("num_clicks", ascending=False)
)

# Create an in-memory query (view) called "clicks".
query = (
    messages.writeStream
    .queryName("clicks")
    .format("memory")
    .outputMode("complete")
    .start()
)

The following code cell loops for a while (until `terminate`, default 2 minutes), printing out the results of the query every few seconds (default 5). Ideally, this should grab the query results and do something useful like plot them in a bar chart. This is left as an exercise for the reader ☺.

You can change the following values in the code to change its behaviour:
* `seconds` (line 4) is how long the consumer will run for in seconds
* `refresh` (line 5) is the refresh rate in seconds, but it will probably take longer than that in practice because we are running the entire cluster on one machine

Try changing the SQL `select` statement on line 8 below.

Of course, you are fairly limited in what you can change here. To make larger changes you will need to change how the in-memory query is generated by modifying the definition of the `messages` variable on lines 14–27 of the previous code cell. You will also need to stop the query (run the second-last code cell below) and restart it (run the previous code cell again) before you can see any changes.

**Remember that if you want to preserve any changes you make to this notebook, you must use the `docker cp` command to copy it back to your `lab13` directory (see the main lab document).**


In [None]:
import time
from datetime import datetime, timedelta

terminate = datetime.now() + timedelta(seconds=120)
refresh = 5

while datetime.now() < terminate:
    result = spark.sql("select * from clicks")
    result.show()
    print("==========")
    time.sleep(refresh)

Terminate the query when done. **Only do this when you are sure you are finished with the query.**

In [None]:
query.stop()

## For office use only ☺

Query monitoring and debugging.

In [None]:
print(query.isActive)
print(query.name)
print(query.id)
# spark.streams.get(query.id).stop()