In [0]:
%fs ls

path,name,size
dbfs:/FileStore/,FileStore/,0
dbfs:/databricks-datasets/,databricks-datasets/,0
dbfs:/databricks-results/,databricks-results/,0
dbfs:/user/,user/,0


In [0]:
%fs ls dbfs:/databricks-datasets/

path,name,size
dbfs:/databricks-datasets/COVID/,COVID/,0
dbfs:/databricks-datasets/README.md,README.md,976
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359
dbfs:/databricks-datasets/adult/,adult/,0
dbfs:/databricks-datasets/airlines/,airlines/,0
dbfs:/databricks-datasets/amazon/,amazon/,0
dbfs:/databricks-datasets/asa/,asa/,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0
dbfs:/databricks-datasets/bikeSharing/,bikeSharing/,0


In [0]:
%fs ls dbfs:/FileStore/tables/

path,name,size
dbfs:/FileStore/tables/UsedCars.csv,UsedCars.csv,69603
dbfs:/FileStore/tables/hotel_bookings.xls,hotel_bookings.xls,16855599


# Getting started with <img src="https://docs.delta.io/latest/_static/delta-lake-logo.png" width=300/>

An open-source storage layer for data lakes that brings ACID transactions to Apache Spark™ and big data workloads.

* **ACID Transactions**: Ensures data integrity and read consistency with complex, concurrent data pipelines.
* **Unified Batch and Streaming Source and Sink**: A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box. 
* **Schema Enforcement and Evolution**: Ensures data cleanliness by blocking writes with unexpected.
* **Time Travel**: Query previous versions of the table by time or version number.
* **Deletes and upserts**: Supports deleting and upserting into tables with programmatic APIs.
* **Open Format**: Stored as Parquet format in blob storage.
* **Audit History**: History of all the operations that happened in the table.
* **Scalable Metadata management**: Able to handle millions of files are scaling the metadata operations with Spark.

<img src="https://databricks.com/wp-content/uploads/2020/09/delta-lake-medallion-model-scaled.jpg" width=1012/>

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Delta Lake Time Travel

Delta Lake’s time travel capabilities simplify building data pipelines for use cases including:

* Auditing Data Changes
* Reproducing experiments & reports
* Rollbacks

As you write into a Delta table or directory, every operation is automatically versioned.

<img src="https://github.com/risan4841/img/blob/master/transactionallogs.png?raw=true" width=250/>

You can query snapshots of your tables by:
1. **Version number**, or
2. **Timestamp.**

using Python, Scala, and/or SQL syntax; for these examples we will use the SQL syntax.  

For more information, refer to the [docs](https://docs.delta.io/latest/delta-utility.html#history), or [Introducing Delta Time Travel for Large Scale Data Lakes](https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html)

In [0]:
spark.sql("SET spark.databricks.delta.formatCheck.enabled = false")
spark.sql("SET spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true")

import random
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# File location and type
file_location = "dbfs:/FileStore/tables/UsedCars.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
new_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(new_df)

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Convert to Delta Lake format

Delta Lake is 100% compatible with Apache Spark&trade;, which makes it easy to get started with if you already use Spark for your big data workflows.
Delta Lake features APIs for **SQL**, **Python**, and **Scala**, so that you can use it in whatever language you feel most comfortable in.

<img src="https://databricks.com/wp-content/uploads/2020/12/simplysaydelta.png" width=600/>

**Pythonic way**: Read the data into a Spark DataFrame, then write it out in Delta Lake format directly, with no upfront schema definition needed.

In [0]:
new_df.write.format("delta").mode("overwrite").saveAsTable("Used_Car")

In [0]:
%sql
Describe History used_car

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
0,2021-07-02T18:22:17.000+0000,3894196996570842,yoga@datadna.in,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {""delta.autoOptimize.optimizeWrite"":""true""})",,List(2414558159189506),0701-052402-cants185,,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 17969, numOutputRows -> 1446)",


In [0]:
%sql
select count(*) from used_car

count(1)
1446


In [0]:
%sql
Select distinct fueltype from used_car

fueltype
Diesel
diesel
petrol
CNG
methane
Petrol
CompressedNaturalGas


In [0]:
%sql
Select distinct doors from used_car

doors
3
5
4
2


In [0]:
%sql
select 
  count(*) from used_car
Where
  fueltype = 'Diesel' And Doors = 5

count(1)
73


In [0]:
%sql
Delete From
  used_car
Where
  fueltype = 'Diesel' And Doors = 5

num_affected_rows
73


In [0]:
%sql
Describe History used_car

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
1,2021-07-02T18:24:43.000+0000,3894196996570842,yoga@datadna.in,DELETE,"Map(predicate -> [""((spark_catalog.default.used_car.`fueltype` = 'Diesel') AND (spark_catalog.default.used_car.`Doors` = 5))""])",,List(2414558159189506),0701-052402-cants185,0.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 1373, numAddedChangeFiles -> 0, executionTimeMs -> 1152, numDeletedRows -> 73, scanTimeMs -> 654, numAddedFiles -> 1, rewriteTimeMs -> 496)",
0,2021-07-02T18:22:17.000+0000,3894196996570842,yoga@datadna.in,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {""delta.autoOptimize.optimizeWrite"":""true""})",,List(2414558159189506),0701-052402-cants185,,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 17969, numOutputRows -> 1446)",


In [0]:
%sql
Select count(*) 
From
  used_car

count(1)
1373


In [0]:
%sql
Insert into 
  used_car
Select * 
From 
  used_car VERSION AS OF 0
Where 
  fueltype = 'Diesel' And Doors = 5

num_affected_rows,num_inserted_rows
73,73


In [0]:
%sql
Describe History Used_Car

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
2,2021-07-02T18:25:56.000+0000,3894196996570842,yoga@datadna.in,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(2414558159189506),0701-052402-cants185,1.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputBytes -> 3959, numOutputRows -> 73)",
1,2021-07-02T18:24:43.000+0000,3894196996570842,yoga@datadna.in,DELETE,"Map(predicate -> [""((spark_catalog.default.used_car.`fueltype` = 'Diesel') AND (spark_catalog.default.used_car.`Doors` = 5))""])",,List(2414558159189506),0701-052402-cants185,0.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 1373, numAddedChangeFiles -> 0, executionTimeMs -> 1152, numDeletedRows -> 73, scanTimeMs -> 654, numAddedFiles -> 1, rewriteTimeMs -> 496)",
0,2021-07-02T18:22:17.000+0000,3894196996570842,yoga@datadna.in,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {""delta.autoOptimize.optimizeWrite"":""true""})",,List(2414558159189506),0701-052402-cants185,,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 17969, numOutputRows -> 1446)",


#### Rollback a table to a specific version using `RESTORE`

In [0]:
%sql 
RESTORE Used_Car  VERSION AS OF 0

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
17969,1,2,1,21115,17969


In [0]:
%sql
Select 
  count(*) 
From used_car

count(1)
1446


In [0]:
spark.sql("""select count(*) from used_car""").show()