# Databricks Delta Streaming

Data from 
`/mnt/training/definitive-guide/data/activity-data`
contains smartphone accelerometer samples from all devices and users. 

The CSV file consists of the following columns:

- `Index`
- `Arrival_Time`
- `Creation_Time`
- `x`
- `y`
- `z`
- `User`
- `Model`
- `Device`
- `gt`

In [2]:
%run "./Reference/Setup"

In [3]:
dataPath = "/mnt/training/definitive-guide/data/activity-data"
outputPath = userhome + "/gaming"
basePath = userhome + "/advanced-streaming"
checkpointPath = basePath + "/checkpoints"
activityPath = basePath + "/activityCount"

-sandbox
A streaming system consists of 
* <b>Input source</b> such as Kafka, Azure Event Hub, files on a distributed system or TCP-IP sockets
* <b>Sinks</b> such as Kafka, Azure Event Hub, various file formats, `forEach` sinks, console sinks or memory sinks

### Streaming and Databricks Delta

In streaming, the problems of traditional data pipelines are exacerbated. 

Specifically, with frequent meta data refreshes, table repairs and accumulation of small files on a secondly- or minutely-basis!

Many small files result because data (may be) streamed in at low volumes with short triggers.

Databricks Delta is uniquely designed to address these needs.

In [6]:
static = spark.read.json(dataPath)
dataSchema = static.schema

deltaStreamWithTimestampDF = (spark
  .readStream
  .format("delta")
  .option("maxFilesPerTrigger", 1)
  .schema(dataSchema)
  .json(dataPath)
  .withColumnRenamed('Index', 'User_ID')
  .selectExpr("*","cast(cast(Arrival_Time as double)/1000000000 as timestamp) as event_time")
)

In [7]:
deltaStreamingQuery = (deltaStreamWithTimestampDF
  .writeStream
  .format("delta")
  .option("checkpointLocation", checkpointPath)
  .outputMode("append")
  .start(basePath)
)

See list of active streams.

In [9]:
for s in spark.streams.active:
  print(s.id)

In [10]:
activityCountsQuery = (spark.readStream
  .format("delta")
  .load(str(basePath))   
  .groupBy("gt")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", checkpointPath + "/activityCount")
  .outputMode("complete")
  .start(activityPath)
)

In [11]:
from pyspark.sql.functions import hour, window, col

countsDF = (deltaStreamWithTimestampDF      
  .withWatermark("event_time", "180 minutes")
  .groupBy(window("event_time", "60 minute"),"gt")
  .count()
  .withColumn('hour',hour(col('window.start')))     
)

display(countsDF.withColumn('hour',hour(col('window.start'))))

window,gt,count,hour
"List(1970-01-01T00:00:00.000+0000, 1970-01-01T01:00:00.000+0000)",sit,984714,0
"List(1970-01-01T00:00:00.000+0000, 1970-01-01T01:00:00.000+0000)",stairsup,836598,0
"List(1970-01-01T00:00:00.000+0000, 1970-01-01T01:00:00.000+0000)",stairsdown,749059,0
"List(1970-01-01T00:00:00.000+0000, 1970-01-01T01:00:00.000+0000)",,835725,0
"List(1970-01-01T00:00:00.000+0000, 1970-01-01T01:00:00.000+0000)",stand,910783,0
"List(1970-01-01T00:00:00.000+0000, 1970-01-01T01:00:00.000+0000)",bike,863710,0
"List(1970-01-01T00:00:00.000+0000, 1970-01-01T01:00:00.000+0000)",walk,1060402,0


In [12]:
# ANSWER
for streamingQuery in spark.streams.active:
  streamingQuery.stop()

In [13]:
%sql
select count(*) from demo_iot_data_delta


count(1)
110000


In [14]:
print(basePath)
print(activityPath)

In [15]:
%python

spark.sql("""
  DROP TABLE IF EXISTS stream_activity_delta
""")
spark.sql("""
  CREATE TABLE stream_activity_delta 
  USING DELTA 
  LOCATION '{}' 
""".format(activityPath))

In [16]:
%sql

select * from stream_activity_delta

gt,count
stairsdown,402587
stairsup,449685
stand,489536
bike,464277
,449217
walk,569997
sit,529241


In [17]:
%python

spark.sql("""
  DROP TABLE IF EXISTS stream_delta
""")
spark.sql("""
  CREATE TABLE stream_delta 
  USING DELTA 
  LOCATION '{}' 
""".format(basePath))

In [18]:
%sql

select count(1) from stream_delta

count(1)
3276526


In [19]:
%sql

select count(1) from stream_delta

count(1)
1794269
