In [1]:
spark

Waiting for a Spark session to start...

In [2]:
sc.getConf.getAll.filter(_._2.contains("/proxy/"))(0)._2

Waiting for a Spark session to start...

http://rm01.itversity.com:19288/proxy/application_1540458187951_76577

In [3]:
def getType(o: Any) = o.getClass.getCanonicalName

getType: (o: Any)String


In [4]:
val os_name = System.getProperty("os.name")
val hdfs_home = "/user/" + System.getenv("HOME").split("/")(2)

os_name = Linux
hdfs_home = /user/kranthidr


/user/kranthidr

In [5]:
val path = hdfs_home+"/dataSets/spark-guide/activity-data/"

path = /user/kranthidr/dataSets/spark-guide/activity-data/


/user/kranthidr/dataSets/spark-guide/activity-data/

In [6]:
// in Scala
spark.conf.set("spark.sql.shuffle.partitions", 5)

In [7]:
val static = spark.read.json(path)

static = [Arrival_Time: bigint, Creation_Time: bigint ... 8 more fields]


[Arrival_Time: bigint, Creation_Time: bigint ... 8 more fields]

In [8]:
val streaming = spark
  .readStream
  .schema(static.schema)
  .option("maxFilesPerTrigger", 10)
  .json(path)

streaming = [Arrival_Time: bigint, Creation_Time: bigint ... 8 more fields]


[Arrival_Time: bigint, Creation_Time: bigint ... 8 more fields]

In [9]:
// COMMAND ----------

streaming.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



In [10]:
// COMMAND ----------

// in Scala
val withEventTime = streaming.selectExpr(
  "*",
  "cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")

withEventTime = [Arrival_Time: bigint, Creation_Time: bigint ... 9 more fields]


[Arrival_Time: bigint, Creation_Time: bigint ... 9 more fields]

In [11]:
// COMMAND ----------

// in Scala
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes")).count()
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("complete")
  .start()

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@4f2ce925

In [12]:
// COMMAND ----------

spark.sql("SELECT * FROM events_per_window").printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [13]:
// COMMAND ----------

// in Scala
import org.apache.spark.sql.functions.{window, col}

withEventTime.groupBy(window(col("event_time"), "10 minutes"), col("User")).count()
  .writeStream
  .queryName("events_per_window1")
  .format("memory")
  .outputMode("complete")
  .start()

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@556abaac

In [14]:
// COMMAND ----------

// in Scala
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
  .count()
  .writeStream
  .queryName("events_per_window2")
  .format("memory")
  .outputMode("complete")
  .start()

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5a1210a

In [15]:
// COMMAND ----------

// in Scala
import org.apache.spark.sql.functions.{window, col}
withEventTime
  .withWatermark("event_time", "5 hours")
  .groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
  .count()
  .writeStream
  .queryName("events_per_window3")
  .format("memory")
  .outputMode("complete")
  .start()

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@44adbf0d

In [16]:
// COMMAND ----------

// in Scala
import org.apache.spark.sql.functions.expr

withEventTime
  .withWatermark("event_time", "5 seconds")
  .dropDuplicates("User", "event_time")
  .groupBy("User")
  .count()
  .writeStream
  .queryName("deduplicated")
  .format("memory")
  .outputMode("complete")
  .start()

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@77f291bf

In [17]:
// COMMAND ----------

case class InputRow(user:String, timestamp:java.sql.Timestamp, activity:String)
case class UserState(user:String,
  var activity:String,
  var start:java.sql.Timestamp,
  var end:java.sql.Timestamp)

defined class InputRow
defined class UserState


In [18]:
// COMMAND ----------

def updateUserStateWithEvent(state:UserState, input:InputRow):UserState = {
  if (Option(input.timestamp).isEmpty) {
    return state
  }
  if (state.activity == input.activity) {

    if (input.timestamp.after(state.end)) {
      state.end = input.timestamp
    }
    if (input.timestamp.before(state.start)) {
      state.start = input.timestamp
    }
  } else {
    if (input.timestamp.after(state.end)) {
      state.start = input.timestamp
      state.end = input.timestamp
      state.activity = input.activity
    }
  }

  state
}

updateUserStateWithEvent: (state: UserState, input: InputRow)UserState


In [19]:
// COMMAND ----------

import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, GroupState}
def updateAcrossEvents(user:String,
  inputs: Iterator[InputRow],
  oldState: GroupState[UserState]):UserState = {
  var state:UserState = if (oldState.exists) oldState.get else UserState(user,
        "",
        new java.sql.Timestamp(6284160000000L),
        new java.sql.Timestamp(6284160L)
    )
  // we simply specify an old date that we can compare against and
  // immediately update based on the values in our data

  for (input <- inputs) {
    state = updateUserStateWithEvent(state, input)
    oldState.update(state)
  }
  state
}

updateAcrossEvents: (user: String, inputs: Iterator[InputRow], oldState: org.apache.spark.sql.streaming.GroupState[UserState])UserState


In [20]:
// COMMAND ----------

import org.apache.spark.sql.streaming.GroupStateTimeout
withEventTime
  .selectExpr("User as user",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp", "gt as activity")
  .as[InputRow]
  .groupByKey(_.user)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("mapGroupsWithState")
  .format("memory")
  .outputMode("update")
  .start()

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@2604e04

In [21]:
// COMMAND ----------

case class InputRow(device: String, timestamp: java.sql.Timestamp, x: Double)
case class DeviceState(device: String, var values: Array[Double],
  var count: Int)
case class OutputRow(device: String, previousAverage: Double)

defined class InputRow
defined class DeviceState
defined class OutputRow


In [22]:
// COMMAND ----------

def updateWithEvent(state:DeviceState, input:InputRow):DeviceState = {
  state.count += 1
  // maintain an array of the x-axis values
  state.values = state.values ++ Array(input.x)
  state
}

updateWithEvent: (state: DeviceState, input: InputRow)DeviceState


In [23]:
// COMMAND ----------

import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode,
  GroupState}

In [24]:
def updateAcrossEvents(device:String, inputs: Iterator[InputRow],
  oldState: GroupState[DeviceState]):Iterator[OutputRow] = {
  inputs.toSeq.sortBy(_.timestamp.getTime).toIterator.flatMap { input =>
    val state = if (oldState.exists) oldState.get
      else DeviceState(device, Array(), 0)

    val newState = updateWithEvent(state, input)
    if (newState.count >= 500) {
      // One of our windows is complete; replace our state with an empty
      // DeviceState and output the average for the past 500 items from
      // the old state
      oldState.update(DeviceState(device, Array(), 0))
      Iterator(OutputRow(device,
        newState.values.sum / newState.values.length.toDouble))
    }
    else {
      // Update the current DeviceState object in place and output no
      // records
      oldState.update(newState)
      Iterator()
    }
  }
}

updateAcrossEvents: (device: String, inputs: Iterator[InputRow], oldState: org.apache.spark.sql.streaming.GroupState[DeviceState])Iterator[OutputRow]


In [25]:
// COMMAND ----------

import org.apache.spark.sql.streaming.GroupStateTimeout

withEventTime
  .selectExpr("Device as device",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp", "x")
  .as[InputRow]
  .groupByKey(_.device)
  .flatMapGroupsWithState(OutputMode.Append,
    GroupStateTimeout.NoTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("flatMapGroupsWithState")
  .format("memory")
  .outputMode("append")
  .start()

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1e1d7b58

In [26]:
// COMMAND ----------

case class InputRow(uid:String, timestamp:java.sql.Timestamp, x:Double,
  activity:String)
case class UserSession(val uid:String, var timestamp:java.sql.Timestamp,
  var activities: Array[String], var values: Array[Double])
case class UserSessionOutput(val uid:String, var activities: Array[String],
  var xAvg:Double)

defined class InputRow
defined class UserSession
defined class UserSessionOutput


In [27]:
// COMMAND ----------

def updateWithEvent(state:UserSession, input:InputRow):UserSession = {
  // handle malformed dates
  if (Option(input.timestamp).isEmpty) {
    return state
  }

  state.timestamp = input.timestamp
  state.values = state.values ++ Array(input.x)
  if (!state.activities.contains(input.activity)) {
    state.activities = state.activities ++ Array(input.activity)
  }
  state
}

updateWithEvent: (state: UserSession, input: InputRow)UserSession


In [28]:
// COMMAND ----------

import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode,
  GroupState}

def updateAcrossEvents(uid:String,
  inputs: Iterator[InputRow],
  oldState: GroupState[UserSession]):Iterator[UserSessionOutput] = {

  inputs.toSeq.sortBy(_.timestamp.getTime).toIterator.flatMap { input =>
    val state = if (oldState.exists) oldState.get else UserSession(
    uid,
    new java.sql.Timestamp(6284160000000L),
    Array(),
    Array())
    val newState = updateWithEvent(state, input)

    if (oldState.hasTimedOut) {
      val state = oldState.get
      oldState.remove()
      Iterator(UserSessionOutput(uid,
      state.activities,
      newState.values.sum / newState.values.length.toDouble))
    } else if (state.values.length > 1000) {
      val state = oldState.get
      oldState.remove()
      Iterator(UserSessionOutput(uid,
      state.activities,
      newState.values.sum / newState.values.length.toDouble))
    } else {
      oldState.update(newState)
      oldState.setTimeoutTimestamp(newState.timestamp.getTime(), "5 seconds")
      Iterator()
    }

  }
}

updateAcrossEvents: (uid: String, inputs: Iterator[InputRow], oldState: org.apache.spark.sql.streaming.GroupState[UserSession])Iterator[UserSessionOutput]


In [29]:
// COMMAND ----------

import org.apache.spark.sql.streaming.GroupStateTimeout

withEventTime.where("x is not null")
  .selectExpr("user as uid",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp",
    "x", "gt as activity")
  .as[InputRow]
  .withWatermark("timestamp", "5 seconds")
  .groupByKey(_.uid)
  .flatMapGroupsWithState(OutputMode.Append,
    GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("count_based_device")
  .format("memory")
  .start()


// COMMAND ----------

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@23eb1fe3

In [30]:
spark.streams.active

[org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@556abaac, org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@23eb1fe3, org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5a1210a, org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@44adbf0d, org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@2604e04, org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1e1d7b58, org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@77f291bf, org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@4f2ce925]