# Unit 9: Insert into MOR tables

In this unit, we will learn insert operations into MOR tables with a peek under the hood.<br>

This unit takes about 5 minutes to complete.

In [1]:
from pyspark.sql.functions import lit
from functools import reduce
from pyspark.sql.types import LongType
import pyspark.sql.functions as F
from datetime import datetime

spark = SparkSession.builder \
  .appName("Hudi-Learning-Unit-09-PySpark") \
  .master("yarn")\
  .enableHiveSupport()\
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
  .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
  .getOrCreate()

spark

23/08/02 06:29:22 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### Variables

In [2]:
PROJECT_ID_OUTPUT=!gcloud config get-value core/project
PROJECT_ID=PROJECT_ID_OUTPUT[0]
PROJECT_NBR_OUTPUT=!gcloud projects describe $PROJECT_ID --format="value(projectNumber)"
PROJECT_NBR=PROJECT_NBR_OUTPUT[0]
LOCATION = "us-central1" #Replace with your GCP region

HUDI_MOR_BASE_GCS_URI = f"gs://gaia_data_bucket-{PROJECT_NBR}/nyc-taxi-trips-hudi-mor"
TRIP_DATE="2020-01-30"
DATAPROC_METASTORE_THRIFT_URI_LIST=!gcloud metastore services list --location $LOCATION | grep thrift | cut -d' ' -f11
DATAPROC_METASTORE_THRIFT_URI=DATAPROC_METASTORE_THRIFT_URI_LIST[0]

print(f"Project ID is {PROJECT_ID}")
print(f"Project number is {PROJECT_NBR}")
print(f"Project location is is {LOCATION}")
print(f"Dataproc Metastore Service thrift URI is {DATAPROC_METASTORE_THRIFT_URI}")
print(f"Trip date partition we will insert into is {TRIP_DATE}")

Project ID is apache-hudi-lab
Project number is 623600433888
Project location is is us-central1
Dataproc Metastore Service thrift URI is thrift://10.60.192.28:9080
Trip date partition we will insert into is 2020-01-30


**Note**: Ensure you have the right URI for Dataproc Metastore

## 1. [HUDI INSERT FEATURE] Insert into MoR table

In [3]:
# This is in case you are re-running this notebook, we need the MoR table definition
spark.sql(f"CREATE TABLE IF NOT EXISTS taxi_db.nyc_taxi_trips_hudi_mor USING hudi LOCATION \"{HUDI_MOR_BASE_GCS_URI}/\";")

ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
23/08/02 06:29:35 WARN GhfsStorageStatistics: Detected potential high latency for operation stream_write_close_operations. latencyMs=124; previousMaxLatencyMs=0; operationCount=1; context=gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/hoodie.properties
23/08/02 06:29:35 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


DataFrame[]

In [4]:
spark.sql("SHOW tables IN taxi_db;").show(truncate=False)

23/08/02 06:29:36 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
[Stage 1:>                                                          (0 + 1) / 1]

+---------+-----------------------+-----------+
|namespace|tableName              |isTemporary|
+---------+-----------------------+-----------+
|taxi_db  |nyc_taxi_trips_hudi_cow|false      |
|taxi_db  |nyc_taxi_trips_hudi_mor|false      |
+---------+-----------------------+-----------+



                                                                                

The MOR table we see above was created as part of MOR dataset creation.

### 1.1. Determine trip ID to clone
(with some modifications and then insert)

In [5]:
ORIGINAL_TRIP_ID=spark.sql(f"select trip_id  from taxi_db.nyc_taxi_trips_hudi_mor WHERE trip_date='2020-01-30' AND trip_hour < 12 LIMIT 1").collect()[0][0]
print(f"ID of the trip cloned: {ORIGINAL_TRIP_ID}")

[Stage 5:>                                                          (0 + 1) / 1]

ID of the trip cloned: 8589935026


                                                                                

### 1.2. Create a record / taxi trip that we will use for our insert trial
We'll grab a record and change the hour of pickup and dropoff to be 5 hours later:<br>

#### 1.2.1. Generate a new trip ID to use for the record we will be inserting

In [6]:
NEW_TRIP_ID=spark.sql(f"select max(trip_id) as max_trip_id from taxi_db.nyc_taxi_trips_hudi_mor WHERE trip_date='2020-01-30'").collect()[0][0] + 1
print(f"New trip ID is: {NEW_TRIP_ID}")



New trip ID is: 309237772005


                                                                                

#### 1.2.3. Identify a record to use that we will morph and insert

In [7]:
# This query returns exactly one record
candidateTripDFMor=spark.sql(f"SELECT * FROM taxi_db.nyc_taxi_trips_hudi_mor WHERE trip_date='2020-01-30' AND trip_id={ORIGINAL_TRIP_ID}")
candidateTripDFMor.show(truncate=False)
candidateTripDFMor.printSchema

23/08/02 06:30:03 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------------------+-----------------------+------------------+----------------------+-----------------------------------------------------------------------------+---------+---------+----------+--------+---------+-----------+---------+-------------------+-------------------+-----------------+---------+------------------+-------------------+---------------+-------------+------------+-----------+-----------+-----------+------------+---------------------+------------+-----------------+--------------------+---------+---------+--------------+------------------------+--------------------+----------+----------+
|_hoodie_commit_time|_hoodie_commit_seqno   |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                            |taxi_type|trip_year|trip_month|trip_day|trip_hour|trip_minute|vendor_id|pickup_datetime    |dropoff_datetime   |store_and_forward|rate_code|pickup_location_id|dropoff_location_id|passenger_count|trip_distance|fare_amount 

                                                                                

<bound method DataFrame.printSchema of DataFrame[_hoodie_commit_time: string, _hoodie_commit_seqno: string, _hoodie_record_key: string, _hoodie_partition_path: string, _hoodie_file_name: string, taxi_type: string, trip_year: int, trip_month: int, trip_day: int, trip_hour: int, trip_minute: int, vendor_id: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, store_and_forward: string, rate_code: string, pickup_location_id: string, dropoff_location_id: string, passenger_count: bigint, trip_distance: decimal(38,9), fare_amount: decimal(38,9), surcharge: decimal(38,9), mta_tax: decimal(38,9), tip_amount: decimal(38,9), tolls_amount: decimal(38,9), improvement_surcharge: decimal(10,0), total_amount: decimal(38,9), payment_type_code: string, congestion_surcharge: decimal(10,0), trip_type: string, ehail_fee: decimal(10,0), partition_date: date, distance_between_service: decimal(38,9), time_between_service: bigint, trip_id: bigint, trip_date: string]>

#### 1.2.4. Create the record to insert

In [8]:
insertTripDFMor = candidateTripDFMor.withColumn('pickup_datetime', candidateTripDFMor.pickup_datetime + F.expr('INTERVAL 5 HOURS')) \
                                    .withColumn('dropoff_datetime', candidateTripDFMor.dropoff_datetime + F.expr('INTERVAL 5 HOURS')) \
                                    .withColumn('trip_hour', candidateTripDFMor.trip_hour + 5) \
                                    .withColumn('trip_id', lit(NEW_TRIP_ID)) \
                                    .drop("_hoodie_commit_time") \
                                    .drop("_hoodie_commit_seqno") \
                                    .drop("_hoodie_record_key") \
                                    .drop("_hoodie_partition_path") \
                                    .drop("_hoodie_file_name")

insertTripDFMor.show(truncate=False)
insertTripDFMor.printSchema

                                                                                

+---------+---------+----------+--------+---------+-----------+---------+-------------------+-------------------+-----------------+---------+------------------+-------------------+---------------+-------------+------------+-----------+-----------+-----------+------------+---------------------+------------+-----------------+--------------------+---------+---------+--------------+------------------------+--------------------+------------+----------+
|taxi_type|trip_year|trip_month|trip_day|trip_hour|trip_minute|vendor_id|pickup_datetime    |dropoff_datetime   |store_and_forward|rate_code|pickup_location_id|dropoff_location_id|passenger_count|trip_distance|fare_amount |surcharge  |mta_tax    |tip_amount |tolls_amount|improvement_surcharge|total_amount|payment_type_code|congestion_surcharge|trip_type|ehail_fee|partition_date|distance_between_service|time_between_service|trip_id     |trip_date |
+---------+---------+----------+--------+---------+-----------+---------+-------------------+---

<bound method DataFrame.printSchema of DataFrame[taxi_type: string, trip_year: int, trip_month: int, trip_day: int, trip_hour: int, trip_minute: int, vendor_id: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, store_and_forward: string, rate_code: string, pickup_location_id: string, dropoff_location_id: string, passenger_count: bigint, trip_distance: decimal(38,9), fare_amount: decimal(38,9), surcharge: decimal(38,9), mta_tax: decimal(38,9), tip_amount: decimal(38,9), tolls_amount: decimal(38,9), improvement_surcharge: decimal(10,0), total_amount: decimal(38,9), payment_type_code: string, congestion_surcharge: decimal(10,0), trip_type: string, ehail_fee: decimal(10,0), partition_date: date, distance_between_service: decimal(38,9), time_between_service: bigint, trip_id: bigint, trip_date: string]>

### 1.3. Prepare for insert

#### 1.3.1. Lets review the timeline

In [9]:
!gsutil ls -al $HUDI_MOR_BASE_GCS_URI/.hoodie

   1606987  2023-08-02T06:28:26Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230731212147518.deltacommit#1690957706235286  metageneration=1
    459360  2023-08-02T06:28:26Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230731212147518.deltacommit.inflight#1690957706231625  metageneration=1
         0  2023-08-02T06:28:26Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230731212147518.deltacommit.requested#1690957706301122  metageneration=1
    799752  2023-08-02T06:28:26Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230731214644879.deltacommit#1690957706670514  metageneration=1
    459971  2023-08-02T06:28:26Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230731214644879.deltacommit.inflight#1690957706273032  metageneration=1
         0  2023-08-02T06:28:26Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230731214644879.deltacommit.requested#1

#### 1.3.2. Review the files in the DFS for the specific trip_date 2020-01-30

In [10]:
# File system layout, files, types & counts by types, byte sizes
!gsutil ls -alh $HUDI_MOR_BASE_GCS_URI/trip_date=2020-01-30/

     373 B  2023-08-02T06:28:37Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/trip_date=2020-01-30/.hoodie_partition_metadata.parquet#1690957717957913  metageneration=1
  4.28 MiB  2023-08-02T06:28:37Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/trip_date=2020-01-30/b56cd539-2355-46cd-b260-c98366952ea4-0_228-38-12729_20230731214644879.parquet#1690957717969042  metageneration=1
  4.27 MiB  2023-08-02T06:28:37Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/trip_date=2020-01-30/d168cab4-e2ba-4c55-b756-fa5ffe621bb4-0_229-38-12731_20230731214644879.parquet#1690957717956170  metageneration=1
805.64 KiB  2023-08-02T06:28:37Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/trip_date=2020-01-30/f5c1f85b-c2f9-45c3-9602-3b13bd1e4617-0_230-38-12733_20230731214644879.parquet#1690957717977586  metageneration=1
TOTAL: 4 objects, 9789865 bytes (9.34 MiB)


#### 1.3.3. Capture record count prior to insert

In [11]:
print(f"Trip Date: {TRIP_DATE}")
TRIP_COUNT_BEFORE_INSERT_RT=spark.sql(f"SELECT count(*) as trip_count from taxi_db.nyc_taxi_trips_hudi_mor where trip_date='2020-01-30'").collect()[0][0]
print(f"Original Trip Count: {TRIP_COUNT_BEFORE_INSERT_RT}")

Trip Date: 2020-01-30




Original Trip Count: 257927


                                                                                

#### 1.3.4. Review the original record

In [12]:
spark.sql(f"SELECT trip_id,taxi_type,vendor_id,pickup_datetime,dropoff_datetime,pickup_location_id,dropoff_location_id,trip_date " \
          f" FROM taxi_db.nyc_taxi_trips_hudi_mor "\
          f" WHERE trip_date='2020-01-30' AND trip_id={ORIGINAL_TRIP_ID}") \
        .show(truncate=False)

+----------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+
|trip_id   |taxi_type|vendor_id|pickup_datetime    |dropoff_datetime   |pickup_location_id|dropoff_location_id|trip_date |
+----------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+
|8589935026|yellow   |2        |2020-01-30 08:11:51|2020-01-30 08:36:48|262               |107                |2020-01-30|
+----------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+



                                                                                

#### 1.3.5. Capture core attributes/columns of the record  to be inserted

In [13]:
# The record we want to insert - note its pickup_datetime and dropoff_datetime are different
insertTripDFMor.select("trip_id","taxi_type","vendor_id","pickup_datetime","dropoff_datetime","pickup_location_id","dropoff_location_id","trip_date") \
               .show(truncate=False)

+------------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+
|trip_id     |taxi_type|vendor_id|pickup_datetime    |dropoff_datetime   |pickup_location_id|dropoff_location_id|trip_date |
+------------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+
|309237772005|yellow   |2        |2020-01-30 13:11:51|2020-01-30 13:36:48|262               |107                |2020-01-30|
+------------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+



In [14]:
# The full record we will insert
insertTripDFMor.show(truncate=False)



+---------+---------+----------+--------+---------+-----------+---------+-------------------+-------------------+-----------------+---------+------------------+-------------------+---------------+-------------+------------+-----------+-----------+-----------+------------+---------------------+------------+-----------------+--------------------+---------+---------+--------------+------------------------+--------------------+------------+----------+
|taxi_type|trip_year|trip_month|trip_day|trip_hour|trip_minute|vendor_id|pickup_datetime    |dropoff_datetime   |store_and_forward|rate_code|pickup_location_id|dropoff_location_id|passenger_count|trip_distance|fare_amount |surcharge  |mta_tax    |tip_amount |tolls_amount|improvement_surcharge|total_amount|payment_type_code|congestion_surcharge|trip_type|ehail_fee|partition_date|distance_between_service|time_between_service|trip_id     |trip_date |
+---------+---------+----------+--------+---------+-----------+---------+-------------------+---

                                                                                

#### 1.3.5. Review tables in the Hive Metatsore

In [15]:
spark.sql("SHOW tables IN taxi_db;").show(truncate=False)

+---------+-----------------------+-----------+
|namespace|tableName              |isTemporary|
+---------+-----------------------+-----------+
|taxi_db  |nyc_taxi_trips_hudi_cow|false      |
|taxi_db  |nyc_taxi_trips_hudi_mor|false      |
+---------+-----------------------+-----------+



#### 1.3.6. Delete the existing MoR external table definition
When we created the MoR data early on in this lab, we registered an external table.<br> 
As part of our Hudi options (below), we will specify Hive sync.<br> 
With Hive Sync, Hudi creates two tables with its extensions - _ro for read optimized table and _rt for real time.<br>
Lets delete the MoR table that already exists to see whar Hudi auto-creates

In [16]:
spark.sql("DROP TABLE IF EXISTS taxi_db.nyc_taxi_trips_hudi_mor;")

DataFrame[]

In [17]:
spark.sql("SHOW tables IN taxi_db;").show(truncate=False)

+---------+-----------------------+-----------+
|namespace|tableName              |isTemporary|
+---------+-----------------------+-----------+
|taxi_db  |nyc_taxi_trips_hudi_cow|false      |
+---------+-----------------------+-----------+



#### 1.3.7. Hudi options

In [18]:
hudi_options = {
            'hoodie.database.name': 'taxi_db',
            'hoodie.table.name': 'nyc_taxi_trips_hudi_mor',
            'hoodie.datasource.write.table.name': 'nyc_taxi_trips_hudi_mor',
            'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
            'hoodie.datasource.write.recordkey.field': 'trip_id',
            'hoodie.datasource.write.partitionpath.field': 'trip_date',
            'hoodie.datasource.write.precombine.field': 'pickup_datetime',
            'hoodie.datasource.write.hive_style_partitioning': 'true',
            'hoodie.partition.metafile.use.base.format': 'true', 
            'hoodie.datasource.write.drop.partition.columns': 'true',
            'hoodie.datasource.write.operation': 'insert',
            'hoodie.datasource.hive_sync.enable': 'true',
            'hoodie.meta.sync.client.tool.class': 'org.apache.hudi.hive.HiveSyncTool',
            'hoodie.datasource.hive_sync.mode':'hms',
            'hoodie.datasource.hive_sync.metastore.uris':DATAPROC_METASTORE_THRIFT_URI,
            'hoodie.datasource.hive_sync.auto_create_database':'true',
            'hoodie.datasource.hive_sync.database': 'taxi_db',
            'hoodie.datasource.hive_sync.table': 'nyc_taxi_trips_hudi_mor',
            'hoodie.datasource.hive_sync.partition_fields': 'trip_date', 
            'hoodie.datasource.hive_sync.partition_extractor_class':'org.apache.hudi.hive.MultiPartKeysValueExtractor',
            'hoodie.datasource.hive_sync.use_jdbc': 'false',
            'hoodie.datasource.hive_sync.support_timestamp': 'true'
        }

### 1.4. Insert the record 

#### 1.4.1. Insert into Hudi MoR

In [19]:
# Insert in append mode to the base path, and with a hive metastore metadata sync
insertTripDFMor.write.format("hudi"). \
                options(**hudi_options). \
                mode("append"). \
                save(HUDI_MOR_BASE_GCS_URI)

23/08/02 06:30:21 WARN GhfsStorageStatistics: Detected potential high latency for operation stream_read_operations. latencyMs=123; previousMaxLatencyMs=94; operationCount=4561; context=gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/hoodie.properties
23/08/02 06:30:26 WARN GhfsStorageStatistics: Detected potential high latency for operation op_create. latencyMs=102; previousMaxLatencyMs=97; operationCount=4; context=gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230802063019539.deltacommit.inflight
23/08/02 06:30:26 WARN GhfsStorageStatistics: Detected potential high latency for operation stream_write_close_operations. latencyMs=128; previousMaxLatencyMs=124; operationCount=3; context=gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230802063019539.deltacommit.inflight
23/08/02 06:30:27 WARN GhfsStorageStatistics: Detected potential high latency for operation stream_write_close_operations. latencyMs=145; previousMaxLatencyMs=

#### 1.4.2. Review tables in the Hive Metatsore

Note how Hudi registers two MoR tables? <br>
With Hive Sync (in our Hudi options), Hudi creates two tables with its extensions - _ro for read optimized table and _rt for real time.<br>

In [20]:
spark.sql("SHOW tables IN taxi_db;").show(truncate=False)

+---------+--------------------------+-----------+
|namespace|tableName                 |isTemporary|
+---------+--------------------------+-----------+
|taxi_db  |nyc_taxi_trips_hudi_cow   |false      |
|taxi_db  |nyc_taxi_trips_hudi_mor_ro|false      |
|taxi_db  |nyc_taxi_trips_hudi_mor_rt|false      |
+---------+--------------------------+-----------+



#### 1.4.3. Review the file system for trip_date=2020-01-30

In [21]:
# File system layout, files, types & counts by types, byte sizes
!gsutil ls -alh $HUDI_MOR_BASE_GCS_URI/trip_date=2020-01-30/

     373 B  2023-08-02T06:28:37Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/trip_date=2020-01-30/.hoodie_partition_metadata.parquet#1690957717957913  metageneration=1
  4.28 MiB  2023-08-02T06:28:37Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/trip_date=2020-01-30/b56cd539-2355-46cd-b260-c98366952ea4-0_228-38-12729_20230731214644879.parquet#1690957717969042  metageneration=1
  4.27 MiB  2023-08-02T06:28:37Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/trip_date=2020-01-30/d168cab4-e2ba-4c55-b756-fa5ffe621bb4-0_229-38-12731_20230731214644879.parquet#1690957717956170  metageneration=1
802.23 KiB  2023-08-02T06:30:29Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/trip_date=2020-01-30/f5c1f85b-c2f9-45c3-9602-3b13bd1e4617-0_0-29-2907_20230802063019539.parquet#1690957829374342  metageneration=1
805.64 KiB  2023-08-02T06:28:37Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/trip_date=2020-01-30/f5c1f85b-c2f9-45c3-9602-

#### 1.4.4. Review the commits

In [22]:
spark.sql("call show_commits(table => 'taxi_db.nyc_taxi_trips_hudi_mor_ro', limit => 100);").show(100, truncate=False)

+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|commit_time      |total_bytes_written|total_files_added|total_files_updated|total_partitions_written|total_records_written|total_update_records_written|total_errors|
+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|20230802063019539|821488             |0                |1                  |1                       |12142                |0                           |0           |
|20230731220137856|1414387506         |438              |0                  |337                     |37023925             |0                           |0           |
|20230731215336903|1223009119         |405              |0                  |365                     |31972637             |0                           |0           

In [23]:
spark.sql("call show_commits(table => 'taxi_db.nyc_taxi_trips_hudi_mor_rt', limit => 100);").show(100, truncate=False)

23/08/02 06:33:33 WARN GhfsStorageStatistics: Detected potential high latency for operation op_open. latencyMs=107; previousMaxLatencyMs=91; operationCount=125; context=gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230731220137856.deltacommit


+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|commit_time      |total_bytes_written|total_files_added|total_files_updated|total_partitions_written|total_records_written|total_update_records_written|total_errors|
+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|20230802063019539|821488             |0                |1                  |1                       |12142                |0                           |0           |
|20230731220137856|1414387506         |438              |0                  |337                     |37023925             |0                           |0           |
|20230731215336903|1223009119         |405              |0                  |365                     |31972637             |0                           |0           

#### 1.4.5. Review commit log

In [24]:
LOG_FILE_LIST=!gsutil ls $HUDI_MOR_BASE_GCS_URI/.hoodie/*.deltacommit | tail -n 1 
LOG_FILE=LOG_FILE_LIST[0]
print(f"Log file FQP is {LOG_FILE} and it correlates with a commit time in the table above")

!gsutil cat $LOG_FILE

Log file FQP is gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230802063019539.deltacommit and it correlates with a commit time in the table above
{
  "partitionToWriteStats" : {
    "trip_date=2020-01-30" : [ {
      "fileId" : "f5c1f85b-c2f9-45c3-9602-3b13bd1e4617-0",
      "path" : "trip_date=2020-01-30/f5c1f85b-c2f9-45c3-9602-3b13bd1e4617-0_0-29-2907_20230802063019539.parquet",
      "prevCommit" : "20230731214644879",
      "numWrites" : 12142,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 1,
      "totalWriteBytes" : 821488,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "trip_date=2020-01-30",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 821488,
      "minEventTime" : null,
      "max

#### 1.4.6. Review the timeline

In [25]:
!gsutil ls -al $HUDI_MOR_BASE_GCS_URI/.hoodie

   1606987  2023-08-02T06:28:26Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230731212147518.deltacommit#1690957706235286  metageneration=1
    459360  2023-08-02T06:28:26Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230731212147518.deltacommit.inflight#1690957706231625  metageneration=1
         0  2023-08-02T06:28:26Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230731212147518.deltacommit.requested#1690957706301122  metageneration=1
    799752  2023-08-02T06:28:26Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230731214644879.deltacommit#1690957706670514  metageneration=1
    459971  2023-08-02T06:28:26Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230731214644879.deltacommit.inflight#1690957706273032  metageneration=1
         0  2023-08-02T06:28:26Z  gs://gaia_data_bucket-623600433888/nyc-taxi-trips-hudi-mor/.hoodie/20230731214644879.deltacommit.requested#1

#### 1.4.7. Lets do a count on the RT & RO tables 

In [26]:
# Run a count to ensure that the record count increased against the MOR_RT table
AFTER_INSERT_TRIP_COUNT=spark.sql(f"SELECT COUNT(*) as trip_count FROM taxi_db.nyc_taxi_trips_hudi_mor_rt WHERE trip_date='2020-01-30'").collect()[0][0]
print(f"MOR_RO: RT Trip count before insert was: {TRIP_COUNT_BEFORE_INSERT_RT} and latest trip count is {AFTER_INSERT_TRIP_COUNT}")



MOR_RO: Trip count before insert was: 257927 and latest trip count is 257928


                                                                                

In [27]:
# Run a count to ensure that the record count increased against the MOR_RT table
AFTER_INSERT_TRIP_COUNT_RO=spark.sql(f"SELECT COUNT(*) as trip_count FROM taxi_db.nyc_taxi_trips_hudi_mor_ro WHERE trip_date='2020-01-30'").collect()[0][0]
print(f"MOR_RO: Trip count before insert was: {TRIP_COUNT_BEFORE_INSERT_RT} and latest trip count is {AFTER_INSERT_TRIP_COUNT_RO}")



MOR_RO: Trip count before insert was: 257927 and latest trip count is 257928


                                                                                

For MoR tables, inserts behave exactly the same as CoW, new parquet is created and indexed. This is why we see identical record count across both tables, reflective of the insert.

In [28]:
# Lets review which file has the record
spark.sql(f"SELECT _hoodie_file_name,_hoodie_commit_time,trip_id,taxi_type,vendor_id,pickup_datetime,dropoff_datetime,pickup_location_id,dropoff_location_id,trip_date " \
          f" FROM taxi_db.nyc_taxi_trips_hudi_mor_rt "\
          f" WHERE trip_date='2020-01-30' AND trip_id={NEW_TRIP_ID}") \
        .show(truncate=False)

                                                                                

+--------------------------------------------------------------------------+-------------------+------------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+
|_hoodie_file_name                                                         |_hoodie_commit_time|trip_id     |taxi_type|vendor_id|pickup_datetime    |dropoff_datetime   |pickup_location_id|dropoff_location_id|trip_date |
+--------------------------------------------------------------------------+-------------------+------------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+
|f5c1f85b-c2f9-45c3-9602-3b13bd1e4617-0_0-29-2907_20230802063019539.parquet|20230802063019539  |309237772005|yellow   |2        |2020-01-30 13:11:51|2020-01-30 13:36:48|262               |107                |2020-01-30|
+--------------------------------------------------------------------------+-------------------+------------+---------+-

### 1.5. Querying Hudi MoR tables with DataSource API

There are two query options-<br>
**1. Snapshot query (default):** Queries see the latest snapshot of the merge on read table - it exposes near-real time data by merging the base and delta files of the latest file slice on-the-fly.<br>
**2. Read optimized query:** Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in latest file slices and guarantees the same columnar query performance compared to a non-hudi columnar table.

#### 1.5.1. Snapshot or real time query (merges base columnar with row based delta log files) 
Queries see the latest snapshot of the merge on read table - it exposes near-real time data(few mins) by merging the base and delta files of the latest file slice on-the-fly.<br>
Slower, but returns freshest data

In [29]:
from pyspark.sql.functions import col

spark.read \
    .format('hudi') \
    .option('hoodie.datasource.query.type','snapshot') \
    .load(f"{HUDI_MOR_BASE_GCS_URI}/trip_date=2020-01-30/") \
    .filter(col("trip_id")==NEW_TRIP_ID) \
    .select("trip_id","taxi_type","vendor_id","pickup_datetime","dropoff_datetime","pickup_location_id","dropoff_location_id","trip_date") \
    .show(truncate=False)


                                                                                

+------------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+
|trip_id     |taxi_type|vendor_id|pickup_datetime    |dropoff_datetime   |pickup_location_id|dropoff_location_id|trip_date |
+------------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+
|309237772005|yellow   |2        |2020-01-30 13:11:51|2020-01-30 13:36:48|262               |107                |2020-01-30|
+------------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+



In [34]:
spark.read \
    .format('hudi') \
    .option('hoodie.datasource.query.type','snapshot') \
    .load(f"{HUDI_MOR_BASE_GCS_URI}/trip_date=2020-01-30/") \
    .count()


                                                                                

257928

#### 1.5.2. Read optimized query 
Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in latest file slices and guarantees the same columnar query performance compared to a non-hudi columnar table.<br>
Faster with some staleness tradeoff.

In [35]:
from pyspark.sql.functions import col

spark.read \
    .format('hudi') \
    .option('hoodie.datasource.query.type','read_optimized') \
    .load(f"{HUDI_MOR_BASE_GCS_URI}/trip_date=2020-01-30/") \
    .filter(col("trip_id")==NEW_TRIP_ID) \
    .select("trip_id","taxi_type","vendor_id","pickup_datetime","dropoff_datetime","pickup_location_id","dropoff_location_id","trip_date") \
    .show(truncate=False)

                                                                                

+------------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+
|trip_id     |taxi_type|vendor_id|pickup_datetime    |dropoff_datetime   |pickup_location_id|dropoff_location_id|trip_date |
+------------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+
|309237772005|yellow   |2        |2020-01-30 13:11:51|2020-01-30 13:36:48|262               |107                |2020-01-30|
+------------+---------+---------+-------------------+-------------------+------------------+-------------------+----------+



In [36]:
spark.read \
    .format('hudi') \
    .option('hoodie.datasource.query.type','read_optimized') \
    .load(f"{HUDI_MOR_BASE_GCS_URI}/trip_date=2020-01-30/") \
    .count()

                                                                                

257928

### 1.6.  Querying Hudi MoR tables - with Spark SQL against the Hive Metastore Service

#### 1.6.1. Tables in the HMS database
Make note of the three mor tables - the base _mor was one we explicitly created, the other two _mor_ro and _mor_rt were auto-created upon first commit (our insert) to the table

In [31]:
spark.sql("SHOW tables in taxi_db;").show(truncate=False)

+---------+--------------------------+-----------+
|namespace|tableName                 |isTemporary|
+---------+--------------------------+-----------+
|taxi_db  |nyc_taxi_trips_hudi_cow   |false      |
|taxi_db  |nyc_taxi_trips_hudi_mor_ro|false      |
|taxi_db  |nyc_taxi_trips_hudi_mor_rt|false      |
+---------+--------------------------+-----------+



#### 1.6.2. Querying the MOR_RO table

In [32]:
# Run a count to ensure that the record count increased afainst the MOR_RO table
AFTER_INSERT_TRIP_COUNT_RO=spark.sql(f"SELECT COUNT(*) as trip_count FROM taxi_db.nyc_taxi_trips_hudi_mor_ro WHERE trip_date='2020-01-30'").collect()[0][0]
print(f"MOR_RO: Trip count before insert into RO table was: {TRIP_COUNT_BEFORE_INSERT_RT} and latest trip count is {AFTER_INSERT_TRIP_COUNT_RO}")

MOR_RO: Trip count before insert into RO table was: 257927 and latest trip count is 257928


In [33]:
# Run a count to ensure that the record count increased afainst the MOR_RT table
AFTER_INSERT_TRIP_COUNT_RT=spark.sql(f"SELECT COUNT(*) as trip_count FROM taxi_db.nyc_taxi_trips_hudi_mor_rt WHERE trip_date='2020-01-30'").collect()[0][0]
print(f"MOR_RT: Trip count before insert into RT table was: {TRIP_COUNT_BEFORE_INSERT_RT} and latest trip count in RT table is {AFTER_INSERT_TRIP_COUNT_RT}")

MOR_RT: Latest trip count in RT table is 257928


Once again, the record is in file listed under _hoodie_file_name. 

This concludes the lab unit, please proceed to the next notebook.