In [0]:
#import libraries

from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
#Check list of available files

dbutils.fs.ls("mnt/ProjectMount/curated/NYC Taxi/")

[FileInfo(path='dbfs:/mnt/ProjectMount/curated/NYC Taxi/trip_data/', name='trip_data/', size=0, modificationTime=1740207467000),
 FileInfo(path='dbfs:/mnt/ProjectMount/curated/NYC Taxi/trip_type/', name='trip_type/', size=0, modificationTime=1740207465000),
 FileInfo(path='dbfs:/mnt/ProjectMount/curated/NYC Taxi/trip_zone/', name='trip_zone/', size=0, modificationTime=1740207466000)]

####Creating Variables

In [0]:
curated = 'dbfs:/mnt/ProjectMount/curated/NYC Taxi'
refined = 'dbfs:/mnt/ProjectMount/refined/NYC Taxi'

###Data Reading

In [0]:
#Read trip_type data from curated layer

trip_type = spark.read.format("parquet").option("header", "true").option("inferSchema", "true").load(f'{curated}/trip_type')

In [0]:
#Read trip_zone data from curated layer

trip_zone = spark.read.format("parquet").option("header", "true").option("inferSchema", "true").load(f'{curated}/trip_zone')

In [0]:
#Read trip_data from curated layer

trip_data = spark.read.format("parquet").option("header", "true").option("inferSchema", "true").load(f'{curated}/trip_data')

####Creating Database

In [0]:
%sql
CREATE DATABASE refined;

####Data Writing 

In [0]:
#Write trip_type in delta format to refined layer

trip_type.write.format("delta")\
                .mode("append")\
                .option("path",f'{refined}/trip_type')\
                .saveAsTable("refined.trip_type")

In [0]:
#Write trip_zone in delta format to refined layer

trip_zone.write.format("delta")\
                .mode("append")\
                .option("path",f'{refined}/trip_zone')\
                .saveAsTable("refined.trip_zone")

In [0]:
#Write trip_data in delta format to refined layer

trip_data.write.format("delta")\
                .mode("append")\
                .option("path",f'{refined}/trip_data')\
                .saveAsTable("refined.trip_data")

In [0]:
%sql
select * from refined.trip_type

trip_type,trip_description
1,Street-hail
2,Dispatch


In [0]:
%sql
select * from refined.trip_zone

LocationID,Borough,Zone,service_zone,Zone1,Zone2
1,EWR,Newark Airport,EWR,Newark Airport,
2,Queens,Jamaica Bay,Boro Zone,Jamaica Bay,
3,Bronx,Allerton/Pelham Gardens,Boro Zone,Allerton,Pelham Gardens
4,Manhattan,Alphabet City,Yellow Zone,Alphabet City,
5,Staten Island,Arden Heights,Boro Zone,Arden Heights,
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone,Arrochar,Fort Wadsworth
7,Queens,Astoria,Boro Zone,Astoria,
8,Queens,Astoria Park,Boro Zone,Astoria Park,
9,Queens,Auburndale,Boro Zone,Auburndale,
10,Queens,Baisley Park,Boro Zone,Baisley Park,


In [0]:
%sql
select * from refined.trip_data

VendorID,PULocationID,DOLocationID,fare_amount,total_amount
1,74,265,42.9,45.4
2,216,196,23.3,25.8
2,7,114,30.3,44.44
2,74,239,16.3,21.55
2,82,223,17.0,19.5
2,7,7,10.0,12.36
2,92,121,14.2,16.7
2,244,42,13.5,16.0
2,75,41,8.6,11.1
2,95,121,14.9,17.4


####Delta Lake

In [0]:
%sql
select * from refined.trip_zone

LocationID,Borough,Zone,service_zone,Zone1,Zone2
1,EWR,Newark Airport,EWR,Newark Airport,
2,Queens,Jamaica Bay,Boro Zone,Jamaica Bay,
3,Bronx,Allerton/Pelham Gardens,Boro Zone,Allerton,Pelham Gardens
4,Manhattan,Alphabet City,Yellow Zone,Alphabet City,
5,Staten Island,Arden Heights,Boro Zone,Arden Heights,
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone,Arrochar,Fort Wadsworth
7,Queens,Astoria,Boro Zone,Astoria,
8,Queens,Astoria Park,Boro Zone,Astoria Park,
9,Queens,Auburndale,Boro Zone,Auburndale,
10,Queens,Baisley Park,Boro Zone,Baisley Park,


In [0]:
%sql

UPDATE refined.trip_zone
SET Borough = 'EMR'
WHERE LocationID = 1;

num_affected_rows
1


In [0]:
%sql
DELETE FROM refined.trip_zone
WHERE LocationID = 1;

num_affected_rows
1


####Versioning

In [0]:
%sql
DESCRIBE HISTORY refined.trip_zone;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2025-02-22T08:44:20Z,4968609912607414,rekhabendarkar@gmail.com,DELETE,"Map(predicate -> [""(LocationID#7914 = 1)""])",,List(1111388022156862),0215-061424-jox7o5yk,1.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numRemovedBytes -> 1637, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 755, numDeletionVectorsUpdated -> 0, numDeletedRows -> 1, scanTimeMs -> 633, numAddedFiles -> 0, numAddedBytes -> 0, rewriteTimeMs -> 121)",,Databricks-Runtime/15.4.x-photon-scala2.12
1,2025-02-22T08:41:01Z,4968609912607414,rekhabendarkar@gmail.com,UPDATE,"Map(predicate -> [""(LocationID#7096 = 1)""])",,List(1111388022156862),0215-061424-jox7o5yk,0.0,WriteSerializable,False,"Map(numRemovedFiles -> 0, numRemovedBytes -> 0, numCopiedRows -> 0, numDeletionVectorsAdded -> 1, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 4086, numDeletionVectorsUpdated -> 0, scanTimeMs -> 2129, numAddedFiles -> 1, numUpdatedRows -> 1, numAddedBytes -> 1637, rewriteTimeMs -> 1896)",,Databricks-Runtime/15.4.x-photon-scala2.12
0,2025-02-22T07:02:18Z,4968609912607414,rekhabendarkar@gmail.com,CREATE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> false, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(1111388022156862),0215-061424-jox7o5yk,,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 265, numOutputBytes -> 9790)",,Databricks-Runtime/15.4.x-photon-scala2.12


In [0]:
#Before
%sql
select * from refined.trip_zone
where LocationID = 1;

LocationID,Borough,Zone,service_zone,Zone1,Zone2


####TimeTravel

In [0]:
%sql
RESTORE refined.trip_zone TO VERSION AS OF 0;

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
9790,1,1,1,9790,9790


In [0]:
#After
%sql
select * from refined.trip_zone
where LocationID = 1;

LocationID,Borough,Zone,service_zone,Zone1,Zone2
1,EWR,Newark Airport,EWR,Newark Airport,
