# Databricks Overview 

Databricks provides a Unified Analytics Platform to help business transform their data strategy. We do this by providing a collaborative, scalable, and easy-to-use environment built around Databricks Runtime and Apache Spark.

Databricks provides several key differentiators that will be shown in this demo, including Data engineering capabilities, Databricks Delta,  Databricks Runtime.

In this demo, we'll cover the basics of Databricks using the following architecture:
<div style="text-align: center; line-height: 0; padding-top: 0px;"><br><img src="https://i.imgur.com/5n2ZX8o.png" height="00" width="700"></div>

We'll break this down into 4 sections:
- Platform Overview
- Loading and viewing data
- Manipulating and enriching data
- Streaming data
- Databricks Delta

In [2]:
%run ./Delta-Setup

In [3]:
%fs ls /databricks/ganeshrj/stocks

path,name,size
dbfs:/databricks/ganeshrj/stocks/AMZN.csv,AMZN.csv,98657
dbfs:/databricks/ganeshrj/stocks/APPL.csv,APPL.csv,102568
dbfs:/databricks/ganeshrj/stocks/GOOG.csv,GOOG.csv,103177
dbfs:/databricks/ganeshrj/stocks/NFLX.csv,NFLX.csv,1524


In [4]:
 stocksDF = spark.read.jdbc(url=jdbcUrl, table="metadata", properties=connectionProps)
 display (stocksDF)

sticker,name
APPL,Apple Computers Limited
GOOG,Google Inc
NFLX,Netflix Inc
AMZN,Amazon Inc


Run the following code cell to set up paths and remove any leftover data.

In [6]:
genericDataPath = userhome + "/generic/stock-data/"
deltaDataPath = userhome + "/delta/stock-data/"
backfillDataPath = userhome + "/delta/backfill-data/"
deltaMiniDataPath = userhome + "/delta/mini-delta"

dbutils.fs.rm(genericDataPath, True)
dbutils.fs.rm(deltaDataPath, True)
dbutils.fs.rm(backfillDataPath, True)
dbutils.fs.rm(deltaMiniDataPath, True)

###  READ csv data for each stock then WRITE to Parquet / Delta

Read the data into a DataFrame. Since this is a CSV file, let Spark infer the schema from the first row by setting
* `inferSchema` to `true`
* `header` to `true`

Drop the `Adj Close` column, as it's not needed.

## Part 2: Loading and viewing data

Now that we've seen the environment, we can load some data. In this case, we'll load user data from a MySQL database, and also some reference data from a CSV file on S3.
<div style="text-align: center; line-height: 0; padding-top: 9px;"><img src="https://i.imgur.com/r4hY0rS.png" height="00" width="700"></div>

On Databricks, it's simple to unify many different sources across traditional silos into a single environment; it's just a few lines of code.

In [9]:
for stickers in ['APPL', 'GOOG', 'AMZN', 'NFLX']:
  path = "dbfs:/databricks/ganeshrj/stocks/" + stickers + ".csv"
  print (path)  
  rawDataDF = (spark.read 
  .option("inferSchema", "true") 
  .option("header", "true")
  .csv(path)
  .drop("Adj Close"))
  
  rawDataDF = rawDataDF.withColumn("month",(month(rawDataDF.date))).withColumn("Year",(year(rawDataDF.date))).withColumn("Stickers", lit(stickers))
  
  ## Join with StocksDF to get the company name
  joinedDF=rawDataDF.join(broadcast(stocksDF), rawDataDF.Stickers == stocksDF.sticker).drop("Stickers")
  
  ### Write as Parquet 
  joinedDF.write.format("parquet").partitionBy("Year").mode("append").save(genericDataPath)
  
  ### Write as Delta
  joinedDF.write.format("delta").partitionBy("Year").mode("append").save(deltaDataPath)
  
  display (joinedDF)
  
   

In [10]:
rawDataDF.printSchema()

-sandbox
### CREATE Using Non-Delta Pipeline

Create a table called `generic_stocks` using `parquet` out of the above data.

<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> Notice how you MUST specify a schema and partitioning info!

In [12]:
spark.sql("""
    use ngcdemo
  """)
spark.sql("""
    DROP TABLE IF EXISTS generic_stocks
  """)
spark.sql("""
    CREATE TABLE generic_stocks (
      Date TIMESTAMP,
      Open DOUBLE,
      High DOUBLE,
      Low DOUBLE,
      Close DOUBLE,
      Volume INTEGER,
      Month INTEGER,
      Year INTEGER, 
      Sticker STRING,
      name STRING)
    USING parquet 
    OPTIONS (path = '{}' )
    PARTITIONED BY (Year)
  """.format(genericDataPath))
None

Perform a simple `count` query to verify the number of records.

In [14]:
%sql
SELECT count(*) FROM generic_stocks

count(1)
0


-sandbox

<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> Wait, no results? 

What is going on here is a problem that stems from its Apache Hive origins.
It's the concept of <br>
<b>schema on read</b> where data is applied to a plan or schema as it is pulled out of a stored location, rather than as it goes into a stored location.

This means that as soon as you put data into a data lake, the schema is unknown <i>until</i> you perform a read operation.

To remedy, you repair the table using `MSCK REPAIR TABLE`.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> Only after table repair is our count of customer data correct.

In [16]:
%sql
MSCK REPAIR TABLE generic_stocks;

SELECT  Sticker , High, Low  FROM generic_stocks where date(Date) = '2019-03-25' 


Sticker,High,Low
GOOG,1206.4,1187.31
AMZN,1782.68,1747.5
NFLX,367.04,357.44
APPL,191.976,186.6


In [17]:
%sql 

select * from generic_stocks;

Date,Open,High,Low,Close,Volume,Month,Sticker,name,Year
2013-12-31T00:00:00.000+0000,79.1671,80.1829,79.1429,80.1457,55771100,12,APPL,Apple Computers Limited,2013
2013-12-30T00:00:00.000+0000,79.6371,80.0129,78.9029,79.2171,63407400,12,APPL,Apple Computers Limited,2013
2013-12-27T00:00:00.000+0000,80.5457,80.63,79.9286,80.0129,56471100,12,APPL,Apple Computers Limited,2013
2013-12-26T00:00:00.000+0000,81.1571,81.3571,80.4829,80.5571,51002000,12,APPL,Apple Computers Limited,2013
2013-12-24T00:00:00.000+0000,81.4129,81.6971,80.8614,81.0957,41888700,12,APPL,Apple Computers Limited,2013
2013-12-23T00:00:00.000+0000,81.1429,81.5314,80.3943,81.4414,125326600,12,APPL,Apple Computers Limited,2013
2013-12-20T00:00:00.000+0000,77.9186,78.8014,77.8314,78.4314,109103400,12,APPL,Apple Computers Limited,2013
2013-12-19T00:00:00.000+0000,78.5,78.5714,77.6757,77.78,80077200,12,APPL,Apple Computers Limited,2013
2013-12-18T00:00:00.000+0000,78.5286,78.7786,76.9714,78.6814,141465800,12,APPL,Apple Computers Limited,2013
2013-12-17T00:00:00.000+0000,79.4014,79.92,79.0543,79.2843,57475600,12,APPL,Apple Computers Limited,2013


In [18]:
%sql 

select * from stocks_delta;

date,open,high,low,close,volume,month,Year,sticker,name
2014-12-31T00:00:00.000+0000,528.345,529.688,522.925,523.521,1372000,12,2014,GOOG,Google Inc
2014-12-30T00:00:00.000+0000,525.202,528.245,524.247,527.519,878600,12,2014,GOOG,Google Inc
2014-12-29T00:00:00.000+0000,529.28,532.552,527.113,527.43,2284800,12,2014,GOOG,Google Inc
2014-12-26T00:00:00.000+0000,525.878,531.328,524.426,531.11,1043400,12,2014,GOOG,Google Inc
2014-12-24T00:00:00.000+0000,527.609,528.851,524.138,525.878,707800,12,2014,GOOG,Google Inc
2014-12-23T00:00:00.000+0000,524.118,531.637,523.412,527.688,2203600,12,2014,GOOG,Google Inc
2014-12-22T00:00:00.000+0000,513.258,523.581,513.258,522.0,2731200,12,2014,GOOG,Google Inc
2014-12-19T00:00:00.000+0000,508.713,514.889,504.139,513.526,3700300,12,2014,GOOG,Google Inc
2014-12-18T00:00:00.000+0000,510.145,511.06,501.94,508.305,2934700,12,2014,GOOG,Google Inc
2014-12-17T00:00:00.000+0000,494.282,504.227,494.093,502.129,2891000,12,2014,GOOG,Google Inc


## Part 3: Manipulating and Enriching Data

We've loaded some data into Databricks, but now what? Databricks makes it easy to clean, manipulate, enrich, and then store data for downstream use. 

We'll blend our two data sources together, then view the results. We'll also use __Delta__ to persist the data into a permanent, performant, and easily accessible format.

<div style="text-align: center; line-height: 0; padding-top: 9px;"><img src="https://i.imgur.com/N2ASM7K.png" height="00" width="700"></div>

In [20]:
from pyspark.sql.types import * 


sentimentsDF = spark.read.option("inferSchema", "true").option("header", "true").csv("hdfs://hdp-c1.ganeshrj.com:8020/user/root/APPL_Sentiments.csv")


redinedDF=sentimentsDF.withColumnRenamed("published at", "publishTime")

redinedDF.write.format("delta").saveAsTable("ngcdemo.appl_sentiments")


In [21]:
%sql describe ngcdemo.appl_sentiments 

col_name,data_type,comment
title,string,
ticker,string,
author,string,
description,string,
source,string,
url,string,
publishTime,string,
sentiment,double,


In [22]:
%sql 
select count(title) as cnt, date(publishTime) as pdate, avg(sentiment) as senti from ngcdemo.appl_sentiments group by date(publishTime) order by count(title) desc;

cnt,pdate,senti
102,2019-03-25,0.3596078431372548
66,2019-04-23,0.018030303030303
66,2019-04-09,0.2872727272727273
60,2019-03-26,0.1465
57,2018-11-28,0.2545614035087719
57,2019-04-25,0.2180701754385964
56,2019-04-26,0.3037499999999999
55,2019-04-27,0.3045454545454546
54,2019-04-10,0.1687037037037037
53,2019-04-24,0.2815094339622641


In [23]:
%sql 

select ticker ,title, date(publishTime), sentiment from ngcdemo.appl_sentiments where date(publishTime) = '2019-03-25'

ticker,title,publishTime,sentiment
AAPL,Jennifer Aniston Reese Witherspoon and Steve Carell preview The Morning Show at Apple launch event,2019-03-25,0.34
AAPL,Why Roku Was Also a Big Winner After Apple?s Special TV Launch Event,2019-03-25,0.76
AAPL,Apple Enters the Credit Card Market With?Yep?Apple Card,2019-03-25,0.38
AAPL,How to Sign Up For an Apple News+ Subscription,2019-03-25,0.57
AAPL,LAUGHABLE: Apple?s Big Idea Is? a Credit Card?,2019-03-25,0.8
AAPL,Apple Releases macOS Mojave 10.14.4 With Safari Automatic Dark Mode and Apple News+ Support,2019-03-25,0.33
AAPL,Why Today?s Apple Event Was So Unusual,2019-03-25,0.64
AAPL,iOS 12.2 Suggests ECG App May Be Coming to UK and Other European Countries With watchOS 5.2,2019-03-25,0.44
AAPL,Here are the top 6 announcements from Apple?s ?Show Time? event,2019-03-25,0.56
AAPL,First pictures of Apple?s truly wireless Powerbeats Pro leak,2019-03-25,0.18


In [24]:
%sql 

select    Sticker ,Open, High, Low, Close, Volume, temp.senti as avg_sentiment  FROM generic_stocks , 
(select count(title) as cnt, date(publishTime) as pdate, avg(sentiment) as senti from ngcdemo.appl_sentiments group by date(publishTime) order by count(title) desc limit 1) temp 
where date(Date) = temp.pdate and Sticker = 'APPL';


Sticker,Open,High,Low,Close,Volume,avg_sentiment
APPL,191.51,191.976,186.6,188.74,43326124,0.3596078431372548


## DELTA

Create a table called `stocks_delta` using `DELTA` out of the above data.

In [26]:

spark.sql("""
  USE ngcdemo
""")

spark.sql("""
  DROP TABLE IF EXISTS stocks_delta
""")
spark.sql("""
  CREATE TABLE stocks_delta
  USING DELTA 
  LOCATION '{}' 
""".format(deltaDataPath))
None

#### Metadata

Since we already have data backing `stocks_delta` in place, 
the table in the Hive metastore automatically inherits the schema, partitioning, 
and table properties of the existing data. 

Note that we only store table name, path, database info in Hive metastore,
the actual schema is stored in `_delta_logs`.

Metadata is displayed through `DESCRIBE DETAIL <tableName>`.

As long as we have some data in place already for a Delta table, we can infer schema.

In [28]:
%sql
DESCRIBE  stocks_delta

col_name,data_type,comment
date,timestamp,
open,double,
high,double,
low,double,
close,double,
volume,int,
month,int,
Year,int,
sticker,string,
name,string,


-sandbox
Perform a simple `count` query to verify the number of records.

<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> Notice how the count is right off the bat; no need to worry about table repairs.

In [30]:
%sql
SELECT count(*) FROM generic_stocks

count(1)
5503


## Append

Now - Let's append some NetFlix Stock Data

In [32]:
janDataDF = (spark       
  .read                                              # Read a DataFrame from storage
  .option("inferSchema","true")                      # Infer schema
  .option("header","true")                           # File has a header
  .csv("dbfs:/databricks/ganeshrj/apple/NFLX_JAN.csv")     # Path to file 
  .drop("Adj Close"))

janDataDF = janDataDF.withColumn("Month", month(janDataDF.Date)).withColumn("Year",(year(janDataDF.Date))).withColumn("Stickers", lit("NFLX")) 

display(janDataDF)

Date,Open,High,Low,Close,Volume,Month,Year,Stickers
2018-01-02T00:00:00.000+0000,170.160004,172.300003,169.259995,172.259995,25555900,1,2018,NFLX
2018-01-03T00:00:00.000+0000,172.529999,174.550003,171.960007,172.229996,29517900,1,2018,NFLX
2018-01-04T00:00:00.000+0000,172.539993,173.470001,172.080002,173.029999,22434600,1,2018,NFLX
2018-01-05T00:00:00.000+0000,173.440002,175.369995,173.050003,175.0,23660000,1,2018,NFLX
2018-01-08T00:00:00.000+0000,174.350006,175.610001,173.929993,174.350006,20567800,1,2018,NFLX
2018-01-09T00:00:00.000+0000,174.550003,175.059998,173.410004,174.330002,21584000,1,2018,NFLX
2018-01-10T00:00:00.000+0000,173.160004,174.300003,173.0,174.289993,23959900,1,2018,NFLX
2018-01-11T00:00:00.000+0000,174.589996,175.490005,174.490005,175.279999,18667700,1,2018,NFLX
2018-01-12T00:00:00.000+0000,176.179993,177.360001,175.649994,177.089996,25418100,1,2018,NFLX
2018-01-16T00:00:00.000+0000,177.899994,179.389999,176.139999,176.190002,29565900,1,2018,NFLX


Do a simple count of number of new items to be added to production data.

In [34]:
janDataDF.count()

## APPEND Using Non-Delta pipeline
Append to the production table.

In the next cell, load the new data in `parquet` format and save to `../generic/aapl-data/`.

In [36]:
(janDataDF
  .write
  .format("parquet")
  .partitionBy("Year")
  .mode("append")
  .save(genericDataPath)
)

Query should show new results.

Should be `5503 + 21 = 5524` rows

In [38]:
%sql

select count(*) from generic_stocks

count(1)
5503


-sandbox

That's the not the right count (the new rows haven't been added in correctly). 

<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> Repair the table again and count the number of records.

In [40]:
%sql
MSCK REPAIR TABLE generic_stocks;

SELECT count(*) FROM generic_stocks

count(1)
5524


## APPEND Using Delta Pipeline

Next, repeat the process by writing to Delta format. 

In the next cell, load the new data in Delta format and save to `/delta/aapl-data/`.

In [42]:
%sql 

describe stocks_delta;

col_name,data_type,comment
date,timestamp,
open,double,
high,double,
low,double,
close,double,
volume,int,
month,int,
Year,int,
sticker,string,
name,string,


In [43]:
(janDataDF
  .write
  .format("delta")
  .partitionBy("Year")
  .mode("append")
  .save(deltaDataPath)
)

In [44]:
janDataDF = (spark       
  .read                                              # Read a DataFrame from storage
  .option("inferSchema","true")                      # Infer schema
  .option("header","true")                           # File has a header
  .csv("dbfs:/databricks/ganeshrj/apple/NFLX_JAN.csv")     # Path to file 
  .drop("Adj Close"))

janDataDF = janDataDF.withColumn("Month", month(janDataDF.Date)).withColumn("Year",(year(janDataDF.Date))).withColumn("sticker1", lit("NFLX")) 

## Join with StocksDF to get the company name
janDataDF=janDataDF.join(broadcast(stocksDF), janDataDF.sticker1 == stocksDF.sticker).drop("sticker1") 

display(janDataDF)

Date,Open,High,Low,Close,Volume,Month,Year,sticker,name
2018-01-02T00:00:00.000+0000,170.160004,172.300003,169.259995,172.259995,25555900,1,2018,NFLX,Netflix Inc
2018-01-03T00:00:00.000+0000,172.529999,174.550003,171.960007,172.229996,29517900,1,2018,NFLX,Netflix Inc
2018-01-04T00:00:00.000+0000,172.539993,173.470001,172.080002,173.029999,22434600,1,2018,NFLX,Netflix Inc
2018-01-05T00:00:00.000+0000,173.440002,175.369995,173.050003,175.0,23660000,1,2018,NFLX,Netflix Inc
2018-01-08T00:00:00.000+0000,174.350006,175.610001,173.929993,174.350006,20567800,1,2018,NFLX,Netflix Inc
2018-01-09T00:00:00.000+0000,174.550003,175.059998,173.410004,174.330002,21584000,1,2018,NFLX,Netflix Inc
2018-01-10T00:00:00.000+0000,173.160004,174.300003,173.0,174.289993,23959900,1,2018,NFLX,Netflix Inc
2018-01-11T00:00:00.000+0000,174.589996,175.490005,174.490005,175.279999,18667700,1,2018,NFLX,Netflix Inc
2018-01-12T00:00:00.000+0000,176.179993,177.360001,175.649994,177.089996,25418100,1,2018,NFLX,Netflix Inc
2018-01-16T00:00:00.000+0000,177.899994,179.389999,176.139999,176.190002,29565900,1,2018,NFLX,Netflix Inc


In [45]:
%sql
SELECT count(*) FROM stocks_delta

count(1)
5524


## UPSERT

## UPSERT Using Non-Delta Pipeline 

This feature is not supported in non-Delta pipelines.

Furthermore, no updates or deletes, or any sort of modifications are supported in non-Delta pipelines.

## UPSERT Using Delta Pipeline

## Part 4: Update and Insert Data

**Databricks Delta** delivers a powerful transactional storage layer by harnessing the power of Apache Spark and Databricks DBFS. The core abstraction of Databricks Delta is an optimized Spark table that stores data as Parquet files in DBFS.

Maintains a transaction log that efficiently tracks changes to the table.

You read and write data stored in the delta format using the same familiar Apache Spark SQL batch and streaming APIs that you use to work with Hive tables and DBFS directories. With the addition of the transaction log and other enhancements, Databricks Delta offers significant benefits:

#### ACID transactions
Multiple writers can simultaneously modify a dataset and see consistent views. For qualifications, see Multi-cluster writes.
Writers can modify a dataset without interfering with jobs reading the dataset.
#### Fast read access
Automatic file management organizes data into large files that can be read efficiently.
Statistics enable speeding up reads by 10-100x and and data skipping avoids reading irrelevant information.

#### Upserts (MERGE INTO)
The MERGE INTO statement allows you to merge a set of updates and insertions into an existing dataset.

In [50]:
dbutils.fs.rm(deltaMiniDataPath, True)
deltaMiniDataPath = userhome + "/delta/mini-delta1"
newJanDataDF = janDataDF.where((col("Date") < '2018-01-13T00:00:00.000+0000')   &  (col("sticker") == 'NFLX'))

(newJanDataDF
  .write
  .format("delta")
  .partitionBy("Year")
  .save(deltaMiniDataPath) 
)

display (newJanDataDF)
spark.sql("use ngcdemo")
spark.sql("""
    DROP TABLE IF EXISTS jan_nflx_delta_mini
  """)
spark.sql("""
    CREATE TABLE jan_nflx_delta_mini
    USING DELTA 
    LOCATION '{}' 
  """.format(deltaMiniDataPath))
None

Date,Open,High,Low,Close,Volume,Month,Year,sticker,name
2018-01-02T00:00:00.000+0000,170.160004,172.300003,169.259995,172.259995,25555900,1,2018,NFLX,Netflix Inc
2018-01-03T00:00:00.000+0000,172.529999,174.550003,171.960007,172.229996,29517900,1,2018,NFLX,Netflix Inc
2018-01-04T00:00:00.000+0000,172.539993,173.470001,172.080002,173.029999,22434600,1,2018,NFLX,Netflix Inc
2018-01-05T00:00:00.000+0000,173.440002,175.369995,173.050003,175.0,23660000,1,2018,NFLX,Netflix Inc
2018-01-08T00:00:00.000+0000,174.350006,175.610001,173.929993,174.350006,20567800,1,2018,NFLX,Netflix Inc
2018-01-09T00:00:00.000+0000,174.550003,175.059998,173.410004,174.330002,21584000,1,2018,NFLX,Netflix Inc
2018-01-10T00:00:00.000+0000,173.160004,174.300003,173.0,174.289993,23959900,1,2018,NFLX,Netflix Inc
2018-01-11T00:00:00.000+0000,174.589996,175.490005,174.490005,175.279999,18667700,1,2018,NFLX,Netflix Inc
2018-01-12T00:00:00.000+0000,176.179993,177.360001,175.649994,177.089996,25418100,1,2018,NFLX,Netflix Inc


## Fix the data issue

In [52]:
from pyspark.sql.functions import lit
correctedDataDF = (newJanDataDF
  .withColumn("Volume", lit(4000000))
 )

spark.sql("use ngcdemo")
spark.sql("DROP TABLE IF EXISTS corrected_nflx_delta_to_upsert")
correctedDataDF.write.saveAsTable("corrected_nflx_delta_to_upsert")

In [53]:
%sql select * from corrected_nflx_delta_to_upsert

Date,Open,High,Low,Close,Volume,Month,Year,sticker,name
2018-01-02T00:00:00.000+0000,170.160004,172.300003,169.259995,172.259995,4000000,1,2018,NFLX,Netflix Inc
2018-01-03T00:00:00.000+0000,172.529999,174.550003,171.960007,172.229996,4000000,1,2018,NFLX,Netflix Inc
2018-01-04T00:00:00.000+0000,172.539993,173.470001,172.080002,173.029999,4000000,1,2018,NFLX,Netflix Inc
2018-01-05T00:00:00.000+0000,173.440002,175.369995,173.050003,175.0,4000000,1,2018,NFLX,Netflix Inc
2018-01-08T00:00:00.000+0000,174.350006,175.610001,173.929993,174.350006,4000000,1,2018,NFLX,Netflix Inc
2018-01-09T00:00:00.000+0000,174.550003,175.059998,173.410004,174.330002,4000000,1,2018,NFLX,Netflix Inc
2018-01-10T00:00:00.000+0000,173.160004,174.300003,173.0,174.289993,4000000,1,2018,NFLX,Netflix Inc
2018-01-11T00:00:00.000+0000,174.589996,175.490005,174.490005,175.279999,4000000,1,2018,NFLX,Netflix Inc
2018-01-12T00:00:00.000+0000,176.179993,177.360001,175.649994,177.089996,4000000,1,2018,NFLX,Netflix Inc


In [54]:
%sql
MERGE INTO stocks_delta
USING corrected_nflx_delta_to_upsert
ON corrected_nflx_delta_to_upsert.Date = stocks_delta.Date
WHEN MATCHED THEN
  UPDATE SET
    stocks_delta.Volume = corrected_nflx_delta_to_upsert.Volume
WHEN NOT MATCHED
  THEN INSERT (Date, Open, High, Low, Close, Volume, Month, Year, sticker, name)
  VALUES (
    corrected_nflx_delta_to_upsert.Date,
    corrected_nflx_delta_to_upsert.Open,
    corrected_nflx_delta_to_upsert.High,
    corrected_nflx_delta_to_upsert.Low,
    corrected_nflx_delta_to_upsert.Close,
    corrected_nflx_delta_to_upsert.Volume,
    corrected_nflx_delta_to_upsert.Month,
    corrected_nflx_delta_to_upsert.Year,
    corrected_nflx_delta_to_upsert.sticker,
    corrected_nflx_delta_to_upsert.name)

In [55]:
%sql
SELECT * FROM stocks_delta
where Date < '2018-01-13T00:00:00.000+0000' and sticker = 'NFLX'

date,open,high,low,close,volume,month,Year,sticker,name
2018-01-08T00:00:00.000+0000,174.350006,175.610001,173.929993,174.350006,4000000,1,2018,NFLX,
2018-01-09T00:00:00.000+0000,174.550003,175.059998,173.410004,174.330002,4000000,1,2018,NFLX,
2018-01-04T00:00:00.000+0000,172.539993,173.470001,172.080002,173.029999,4000000,1,2018,NFLX,
2018-01-05T00:00:00.000+0000,173.440002,175.369995,173.050003,175.0,4000000,1,2018,NFLX,
2018-01-10T00:00:00.000+0000,173.160004,174.300003,173.0,174.289993,4000000,1,2018,NFLX,
2018-01-03T00:00:00.000+0000,172.529999,174.550003,171.960007,172.229996,4000000,1,2018,NFLX,
2018-01-02T00:00:00.000+0000,170.160004,172.300003,169.259995,172.259995,4000000,1,2018,NFLX,
2018-01-11T00:00:00.000+0000,174.589996,175.490005,174.490005,175.279999,4000000,1,2018,NFLX,
2018-01-12T00:00:00.000+0000,176.179993,177.360001,175.649994,177.089996,4000000,1,2018,NFLX,


## Timetravel

In [57]:
%sql 

describe history stocks_delta;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel
5,2019-04-29T02:24:22.000+0000,100959,ganesh.rajagopal@databricks.com,MERGE,Map(predicate -> (ngcdemo.corrected_nflx_delta_to_upsert.`Date` = ngcdemo.stocks_delta.`Date`)),,List(2911691),0426-204644-shear440,4.0,WriteSerializable
4,2019-04-29T01:44:46.000+0000,100959,ganesh.rajagopal@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [""Year""])",,List(2911691),0426-204644-shear440,3.0,WriteSerializable
3,2019-04-29T00:38:54.000+0000,100959,ganesh.rajagopal@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [""Year""])",,List(2911691),0426-204644-shear440,2.0,WriteSerializable
2,2019-04-29T00:38:47.000+0000,100959,ganesh.rajagopal@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [""Year""])",,List(2911691),0426-204644-shear440,1.0,WriteSerializable
1,2019-04-29T00:38:32.000+0000,100959,ganesh.rajagopal@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [""Year""])",,List(2911691),0426-204644-shear440,0.0,WriteSerializable
0,2019-04-29T00:38:17.000+0000,100959,ganesh.rajagopal@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [""Year""])",,List(2911691),0426-204644-shear440,,WriteSerializable


In [58]:
%sql 

SELECT count(*) FROM stocks_delta VERSION AS OF 1 where volume = 4000000;

count(1)
0


In [59]:
%sql 

SELECT count(*) FROM stocks_delta VERSION AS OF 5 where volume = 4000000;

count(1)
36


In [60]:
%sql 

select count(*) from stocks_delta   TIMESTAMP AS OF "2019-04-12 13:53:01"    where volume = 4000000;

In [61]:
%sql 

Optimize stocks_delta  
   zorder by Date

path
""


In [62]:
%sql
SELECT * FROM stocks_delta
where Date < '2018-01-13T00:00:00.000+0000' and sticker = 'NFLX'

date,open,high,low,close,volume,month,Year,sticker,name
2018-01-12T00:00:00.000+0000,176.179993,177.360001,175.649994,177.089996,4000000,1,2018,NFLX,
2018-01-02T00:00:00.000+0000,170.160004,172.300003,169.259995,172.259995,4000000,1,2018,NFLX,
2018-01-11T00:00:00.000+0000,174.589996,175.490005,174.490005,175.279999,4000000,1,2018,NFLX,
2018-01-08T00:00:00.000+0000,174.350006,175.610001,173.929993,174.350006,4000000,1,2018,NFLX,
2018-01-09T00:00:00.000+0000,174.550003,175.059998,173.410004,174.330002,4000000,1,2018,NFLX,
2018-01-04T00:00:00.000+0000,172.539993,173.470001,172.080002,173.029999,4000000,1,2018,NFLX,
2018-01-05T00:00:00.000+0000,173.440002,175.369995,173.050003,175.0,4000000,1,2018,NFLX,
2018-01-03T00:00:00.000+0000,172.529999,174.550003,171.960007,172.229996,4000000,1,2018,NFLX,
2018-01-10T00:00:00.000+0000,173.160004,174.300003,173.0,174.289993,4000000,1,2018,NFLX,


## Part 5: Streaming Data - combine live and historical data (without the complexity of traditional lambda architecture)

In Databricks, incorporating real-time or near-real-time data is just as simple as incorporating static data. Let's look at how we can combine a streaming source, in this case eProcessing Events, with our existing data, using the power of Delta and Databricks Runtime.

<div style="text-align: center; line-height: 0; padding-top: 9px;"><br><img src="https://i.imgur.com/RYOs8lB.png" height="00" width="700"></div>

In [64]:
streamingDataPath = userhome + "/delta/streaming/"
eventStreamPath = userhome + "/delta/eventStream"
dbutils.fs.rm(streamingDataPath, True)

In [65]:
from datetime import datetime
def createEvents( count, date, eventType, open1, high, low, close, month, year, sticker, name ):
  return [(datetime.strptime(date, '%Y-%m-%d'), eventType, open1, high, low, close, month, year, sticker, name , ((i%5)+1) * count ) for i in range(count)]

In [66]:
from pyspark.sql.types import *
schemaFields = [StructField('date', DateType(), True), 
                StructField('eventType', StringType(), True), 
                StructField('open1', DoubleType(), True), 
                StructField('high', DoubleType(), True), 
                StructField('low', DoubleType(), True), 
                StructField('close', DoubleType(), True),
                StructField('month', IntegerType(), True), 
                StructField('year', IntegerType(), True),
                StructField('sticker', StringType(), True), 
                StructField('name', StringType(), True),               
                StructField('volume', IntegerType(), True)]
schema = StructType(schemaFields)

## Simulate a Batch Insert

In [68]:
insertDataDF = spark.createDataFrame(createEvents( 1000, "2017-10-01", "batch-insert", 123.33, 128.33, 121.40, 124.33, 10, 2017, 'NFLX', 'Netflix Inc'), schema)
insertDataDF.write.format("delta").save(streamingDataPath)


In [69]:
display (insertDataDF)

date,eventType,open1,high,low,close,month,year,sticker,name,volume
2017-10-01,batch-insert,123.33,128.33,121.4,124.33,10,2017,NFLX,Netflix Inc,1000
2017-10-01,batch-insert,123.33,128.33,121.4,124.33,10,2017,NFLX,Netflix Inc,2000
2017-10-01,batch-insert,123.33,128.33,121.4,124.33,10,2017,NFLX,Netflix Inc,3000
2017-10-01,batch-insert,123.33,128.33,121.4,124.33,10,2017,NFLX,Netflix Inc,4000
2017-10-01,batch-insert,123.33,128.33,121.4,124.33,10,2017,NFLX,Netflix Inc,5000
2017-10-01,batch-insert,123.33,128.33,121.4,124.33,10,2017,NFLX,Netflix Inc,1000
2017-10-01,batch-insert,123.33,128.33,121.4,124.33,10,2017,NFLX,Netflix Inc,2000
2017-10-01,batch-insert,123.33,128.33,121.4,124.33,10,2017,NFLX,Netflix Inc,3000
2017-10-01,batch-insert,123.33,128.33,121.4,124.33,10,2017,NFLX,Netflix Inc,4000
2017-10-01,batch-insert,123.33,128.33,121.4,124.33,10,2017,NFLX,Netflix Inc,5000


## Simulate a Batch Append for couple of Stocks

In [71]:
(spark.createDataFrame(createEvents(5000, "2017-10-02", "batch-append-1", 123.33, 128.33, 121.40, 124.33, 10, 2017, 'APPL', 'Apple Computers Inc'), schema)
  .write
  .format("delta")
  .mode("append")
  .save(str(streamingDataPath)))

In [72]:
(spark.createDataFrame(createEvents(5000, "2017-10-03", "batch-append-2", 123.33, 128.33, 121.40, 124.33, 10, 2017, 'GOOG', 'Google Inc'), schema)
  .write
  .format("delta")
  .mode("append")
  .save(str(streamingDataPath)))

## Display the results

In [74]:
display(
  (spark
  .readStream
  .format("delta")
  .load(str(streamingDataPath))
  .groupBy("date","eventType", "sticker")
  .count()
  .orderBy("date"))
)

date,eventType,sticker,count
2017-10-01,batch-insert,NFLX,1000
2017-10-02,batch-append-1,APPL,5000
2017-10-03,batch-append-2,GOOG,5000
2017-10-10,stream,NFLX,10000


## Kick the streaming job and watch the data being processed

In [76]:
df = spark.createDataFrame(createEvents(10000, "2017-10-10", "stream", 123.33, 128.33, 121.40, 124.33, 10, 2017, 'NFLX', 'Netflix Inc'), schema)
schemaNew = df.schema
(df.write
  .format("json")
  .mode("overwrite")
  .save(str(eventStreamPath)))

streamDF = spark.readStream.schema(schemaNew).option("maxFilesPerTrigger", 1).json(str(eventStreamPath))

(streamDF
  .writeStream
  .format("delta")
  .option("path", streamingDataPath)
  .option("checkpointLocation", "/tmp/checkpoint/")
  .start())

## Summary:  

Databricks provides a transformative, best-in-class platform that unifies Data Science, Analytics, and Data Engineering to help solve the toughest business problems in the world.

In a few lines of code, we created a cross-silo 360 view of the user that included live streaming data; this data can be directly fed to Data Scientists, Analysts, or other users downstream who can immediately access and further manipulate it.

We just demonstrated that Databricks Delta supports batch reports, exploratory analytics, DW queries (such as aggregations on a  dataset),  and BI/interactive queries on a **single storage/copy of data**.