In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.5`
import $ivy.`org.apache.spark::spark-streaming:2.4.5`

import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

import org.apache.spark.sql._

val spark = {
  NotebookSparkSession.builder()
    .config("spark.sql.shuffle.partitions", 1)
    .config("parquet.enable.summary-metadata", "false")
    .config("spark.sql.streaming.checkpointLocation", "checkpoints")
    .master("local[*]")
    .getOrCreate()
}

import scala.reflect.io.Directory
import java.io.File

new Directory(new File("tables")).deleteRecursively()

val dataSourceFile = "./sample_data.json"
val correctDataSourceFile = "./correct_data.json"

Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                                        

[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m
[32mimport [39m[36morg.apache.spark.sql._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@74579c87
[32mimport [39m[36mscala.reflect.io.Directory
[39m
[32mimport [39m[36mjava.io.File

[39m
[36mres0_8[39m: [32mBoolean[39m = false
[36mdataSourceFile[39m: [32mString[39m = [32m"./sample_data.json"[39m
[36mcorrectDataSourceFile[39m: [32mString[39m = [32m"./correct_data.json"[39m

# Lambda

## Streaming layer

In [2]:
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

import spark.implicits._

implicit val sqlContext = spark.sqlContext

val sampleDataStream = MemoryStream[String]

val schema = new StructType()
    .add(StructField("timestamp", LongType))
    .add(StructField("page", StringType))
    .add(StructField("user_id", LongType))

def streamingLayer(dataStream: MemoryStream[String], outputDir: String) = {

    def loadStream = dataStream
        .toDF
        .select(from_json($"value", schema) as "value")
        .select("value.*")
        .withColumn("timestamp", to_timestamp(from_unixtime('timestamp)))
        .withWatermark("timestamp", "1 second")

    def streamingLayer = loadStream
        .withColumn("event_date", to_date('timestamp))
        .groupBy(window('timestamp, "1 minute"), 'event_date, 'page)
        .count

    streamingLayer
        .writeStream
        .outputMode("append")
        .format("parquet")
        .partitionBy("event_date")
        .option("path", outputDir)
        .start()
    
}

val streamingQuery = streamingLayer(sampleDataStream, "tables/streaming_table")

[32mimport [39m[36morg.apache.spark.sql.execution.streaming.MemoryStream
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql.types._

[39m
[32mimport [39m[36mspark.implicits._

[39m
[36msqlContext[39m: [32mSQLContext[39m = org.apache.spark.sql.SQLContext@3bde0c63
[36msampleDataStream[39m: [32mMemoryStream[39m[[32mString[39m] = [33mMemoryStream[39m(
  [32m0[39m,
  org.apache.spark.sql.SQLContext@3bde0c63
)
[36mschema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"timestamp"[39m, LongType, true, {}),
  [33mStructField[39m([32m"page"[39m, StringType, true, {}),
  [33mStructField[39m([32m"user_id"[39m, LongType, true, {})
)
defined [32mfunction[39m [36mstreamingLayer[39m
[36mstreamingQuery[39m: [32mstreaming[39m.[32mStreamingQuery[39m = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5ae1c99a

In [3]:
import scala.io.Source

def loadDataToStream(filename: String, stream: MemoryStream[String]) = {
    Source
        .fromFile(filename)
        .getLines
        .foreach(s => stream.addData(s))
}

loadDataToStream(dataSourceFile, sampleDataStream)
streamingQuery.processAllAvailable()

[32mimport [39m[36mscala.io.Source

[39m
defined [32mfunction[39m [36mloadDataToStream[39m

In [4]:
spark.read.parquet("tables/streaming_table").show(false)

+------------------------------------------+-------+-----+----------+
|window                                    |page   |count|event_date|
+------------------------------------------+-------+-----+----------+
|[2020-05-03 15:47:00, 2020-05-03 15:48:00]|catalog|3    |2020-05-03|
|[2020-05-03 15:47:00, 2020-05-03 15:48:00]|payment|1    |2020-05-03|
|[2020-05-03 15:47:00, 2020-05-03 15:48:00]|order  |1    |2020-05-03|
|[2020-05-03 15:47:00, 2020-05-03 15:48:00]|product|1    |2020-05-03|
|[2020-05-02 15:47:00, 2020-05-02 15:48:00]|catalog|1    |2020-05-02|
+------------------------------------------+-------+-----+----------+



## Batch layer

In [5]:
import $ivy.`joda-time:joda-time:2.10.5`

import org.joda.time.LocalDate
import org.apache.spark.sql.functions._


def processDay(filename: String, executionDateString: String) = {
    
    val executionDate = LocalDate.parse(executionDateString)
    
    val startUnixTime = executionDate.toDateTimeAtStartOfDay.getMillis / 1000
    val endUnixTime = executionDate.plusDays(1).toDateTimeAtStartOfDay.getMillis / 1000
    
    import spark.implicits._
    
    def loadBatch = spark
        .read 
        .json(filename)
        .where('timestamp between (startUnixTime, endUnixTime))
        
    loadBatch
        .withColumn("event_date", to_date(from_unixtime('timestamp)))
        .groupBy('event_date, 'page)
        .count
        .write
        .mode("overwrite")
        .parquet(s"tables/batch_table/event_date=$executionDateString")
}

processDay(dataSourceFile, "2020-05-02")
processDay(dataSourceFile, "2020-05-02")
processDay(dataSourceFile, "2020-05-02")

[32mimport [39m[36m$ivy.$                           

[39m
[32mimport [39m[36morg.joda.time.LocalDate
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._


[39m
defined [32mfunction[39m [36mprocessDay[39m

## Serving layer

In [6]:
spark.read.parquet("tables/streaming_table").createOrReplaceTempView("streaming_table")
spark.read.parquet("tables/batch_table").createOrReplaceTempView("batch_table")

spark.sql("""
    SELECT event_date, page, count
    FROM batch_table
    UNION ALL (
        SELECT event_date, page, SUM(count) as count
        FROM streaming_table
        WHERE event_date > (SELECT MAX(event_date) FROM batch_table)
        GROUP BY event_date, page
    )
    ORDER BY event_date, page
""").createOrReplaceTempView("lambda_serving_layer")

spark.sql("SELECT * FROM lambda_serving_layer").show

+----------+-------+-----+
|event_date|   page|count|
+----------+-------+-----+
|2020-05-02|catalog|    1|
|2020-05-03|catalog|    3|
|2020-05-03|  order|    1|
|2020-05-03|payment|    1|
|2020-05-03|product|    1|
+----------+-------+-----+



## Reprocessing

In [7]:
import scala.reflect.io.Directory
import java.io.File

val dateToReprocess = "2020-05-02"
val directory = new Directory(new File(s"tables/batch_table/event_date=$dateToReprocess"))
directory.deleteRecursively()

processDay(correctDataSourceFile, "2020-05-02")

spark.sql("REFRESH TABLE batch_table")
spark.sql("SELECT * FROM lambda_serving_layer").show

+----------+-------+-----+
|event_date|   page|count|
+----------+-------+-----+
|2020-05-02|catalog|    2|
|2020-05-03|catalog|    3|
|2020-05-03|  order|    1|
|2020-05-03|payment|    1|
|2020-05-03|product|    1|
+----------+-------+-----+



[32mimport [39m[36mscala.reflect.io.Directory
[39m
[32mimport [39m[36mjava.io.File

[39m
[36mdateToReprocess[39m: [32mString[39m = [32m"2020-05-02"[39m
[36mdirectory[39m: [32mDirectory[39m = tables/batch_table/event_date=2020-05-02
[36mres6_4[39m: [32mBoolean[39m = true
[36mres6_6[39m: [32mDataFrame[39m = []

# Kappa

## Streaming layer

In [8]:
val sampleDataStream2 = MemoryStream[String]

val streamingQuery2 = streamingLayer(sampleDataStream2, "tables/kappa_table")
loadDataToStream(dataSourceFile, sampleDataStream2)
streamingQuery2.processAllAvailable()

[36msampleDataStream2[39m: [32mMemoryStream[39m[[32mString[39m] = [33mMemoryStream[39m(
  [32m1[39m,
  org.apache.spark.sql.SQLContext@3bde0c63
)
[36mstreamingQuery2[39m: [32mstreaming[39m.[32mStreamingQuery[39m = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@2a33c6f

In [9]:
spark.read.parquet("tables/kappa_table").createOrReplaceTempView("kappa_table")
spark.sql("""
    SELECT event_date, page, SUM(count) as count
    FROM kappa_table
    GROUP BY event_date, page
    ORDER BY event_date, page
""").createOrReplaceTempView("kappa_serving_layer")

spark.sql("SELECT * FROM kappa_serving_layer").show

+----------+-------+-----+
|event_date|   page|count|
+----------+-------+-----+
|2020-05-02|catalog|    1|
|2020-05-03|catalog|    3|
|2020-05-03|  order|    1|
|2020-05-03|payment|    1|
|2020-05-03|product|    1|
+----------+-------+-----+



## Reprocessing

In [10]:
import scala.reflect.io.Directory
import java.io.File

// Delete old Kappa
streamingQuery2.stop()
new Directory(new File("tables/kappa_table")).deleteRecursively()

// Re-create Streaming layer
val sampleDataStream3 = MemoryStream[String]
val streamingQuery3 = streamingLayer(sampleDataStream3, "tables/kappa_table")
loadDataToStream(correctDataSourceFile, sampleDataStream3)
streamingQuery3.processAllAvailable()

[32mimport [39m[36mscala.reflect.io.Directory
[39m
[32mimport [39m[36mjava.io.File

// Delete old Kappa
[39m
[36mres9_3[39m: [32mBoolean[39m = true
[36msampleDataStream3[39m: [32mMemoryStream[39m[[32mString[39m] = [33mMemoryStream[39m(
  [32m2[39m,
  org.apache.spark.sql.SQLContext@3bde0c63
)
[36mstreamingQuery3[39m: [32mstreaming[39m.[32mStreamingQuery[39m = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6de67bb8

In [11]:
// Re-create Serving layer
spark.read.parquet("tables/kappa_table").createOrReplaceTempView("kappa_table")
spark.sql("""
    SELECT event_date, page, SUM(count) as count
    FROM kappa_table
    GROUP BY event_date, page
    ORDER BY event_date, page
""").createOrReplaceTempView("kappa_serving_layer")

spark.sql("SELECT * FROM kappa_serving_layer").show

+----------+-------+-----+
|event_date|   page|count|
+----------+-------+-----+
|2020-05-02|catalog|    2|
|2020-05-03|catalog|    3|
|2020-05-03|  order|    1|
|2020-05-03|payment|    1|
|2020-05-03|product|    1|
+----------+-------+-----+

