# Azure Databricks Quickstart for Data Engineers
Welcome to the quickstart lab for data analysts on Azure Databricks! Over the course of this notebook, you will use a real-world dataset and learn how to:
1. Access your enterprise data lake in Azure using Databricks
2. Transform and store your data in a reliable and performance Delta Lake
3. Use Update,Delete,Merge,Schema Evolution and Time Travel Capabilities of Delta Lake

## The Use Case
We will analyze public subscriber data from a popular Korean music streaming service called KKbox stored in Azure Blob Storage. The goal of the notebook is to answer a set of business-related questions about our business, subscribers and usage.

## Accessing Your Enterprise Data Lake
Databricks enables an architecture where your analytics is decoupled from your data storage. This allows organizations to store their data cost effectively in Azure Storage and share their data across best of breed tools in Azure without duplicating it in data silos. 

<img src="https://sguptasa.blob.core.windows.net/random/Delta%20Lakehouse.png" width=800>

In this notebook, we focus exclusively on the **Data Engineers** user. Subsequent quickstart labs will demonstrate data science, SQL Analytics and machine learning on the same data set without duplicating it. 

Run the code below to set up your storage access in Databricks.

## Accessing Azure Storage in Databricks
There are two common ways to access Data Lake stores in Azure Databricks: More information [here](https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/azure-datalake-gen2?toc=https%3A%2F%2Fdocs.microsoft.com%2Fen-us%2Fazure%2Fazure-databricks%2Ftoc.json&bc=https%3A%2F%2Fdocs.microsoft.com%2Fen-us%2Fazure%2Fbread%2Ftoc.json). 
1. Mounting your storage container to the Databricks workspace to be shared by all users and clusters. This can be done via a Service principal or using Access Keys
2. Passing your Azure AD credentials to the storage for fine-grained access security

### Mounting Azure Storage using an Access Key or Service Principal
We will mount an Azure blob storage container to the workspace using a shared Access Key. More instructions can be found [here](https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/azure-storage#--mount-azure-blob-storage-containers-to-dbfs).

In [0]:
dbutils.fs.unmount("/mnt/adbquickstart")

In [0]:
BLOB_CONTAINER = "blobcontainer"
BLOB_ACCOUNT = "blobstor301041"
ACCOUNT_KEY = "K0lJ/F73UDaW6wJqtL7o4ybRsJV9XACWSSzH5bICwz6doxoaCfBlrXqiUUvrexY4tHKFLKB89raf5LQELSyvHA=="
ADLS_CONTAINER = "adlscontainer"
ADLS_ACCOUNT = "adls301041"

In [0]:
DIRECTORY = "/"
MOUNT_PATH = "/mnt/adbquickstart"

dbutils.fs.mount(
  source = f"wasbs://{BLOB_CONTAINER}@{BLOB_ACCOUNT}.blob.core.windows.net/KKBox-Dataset-orig/",
  mount_point = MOUNT_PATH,
  extra_configs = {
    f"fs.azure.account.key.{BLOB_ACCOUNT}.blob.core.windows.net":ACCOUNT_KEY
  }
)

Once mounted, we can view and navigate the contents of our container using Databricks `%fs` file system commands.

In [0]:
%fs ls /mnt/adbquickstart/bronze/

path,name,size
dbfs:/mnt/adbquickstart/bronze/members/,members/,0
dbfs:/mnt/adbquickstart/bronze/transactions/,transactions/,0


In [0]:
%fs head /mnt/adbquickstart/members/members_v3.csv

## Explore Your Data
In 2018, [KKBox](https://www.kkbox.com/) - a popular music streaming service based in Taiwan - released a [dataset](https://www.kaggle.com/c/kkbox-churn-prediction-challenge/data) consisting of a little over two years of (anonymized) customer transaction and activity data with the goal of challenging the Data & AI community to predict which customers would churn in a future period.  

The primary data files are organized in the storage container:

<img src='https://brysmiwasb.blob.core.windows.net/demos/images/kkbox_filedownloads.png' width=150>

Read into dataframes, these files form the following data model:

<img src='https://brysmiwasb.blob.core.windows.net/demos/images/kkbox_schema.png' width=150>

Each subscriber is uniquely identified by a value in the `msno` field of the `members` table. Data in the `transactions` and `user_logs` tables provide a record of subscription management and streaming activities, respectively.

In [0]:
import shutil
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.mllib.stat import Statistics
from pyspark.ml.stat import ChiSquareTest
from pyspark.sql import functions
from pyspark.sql.functions import isnan, when, count, col
import pandas as pd
import numpy as np
import matplotlib.pyplot as mplt
import matplotlib.ticker as mtick

In [0]:
import shutil
from pyspark.sql.types import *
# delete the old database and tables if needed
_ = spark.sql('DROP DATABASE IF EXISTS kkbox CASCADE')

# drop any old delta lake files that might have been created
shutil.rmtree('/dbfs/mnt/adbquickstart/bronze', ignore_errors=True)
shutil.rmtree('/dbfs/mnt/adbquickstart/gold', ignore_errors=True)
shutil.rmtree('/dbfs/mnt/adbquickstart/silver', ignore_errors=True)
shutil.rmtree('/dbfs/mnt/adbquickstart/checkpoint', ignore_errors=True)
# create database to house SQL tables
_ = spark.sql('CREATE DATABASE kkbox')

##In this Demo notebook we will showcase some of the most common scenarios Data Engineers encouter while working on ingesting and processing data
####1. Ingest Data in Batch Process
####2. Ingest Data from a streaming source
####3. Perform operations such as Update, Merge , Delete on data

### SCENARIO 1  : INGEST DATA in BATCH PROCESS (Reading CSV or Parquet File)
 ##### In this scenario we will ingest an inital load of transactional data to Delta format. We will ingest two data sets : (Transaction Dataset : Parquet Format) and (Members data : csv Format) and convert it to Delta(bronze layer)

In [0]:
# transaction dataset schema
transaction_schema = StructType([
  StructField('msno', StringType()),
  StructField('payment_method_id', IntegerType()),
  StructField('payment_plan_days', IntegerType()),
  StructField('plan_list_price', IntegerType()),
  StructField('actual_amount_paid', IntegerType()),
  StructField('is_auto_renew', IntegerType()),
  StructField('transaction_date', DateType()),
  StructField('membership_expire_date', DateType()),
  StructField('is_cancel', IntegerType())  
  ])

# read data from parquet
transactions = (
  spark
    .read
    .parquet(
      '/mnt/adbquickstart/transactions',
      schema=transaction_schema,
      header=True,
      dateFormat='yyyyMMdd'
      )
    )

# persist in delta lake format
( transactions
    .write
    .format('delta')
    .partitionBy('transaction_date')
    .mode('overwrite')
    .save('/mnt/adbquickstart/bronze/transactions')
  )

# create table object to make delta lake queriable
spark.sql('''
  CREATE TABLE kkbox.transactions
  USING DELTA 
  LOCATION '/mnt/adbquickstart/bronze/transactions'
  ''')

In [0]:
display(transactions)

msno,payment_method_id,payment_plan_days,plan_list_price,actual_amount_paid,is_auto_renew,transaction_date,membership_expire_date,is_cancel
++6eU4LsQ3UQ20ILS7d99XK8WbiVgbyYL4FUgzZR134=,32,90,298,298,0,20170131,20170504,0
++lvGPJOinuin/8esghpnqdljm6NXS8m8Zwchc7gOeA=,41,30,149,149,1,20150809,20190412,0
+/GXNtXWQVfKrEDqYAzcSw2xSPYMKWNj22m+5XkVQZc=,36,30,180,180,1,20170303,20170422,0
+/w1UrZwyka4C9oNH3+Q8fUf3fD8R3EwWrx57ODIsqk=,36,30,180,180,1,20170329,20170331,1
+00PGzKTYqtnb65mPKPyeHXcZEwqiEzktpQksaaSC3c=,41,30,99,99,1,20170323,20170423,0
+0KcMm8JNCW08lTp3Lyz5Ger/47u3yj9H2xLf8lyAj8=,41,30,149,149,1,20151112,20180613,0
+0MeUJe1cGb4O97gJoTPUPjiONQzi9BmxDuKRZn+E2o=,41,30,99,99,1,20170313,20170413,0
+0W3Q9FSw8TLgoNke9JFl6dUeoxAOfuHarEexRPcJsA=,41,30,99,99,1,20170318,20170418,0
+0aofSeKQ/F2bhtAGW/zrdrw+lGVV6nmIdt8hJRTR/8=,41,30,99,99,1,20170316,20170416,0
+0c0aay0C6GHocNyfqE+46Ih4dYOy2P5xjc0GjGgBTE=,41,30,149,149,1,20170307,20170407,0


In [0]:
%sql
SELECT * FROM kkbox.transactions

msno,payment_method_id,payment_plan_days,plan_list_price,actual_amount_paid,is_auto_renew,transaction_date,membership_expire_date,is_cancel
m+pq8Tb2lxn1TB/12q2vD39g7ZuVwRPfBozroqf1YJY=,41,30,149,149,1,20150125,20180203,0
WOI+isWP6hjUJ34Wm7Fo63ajkmK6SMIEqoIjBgts1xg=,41,30,149,149,1,20150125,20180327,0
TC7Gm7m/z1KZaKV47Ds9ilhoGYguUAUmAIFmu/JPhvE=,41,30,149,149,1,20150125,20170607,0
A1WxXNkMY1sCQHRCmnWXzZHgexWVzX5C2GzdK7r91bU=,41,30,149,149,1,20150125,20170403,0
hqWmt5OeZ3ab3MZdXmYlEdMtmsBkKH9/wuqzFYh26W8=,41,30,149,149,1,20150125,20180207,0
yb3ewDKL4vIjn/5iW3KCcT9u0BHQJijt5wJy8npWCFQ=,41,30,149,149,1,20150125,20171024,0
Du6MaYC9gJo8KEu2QVKWpzl448tbmM2iTImTAHAg8EY=,41,30,149,149,1,20150125,20170918,0
MCOWVhzR0JDf1+PSmbynbSVfJTfaMu4pvK9w9XKEyaU=,41,30,149,149,1,20150125,20180413,0
gKwtlqsPFV9kYu6I7ejrAWiIwWnLWFnvcV0+iR2u+3Y=,41,30,149,149,1,20150125,20171224,0
o5aFK3grQrjlHACDVvS0cPa1KvLq9x9KeTM/L2SgLz0=,41,30,149,149,1,20150125,20180522,0


In [0]:
# members dataset schema
member_schema = StructType([
  StructField('msno', StringType()),
  StructField('city', IntegerType()),
  StructField('bd', IntegerType()),
  StructField('gender', StringType()),
  StructField('registered_via', IntegerType()),
  StructField('registration_init_time', DateType())
  ])

# read data from csv
members = (
  spark
    .read
    .csv(
      'dbfs:/mnt/adbquickstart/members/members_v3.csv',
      schema=member_schema,
      header=True,
      dateFormat='yyyyMMdd'
      )
    )

# persist in delta lake format
(
  members
    .write
    .format('delta')
    .mode('overwrite')
    .save('/mnt/adbquickstart/bronze/members')
  )

# create table object to make delta lake queriable
spark.sql('''
  CREATE TABLE kkbox.members 
  USING DELTA 
  LOCATION '/mnt/adbquickstart/bronze/members'
  ''')

In [0]:
%sql
select * from kkbox.members

msno,city,bd,gender,registered_via,registration_init_time
Rb9UwLQTrxzBVwCB6+bCcSQWZ9JiNLC9dXtM1oEsZA8=,1,0,,11,2011-09-11
+tJonkh+O1CA796Fm5X60UMOtB6POHAwPjbTRVl/EuU=,1,0,,7,2011-09-14
cV358ssn7a0f7jZOwGNWS07wCKVqxyiImJUX6xcIwKw=,1,0,,11,2011-09-15
9bzDeJP6sQodK73K5CBlJ6fgIQzPeLnRl0p5B77XP+g=,1,0,,11,2011-09-15
WFLY3s7z4EZsieHCt63XrsdtfTEmJ+2PnnKLH5GY4Tk=,6,32,female,9,2011-09-15
yLkV2gbZ4GLFwqTOXLVHz0VGrMYcgBGgKZ3kj9RiYu8=,4,30,male,9,2011-09-16
jNCGK78YkTyId3H3wFavcBLDmz7pfqlvCfUKf4G1Lw4=,1,0,,7,2011-09-16
WH5Jq4mgtfUFXh2yz+HrcTXKS4Oess4k4W3qKolAeb0=,5,34,male,9,2011-09-16
tKmbR4X5VXjHmxERrckawEMZ4znVy1lAQIR1vV5rdNk=,5,19,male,9,2011-09-17
I0yFvqMoNkM8ZNHb617e1RBzIS/YRKemHO7Wj13EtA0=,13,63,male,9,2011-09-18


### SCENARIO 2  : INGEST DATA from a streaming source
 ##### For demo purpose we create a stream from the user_log file which is csv format and then convert to Delta which acts as a sink. In real time we would be using Eventhub or Kafka

In [0]:
%scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val log_schema = StructType(Seq(
  StructField("msno", StringType, true), 
  StructField("date", StringType, true),
  StructField("num_25", IntegerType , true),  
  StructField("num_50", IntegerType, true),
  StructField("num_75", IntegerType, true),
  StructField("num_985", IntegerType, true),
  StructField("num_100", IntegerType, true),
  StructField("num_unq", IntegerType, true),
  StructField("total_secs", StringType, true),
))

val streamingDF = spark.readStream.format("com.databricks.spark.csv").schema(log_schema).option("mode","APPEND").load("dbfs:/mnt/adbquickstart/user_logs/")

In [0]:
%scala
import org.apache.spark.sql.streaming.Trigger

streamingDF 
    .repartition(1) 
    .writeStream 
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/adbquickstart/checkpoint") 
    .start("dbfs:/mnt/adbquickstart/bronze/user_logs/")

In [0]:
%fs
ls /mnt/adbquickstart/bronze

path,name,size
dbfs:/mnt/adbquickstart/bronze/members/,members/,0
dbfs:/mnt/adbquickstart/bronze/transactions/,transactions/,0
dbfs:/mnt/adbquickstart/bronze/user_logs/,user_logs/,0


In [0]:
%sql
  CREATE TABLE kkbox.user_logs 
  USING DELTA 
  LOCATION 'dbfs:/mnt/adbquickstart/bronze/user_logs/'


###Now we have ingested the raw data (both Batch and streaming) and landed it in Delta format. The Next Step is to do some ETL and move it to Silver layer
<img src="https://kpistoropen.blob.core.windows.net/collateral/quickstart/etl.png" width=1500>

#####For this Demo we are moving the just data as is from Bronze to Silver landing Zone

In [0]:
## Read the Bronze Data
transactions_bronze = spark.read.format("delta").load('/mnt/adbquickstart/bronze/transactions/')
members_bronze = spark.read.format("delta").load('/mnt/adbquickstart/bronze/members/')
user_logs_bronze = spark.read.format("delta").load('/mnt/adbquickstart/bronze/user_logs/')

##Write the Bronze to Silver Location
transactions_bronze.write.format('delta').mode('overwrite').save('/mnt/adbquickstart/silver/transactions/')
members_bronze.write.format('delta').mode('overwrite').save('/mnt/adbquickstart/silver/members/')
user_logs_bronze.write.format('delta').mode('overwrite').save('/mnt/adbquickstart/silver/user_logs/')

##Read Silver Data
transactions_silver = spark.read.format("delta").load('/mnt/adbquickstart/silver/transactions/')
members_silver = spark.read.format("delta").load('/mnt/adbquickstart/silver/members/')
user_logs_silver = spark.read.format("delta").load('/mnt/adbquickstart/silver/user_logs/')

### Create Gold table
#### Further we are concentrating on Members dataset. We will create a Gold table (Aggregated table)

In [0]:
import pyspark.sql.functions as f
members_silver = members_silver.withColumn('years',f.year(f.to_timestamp('registration_init_time', 'yyyy-MM-dd')))

members_gold = members_silver.groupBy('years').count()

members_gold.createOrReplaceTempView("member_gold")

display(members_gold)

years,count
2007,89830
2015,1620525
2006,53953
2013,524722
2014,975776
2004,26234
2012,283190
2009,63633
2016,2246761
2005,41349


In [0]:
%python
# Save our Gold table in Delta format
members_gold.write.format('delta').mode('overwrite').save('/mnt/adbquickstart/gold/members/')

# Create SQL table
spark.sql(f"CREATE TABLE kkbox.members_gold USING delta LOCATION '/mnt/adbquickstart/gold/members/'") 

In [0]:
%sql
select * from kkbox.members_gold

years,count
2007,89830
2015,1620525
2006,53953
2013,524722
2014,975776
2004,26234
2012,283190
2009,63633
2016,2246761
2005,41349


## Scenario 3. Delta as Unified Batch and Streaming Source and Sink

These cells showcase streaming and batch concurrent queries (inserts and reads)
* This notebook will run an `INSERT` every 10s against our `members_gold` table
* We will run two streaming queries concurrently against this data and update the table

In [0]:
dbutils.notebook.exit("stop") 

stop

In [0]:
# Read the insertion of data
members_gold_readStream = spark.readStream.format("delta").load('/mnt/adbquickstart/gold/members/')
members_gold_readStream.createOrReplaceTempView("members_gold_readStream")

In [0]:
%sql
SELECT years, sum(`count`) AS members
FROM members_gold_readStream
GROUP BY years
ORDER BY years

years,members
2004,2726234
2005,41349
2006,53953
2007,89830
2008,67690
2009,63633
2010,115075
2011,179051
2012,283190
2013,524722


In [0]:
import time
i = 1
while i <= 6:
  # Execute Insert statement
  insert_sql = "INSERT INTO kkbox.members_gold VALUES (2004, 450000)"
  spark.sql(insert_sql)
  print('members_gold_delta: inserted new row of data, loop: [%s]' % i)
    
  # Loop through
  i = i + 1
  time.sleep(10)

###Secnario 4 : Perform DML operations , Schema Evolution and Time Travel
#####Delta Lake supports standard DML including UPDATE, DELETE and MERGE INTO providing data engineers more controls to manage their big datasets.

#####Let's pick the member's gold data

### A. DELETE Support

In [0]:
%sql
-- Running `DELETE` on the Delta Lake table to remove records from year 2009
DELETE FROM kkbox.members_gold WHERE years = 2009

In [0]:
%sql
SELECT * FROM kkbox.members_gold
ORDER BY years

years,count
2004,450000
2004,26234
2004,450000
2004,450000
2004,450000
2004,450000
2004,450000
2005,41349
2006,53953
2007,89830


### B. UPDATE Support

In [0]:
%sql
UPDATE kkbox.members_gold SET `count` = 50000 WHERE years = 2010

In [0]:
%sql
SELECT * FROM kkbox.members_gold
ORDER BY years

years,count
2004,26234
2004,450000
2004,450000
2004,450000
2004,450000
2004,450000
2004,450000
2005,41349
2006,53953
2007,89830


### C. MERGE INTO Support

#### INSERT or UPDATE with Delta Lake: 2-step process

With Delta Lake, inserting or updating a table is a simple 2-step process: 
1. Identify rows to insert or update
2. Use the `MERGE` command

In [0]:
items = [(2009, 50000), (2021, 250000), (2012, 35000)]
cols = ['years', 'count']
merge_table = spark.createDataFrame(items, cols)
merge_table.createOrReplaceTempView("merge_table")
display(merge_table)

years,count
2009,50000
2021,250000
2012,35000


Instead of writing separate `INSERT` and `UPDATE` statements, we can use a `MERGE` statement.

In [0]:
%sql
MERGE INTO kkbox.members_gold as d
USING merge_table as m
on d.years = m.years
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED 
  THEN INSERT *

In [0]:
%sql
SELECT * FROM kkbox.members_gold
ORDER BY years

years,count
2004,450000
2004,26234
2004,450000
2004,450000
2004,450000
2004,450000
2004,450000
2005,41349
2006,53953
2007,89830


## D. Schema Evolution
With the `mergeSchema` option, you can evolve your Delta Lake table schema

In [0]:
member_dummy = sql("SELECT years, count, CAST(rand(10) * 10 * count AS double) AS usage FROM kkbox.members_gold")
display(member_dummy)

years,count,usage
2004,26234,44846.94791712637
2005,41349,332906.75151956774
2006,53953,311628.5126337073
2007,89830,851233.3801514034
2008,67690,141722.88993218713
2009,50000,183321.11308973908
2010,50000,403934.4089185941
2011,179051,1277554.5669030966
2012,35000,251836.39482071187
2013,524722,1644231.7252104608


In [0]:
%python
# Add the mergeSchema option
member_dummy.write.option("mergeSchema","true").format("delta").mode("append").save('/mnt/adbquickstart/gold/members/')

In [0]:
%sql
select * from kkbox.members_gold

years,count,usage
2004,26234,44846.94791712637
2005,41349,332906.75151956774
2006,53953,311628.5126337073
2007,89830,851233.3801514034
2008,67690,141722.88993218713
2009,50000,183321.11308973908
2010,50000,403934.4089185941
2011,179051,1277554.5669030966
2012,35000,251836.39482071187
2013,524722,1644231.7252104608


## E. Let's Travel back in Time!
Databricks Delta’s time travel capabilities simplify building data pipelines for the following use cases. 

* Audit Data Changes
* Reproduce experiments & reports
* Rollbacks

As you write into a Delta table or directory, every operation is automatically versioned.

You can query by:
1. Using a timestamp
1. Using a version number

using Python, Scala, and/or Scala syntax; for these examples we will use the SQL syntax.  

For more information, refer to [Introducing Delta Time Travel for Large Scale Data Lakes](https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html)

### Review Delta Lake Table History
All the transactions for this table are stored within this table including the initial set of insertions, update, delete, merge, and inserts with schema modification

In [0]:
%sql
DESCRIBE HISTORY kkbox.members_gold

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
10,2021-02-10T02:41:21.000+0000,1573660417364949,odl_user_301041@databrickslabs.onmicrosoft.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3992027805255818),0208-135108-bowel826,9.0,WriteSerializable,False,"Map(numFiles -> 7, numOutputBytes -> 6907, numOutputRows -> 21)",
9,2021-02-10T02:41:18.000+0000,1573660417364949,odl_user_301041@databrickslabs.onmicrosoft.com,MERGE,"Map(predicate -> (CAST(d.`years` AS BIGINT) = m.`years`), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(3992027805255818),0208-135108-bowel826,8.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 12, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetRowsInserted -> 2, numTargetRowsUpdated -> 1, numOutputRows -> 15, numSourceRows -> 3, numTargetFilesRemoved -> 1)",
8,2021-02-10T02:40:53.000+0000,1573660417364949,odl_user_301041@databrickslabs.onmicrosoft.com,UPDATE,Map(predicate -> (years#13142 = 2010)),,List(3992027805255818),0208-135108-bowel826,7.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 12)",
7,2021-02-10T02:40:43.000+0000,1573660417364949,odl_user_301041@databrickslabs.onmicrosoft.com,DELETE,"Map(predicate -> [""(`years` = 2009)""])",,List(3992027805255818),0208-135108-bowel826,6.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numDeletedRows -> 1, numAddedFiles -> 1, numCopiedRows -> 13)",
6,2021-02-10T02:39:57.000+0000,1573660417364949,odl_user_301041@databrickslabs.onmicrosoft.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3992027805255818),0208-135108-bowel826,5.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputBytes -> 702, numOutputRows -> 1)",
5,2021-02-10T02:39:45.000+0000,1573660417364949,odl_user_301041@databrickslabs.onmicrosoft.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3992027805255818),0208-135108-bowel826,4.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputBytes -> 702, numOutputRows -> 1)",
4,2021-02-10T02:39:31.000+0000,1573660417364949,odl_user_301041@databrickslabs.onmicrosoft.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3992027805255818),0208-135108-bowel826,3.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputBytes -> 702, numOutputRows -> 1)",
3,2021-02-10T02:39:13.000+0000,1573660417364949,odl_user_301041@databrickslabs.onmicrosoft.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3992027805255818),0208-135108-bowel826,2.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputBytes -> 702, numOutputRows -> 1)",
2,2021-02-10T02:39:01.000+0000,1573660417364949,odl_user_301041@databrickslabs.onmicrosoft.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3992027805255818),0208-135108-bowel826,1.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputBytes -> 702, numOutputRows -> 1)",
1,2021-02-10T02:38:50.000+0000,1573660417364949,odl_user_301041@databrickslabs.onmicrosoft.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3992027805255818),0208-135108-bowel826,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputBytes -> 702, numOutputRows -> 1)",


###  Time Travel via Version Number
Below are SQL syntax examples of Delta Time Travel by using a Version Number

In [0]:
%sql
SELECT * FROM kkbox.members_gold VERSION AS OF 0
order by years

years,count
2004,26234
2005,41349
2006,53953
2007,89830
2008,67690
2009,63633
2010,115075
2011,179051
2012,283190
2013,524722


###  OPTIMIZE (Delta Lake on Databricks)
Optimizes the layout of Delta Lake data. Optionally optimize a subset of data or colocate data by column. If you do not specify colocation, bin-packing optimization is performed.

In [0]:
%sql OPTIMIZE kkbox.user_logs ZORDER BY (date)

path,metrics
,"List(2, 1, List(501389559, 597230681, 5.4931012E8, 2, 1098620240), List(1098831361, 1098831361, 1.098831361E9, 1, 1098831361), 0, List(minCubeSize(107374182400), List(0, 0), List(1, 1098831361), 0, List(1, 1098831361), 1, null), 1)"


In [0]:
dbutils.fs.unmount('/mnt/adbquickstart')