# Web users clickstream streaming data pipeline on Databricks and GCP

## This jupyter notebook run on Databricks platform which connect with GCP 

1. Databricks try to connect and read data directly from confluent cloud topic, which is a managed Kafka platform. I create two kafka topics, one is called 'clickstream', the other is 'pksqlc-n0r06USER_CLICKSTREAM' which contains both user click stream and user information.

2. Databricks connect with Google cloud storage as the sink for streaming data.

3. In this notebook, both SQL and Python are used for creating a streaming data pipeline, collect data from Kafka topic and load data to Google cloud storage.

## Key and secrete for connecting kafka topics, which is generate from confluent cloud

In [3]:
kafkaUser = "xxxxxxx"
kafkaSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

## 1. TOPIC: click_stream

### Using confluent databricks connector to connect databricks with kafka topics. And read streaming data from kafka topic: 'clickstream'.  Change data to be table for later using SQL.


In [0]:
# connect kafka topic clickstream, this topic include the information for user web click information
(
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "pkc-3w22w.us-central1.gcp.confluent.cloud:9092")
    .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(kafkaUser, kafkaSecret))
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("subscribe", "clickstream")
    .option("startingOffsets", "earliest")
    .load()
    .createOrReplaceTempView("click_stream")
)

In [0]:
%sql
select * from click_stream

### Convert key, value column to json string type, and select useful column.

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW click_stream_parsed
AS
SELECT string(key), string(value), topic, partition, offset, timestamp, timestampType
FROM click_stream

### Parse json string to different columns, and assign different data types

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW click_stream_final
AS
SELECT key, value: _time, cast(value: time as bigint), value: ip, value: request, 
cast(value: status as int), value: userid, cast(value: bytes as bigint), value: agent, 
topic, partition, offset, timestamp, timestampType
FROM click_stream_parsed

In [0]:
%sql
select * from click_stream_final

In [0]:
from pyspark.sql.functions import *

df_click_stream = (
    spark.table('click_stream_final'))

In [0]:
%sql
select count(*) from click_stream_final

###  Aggregate and analyse the streaming table. Table of html pages per minute for each user, slide window

In [0]:
from pyspark.sql.functions import *

(df_click_stream
 .withWatermark("timestamp", "60 seconds")
 .filter("request LIKE '%html%'")
 .groupBy(window(col("timestamp"), "60 seconds", "5 second"), "userid")
 .agg(count("*").alias("pages"))
 .select(col("userid"), col("window.start").alias("EVENT_start"), col("window.end").alias("EVENT_end"), col("pages"))
 .createOrReplaceTempView("click_stream_analyse"))

In [0]:
%sql
select * from click_stream_analyse
where userid = 19
order by event_start desc

### Create a new table pages_per_min from click_stream_analyse tempView, and load it to GCS continuesly

In [0]:
query = (spark
         .readStream
         .table("click_stream_analyse")
         .writeStream
         .option("checkpointLocation", "/FileStore/tables/checkpoint/pages_per_min")
         .outputMode("append")
         .trigger(processingTime='4 seconds')
         .table("pages_per_min"))

In [0]:
%sql
select count(*) from pages_per_min

In [0]:
%sql
describe detail pages_per_min

###  Aggregate and analyse the streaming table. Count number of errors per min, only show ERROR codes > 400
#### where count > 5
#### slide window 

In [0]:
(df_click_stream
 .withWatermark("timestamp", "60 seconds")
 .filter("status > 400")
 .groupBy(window(col("timestamp"), "60 seconds", "20 seconds"), col("status"))
 .agg(count("*").alias("errors"))
 .filter("errors > 5 AND errors IS NOT NULL")
 .select(col("status"), col("window.start").alias("EVENT_start"), col("window.end").alias("EVENT_end"), col("errors"))
 .createOrReplaceTempView("click_stream_error"))

In [0]:
%sql
select * from click_stream_error

In [0]:
%sql
select * from click_stream_error
where status = 404
order by event_start desc

### Create a new table errors_per_min_alert from click_stream_error tempView, and load it to GCS continuesly

In [0]:
query = (spark
         .readStream
         .table("click_stream_error")
         .writeStream
         .option("checkpointLocation", "/FileStore/tables/checkpoint/errors_per_min_alert")
         .outputMode("append")
         .trigger(processingTime='4 seconds')
         .table("errors_per_min_alert"))

## 2. TOPIC pksqlc-n0r06USER_CLICKSTREAM (both clickstream and user information)

### load streaming data flow from kafka topic: pksqlc-n0r06USER_CLICKSTREAM, and do some streaming data transform.
### This topic contain both user and clickstream information

In [0]:
(
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "pkc-3w22w.us-central1.gcp.confluent.cloud:9092")
    .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(kafkaUser, kafkaSecret))
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("subscribe", "pksqlc-n0r06USER_CLICKSTREAM")
    .option("startingOffsets", "earliest")
    .load()
    .createOrReplaceTempView("user_clicks")
)

In [0]:
%sql
select count(*) from user_clicks

In [0]:
%sql
select * from user_clicks

### Convert key, value column to json string type, and select useful column.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW user_click_string
AS
SELECT string(key), string(value), topic, partition, offset, timestamp, timestampType
FROM user_clicks

In [0]:
%sql
select * from user_click_string

### Returns a struct value with the jsonStr and  for value column

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW parsed_user_click AS
SELECT key, from_json(value, schema_of_json('{"USERNAME":"ArlyneW8ter","REGISTERED_AT":1420654807513,"IP":"222.245.174.222","CITY":"London","REQUEST":"GET /index.html HTTP/1.1","STATUS":404,"BYTES":4006}')) AS value, topic, partition, offset, timestamp, timestampType
FROM user_click_string

SELECT * FROM parsed_user_click

### Parse json string to different columns, and assign different data types

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW user_click_final AS
SELECT key, value.*, topic, partition, offset, timestamp, timestampType
FROM parsed_user_click

SELECT * FROM user_click_final

### Session window analyse from temp view user_click_final
### 30 seconds of inactivity expires the session, and counts number of events within the session

In [0]:
(spark
 .table("user_click_final")
 .withWatermark("timestamp", "30 seconds")
 .groupBy(session_window(col("timestamp"), "30 seconds"), col("USERNAME"))
 .agg(count("*").alias("events"))
 .select(col("USERNAME"), col("session_window.start").alias("EVENT_start"), col("session_window.end").alias("EVENT_end"), col("events"))
 .createOrReplaceTempView("click_user_session_temp"))

In [0]:
%sql
select * from click_user_session_temp

In [0]:
%sql
select * from click_user_session_temp
where username = 'Nathan_126'
order by event_start desc

### Write the analysed data to a new table, save it to Google cloud storage.

In [0]:
query = (spark
         .readStream
         .table("click_user_session_temp")
         .writeStream
         .option("checkpointLocation", "/FileStore/tables/checkpoint/click_user_sessions")
         .outputMode("append")
         .trigger(processingTime='4 seconds')
         .table("click_user_sessions"))

###  Enriched user details table:
###  Aggregate (count&groupBy) using a TUMBLING-Window
###  Count user activity within 60s

In [0]:
(spark
 .table("user_click_final")
 .withWatermark("timestamp", "60 seconds")
 .groupBy(window(col("timestamp"), "60 seconds"), col("username"), col("ip"), col("city"))
 .agg(count("*").alias("count"))
 .filter("count > 1")
 .select(col("username"), col("ip"), col("city"), col("window.start").alias("EVENT_start"), col("window.end").alias("EVENT_end"), col("count"))
 .createOrReplaceTempView("user_ip_activity_temp"))

In [0]:
%sql
select * from user_ip_activity_temp

In [0]:
%sql
select * from user_ip_activity_temp
where username = "Reeva43" and ip = "111.245.174.111" and city = "Raleigh"
order by event_start desc

### Write the analysed data to a new table, save it to Google cloud storage.

In [0]:
query = (spark
         .readStream
         .table("user_ip_activity_temp")
         .writeStream
         .option("checkpointLocation", "/FileStore/tables/checkpoint/user_ip_activity")
         .outputMode("append")
         .trigger(processingTime='4 seconds')
         .table("user_ip_activity"))