Create spark session with delta

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

Download January and February 2021 Yellow Taxi Data (Uncomment the next two cells to donwload)

In [96]:
#!wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2021-01.parquet

In [97]:
#!wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2021-02.parquet

Read in the first dataset and print the schema

In [43]:
df = spark.read.parquet('yellow_tripdata_2021-01.parquet')

In [44]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



Create a delta database and table using the schema printed above as the schema for the table

In [46]:
delta_dir = ('./output/delta')
spark.sql('CREATE DATABASE IF NOT EXISTS RIDES')

spark.sql('''
    CREATE TABLE IF NOT EXISTS RIDES.YELLOW_TAXI(
         VendorID long,
         tpep_pickup_datetime timestamp,
         tpep_dropoff_datetime timestamp,
         passenger_count double,
         trip_distance double,
         RatecodeID double,
         store_and_fwd_flag string,
         PULocationID long,
         DOLocationID long,
         payment_type long,
         fare_amount double,
         extra double,
         mta_tax double,
         tip_amount double,
         tolls_amount double,
         improvement_surcharge double,
         total_amount double,
         congestion_surcharge double,
         airport_fee double
    ) USING DELTA
    LOCATION "{0}"
    '''.format(delta_dir)
)

DataFrame[]

Create a table from the dataframe created by the January 2021 data

In [47]:
df.createOrReplaceTempView('load_table')

Check if the VendorID tpep_pickup_datetime, tpep_dropoff_datetime, PULocationID and DOLocationID match records
already in the table. If they are not present, insert them.<br>
***NOTE*** I made the below into a function so that I can simple place new date into a TempView called "load_table and then call the function ```update()```

In [48]:
def update():
    spark.sql("""MERGE INTO RIDES.YELLOW_TAXI
    USING load_table
       ON  RIDES.YELLOW_TAXI.VendorID = load_table.VendorID and
           RIDES.YELLOW_TAXI.tpep_pickup_datetime = load_table.tpep_pickup_datetime and
           RIDES.YELLOW_TAXI.tpep_dropoff_datetime = load_table.tpep_dropoff_datetime and
           RIDES.YELLOW_TAXI.PULocationID = load_table.PULocationID and
           RIDES.YELLOW_TAXI.DOLocationID = load_table.DOLocationID 
     WHEN NOT MATCHED THEN
          INSERT (VendorID,
            tpep_pickup_datetime,
            tpep_dropoff_datetime,
            passenger_count,
            trip_distance,
            RatecodeID,
            store_and_fwd_flag,
            PULocationID,
            DOLocationID,
            payment_type,
            fare_amount,
            extra,
            mta_tax,
            tip_amount,
            tolls_amount,
            improvement_surcharge,
            total_amount,
            congestion_surcharge,
            airport_fee) VALUES (VendorID,
            tpep_pickup_datetime,
            tpep_dropoff_datetime,
            passenger_count,
            trip_distance,
            RatecodeID,
            store_and_fwd_flag,
            PULocationID,
            DOLocationID,
            payment_type,
            fare_amount,
            extra,
            mta_tax,
            tip_amount,
            tolls_amount,
            improvement_surcharge,
            total_amount,
            congestion_surcharge,
            airport_fee)
    """)

Call update and January data should be loaded. Your numbers will most likely be different from that shown below because I messed up a bit loading the data initially.

In [None]:
update()

Get a count of the records. This should match what is currently in the df dataframe and the load_table TempView

In [None]:
spark.sql("SELECT count(1) FROM RIDES.YELLOW_TAXI").show()

Read in the February data into the load_table

In [98]:
spark.read.parquet('yellow_tripdata_2021-01.parquet').createOrReplaceTempView('load_table')

Run the update again

In [41]:
update()

Now the cound should include January and February data excluding any duplicates as specified in the merge statment in the update function

In [99]:
spark.sql("SELECT count(1) FROM RIDES.YELLOW_TAXI").show()

Check the history of the table

In [50]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, './output/delta')

fullHistoryDF = deltaTable.history()   

In [100]:
fullHistoryDF.show()

Or another way to see the history. In this case, just version is selected.

In [74]:
spark.sql("DESCRIBE HISTORY './output/delta/'").select('version').show()

+-------+
|version|
+-------+
|      3|
|      2|
|      1|
|      0|
+-------+



Not sure why the below query doesn't work. What the heck Databricks!?

In [101]:
#spark.sql("SELECT * FROM RIDES.YELLOW_TAXI VERSION AS OF 0")

But we CAN read in specific versions with the syntax below. 

In [88]:
test1 = spark.read.format("delta").option("versionAsOf", "0").load("./output/delta")

In [89]:
test1.count()

0

In [90]:
test2 = spark.read.format("delta").option("versionAsOf", "1").load("./output/delta")

In [91]:
test2.count()

1371709

In [94]:
test3 = spark.read.format("delta").option("versionAsOf", "3").load("./output/delta")

In [95]:
test3.count()

2741478