#Databricks Delta table demo for following:

         1. Create delta table from a csv using tempview
         2. Data exploration to find a good column for partitioning
         3. Partition the table 
         4. Time Travel
         5. Optimize and Analyze
         6. Databricks delta table Vacuum for data retention specially for timetravel to older versions

###Note : We are going to use Bike Sharing dataset provided by Databricks Dataset

####Step 1. Creating delta table from csv using tempview

In [0]:
df = spark.read.format('csv').load(
  '/databricks-datasets/bikeSharing/data-001/hour.csv',
  header=True,
  inferSchema=True
)

Creating temp view to work on SQL based commands

In [0]:
%fs ls "/"

In [0]:
df.createOrReplaceTempView("temp")

In [0]:
%sh
rm -rf /dbfs/tmp/bikeSharing
rm -rf /dbfs/tmp/bikeSharingPartition
rm -rf /dbfs/tmp/bikeSharingDay
rm -rf /dbfs/tmp/bikeSharingDayOptPar
rm -rf /dbfs/tmp/bikeSharingDayOptDel

In [0]:
%sql
DROP TABLE IF EXISTS bikeSharing;
CREATE TABLE bikeSharing 
USING DELTA
LOCATION '/tmp/bikeSharing/'
AS SELECT * from  temp

Let's take a look at the data in the table

In [0]:
%sql
SELECT * FROM bikeSharing LIMIT 10

####STEP 2 : Data exploration to find the right partition column
Let's findout a column based on which we can do partitioining, the goal is to find out column based on which the data would be evenly distributed. We have months, year and season. Let's look at the data distribution according to those

In [0]:
%sql
SELECT dteday,count(1) FROM bikeSharing GROUP BY dteday order by 2

In [0]:
%sql
SELECT mnth,count(1) FROM bikeSharing GROUP BY mnth order by 2

####STEP 3 : Partition the table
Looks like month would be a good candidate as it won't grow exponentially as dteday, it will be fixed at 12 per year and the data is evenly distributed

In [0]:
%sql
DROP TABLE IF EXISTS bikeSharingPartition;
CREATE TABLE bikeSharingPartition
USING DELTA
PARTITIONED BY (mnth)
LOCATION '/tmp/bikeSharingPartition/'
AS SELECT * FROM  bikeSharing

In [0]:
%sql
select * from bikeSharingPartition

Let's compare partitioned table vs the original non partitioned one, you can see the number of folder/blobs being created for paritioned table

In [0]:
%fs ls "/tmp/bikeSharing/"

In [0]:
%fs ls "/tmp/bikeSharingPartition/"

####STEP : 4 Time travel in delta table
#####Lets use the bike sharing dataset day.csv

In [0]:
daydf = spark.read.format('csv').load(
  '/databricks-datasets/bikeSharing/data-001/day.csv',
  header=True,
  inferSchema=True
);
daydf.createOrReplaceTempView("temp");

In [0]:
%sql
DROP TABLE IF EXISTS bikeSharingDay;
CREATE TABLE bikeSharingDay 
USING DELTA
LOCATION '/tmp/bikeSharingDay/'
AS SELECT * from  temp

In [0]:
%sql
select count(1) from bikeSharingDay;
DELETE FROM bikeSharingDay where mnth=12;
select count(1) from bikeSharingDay;

In [0]:
%sql
DESCRIBE HISTORY bikeSharingDay

In [0]:
%sql
SELECT count(1) from bikeSharingDay VERSION AS OF 1

In [0]:
%sql
SELECT count(1) from bikeSharingDay VERSION AS OF 0

####STEP : 5 Optimize and Analyze
#####Lets use the bike sharing dataset day.csv

In [0]:
daydf = spark.read.format('csv').load(
  '/databricks-datasets/bikeSharing/data-001/day.csv',
  header=True,
  inferSchema=True
);
daydf.createOrReplaceTempView("tempopt");

#####Lets create a parquet table and get runtime for a query

In [0]:
%sql
DROP TABLE IF EXISTS bikeSharingDayOptPar;
CREATE TABLE bikeSharingDayOptPar 
USING PARQUET
LOCATION '/tmp/bikeSharingDayOptPar/'
AS SELECT * from  tempopt

In [0]:
%sql
SELECT weekday,mnth, count(*) FROM bikeSharingDayOptPar group by weekday,mnth order by weekday,mnth

#####Lets create a delta table and get runtime for the aforementioned query with and without optimize, Observe the runtime

In [0]:
%sql
DROP TABLE IF EXISTS bikeSharingDayOptDel;
CREATE TABLE bikeSharingDayOptDel
USING DELTA
LOCATION '/tmp/bikeSharingDayOptDel/'
AS SELECT * from  tempopt

In [0]:
%sql
SELECT weekday,mnth, count(*) FROM bikeSharingDayOptDel group by weekday,mnth order by weekday,mnth

In [0]:
%sql
OPTIMIZE bikeSharingDayOptDel

In [0]:
%sql
SELECT weekday,mnth, count(*) FROM bikeSharingDayOptDel group by weekday,mnth order by weekday,mnth

#####We can compute statistics which will be helpful for optimizer to get a better plan based on data distirbution, skewness and histogram

In [0]:
%sql
ANALYZE TABLE bikeSharingDayOptDel COMPUTE STATISTICS FOR ALL COLUMNS

####STEP : 7 Vacuum for Databricks delta table

######Lets check the number of parquet files for our bikeSharingDayOptDel, it should be only one parquet file

In [0]:
%fs ls "/tmp/bikeSharingDayOptDel"

######Let's update the cnt column to 0 to ensure we get a new parquet file

In [0]:
%sql
UPDATE bikeSharingDayOptDel SET cnt=0

In [0]:
%fs ls "/tmp/bikeSharingDayOptDel"

Vacuum by default have a retention period of 168 hours/7 days, but if you want this check not to be enabled then we can set a config

In [0]:
%sql
VACUUM bikeSharingDayOptDel RETAIN 0 HOURS

In [0]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled",False)

In [0]:
%sql
VACUUM bikeSharingDayOptDel RETAIN 0 HOURS

In [0]:
%fs ls "/tmp/bikeSharingDayOptDel"