In [2]:
import org.apache.spark.sql.types._

val yellowSchema = StructType(Array(
    StructField("VendorID", IntegerType, true),
    StructField("tpep_pickup_datetime", TimestampType, true),
    StructField("tpep_dropoff_datetime", TimestampType, true),
    StructField("passenger_count", LongType, true),
    StructField("trip_distance", DoubleType, true),
    StructField("RatecodeID", LongType, true),
    StructField("store_and_fwd_flag", StringType, true),
    StructField("PULocationID", IntegerType, true),
    StructField("DOLocationID", IntegerType, true),
    StructField("payment_type", LongType, true),
    StructField("fare_amount", DoubleType, true),
    StructField("extra", DoubleType, true),
    StructField("mta_tax", DoubleType, true),
    StructField("tip_amount", DoubleType, true),
    StructField("tolls_amount", DoubleType, true),
    StructField("improvement_surcharge", DoubleType, true),
    StructField("total_amount", DoubleType, true),
    StructField("congestion_surcharge", DoubleType, true)
))

import org.apache.spark.sql.types._
yellowSchema: org.apache.spark.sql.types.StructType = StructType(StructField(VendorID,IntegerType,true),StructField(tpep_pickup_datetime,TimestampType,true),StructField(tpep_dropoff_datetime,TimestampType,true),StructField(passenger_count,LongType,true),StructField(trip_distance,DoubleType,true),StructField(RatecodeID,LongType,true),StructField(store_and_fwd_flag,StringType,true),StructField(PULocationID,IntegerType,true),StructField(DOLocationID,IntegerType,true),StructField(payment_type,LongType,true),StructField(fare_amount,DoubleType,true),StructField(extra,DoubleType,true),StructField(mta_tax,DoubleType,true),StructField(tip_amount,DoubleType,true),StructField(tolls_amount,DoubleType,true),StructField(improvement_surcharge,DoubleType,true),Struct...


In [3]:
import spark.implicits._
import org.apache.spark.sql.functions.current_timestamp
import io.delta.tables._

val df = spark.read.schema(yellowSchema).parquet("sample_data/trips").withColumn("insertion_ts", current_timestamp()).withColumn("update_ts", current_timestamp())

import spark.implicits._
import org.apache.spark.sql.functions.current_timestamp
import io.delta.tables._
df: org.apache.spark.sql.DataFrame = [VendorID: int, tpep_pickup_datetime: timestamp ... 18 more fields]


In [4]:
df.show(false)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+--------------------------+--------------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|insertion_ts              |update_ts                 |
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+--------------------------+--------------------------+
|1       |2023-07-01 00:29:59 |2023-07-01 00:40:15  |1       

In [5]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.LongType

df.write.mode("overwrite").format("delta").saveAsTable("my_trips_with_dv")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.LongType


In [6]:
spark.sql("ALTER TABLE my_trips_with_dv SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);")

res2: org.apache.spark.sql.DataFrame = []


In [8]:
spark.sql("DESCRIBE HISTORY my_trips_with_dv").show(false)

+-------+-----------------------+------+--------+---------------------------------+-----------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+----------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation                        |operationParameters                                                          |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                      |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------------------------------+-----------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+----------------------------------------------------------------------+---------

In [9]:
val df2 = 
(spark.read.schema(yellowSchema).parquet("sample_data/trips/yellow_tripdata_2023-07.parquet").sample(0.001)
.union(spark.read.schema(yellowSchema).parquet("sample_data/trips/yellow_tripdata_2023-08.parquet")).sample(0.001)
.union(spark.read.schema(yellowSchema).parquet("sample_data/trips/yellow_tripdata_2024-01.parquet")).sample(0.001))
.withColumn("insertion_ts", current_timestamp()).withColumn("update_ts", current_timestamp())


df2: org.apache.spark.sql.DataFrame = [VendorID: int, tpep_pickup_datetime: timestamp ... 18 more fields]


In [10]:
DeltaTable
.forName("my_trips_with_dv")
.as("oldData")
  .merge(
    df2.as("newData"),
    "oldData.VendorID = newData.VendorID AND oldData.tpep_pickup_datetime = newData.tpep_pickup_datetime AND oldData.tpep_dropoff_datetime = newData.tpep_dropoff_datetime")
  .whenMatched
  .updateAll
  .whenNotMatched
  .insertAll
  .execute()

In [11]:
spark.sql("DESCRIBE HISTORY my_trips_with_dv").show(false)

+-------+-----------------------+------+--------+---------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------