In [0]:
spark.conf.set("fs.azure.account.auth.type.nycprojectstorageaccount.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.nycprojectstorageaccount.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.nycprojectstorageaccount.dfs.core.windows.net", "cf34b65d-6559-4842-831d-d9cc06a4bcb5")
spark.conf.set("fs.azure.account.oauth2.client.secret.nycprojectstorageaccount.dfs.core.windows.net", "1U58Q~DeK7xpi3jgkMAga00JjNZLZglaiSIeBdi9")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.nycprojectstorageaccount.dfs.core.windows.net", "https://login.microsoftonline.com/92ed5742-9b9e-4f11-a129-0437edfdab2d/oauth2/token")

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Database creation

In [0]:
%sql
-- CREATE DATABASE IF NOT EXISTS gold_layer LOCATION 'abfss://gold@nycprojectstorageaccount.dfs.core.windows.net/';

CREATE DATABASE IF NOT EXISTS gold_layer LOCATION 'abfss://gold@nycprojectstorageaccount.dfs.core.windows.net/gold_layer';

## Trip zone

In [0]:
silver = 'abfss://silver@nycprojectstorageaccount.dfs.core.windows.net'
gold = 'abfss://gold@nycprojectstorageaccount.dfs.core.windows.net'

In [0]:
df_zone = spark.read.format("parquet")\
   .option("header", "true")\
    .option("inferSchema", "true")\
    .load(f"{silver}/trip_zone_2024")

In [0]:
df_zone.display()

LocationID,Borough,Zone,service_zone,Zone1,Zone2
1,EWR,Newark Airport,EWR,Newark Airport,
2,Queens,Jamaica Bay,Boro Zone,Jamaica Bay,
3,Bronx,Allerton/Pelham Gardens,Boro Zone,Allerton,Pelham Gardens
4,Manhattan,Alphabet City,Yellow Zone,Alphabet City,
5,Staten Island,Arden Heights,Boro Zone,Arden Heights,
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone,Arrochar,Fort Wadsworth
7,Queens,Astoria,Boro Zone,Astoria,
8,Queens,Astoria Park,Boro Zone,Astoria Park,
9,Queens,Auburndale,Boro Zone,Auburndale,
10,Queens,Baisley Park,Boro Zone,Baisley Park,


### Writing a parquet file as a delta table

In [0]:
dbutils.fs.ls("abfss://gold@nycprojectstorageaccount.dfs.core.windows.net/")

[FileInfo(path='abfss://gold@nycprojectstorageaccount.dfs.core.windows.net/trip_data_delta/', name='trip_data_delta/', size=0, modificationTime=1738338015000),
 FileInfo(path='abfss://gold@nycprojectstorageaccount.dfs.core.windows.net/trip_type_delta/', name='trip_type_delta/', size=0, modificationTime=1738337755000),
 FileInfo(path='abfss://gold@nycprojectstorageaccount.dfs.core.windows.net/trip_zone_delta/', name='trip_zone_delta/', size=0, modificationTime=1738336701000)]

#### 1.) First - Write the parquet file as a delta file

In [0]:
df_zone.write.format("delta")\
      .mode("overwrite")\
      .option('path', "abfss://gold@nycprojectstorageaccount.dfs.core.windows.net/trip_zone_delta")\
      .save()

#### 2.) Second - Create table from the delta file using CTAS

In [0]:
%sql
CREATE TABLE IF NOT EXISTS gold_layer.trip_zone_delta as SELECT * FROM delta.`abfss://gold@nycprojectstorageaccount.dfs.core.windows.net/trip_zone_delta`;

num_affected_rows,num_inserted_rows


In [0]:
%sql
select * from gold_layer.trip_zone_delta

LocationID,Borough,Zone,service_zone,Zone1,Zone2
1,EWR,Newark Airport,EWR,Newark Airport,
2,Queens,Jamaica Bay,Boro Zone,Jamaica Bay,
3,Bronx,Allerton/Pelham Gardens,Boro Zone,Allerton,Pelham Gardens
4,Manhattan,Alphabet City,Yellow Zone,Alphabet City,
5,Staten Island,Arden Heights,Boro Zone,Arden Heights,
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone,Arrochar,Fort Wadsworth
7,Queens,Astoria,Boro Zone,Astoria,
8,Queens,Astoria Park,Boro Zone,Astoria Park,
9,Queens,Auburndale,Boro Zone,Auburndale,
10,Queens,Baisley Park,Boro Zone,Baisley Park,


## Trip Type

In [0]:
df_trip_type = spark.read.format("parquet")\
   .option("header", "true")\
    .option("inferSchema", "true")\
    .load(f"{silver}/trip_type_2024")

In [0]:
df_trip_type.write.format("delta")\
      .mode("overwrite")\
      .option('path', "abfss://gold@nycprojectstorageaccount.dfs.core.windows.net/trip_type_delta")\
      .save()

In [0]:
%sql
CREATE TABLE if NOT EXISTS gold_layer.trip_type_delta as SELECT * FROM delta.`abfss://gold@nycprojectstorageaccount.dfs.core.windows.net/trip_type_delta`;

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM gold_layer.trip_type_delta

trip_type,trip_description
1,Street-hail
2,Dispatch


## Trip Data

In [0]:
df_trip_data = spark.read.format("parquet")\
   .option("header", "true")\
    .option("inferSchema", "true")\
    .load(f"{silver}/trip_data_2024")

In [0]:
df_trip_data.write.format("delta")\
      .mode("overwrite")\
      .option('path', "abfss://gold@nycprojectstorageaccount.dfs.core.windows.net/trip_data_delta")\
      .save()

In [0]:
%sql
CREATE TABLE IF NOT EXISTS gold_layer.trip_data_delta as SELECT * FROM delta.`abfss://gold@nycprojectstorageaccount.dfs.core.windows.net/trip_data_delta`;

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM gold_layer.trip_data_delta

VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,trip_date,trip_year,trip_month
2,2024-11-01T00:09:19,2024-11-01T00:49:44,N,5,97,50,1,6.68,65.0,0.0,0.0,0.0,0.0,,1.0,68.75,1,2.0,2.75,2024-11-01,2024,11
2,2024-11-01T00:28:47,2024-11-01T00:34:16,N,1,166,41,1,1.44,8.6,1.0,0.5,3.0,0.0,,1.0,14.1,1,1.0,0.0,2024-11-01,2024,11
2,2024-11-01T00:24:54,2024-11-01T00:26:55,N,1,129,82,1,0.21,4.4,1.0,0.5,0.0,0.0,,1.0,6.9,2,1.0,0.0,2024-11-01,2024,11
1,2024-11-01T00:03:21,2024-11-01T00:43:38,N,1,66,164,2,6.6,38.7,3.75,1.5,10.0,0.0,,1.0,53.95,1,1.0,2.75,2024-11-01,2024,11
2,2024-11-01T00:05:54,2024-11-01T00:12:42,N,1,83,83,1,0.97,8.6,1.0,0.5,2.22,0.0,,1.0,13.32,1,1.0,0.0,2024-11-01,2024,11
2,2024-10-31T22:58:17,2024-10-31T23:04:02,N,1,236,151,1,0.99,7.9,1.0,0.5,0.0,0.0,,1.0,10.4,1,1.0,0.0,2024-10-31,2024,10
2,2024-11-01T00:44:10,2024-11-01T01:39:10,N,1,74,68,1,7.78,51.3,1.0,0.5,0.0,0.0,,1.0,56.55,2,1.0,2.75,2024-11-01,2024,11
2,2024-11-01T00:55:16,2024-11-01T01:12:07,N,1,255,79,2,3.3,18.4,1.0,0.5,0.0,0.0,,1.0,23.65,1,1.0,2.75,2024-11-01,2024,11
2,2024-11-01T00:30:47,2024-11-01T00:40:31,N,1,152,42,2,1.61,12.1,1.0,0.5,0.0,0.0,,1.0,14.6,2,1.0,0.0,2024-11-01,2024,11
2,2024-11-01T00:17:47,2024-11-01T00:32:47,N,5,82,195,1,2.55,35.0,0.0,0.0,5.0,0.0,,1.0,41.0,1,2.0,0.0,2024-11-01,2024,11


In [0]:
%sql
DESCRIBE HISTORY gold_layer.trip_data_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2025-01-31T15:41:04Z,4520695247759210,scpowar@gmail.com,CREATE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(2011661020344616),0129-013309-kmyggzs9,,WriteSerializable,True,"Map(numFiles -> 4, numOutputRows -> 606224, numOutputBytes -> 13959409)",,Databricks-Runtime/15.4.x-photon-scala2.12


## Delta table versioning

In [0]:
%sql 
select * from gold_layer.trip_type_delta

trip_type,trip_description
1,Street-hail
2,Dispatch


In [0]:
%sql
update gold_layer.trip_type_delta set trip_description = 'Round Trip' where trip_type = 2

num_affected_rows
1


In [0]:
%sql
select * from gold_layer.trip_type_delta

trip_type,trip_description
2,Round Trip
1,Street-hail


In [0]:
%sql
describe history gold_layer.trip_type_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2025-01-31T19:18:04Z,4520695247759210,scpowar@gmail.com,OPTIMIZE,"Map(predicate -> [], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,List(2011661020344616),0129-013309-kmyggzs9,1.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 2, numRemovedBytes -> 1769, p25FileSize -> 898, numDeletionVectorsRemoved -> 1, minFileSize -> 898, numAddedFiles -> 1, maxFileSize -> 898, p75FileSize -> 898, p50FileSize -> 898, numAddedBytes -> 898)",,Databricks-Runtime/15.4.x-photon-scala2.12
1,2025-01-31T19:17:59Z,4520695247759210,scpowar@gmail.com,UPDATE,"Map(predicate -> [""(trip_type#3253 = 2)""])",,List(2011661020344616),0129-013309-kmyggzs9,0.0,WriteSerializable,False,"Map(numRemovedFiles -> 0, numRemovedBytes -> 0, numCopiedRows -> 0, numDeletionVectorsAdded -> 1, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 5077, numDeletionVectorsUpdated -> 0, scanTimeMs -> 2774, numAddedFiles -> 1, numUpdatedRows -> 1, numAddedBytes -> 877, rewriteTimeMs -> 2255)",,Databricks-Runtime/15.4.x-photon-scala2.12
0,2025-01-31T15:36:26Z,4520695247759210,scpowar@gmail.com,CREATE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(2011661020344616),0129-013309-kmyggzs9,,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 892)",,Databricks-Runtime/15.4.x-photon-scala2.12


In [0]:
%sql
select * from gold_layer.trip_type_delta version as of 0

trip_type,trip_description
1,Street-hail
2,Dispatch


In [0]:
%sql
restore table gold_layer.trip_type_delta version as of 0

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
892,1,1,1,898,892


In [0]:
%sql
select * from gold_layer.trip_type_delta

trip_type,trip_description
1,Street-hail
2,Dispatch
