# ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png)  3/ GOLD table: extract the sessions

<img style="float:right; height: 250px; margin: 0px 30px 0px 30px" src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/product/streaming-sessionization/session_diagram.png">

**Scala version:** This notebook implement the same logic as [the python]($../03-Delta-session-GOLD), but using Scala. As you'll see, the function signature is slightly different as we do not receive an iterator of Pandas Dataframe, but the logic remains identical.

### Why is this a challenge?
Because we don't have any event to flag the user disconnection, detecting the end of the session is hard. After 10 minutes without any events, we want to be notified that the session has ended.
However, spark will only react on event, not the absence of event.

Thanksfully, Spark Structured Streaming has the concept of timeout. 

**We can set a 10 minutes timeout in the state engine** and be notified 10 minutes later in order to close the session

<!-- tracking, please Collect usage data (view). Remove it to disable collection. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=data-engineering&org_id=1126292079753158&notebook=%2Fscala%2F03-Delta-session-GOLD-scala&demo_name=streaming-sessionization&event=VIEW&path=%2F_dbdemos%2Fdata-engineering%2Fstreaming-sessionization%2Fscala%2F03-Delta-session-GOLD-scala&version=1">

### A cluster has been created for this demo
To run this demo, just select the cluster `dbdemos-streaming-sessionization-david_lopez` from the dropdown menu ([open cluster configuration](https://adb-1126292079753158.18.azuredatabricks.net/#setting/clusters/0102-204425-e6rayurl/configuration)). <br />
*Note: If the cluster was deleted after 30 days, you can re-create it with `dbdemos.create_cluster('streaming-sessionization')` or re-install the demo: `dbdemos.install('streaming-sessionization')`*

In [0]:
%run ../_resources/00-setup-scala $reset_all_data=false


### Implementing the aggregation function to update our Session

In this simple example, we'll just be counting the number of click in the session.

In [0]:
import java.sql.Timestamp

waitForTable("events") // Wait until the previous table is created to avoid error if all notebooks are started at once

//Event (from the silver table)
case class ClickEvent(user_id: String, event_id: String, event_datetime: Timestamp, event_date: Long, platform: String, action: String, uri: String)
//Session (from the gold table)
case class UserSession(
  user_id: String, 
  var click_count: Integer = 0, 
  var start_time: Timestamp = Timestamp.valueOf("9999-12-31 23:59:29"), 
  var end_time: Timestamp = new Timestamp(0L), 
  var status: String = "online"
)

The function `updateState` will be called for each user with a list of events for this user.

In [0]:
import org.apache.spark.sql.streaming.{ GroupState, GroupStateTimeout, OutputMode }


val MaxSessionDuration = 30000

def updateState(user_id: String, events: Iterator[ClickEvent], state: GroupState[UserSession]): Iterator[UserSession] = {
  val curState = state.getOption.getOrElse { UserSession(user_id) } // get previous state or instantiate new with default
  if (state.hasTimedOut) {
    state.remove()
    Iterator(curState)
  } else {
    val updatedState = events.foldLeft(curState){ updateStateWithEvent }
    updatedState.status = "offline" // next iteration will be a timeout or restart
    state.update(updatedState)
    state.setTimeoutTimestamp(MaxSessionDuration)
    Iterator(updatedState)
  }
}

def updateStateWithEvent(state: UserSession, input: ClickEvent): UserSession = {
  state.status = "online"
  state.click_count += 1
  //Update then begining and end of our session
  if (input.event_datetime.after(state.end_time)) {
    state.end_time = input.event_datetime
  }
  if (input.event_datetime.before(state.start_time)) {
    state.start_time = input.event_datetime
  }
  state
}

val sessions = spark
  .readStream
  .format("delta")
  .table("events")  
  .as[ClickEvent]
  .groupByKey(_.user_id)
  .flatMapGroupsWithState(OutputMode.Append(), GroupStateTimeout.EventTimeTimeout)(updateState)
  .toDF

display(sessions)

# Updating the session table with number of clicks and end/start time

We want to have the session information in real time for each user. 

To do that, we'll create a Session table. Everytime we update the state, we'll UPSERT the session information:

- if the session doesn't exist, we add it
- if it exists, we update it with the new count and potential new status

This can easily be done with a MERGE operation using Delta and calling `foreachBatch`

In [0]:
import io.delta.tables.DeltaTable
import org.apache.spark.sql.DataFrame

def updateSessions(df: DataFrame, epochId: Long): Unit = {
  // Create the table if it doesn't exist (we need it to be able to perform the merge)
  if (!spark.catalog.tableExists("sessions")) {
    df.limit(0).write.option("mergeSchema", "true").mode("append").saveAsTable("sessions")
  }

  DeltaTable.forName(spark, "sessions").alias("s")
    .merge(source = df.alias("u"), condition = "s.user_id = u.user_id")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

sessions
  .writeStream
  .option("checkpointLocation", s"$cloudStoragePath/checkpoints/sessions")
  .foreachBatch(updateSessions _)
  .start()

waitForTable("sessions")

In [0]:
%sql SELECT * FROM sessions

In [0]:
%sql SELECT CAST(avg(end_time - start_time) as INT) average_session_duration FROM sessions

In [0]:
stopAllStreams(sleepTime=120)

### We now have our sessions stream running!

We can set the output of this streaming job to a SQL database or another queuing system.

We'll be able to automatically detect cart abandonments in our website and send an email to our customers, our maybe just give them a call asking if they need some help! 