d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px; height: 163px">
</div>

# Databricks Partner Capstone Project

This optional capstone is included in the course to help you solidify key topics related to Databricks, Structured Streaming, and Delta.

# Capstone Overview

In this project you will build a Delta Lake over incoming Streaming Data by using a series of Bronze, Silver, and Gold Tables. 

The Goal of the project is to gain actionable insights from a data lake, using a series of connected tables that: 
* Preserve the raw data
* Enrich the data by joining with additional static table
* Use Structured Streaming along with Delta tables to guarantee a robust solution

### Scenario:

A video gaming company stores historical data in a data lake, which is growing exponentially. 

The data isn't sorted in any particular way (actually, it's quite a mess).

It is proving to be _very_ difficult to query and manage this data because there is so much of it.


## Instructions
1. Read in streaming data into Databricks Delta bronze tables
2. Create Databricks Delta silver table
3. Compute aggregate statistics about data i.e. create gold table

-sandbox

## What is a table? 
Before we continue, we need to address a semantic concern addressed by the [Databricks docs](https://docs.databricks.com/user-guide/tables.html#view-databases-and-tables):

> A Databricks table is a collection of structured data. Tables are equivalent to Apache Spark DataFrames.

Generally, the distinction between tables and DataFrames in Spark can be summarized by discussing scope and persistence:
- Tables are defined at the **workspace** level and **persist** between notebooks.
- DataFrames are defined at the **notebook** level and are **ephemeral**.

When we discuss **Delta tables**, we are always talking about collections of structured data that persist between notebooks. Importantly, we do not need to register a directory of files to Spark SQL in order to refer to them as a table. The directory of files itself _is_ the table; registering it with a useful name to Spark SQL just gives us easy accessing to querying these underlying data.

A **Delta Lake** can be thought of as a collection of one or many Delta tables. Generally, an entire elastic storage container will be dedicated to a single Delta Lake, and data will be enriched and cleaned as it is promoted through pre-defined logic.

<img alt="Best Practice" title="Best Practice" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-blue-ribbon.svg"/> To make Delta tables easily accessible, register them using Spark SQL. Use table ACLs to control access in workspaces shared by many diverse parties within an organization.

## Getting Started

Run the following cell to configure our environment.

In [0]:
%run "./Includes/Classroom-Setup"

-sandbox

### Set up paths

The cell below sets up relevant paths in DBFS.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> It also clears out this directory (to ensure consistent results if re-run). This operation can take several minutes.

In [0]:
inputPath = "/mnt/training/gaming_data/mobile_streaming_events"

basePath = userhome + "/capstone"
outputPathBronze = basePath + "/gaming/bronze"
outputPathSilver = basePath + "/gaming/silver"
outputPathGold   = basePath + "/gaming/gold"

dbutils.fs.rm(basePath, True)

### SQL Table Setup

The follow cell drops a table that we'll be creating later in the notebook.

(Dropping the table prevents challenges involved if the notebook is run more than once.)

In [0]:
%sql
DROP TABLE IF EXISTS mobile_events_delta_gold;

### Step 1: Prepare Schema and Read Streaming Data from input source

The input source is a folder containing 20 files of around 50 MB each. 

The stream is configured to read one file per trigger. 

Run this code to start the streaming read from the file directory.

In [0]:
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType, DoubleType

eventSchema = ( StructType()
  .add('eventName', StringType()) 
  .add('eventParams', StructType() 
    .add('game_keyword', StringType()) 
    .add('app_name', StringType()) 
    .add('scoreAdjustment', IntegerType()) 
    .add('platform', StringType()) 
    .add('app_version', StringType()) 
    .add('device_id', StringType()) 
    .add('client_event_time', TimestampType()) 
    .add('amount', DoubleType()) 
  )     
)

gamingEventDF = (spark
  .readStream
  .schema(eventSchema) 
  .option('streamName','mobilestreaming_demo') 
  .option("maxFilesPerTrigger", 1)                # treat each file as Trigger event
  .json(inputPath) 
)

-sandbox
### Step 2: Write Stream to Bronze Table

Write some code that performs the following tasks

* Write the stream from `gamingEventDF` -- the stream defined above -- to a bronze Delta table in path defined by `outputPathBronze`.
* Convert the input column `client_event_time` to a date format and rename the column to `eventDate`
* Filter out records with a null value in the `eventDate` column
* Make sure you provide a checkpoint directory that is unique to this stream

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> Using `append` mode when streaming allows us to insert data indefinitely without rewriting already processed data.

In [0]:
###IGNORE CELL-for testing purposes

gamingEventDF.printSchema()


#from pyspark.sql.functions import to_date
#df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
#display(df)
#display(df.select(to_date(df.t).alias('date')).collect())

In [0]:
###IGNORE CELL-for testing purposes

from pyspark.sql.functions import col
#print(gamingEventDF.withColumnRenamed("eventParams.client_event_time","eventDate").dtypes) #######with columns renamed doesn't work for nested columns
#testdf = gamingEventDF.select("eventName",gamingEventDF.eventParams.client_event_time.alias("abc"))  #######flattening it out

###to date
#testdf = gamingEventDF.select("eventName",gamingEventDF.eventParams.client_event_time)
testdf = gamingEventDF.select("eventName",to_date(gamingEventDF.eventParams.client_event_time).alias("eventDate"))  #######flattening it out
#testdf = gamingEventDF.select("eventName","eventParams")
testdf.printSchema()
#gamingEventDF.withColumnRenamed("eventName","even").printSchema()

In [0]:
# TODO

from pyspark.sql.functions import col, to_date

#eventStream = gamingEventDF.withColumnRenamed("client_event_time","eventDate").filter(col("eventDate").isNotNull()).to_date()

eventsStream = (gamingEventDF
  .withColumn("eventDate",to_date(gamingEventDF.eventParams.client_event_time))
  .filter(col("eventDate").isNotNull())
  .writeStream
  .format("delta")
  .option("checkpointLocation", outputPathBronze + "/_checkpoint") 
  .queryName("bronze_stream")
  .outputMode("append")
  .start(outputPathBronze)
)

In [0]:
eventsStream.recentProgress

In [0]:
###IGNORE CELL-for testing purposes
display(gamingEventDF, "bronze_stream")

eventName,eventParams
,
,
,
,
,
,
,
,
,
,


In [0]:
###IGNORE CELL-for testing purposes
eventsStream.awaitTermination(5)
eventsStream.stop()

### Step 3a: Load static data for enrichment

Register a static lookup table to associate `deviceId` with `deviceType` = `{android, ios}`.

While we refer to this as a lookup table, here we'll define it as a DataFrame. This will make it easier for us to define a join on our streaming data in the next step.

Create `deviceLookupDF` from data in `/mnt/training/gaming_data/dimensionData`.

In [0]:
# TODO
lookupPath = "/mnt/training/gaming_data/dimensionData"

deviceLookupDF = spark.read.format("delta").load(lookupPath)

-sandbox
### Step 3b: Create a streaming silver Delta table

A silver table is a table that combines, improves, or enriches bronze data. 

In this case we will join the bronze streaming data with some static data to add useful information. 

#### Steps to complete

Create a new stream by joining `deviceLookupDF` with the bronze table stored at `outputPathBronze` on `deviceId`.
* Make sure you do a streaming read and write
* Your selected fields should be:
  - `eventName`
  - `device_id`
  - `client_event_time`
  - `eventDate`
  - `deviceType`
* **NOTE**: some of these fields are nested; alias them to end up with a flat schema
* Write to `outputPathSilver`

<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> Don't forget to checkpoint your stream!

In [0]:
# flattening tested in cmd 15 test block
#testdf = gamingEventDF.select("eventName",gamingEventDF.eventParams.client_event_time.alias("abc"))  #######flattening it out
##Join example from pyspark documentation
#>>> df.join(df2, 'name', 'inner').drop('age', 'height').collect()
#[Row(name='Bob')]

from pyspark.sql.functions import col

(spark.readStream
  .format("delta")
  .load(outputPathBronze)
 .select("eventName",
         col("eventParams.device_id").alias("device_id"),
         col("eventParams.client_event_time").alias("client_event_time"),
         "eventDate")
 .join(deviceLookupDF, "device_id", "left")
 .writeStream 
 .format("delta")
 .option("checkpointLocation", outputPathSilver + "/_checkpoint")
 .queryName("silver_stream")
 .outputMode("append")
 .start(outputPathSilver))


In [0]:
display(spark.sql("SELECT * FROM delta.`{}` LIMIT 5".format(outputPathSilver)))

device_id,eventName,client_event_time,eventDate,deviceType
004a5732188b4b11911c60b5b8eff7f3,scoreAdjustment,2019-08-04T00:06:32.000+0000,2019-08-04,ios
00e2d25735e4497399734e4ec44198d9,scoreAdjustment,2019-01-21T17:28:11.000+0000,2019-01-21,ios
012a3c37d3bf452dbb07050f5a886946,scoreAdjustment,2018-08-18T22:58:39.000+0000,2018-08-18,ios
0176636fc7c94663bfd0afe0ebb9486f,scoreAdjustment,2019-04-03T20:23:23.000+0000,2019-04-03,android
0191025a8616492f9a3ba292baddc953,scoreAdjustment,2019-07-05T09:55:03.000+0000,2019-07-05,android


In [0]:
###IGNORE CELL-for testing purposes
## group by and distinct alternate ways
from pyspark.sql.functions import countDistinct

x = [("week1","id1"),("week2","id1"),("week2","id1"),("week1","id1"),("week1","id2"),("week1","id2"),("week2","id2"),("week3","id3"),("week2","id4")]
y = spark.createDataFrame(x,["year","id"])

display(y.groupby("year").agg(countDistinct("id")).withColumnRenamed("count(DISTINCT id)", "WAU"))
#display(y.distinct().groupby("year").count())

year,WAU
week1,2
week2,3
week3,1


### Step 4a: Batch Process a Gold Table from the Silver Table

The company executives want to look at the number of active users by week. They use SQL so our target will be a SQL table backed by a Delta Lake. 

The table should have the following columns:
- `WAU`: count of weekly active users (distinct device IDs grouped by week)
- `week`: week of year (the appropriate SQL function has been imported for you)

In the first step, calculate these

In [0]:
###Correct command in NEXT CELL.
### countDistinct didn't work as distinct aggregations aren't supported by streaming dataframes. We could use approx_count_distinct(), and set error margin using rsd parameter.
##whenever we are aggregating streaming data, we cannot append. Because aggregation metrics will change with incoming stream. We have to use complete mode.
from pyspark.sql.functions import weekofyear
from pyspark.sql.functions import countDistinct

(spark.readStream
 .format("Delta")
 .load(outputPathSilver)
 .select(col("device_id"), 
         weekofyear(col("client_event_time")).alias("week"))
.groupBy("week")
.agg(countDistinct("device_id"))
.withColumnRenamed("count(DISTINCT id)", "WAU")
.writeStream
.format("delta")
.option("checkpointLocation", outputPathGold + "/_checkpoint")
.queryName("gold_stream")
.outputMode("complete")
.start(outputPathGold)
)

##see next cell

In [0]:
# TODO
##whenever we are aggregating streaming data, we cannot append. Because aggregation metrics will change with incoming stream. We have to use complete mode.
from pyspark.sql.functions import weekofyear

(spark.readStream
 .format("Delta")
 .load(outputPathSilver)
 .select(col("device_id"), 
         weekofyear(col("client_event_time")).alias("week")).distinct()
 .groupBy("week")
 .count()
 .withColumnRenamed("count", "WAU")
.writeStream
.format("delta")
.option("checkpointLocation", outputPathGold + "/_checkpoint")
.queryName("gold_stream")
.outputMode("complete")
.start(outputPathGold)
)

In [0]:
display(dbutils.fs.ls(outputPathGold))

path,name,size
dbfs:/user/akarsh.pydimarri@wavicledata.com/capstone/gaming/gold/_checkpoint/,_checkpoint/,0
dbfs:/user/akarsh.pydimarri@wavicledata.com/capstone/gaming/gold/_delta_log/,_delta_log/,0
dbfs:/user/akarsh.pydimarri@wavicledata.com/capstone/gaming/gold/part-00000-0942a85d-f1d5-4361-8bad-86035364ba3a-c000.snappy.parquet,part-00000-0942a85d-f1d5-4361-8bad-86035364ba3a-c000.snappy.parquet,357
dbfs:/user/akarsh.pydimarri@wavicledata.com/capstone/gaming/gold/part-00000-157c7c2d-89e5-4f10-acb3-67200faaa64d-c000.snappy.parquet,part-00000-157c7c2d-89e5-4f10-acb3-67200faaa64d-c000.snappy.parquet,357
dbfs:/user/akarsh.pydimarri@wavicledata.com/capstone/gaming/gold/part-00000-2c4ffdf0-8168-4959-aa9e-87c10dbba502-c000.snappy.parquet,part-00000-2c4ffdf0-8168-4959-aa9e-87c10dbba502-c000.snappy.parquet,357
dbfs:/user/akarsh.pydimarri@wavicledata.com/capstone/gaming/gold/part-00000-2d582d9f-d41c-4729-b793-7e448a380c13-c000.snappy.parquet,part-00000-2d582d9f-d41c-4729-b793-7e448a380c13-c000.snappy.parquet,357
dbfs:/user/akarsh.pydimarri@wavicledata.com/capstone/gaming/gold/part-00000-42300cb5-8b22-4fea-9bc0-a955e5572067-c000.snappy.parquet,part-00000-42300cb5-8b22-4fea-9bc0-a955e5572067-c000.snappy.parquet,357
dbfs:/user/akarsh.pydimarri@wavicledata.com/capstone/gaming/gold/part-00000-516f87c1-36bc-46e1-9aa8-6f2b977697ea-c000.snappy.parquet,part-00000-516f87c1-36bc-46e1-9aa8-6f2b977697ea-c000.snappy.parquet,357
dbfs:/user/akarsh.pydimarri@wavicledata.com/capstone/gaming/gold/part-00000-559d8943-7b6c-4764-9a68-7f3b19123e1c-c000.snappy.parquet,part-00000-559d8943-7b6c-4764-9a68-7f3b19123e1c-c000.snappy.parquet,357
dbfs:/user/akarsh.pydimarri@wavicledata.com/capstone/gaming/gold/part-00000-602145d0-7cb2-42df-a37e-c8fe67a1bf52-c000.snappy.parquet,part-00000-602145d0-7cb2-42df-a37e-c8fe67a1bf52-c000.snappy.parquet,357


In [0]:
display(spark.sql("SELECT * FROM delta.`{}` LIMIT 5".format(outputPathGold)))

week,WAU
40,54671
20,79488
31,110150
25,109648
49,54626


-sandbox

### Step 4b: Register Gold SQL Table

By linking the Spark SQL table with the Delta Lake file path, we will always get results from the most current valid version of the streaming table.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> It may take some time for the previous streaming operations to start. 

Once they have started register a SQL table against the gold Delta Lake path. 

* tablename: `mobile_events_delta_gold`
* table Location: `outputPathGold`

In [0]:
# TODO
spark.sql("""
   CREATE TABLE IF NOT EXISTS mobile_events_delta_gold
   USING DELTA
   LOCATION '{}'
  """.format(outputPathGold))

-sandbox
### Step 4c: Visualization

The company executives are visual people: they like pretty charts.

Create a bar chart out of `mobile_events_delta_gold` where the horizontal axis is month and the vertical axis is WAU.

Under <b>Plot Options</b>, use the following:
* <b>Keys:</b> `week`
* <b>Values:</b> `WAU`

In <b>Display type</b>, use <b>Bar Chart</b> and click <b>Apply</b>.

<img src="https://s3-us-west-2.amazonaws.com/files.training.databricks.com/images/eLearning/Delta/plot-options-bar.png"/>

<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> order by `week` to seek time-based patterns.

In [0]:
%sql
SELECT * FROM mobile_events_delta_gold
order by week
;

week,WAU
1,55064
2,54883
3,55075
4,54996
5,55290
6,54885
7,54670
8,55190
9,55120
10,54944


### Step 5: Wrap-up

* Stop streams

In [0]:
for s in spark.streams.active:
  s.stop()

Congratulations: ALL DONE!!

-sandbox
&copy; 2019 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>