In [0]:
#Set the values to add Data Lake Gen2 Access Key to Spark config

spark.conf.set(
    "fs.azure.account.key.***DataLakeName***.dfs.core.windows.net",
  
    "***Access Key***")

In [0]:
# Define input & output paths

greenTaxisInputPath = "abfss://***ContainerName***@***DataLakeName***.dfs.core.windows.net/Raw/"

greenTaxisOutputPath = "abfss://***ContainerName***@***DataLakeName***.dfs.core.windows.net/Output/OReilly/"

In [0]:
# Create schema for Green Taxi Data

from pyspark.sql.functions import *
from pyspark.sql.types import *
  
greenTaxiSchema = (
            StructType()               
               .add("VendorId", "integer")
               .add("lpep_pickup_datetime", "timestamp")
               .add("lpep_dropoff_datetime", "timestamp")
               .add("store_and_fwd_flag", "string")
               .add("RatecodeID", "integer")
               .add("PULocationID", "integer")
               .add("DOLocationID", "integer")
  
              .add("passenger_count", "integer")
              .add("trip_distance", "double")
              .add("fare_amount", "double")
              .add("extra", "double")
              .add("mta_tax", "double")
              .add("tip_amount", "double")
  
              .add("tolls_amount", "double")
              .add("ehail_fee", "double")
              .add("improvement_surcharge", "double")
              .add("total_amount", "double")
              .add("payment_type", "integer")
              .add("trip_type", "integer")
         )

In [0]:
# Read csv file
greenTaxiDF = (
                  spark
                    .read                     
                    .option("header", "true")
                    .schema(greenTaxiSchema)
                    .csv(greenTaxisInputPath + "GreenTaxis_201911.csv")
              )

display(greenTaxiDF)

### Save DataFrame in Parquet and Delta formats

In [0]:
# Write in parquet format
(
    greenTaxiDF
        .write
        .mode("overwrite")        
  
        .partitionBy("VendorId")
  
        .format("parquet")
  
        .save(greenTaxisOutputPath + "GreenTaxis.parquet")
)

In [0]:
# Write in delta format
(
    greenTaxiDF
        .write
        .mode("overwrite")        
  
        .partitionBy("VendorId")
  
        .format("delta")
  
        .save(greenTaxisOutputPath + "GreenTaxis.delta")
)

#####Check output folder to see differences between parquet & delta outputs
  - Do you see _delta_log folder in delta directory?

### Create Parquet Table

In [0]:
%sql
-- Drop tables if they exist
DROP TABLE IF EXISTS TaxiDB.GreenTaxisParquet;
DROP TABLE IF EXISTS TaxiDB.GreenTaxis;

In [0]:
%sql

DROP DATABASE IF EXISTS TaxiDB;

CREATE DATABASE TaxiDB;

In [0]:
filepath = greenTaxisOutputPath + "GreenTaxis.parquet"

spark.sql(f"""

    CREATE TABLE TaxiDB.GreenTaxisParquet
      USING PARQUET
      OPTIONS (path = '{filepath}')
      
""")

In [0]:
filepath = greenTaxisOutputPath + "GreenTaxis.delta"

spark.sql(f"""

    CREATE TABLE TaxiDB.GreenTaxis
      USING DELTA
      OPTIONS (path = '{filepath}')
      
""")

#####From left pane, navigate to Data tab and verify the tables

In [0]:
%sql
SELECT COUNT(*) FROM TaxiDB.GreenTaxis

### Check Audit History of Delta Table

In [0]:
%sql

DESCRIBE HISTORY TaxiDB.GreenTaxis

### Overwrite Parquet folder

In [0]:
# Overwrite data in parquet format
(
    greenTaxiDF
        .write
        .mode("overwrite")        
  
        .partitionBy("VendorId")
  
        .format("parquet")
  
        .save(greenTaxisOutputPath + "GreenTaxis.parquet")
)

### Overwrite Delta folder

In [0]:
# Overwrite data in delta format
(
    greenTaxiDF
        .write
        .mode("overwrite")        
  
        .partitionBy("VendorId")
  
        .format("delta")
  
        .save(greenTaxisOutputPath + "GreenTaxis.delta")
)

In [0]:
%sql
SELECT COUNT(*) FROM TaxiDB.GreenTaxis

In [0]:
%sql

DESCRIBE HISTORY TaxiDB.GreenTaxis

#####Notice number of records have not changed (they are overwritten), but log maintains the overwrite operation

### Schema Enforcement

In [0]:
%sql

-- Schema enforcement
INSERT INTO TaxiDB.GreenTaxis
(VendorId, lpep_pickup_datetime, lpep_dropoff_datetime, store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count, trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee, improvement_surcharge, total_amount, payment_type, trip_type)

-- VendorId should be integer, adding it as string
VALUES ('JUNK', '2019-12-01T00:00:00.000Z', '2019-12-01T00:15:34.000Z', 'N', 1, 145, 148, 1, 2.9, 100.0, 15.3, 13.0, 0.5, 0.5, 1.0, 0.0, 140.0, 1, 1)

#####Previous command will fail since it does not match the schema

### Insert Data to Delta Table: Insert Command

In [0]:
%sql

INSERT INTO TaxiDB.GreenTaxis
(VendorId, lpep_pickup_datetime, lpep_dropoff_datetime, store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count, trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee, improvement_surcharge, total_amount, payment_type, trip_type)

VALUES (4, '2019-12-01T00:00:00.000Z', '2019-12-01T00:15:34.000Z', 'N', 1, 145, 148, 1, 2.9, 100.0, 15.3, 13.0, 0.5, 0.5, 1.0, 0.0, 140.0, 1, 1)

In [0]:
%sql

SELECT *
FROM TaxiDB.GreenTaxis
WHERE VendorId = 4

In [0]:
%sql

DESCRIBE HISTORY TaxiDB.GreenTaxis

### Insert Data to Delta Table: Append DataFrame

In [0]:
# Extract new records from Data Lake
# Read csv file to append - this file only has one record for VendorId 3

greenTaxiAppendDF = (
                        spark
                          .read                     
                          .option("header", "true")
                          .schema(greenTaxiSchema)
                          .csv(greenTaxisInputPath + "GreenTaxis_201911_append.csv")
                    )

display(greenTaxiAppendDF)

In [0]:
%sql

SELECT COUNT(*) FROM TaxiDB.GreenTaxis

In [0]:
# Append to Delta table
(
    greenTaxiAppendDF
        .write
        .mode("append")        
  
        .partitionBy("VendorId")
  
        .format("delta")
  
        .save(greenTaxisOutputPath + "GreenTaxis.delta")
)

In [0]:
%sql

SELECT COUNT(*) FROM TaxiDB.GreenTaxis

In [0]:
%sql

DESCRIBE HISTORY TaxiDB.GreenTaxis

### Update Data in Delta Table

In [0]:
%sql

SELECT RateCodeID
FROM TaxiDB.GreenTaxis
WHERE VendorId = 4

In [0]:
%sql

UPDATE TaxiDB.GreenTaxis

SET RateCodeID = 2

WHERE VendorId = 4

In [0]:
%sql

SELECT RateCodeID
FROM TaxiDB.GreenTaxis
WHERE VendorId = 4

In [0]:
%sql

DESCRIBE HISTORY TaxiDB.GreenTaxis

#####Check how update operation has removed and added a new file in delta transaction log

### Merge Data to Delta Table

In [0]:
%sql

SELECT store_and_fwd_flag
FROM TaxiDB.GreenTaxis
WHERE VendorId = 3

In [0]:
# Extract new records from Data Lake
# Read csv file to append - this file only has one record for VendorId 3

greenTaxiChangesDF = (
                        spark
                          .read                     
                          .option("header", "true")
                          .schema(greenTaxiSchema)
                          .csv(greenTaxisInputPath + "GreenTaxis_201911_changes.csv")
                    )

display(greenTaxiChangesDF)

In [0]:
# Create a temporary view on top of DataFrame

greenTaxiChangesDF.createOrReplaceTempView("GreenTaxiChanges")

In [0]:
%sql
SELECT * FROM GreenTaxiChanges

In [0]:
%sql

MERGE INTO TaxiDB.GreenTaxis AS target

  USING GreenTaxiChanges     AS source
  
ON target.VendorID = source.VendorId
  AND target.lpep_pickup_datetime = source.lpep_pickup_datetime
  AND target.PULocationID = source.PULocationID
  AND target.DOLocationID = source.DOLocationID
  
WHEN MATCHED THEN
  UPDATE SET
    target.store_and_fwd_flag = source.store_and_fwd_flag
    
WHEN NOT MATCHED
  THEN INSERT *

In [0]:
%sql

SELECT store_and_fwd_flag
FROM TaxiDB.GreenTaxis
WHERE VendorId = 3

In [0]:
%sql

DESCRIBE HISTORY TaxiDB.GreenTaxis

### Time Travel in Delta Lake

In [0]:
%sql

SELECT store_and_fwd_flag
FROM TaxiDB.GreenTaxis
WHERE VendorId = 3

In [0]:
%sql

DESCRIBE HISTORY TaxiDB.GreenTaxis

In [0]:
%sql

SELECT store_and_fwd_flag
FROM TaxiDB.GreenTaxis    VERSION AS OF 4
WHERE VendorId = 3

In [0]:
%sql

SELECT store_and_fwd_flag
FROM TaxiDB.GreenTaxis    TIMESTAMP AS OF '<add timestamp>'
WHERE VendorId = 3

### Table Constraints

In [0]:
%sql

--Table constraints

ALTER TABLE TaxiDB.GreenTaxis

ADD CONSTRAINT PassengerCountCheck CHECK (passenger_count IS NULL OR passenger_count <= 6)

#####Previous statement will fail since table already has records that does not satisfy constraint conditions

In [0]:
%sql

--Table constraints

ALTER TABLE TaxiDB.GreenTaxis

ADD CONSTRAINT PassengerCountCheck CHECK (passenger_count IS NULL OR passenger_count <= 9)

In [0]:
%sql

INSERT INTO TaxiDB.GreenTaxis
(VendorId, lpep_pickup_datetime, lpep_dropoff_datetime, store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count, trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee, improvement_surcharge, total_amount, payment_type, trip_type)

VALUES (1, '2019-12-01T00:00:00.000Z', '2019-12-01T00:15:34.000Z', 'N', 1, 145, 148, 

10,  -- passenger_count

2.9, 100.0, 15.3, 13.0, 0.5, 0.5, 1.0, 0.0, 140.0, 1, 1)