# Configurations: Spark-Hudi-S3

In [1]:
from typing import *

from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Hudi Table") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.2.4,com.amazonaws:aws-java-sdk:1.12.262") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

print("Spark Running")

s3_path = "s3a://my-bucket/dip/"

# Access SparkContext
sc = spark.sparkContext

:: loading settings :: url = jar:file:/Users/dipankarmazumdar/Documents/spark-3.4.1-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/dipankarmazumdar/.ivy2/cache
The jars for the packages stored in: /Users/dipankarmazumdar/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d91d5fc5-a08c-436f-8bf8-20c018d7273c;1.0
	confs: [default]
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 in central
	found org.apache.hadoop#hadoop-aws;3.2.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found com.amazonaws#aws-java-sdk;1.12.262 in central
	found com.amazonaws#aws-java-sdk-iamrolesanywhere;1.12.262 in central
	found com.amazonaws#aws-java-sdk-core;1.12.262 in central
	found commons-logging#commons-logging;1.1.3 in local-m2-cache
	found commons-codec#commons-codec;1.15 in local-m2-cache
	found org.apache.httpcomponents#httpclient;4.5.13 in local-m2-cache
	found org.apache.http

Spark Running


# CREATE HUDI TABLE: Spark SQL

In [13]:
spark.sql(
    """CREATE TABLE hudi_table 
        (ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING)
       USING HUDI 
       PARTITIONED BY (city)
       LOCATION 's3a://my-bucket/dip/'"""
);

# INSERT Data: Spark SQL

In [15]:
spark.sql(
    """INSERT INTO hudi_table VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'    ),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'    ),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'      ),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai')"""
);

24/01/25 12:40:29 WARN AutoRecordKeyGenerationUtils$: Precombine field  will be ignored with auto record key generation enabled
24/01/25 12:40:48 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
[Stage 32:>                                                         (0 + 1) / 1]



                                                                                

# Query Data: Spark SQL

In [18]:
spark.sql("SELECT * FROM hudi_table").toPandas()

                                                                                

Unnamed: 0,_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,ts,uuid,rider,driver,fare,city
0,20240125124028836,20240125124028836_0_0,20240125124028836_0_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-25-83...,1695160000000.0,334e26e9-8355-45cc-97c6-c31daf0df330,rider-A,driver-K,19.1,san_francisco
1,20240125124028836,20240125124028836_0_1,20240125124028836_1_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-25-83...,1695092000000.0,e96c4396-3fad-413a-a942-4cb36106d721,rider-C,driver-M,27.7,san_francisco
2,20240125124028836,20240125124028836_0_2,20240125124028836_2_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-25-83...,1695046000000.0,9909a8b1-2d15-4d3d-8ec9-efc48c536a00,rider-D,driver-L,33.9,san_francisco
3,20240125124028836,20240125124028836_0_3,20240125124028836_3_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-25-83...,1695332000000.0,1dced545-862b-4ceb-8b43-d2a568f6616b,rider-E,driver-O,93.5,san_francisco
4,20240125124028836,20240125124028836_1_0,20240125124028836_4_0,city=sao_paulo,3f87deb7-6caa-40a8-ae92-a51c0e19fe02-0_1-25-84...,1695516000000.0,e3cf430c-889d-4015-bc98-59bdce1e530c,rider-F,driver-P,34.15,sao_paulo
5,20240125124028836,20240125124028836_1_1,20240125124028836_5_0,city=sao_paulo,3f87deb7-6caa-40a8-ae92-a51c0e19fe02-0_1-25-84...,1695376000000.0,7a84095f-737f-40bc-b62f-6b69664712d2,rider-G,driver-Q,43.4,sao_paulo
6,20240125124028836,20240125124028836_2_0,20240125124028836_6_0,city=chennai,b91d1cc8-c6ba-4ccd-b1ce-92e256ff4d40-0_2-25-85...,1695174000000.0,3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04,rider-I,driver-S,41.06,chennai
7,20240125124028836,20240125124028836_2_1,20240125124028836_7_0,city=chennai,b91d1cc8-c6ba-4ccd-b1ce-92e256ff4d40-0_2-25-85...,1695116000000.0,c8abbe79-8d89-47ea-b4ce-4d224bae5bfa,rider-J,driver-T,17.85,chennai
8,20240122124049429,20240122124049429_11_6,20240122124049429_11_0,city=DFW,afaf022e-3029-4862-8057-348868920b32-0_11-19-0...,,,,,,people/city=DFW
9,20240122124049429,20240122124049429_5_1,20240122124049429_5_0,city=ORD,ed86d420-d8a3-48cd-b591-09f204ac0362-0_5-13-0_...,,,,,,people/city=ORD


# Update Data: Spark SQL

In [19]:
spark.sql("UPDATE hudi_table SET fare = 25.0 WHERE rider = 'rider-D'");

24/01/25 12:43:16 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/01/25 12:43:17 WARN AutoRecordKeyGenerationUtils$: Precombine field  will be ignored with auto record key generation enabled
                                                                                

In [20]:
spark.sql("SELECT * FROM hudi_table where rider = 'rider-D'").toPandas()

                                                                                

Unnamed: 0,_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,ts,uuid,rider,driver,fare,city
0,20240125124317346,20240125124317346_0_2,20240125124028836_2_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-48-16...,1695046462179,9909a8b1-2d15-4d3d-8ec9-efc48c536a00,rider-D,driver-L,25.0,san_francisco


# Upsert Data: Spark SQL

In [21]:
# Create a source table `fare_adjustment` & insert some records. This will then be used to upsert records into
# the target Hudi table `hudi_table`
spark.sql(
    """CREATE TABLE fare_adjustment
        (ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING) 
        USING HUDI 
        PARTITIONED BY (city)
        LOCATION 's3a://my-bucket/dip/fare'"""
);

spark.sql(
    """INSERT INTO fare_adjustment VALUES 
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',-2.70 ,'san_francisco'),
(1695530237068,'3f3d9565-7261-40e6-9b39-b8aa784f95e2','rider-K','driver-U',64.20 ,'san_francisco'),
(1695241330902,'ea4c36ff-2069-4148-9927-ef8c1a5abd24','rider-H','driver-R',66.60 ,'sao_paulo'),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',1.85,'chennai')"""
);

In [23]:
# MERGE INTO operation using Spark SQL

spark.sql(
"""
    MERGE INTO hudi_table AS target
    USING fare_adjustment AS source
    ON target.uuid = source.uuid
    WHEN MATCHED THEN UPDATE SET target.fare = target.fare + source.fare
    WHEN NOT MATCHED THEN INSERT *
""");

24/01/25 13:04:57 WARN MergeIntoHoodieTableCommand: Updates without precombine can have nondeterministic behavior
24/01/25 13:04:59 WARN AutoRecordKeyGenerationUtils$: Precombine field  will be ignored with auto record key generation enabled
                                                                                

In [24]:
spark.sql("SELECT * FROM hudi_table").toPandas()

                                                                                

Unnamed: 0,_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,ts,uuid,rider,driver,fare,city
0,20240125124028836,20240125124028836_0_0,20240125124028836_0_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-96-27...,1695160000000.0,334e26e9-8355-45cc-97c6-c31daf0df330,rider-A,driver-K,19.1,san_francisco
1,20240125130458980,20240125130458980_0_1,20240125124028836_1_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-96-27...,1695092000000.0,e96c4396-3fad-413a-a942-4cb36106d721,rider-C,driver-M,25.0,san_francisco
2,20240125124317346,20240125124317346_0_2,20240125124028836_2_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-96-27...,1695046000000.0,9909a8b1-2d15-4d3d-8ec9-efc48c536a00,rider-D,driver-L,25.0,san_francisco
3,20240125124028836,20240125124028836_0_3,20240125124028836_3_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-96-27...,1695332000000.0,1dced545-862b-4ceb-8b43-d2a568f6616b,rider-E,driver-O,93.5,san_francisco
4,20240125130458980,20240125130458980_0_4,20240125130458980_0_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-96-27...,1695530000000.0,3f3d9565-7261-40e6-9b39-b8aa784f95e2,rider-K,driver-U,64.2,san_francisco
5,20240125124028836,20240125124028836_1_0,20240125124028836_4_0,city=sao_paulo,3f87deb7-6caa-40a8-ae92-a51c0e19fe02-0_2-96-27...,1695516000000.0,e3cf430c-889d-4015-bc98-59bdce1e530c,rider-F,driver-P,34.15,sao_paulo
6,20240125124028836,20240125124028836_1_1,20240125124028836_5_0,city=sao_paulo,3f87deb7-6caa-40a8-ae92-a51c0e19fe02-0_2-96-27...,1695376000000.0,7a84095f-737f-40bc-b62f-6b69664712d2,rider-G,driver-Q,43.4,sao_paulo
7,20240125130458980,20240125130458980_2_2,20240125130458980_1_0,city=sao_paulo,3f87deb7-6caa-40a8-ae92-a51c0e19fe02-0_2-96-27...,1695241000000.0,ea4c36ff-2069-4148-9927-ef8c1a5abd24,rider-H,driver-R,66.6,sao_paulo
8,20240125124028836,20240125124028836_2_0,20240125124028836_6_0,city=chennai,b91d1cc8-c6ba-4ccd-b1ce-92e256ff4d40-0_1-96-27...,1695174000000.0,3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04,rider-I,driver-S,41.06,chennai
9,20240125130458980,20240125130458980_1_1,20240125124028836_7_0,city=chennai,b91d1cc8-c6ba-4ccd-b1ce-92e256ff4d40-0_1-96-27...,1695116000000.0,c8abbe79-8d89-47ea-b4ce-4d224bae5bfa,rider-J,driver-T,19.7,chennai


# DELETE Records: Spark SQL

In [25]:
spark.sql("DELETE FROM hudi_table WHERE uuid = '3f3d9565-7261-40e6-9b39-b8aa784f95e2'");

24/01/25 13:10:05 WARN AutoRecordKeyGenerationUtils$: Precombine field ts will be ignored with auto record key generation enabled
                                                                                

# Time Travel Query: Spark SQL
### query the table as of a point-in-time in history

In [30]:
spark.sql("SELECT * FROM hudi_table TIMESTAMP AS OF '2024-01-24 12:43:00'").toPandas()

                                                                                

Unnamed: 0,_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,ts,uuid,rider,driver,fare,city
0,20240122124049429,20240122124049429_11_6,20240122124049429_11_0,city=DFW,afaf022e-3029-4862-8057-348868920b32-0_11-19-0...,,,,,,people/city=DFW
1,20240122124049429,20240122124049429_5_1,20240122124049429_5_0,city=ORD,ed86d420-d8a3-48cd-b591-09f204ac0362-0_5-13-0_...,,,,,,people/city=ORD
2,20240122124049429,20240122124049429_7_5,20240122124049429_7_0,city=NYC,87e551de-c5cb-4a12-b60e-7d669eb8fb32-0_7-15-0_...,,,,,,people/city=NYC
3,20240122124049429,20240122124049429_3_3,20240122124049429_3_0,city=SFO,ef67a0f6-8646-4659-a61c-c078ba55e119-0_3-11-0_...,,,,,,people/city=SFO
4,20240122124049429,20240122124049429_1_2,20240122124049429_1_0,city=NYC,caf29b67-ebf1-499e-931f-5338a3eea72c-0_1-9-0_2...,,,,,,people/city=NYC
5,20240122124049429,20240122124049429_9_4,20240122124049429_9_0,city=SEA,6f7f053b-f719-4cc5-b160-5698fcba830b-0_9-17-0_...,,,,,,people/city=SEA


# Incremental Query: Spark SQL
### lets you obtain a set of records that changed between a start and end commit time, providing you with the "latest state" for each such record as of the end commit time

In [31]:
spark.sql("SELECT * FROM hudi_table_changes('hudi_table', 'latest_state', 'earliest')").toPandas()

                                                                                

Unnamed: 0,_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,ts,uuid,rider,driver,fare,city
0,20240125124028836,20240125124028836_0_0,20240125124028836_0_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-116-3...,1695159649087,334e26e9-8355-45cc-97c6-c31daf0df330,rider-A,driver-K,19.1,san_francisco
1,20240125130458980,20240125130458980_0_1,20240125124028836_1_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-116-3...,1695091554788,e96c4396-3fad-413a-a942-4cb36106d721,rider-C,driver-M,25.0,san_francisco
2,20240125124317346,20240125124317346_0_2,20240125124028836_2_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-116-3...,1695046462179,9909a8b1-2d15-4d3d-8ec9-efc48c536a00,rider-D,driver-L,25.0,san_francisco
3,20240125124028836,20240125124028836_0_3,20240125124028836_3_0,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-116-3...,1695332066204,1dced545-862b-4ceb-8b43-d2a568f6616b,rider-E,driver-O,93.5,san_francisco
4,20240125124028836,20240125124028836_1_0,20240125124028836_4_0,city=sao_paulo,3f87deb7-6caa-40a8-ae92-a51c0e19fe02-0_2-96-27...,1695516137016,e3cf430c-889d-4015-bc98-59bdce1e530c,rider-F,driver-P,34.15,sao_paulo
5,20240125124028836,20240125124028836_1_1,20240125124028836_5_0,city=sao_paulo,3f87deb7-6caa-40a8-ae92-a51c0e19fe02-0_2-96-27...,1695376420876,7a84095f-737f-40bc-b62f-6b69664712d2,rider-G,driver-Q,43.4,sao_paulo
6,20240125130458980,20240125130458980_2_2,20240125130458980_1_0,city=sao_paulo,3f87deb7-6caa-40a8-ae92-a51c0e19fe02-0_2-96-27...,1695241330902,ea4c36ff-2069-4148-9927-ef8c1a5abd24,rider-H,driver-R,66.6,sao_paulo
7,20240125124028836,20240125124028836_2_0,20240125124028836_6_0,city=chennai,b91d1cc8-c6ba-4ccd-b1ce-92e256ff4d40-0_1-96-27...,1695173887231,3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04,rider-I,driver-S,41.06,chennai
8,20240125130458980,20240125130458980_1_1,20240125124028836_7_0,city=chennai,b91d1cc8-c6ba-4ccd-b1ce-92e256ff4d40-0_1-96-27...,1695115999911,c8abbe79-8d89-47ea-b4ce-4d224bae5bfa,rider-J,driver-T,19.7,chennai


# Check Table Properties

In [41]:
spark.sql("call show_table_properties(table => 'hudi_table')").toPandas()

Unnamed: 0,key,value
0,hoodie.datasource.write.drop.partition.columns,false
1,hoodie.table.partition.fields,city
2,hoodie.table.type,COPY_ON_WRITE
3,hoodie.archivelog.folder,archived
4,hoodie.timeline.layout.version,1
5,hoodie.table.version,6
6,hoodie.table.metadata.partitions,files
7,hoodie.datasource.write.partitionpath.urlencode,false
8,hoodie.database.name,default
9,hoodie.table.name,hudi_table


# Check files in a Hudi table by partition

In [47]:
spark.sql("call show_metadata_table_files(table => 'hudi_table', partition => 'city=san_francisco')").toPandas()

Unnamed: 0,file_path
0,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-116-3...
1,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-25-83...
2,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-48-16...
3,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0_0-96-27...


# Check Commits information on the table

In [42]:
spark.sql("call show_commits(table => 'hudi_table')").toPandas()

Unnamed: 0,commit_time,state_transition_time,action,total_bytes_written,total_files_added,total_files_updated,total_partitions_written,total_records_written,total_update_records_written,total_errors
0,20240125131004830,20240125131038000,commit,436280,0,1,1,4,0,0
1,20240125130458980,20240125130531000,commit,1308427,0,3,3,10,2,0
2,20240125124317346,20240125124348000,commit,436206,0,1,1,4,1,0
3,20240125124028836,20240125124128000,commit,1308315,3,0,3,8,0,0


# Check Commit files for specific timestamp

In [45]:
spark.sql("call show_commit_files(table => 'hudi_table', instant_time => '20240125124317346')").toPandas()

Unnamed: 0,action,partition_path,file_id,previous_commit,total_records_updated,total_records_written,total_bytes_written,total_errors,file_size
0,commit,city=san_francisco,00a54be5-0248-4cb6-8a58-f9fa5e408fe8-0,20240125124028836,1,4,436206,0,436206
