In [1]:
%%configure
{ "conf": {
            "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
            "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
            "spark.sql.hive.convertMetastoreParquet":"false"
          }}

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1628248144302_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
tableName = "hudi_cow_table_mobikwik"
tablePath = "s3://khokharn-hudi/MobiKwik/" + tableName

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1628248144302_0005,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
hudiWriteConfig = {
    'className' : 'org.apache.hudi',
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.precombine.field': 'date',
    'hoodie.datasource.write.recordkey.field': 'name',
    'hoodie.datasource.write.partitionpath.field': 'name:SIMPLE,year:SIMPLE,month:SIMPLE,day:SIMPLE',
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator',
    'hoodie.deltastreamer.keygen.timebased.timestamp.type': 'MIXED',
    'hoodie.deltastreamer.keygen.timebased.input.dateformat': 'yyyy-mm-dd',
    'hoodie.deltastreamer.keygen.timebased.output.dateformat':'yyyy/MM/dd'
}
     
hudiGlueConfig = {
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.database': 'default',
    'hoodie.datasource.hive_sync.table': tableName,
    'hoodie.datasource.write.hive_style_partitioning' : 'true',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.hive_sync.partition_fields': 'name,year,month,day'
}

#'hoodie.datasource.hive_sync.jdbcurl': 'jdbc:hive2://localhost:10000',
#'hoodie.datasource.write.partitionpath.field': 'name:SIMPLE,dt:TIMESTAMP',    

combinedConf = {
    **hudiWriteConfig, 
    **hudiGlueConfig
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
# use for first run
simpleData = [
    ("Person1","2021-07-22","1234","White"),
    ("Person2","2021-07-22","1234","White"),
    ("Person3","2021-07-22","1234","White"),
    ("Person4","2021-07-22","1234","White")
]

columns = ["name","date","col_to_update_integer","col_to_update_string"]
df = spark.createDataFrame(data = simpleData, schema = columns)


df1 = df.select("name","date","col_to_update_integer","col_to_update_string", year(df["date"]).alias('year'), month(df["date"]).alias('month'), dayofmonth(df["date"]).alias('day'))
df1.printSchema()
df1.show(truncate=False)


df1.write.format("hudi") \
.options(**combinedConf) \
.mode("append") \
.save(tablePath)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- col_to_update_integer: string (nullable = true)
 |-- col_to_update_string: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)

+-------+----------+---------------------+--------------------+----+-----+---+
|name   |date      |col_to_update_integer|col_to_update_string|year|month|day|
+-------+----------+---------------------+--------------------+----+-----+---+
|Person1|2021-07-22|1234                 |White               |2021|7    |22 |
|Person2|2021-07-22|1234                 |White               |2021|7    |22 |
|Person3|2021-07-22|1234                 |White               |2021|7    |22 |
|Person4|2021-07-22|1234                 |White               |2021|7    |22 |
+-------+----------+---------------------+--------------------+----+-----+---+

In [29]:
## Check via PySpark
hudiDF = spark.read \
.format("hudi") \
.load(tablePath).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+-------------------------------------+-------------------------------------------------------------------------+-------+----------+---------------------+--------------------+----+-----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path               |_hoodie_file_name                                                        |name   |date      |col_to_update_integer|col_to_update_string|year|month|day|
+-------------------+--------------------+------------------+-------------------------------------+-------------------------------------------------------------------------+-------+----------+---------------------+--------------------+----+-----+---+
|20210808155456     |20210808155456_2_3  |Person2           |name=Person2/year=2021/month=7/day=22|c2f7bac6-24ab-4661-8791-4e299781dd92-0_2-228-72348_20210808155456.parquet|Person2|2021-07-22|1234                 |White               |2021|7    |2

In [30]:
# use for second run for in-place update
simpleData = [
    ("Person1","2021-07-22","4567","Yellow"),
    ("Person2","2021-07-22","4567","Yellow"),
    ("Person3","2021-07-22","4567","Yellow"),
    ("Person4","2021-07-22","4567","Yellow")
]

columns = ["name","date","col_to_update_integer","col_to_update_string"]
df = spark.createDataFrame(data = simpleData, schema = columns)


df1 = df.select("name","date","col_to_update_integer","col_to_update_string", year(df["date"]).alias('year'), month(df["date"]).alias('month'), dayofmonth(df["date"]).alias('day'))
df1.printSchema()
df1.show(truncate=False)


df1.write.format("hudi") \
.options(**combinedConf) \
.mode("append") \
.save(tablePath)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- col_to_update_integer: string (nullable = true)
 |-- col_to_update_string: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)

+-------+----------+---------------------+--------------------+----+-----+---+
|name   |date      |col_to_update_integer|col_to_update_string|year|month|day|
+-------+----------+---------------------+--------------------+----+-----+---+
|Person1|2021-07-22|4567                 |Yellow              |2021|7    |22 |
|Person2|2021-07-22|4567                 |Yellow              |2021|7    |22 |
|Person3|2021-07-22|4567                 |Yellow              |2021|7    |22 |
|Person4|2021-07-22|4567                 |Yellow              |2021|7    |22 |
+-------+----------+---------------------+--------------------+----+-----+---+

In [31]:
## Check via PySpark
hudiDF = spark.read \
.format("hudi") \
.load(tablePath).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+-------------------------------------+-------------------------------------------------------------------------+-------+----------+---------------------+--------------------+----+-----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path               |_hoodie_file_name                                                        |name   |date      |col_to_update_integer|col_to_update_string|year|month|day|
+-------------------+--------------------+------------------+-------------------------------------+-------------------------------------------------------------------------+-------+----------+---------------------+--------------------+----+-----+---+
|20210808155922     |20210808155922_0_5  |Person3           |name=Person3/year=2021/month=7/day=22|f265b2d9-0b54-4c0d-bdf8-25326d6454e9-0_0-265-85904_20210808155922.parquet|Person3|2021-07-22|4567                 |Yellow              |2021|7    |2

In [33]:
# use for forward schema evolution
# new columns are accept all good
simpleData = [
    ("Person1","2021-07-22","8910","Silver","abc"),
    ("Person2","2021-07-22","8910","Silver","abc"),
    ("Person3","2021-07-22","8910","Silver","abc"),
    ("Person4","2021-07-22","8910","Silver","abc")
]

columns = ["name","date","col_to_update_integer","col_to_update_string","new_col"]
df = spark.createDataFrame(data = simpleData, schema = columns)


df1 = df.select("name","date","col_to_update_integer","col_to_update_string","new_col", year(df["date"]).alias('year'), month(df["date"]).alias('month'), dayofmonth(df["date"]).alias('day'))
df1.printSchema()
df1.show(truncate=False)


df1.write.format("hudi") \
.options(**combinedConf) \
.mode("append") \
.save(tablePath)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- col_to_update_integer: string (nullable = true)
 |-- col_to_update_string: string (nullable = true)
 |-- new_col: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)

+-------+----------+---------------------+--------------------+-------+----+-----+---+
|name   |date      |col_to_update_integer|col_to_update_string|new_col|year|month|day|
+-------+----------+---------------------+--------------------+-------+----+-----+---+
|Person1|2021-07-22|8910                 |Silver              |abc    |2021|7    |22 |
|Person2|2021-07-22|8910                 |Silver              |abc    |2021|7    |22 |
|Person3|2021-07-22|8910                 |Silver              |abc    |2021|7    |22 |
|Person4|2021-07-22|8910                 |Silver              |abc    |2021|7    |22 |
+-------+----------+---------------------+----------------

In [34]:
## Check via PySpark
hudiDF = spark.read \
.format("hudi") \
.load(tablePath).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+-------------------------------------+--------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path               |_hoodie_file_name                                                         |name   |date      |col_to_update_integer|col_to_update_string|new_col|year|month|day|
+-------------------+--------------------+------------------+-------------------------------------+--------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|20210808160422     |20210808160422_0_10 |Person3           |name=Person3/year=2021/month=7/day=22|f265b2d9-0b54-4c0d-bdf8-25326d6454e9-0_0-336-113015_20210808160422.parquet|Person3|2021-07-22|8910                 |Silve

In [35]:
# use for backward schema evolution 
# fails if newly added column is no longer found 
# Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'new_col' not found
simpleData = [
    ("Person1","2021-07-22","11121314","Purple"),
    ("Person2","2021-07-22","11121314","Purple"),
    ("Person3","2021-07-22","11121314","Purple"),
    ("Person4","2021-07-22","11121314","Purple")
]

columns = ["name","date","col_to_update_integer","col_to_update_string"]
df = spark.createDataFrame(data = simpleData, schema = columns)


df1 = df.select("name","date","col_to_update_integer","col_to_update_string", year(df["date"]).alias('year'), month(df["date"]).alias('month'), dayofmonth(df["date"]).alias('day'))
df1.printSchema()
df1.show(truncate=False)


df1.write.format("hudi") \
.options(**combinedConf) \
.mode("append") \
.save(tablePath)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
An error occurred while calling o843.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 373.0 failed 4 times, most recent failure: Lost task 0.3 in stage 373.0 (TID 126585) (ip-172-31-58-135.ap-south-1.compute.internal executor 10): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:279)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:135)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitio

In [109]:
# ensure the incomong record has the correct current schema, new fresh columns are fine, if a column exists in current schema but not in incoming record then manually add before inserting
def evolveSchema(df,table,forcecast=False):
    try:
        #get existing table's schema
        print("\nexisting schema in hive is :")
        original_df = spark.sql("SELECT * FROM "+table+" LIMIT 0")
        original_df.printSchema()
        
        #sanitize for hudi specific system columns
        print("\nexisting schema in hive (sanitized for hudi columns) is :")
        columns_to_drop = ['_hoodie_commit_time', '_hoodie_commit_seqno','_hoodie_record_key','_hoodie_partition_path','_hoodie_file_name']
        odf = original_df.drop(*columns_to_drop)
        odf.printSchema()
        
        if (df.schema != odf.schema):
            merged_df = df.unionByName(odf, allowMissingColumns=True)

        return (merged_df)
    except Exception as e:
        print (e)
        return (df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [111]:
# use for backward schema evolution 
# manually add default values for columns which exist in schema but not in new records

simpleData = [
    ("Person1","2021-07-22","11121314","Purple"),
    ("Person2","2021-07-22","11121314","Purple"),
    ("Person3","2021-07-22","11121314","Purple"),
    ("Person4","2021-07-22","11121314","Purple")
]

columns = ["name","date","col_to_update_integer","col_to_update_string"]
df = spark.createDataFrame(data = simpleData, schema = columns)


df1 = df.select("name","date","col_to_update_integer","col_to_update_string", year(df["date"]).alias('year'), month(df["date"]).alias('month'), dayofmonth(df["date"]).alias('day'))
df1.printSchema()
df1.show(truncate=False)

df2 = evolveSchema(df1,tableName,False)

df2.printSchema()
df2.show(truncate=False)

df2.write.format("hudi") \
.options(**combinedConf) \
.mode("append") \
.save(tablePath)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- col_to_update_integer: string (nullable = true)
 |-- col_to_update_string: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)

+-------+----------+---------------------+--------------------+----+-----+---+
|name   |date      |col_to_update_integer|col_to_update_string|year|month|day|
+-------+----------+---------------------+--------------------+----+-----+---+
|Person1|2021-07-22|11121314             |Purple              |2021|7    |22 |
|Person2|2021-07-22|11121314             |Purple              |2021|7    |22 |
|Person3|2021-07-22|11121314             |Purple              |2021|7    |22 |
|Person4|2021-07-22|11121314             |Purple              |2021|7    |22 |
+-------+----------+---------------------+--------------------+----+-----+---+


existing schema in hive is :
root
 |-- _hoodie_commit_time: string (null

In [113]:
## Check via PySpark
hudiDF = spark.read \
.format("hudi") \
.load(tablePath).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+-------------------------------------+--------------------------------------------------------------------------+-------+----------+---------------------+--------------------+----+-----+---+-------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path               |_hoodie_file_name                                                         |name   |date      |col_to_update_integer|col_to_update_string|year|month|day|new_col|
+-------------------+--------------------+------------------+-------------------------------------+--------------------------------------------------------------------------+-------+----------+---------------------+--------------------+----+-----+---+-------+
|20210808171322     |20210808171322_3_2  |Person4           |name=Person4/year=2021/month=7/day=22|a1544097-1394-4214-8de5-43aa8f27f273-0_3-510-167394_20210808171322.parquet|Person4|2021-07-22|11121314             |Purpl

In [114]:
# use for backward schema evolution 
# manually add default values for columns which exist in schema but not in new records

simpleData = [
    ("Person1","2021-07-22","15161718","Orange","again"),
    ("Person2","2021-07-22","15161718","Orange","again"),
    ("Person3","2021-07-22","15161718","Orange","again"),
    ("Person4","2021-07-22","15161718","Orange","again")
]

columns = ["name","date","col_to_update_integer","col_to_update_string","new_col"]
df = spark.createDataFrame(data = simpleData, schema = columns)


df1 = df.select("name","date","col_to_update_integer","col_to_update_string", "new_col", year(df["date"]).alias('year'), month(df["date"]).alias('month'), dayofmonth(df["date"]).alias('day'))
df1.printSchema()
df1.show(truncate=False)

#df2 = evolveSchema(df1,tableName,False)
#df2.printSchema()
#df2.show(truncate=False)

df1.write.format("hudi") \
.options(**combinedConf) \
.mode("append") \
.save(tablePath)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- col_to_update_integer: string (nullable = true)
 |-- col_to_update_string: string (nullable = true)
 |-- new_col: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)

+-------+----------+---------------------+--------------------+-------+----+-----+---+
|name   |date      |col_to_update_integer|col_to_update_string|new_col|year|month|day|
+-------+----------+---------------------+--------------------+-------+----+-----+---+
|Person1|2021-07-22|15161718             |Orange              |again  |2021|7    |22 |
|Person2|2021-07-22|15161718             |Orange              |again  |2021|7    |22 |
|Person3|2021-07-22|15161718             |Orange              |again  |2021|7    |22 |
|Person4|2021-07-22|15161718             |Orange              |again  |2021|7    |22 |
+-------+----------+---------------------+----------------

In [115]:
## Check via PySpark
hudiDF = spark.read \
.format("hudi") \
.load(tablePath).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+-------------------------------------+--------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path               |_hoodie_file_name                                                         |name   |date      |col_to_update_integer|col_to_update_string|new_col|year|month|day|
+-------------------+--------------------+------------------+-------------------------------------+--------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|20210808171924     |20210808171924_0_12 |Person3           |name=Person3/year=2021/month=7/day=22|f265b2d9-0b54-4c0d-bdf8-25326d6454e9-0_0-581-194531_20210808171924.parquet|Person3|2021-07-22|15161718             |Orang

In [3]:
#TimeTravel
# available commit timestamps in .hoodie folder of S3 :
# 20210808155456	
# 20210808155922
# 20210808160202
# 20210808160422
# 20210808171322
# 20210808171924

starttime = "0000"
endtime = "20210808155456"

hudiDF = spark.read \
.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", starttime) \
.option("hoodie.datasource.read.end.instanttime", endtime) \
.load(tablePath).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+-------------------------------------+-------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path               |_hoodie_file_name                                                        |name   |date      |col_to_update_integer|col_to_update_string|new_col|year|month|day|
+-------------------+--------------------+------------------+-------------------------------------+-------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|20210808155456     |20210808155456_2_3  |Person2           |name=Person2/year=2021/month=7/day=22|c2f7bac6-24ab-4661-8791-4e299781dd92-0_2-228-72348_20210808155456.parquet|Person2|2021-07-22|1234                 |White    

In [4]:
starttime = "20210808155456"
endtime = "20210808155922"

hudiDF = spark.read \
.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", starttime) \
.option("hoodie.datasource.read.end.instanttime", endtime) \
.load(tablePath).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+-------------------------------------+-------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path               |_hoodie_file_name                                                        |name   |date      |col_to_update_integer|col_to_update_string|new_col|year|month|day|
+-------------------+--------------------+------------------+-------------------------------------+-------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|20210808155922     |20210808155922_2_6  |Person2           |name=Person2/year=2021/month=7/day=22|c2f7bac6-24ab-4661-8791-4e299781dd92-0_2-265-85906_20210808155922.parquet|Person2|2021-07-22|4567                 |Yellow   

In [5]:
starttime = "20210808155922"
endtime = "20210808160202"

hudiDF = spark.read \
.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", starttime) \
.option("hoodie.datasource.read.end.instanttime", endtime) \
.load(tablePath).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+-------------------------------------+-------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path               |_hoodie_file_name                                                        |name   |date      |col_to_update_integer|col_to_update_string|new_col|year|month|day|
+-------------------+--------------------+------------------+-------------------------------------+-------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|20210808160202     |20210808160202_1_8  |Person1           |name=Person1/year=2021/month=7/day=22|7e28e1f2-9cee-4b6c-ada0-3bf587f8aa7c-0_1-302-99463_20210808160202.parquet|Person1|2021-07-22|8910                 |Silver   

In [6]:
starttime = "20210808160202"
endtime = "20210808160422"

hudiDF = spark.read \
.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", starttime) \
.option("hoodie.datasource.read.end.instanttime", endtime) \
.load(tablePath).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+-------------------------------------+--------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path               |_hoodie_file_name                                                         |name   |date      |col_to_update_integer|col_to_update_string|new_col|year|month|day|
+-------------------+--------------------+------------------+-------------------------------------+--------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|20210808160422     |20210808160422_0_10 |Person3           |name=Person3/year=2021/month=7/day=22|f265b2d9-0b54-4c0d-bdf8-25326d6454e9-0_0-336-113015_20210808160422.parquet|Person3|2021-07-22|8910                 |Silve

In [7]:
starttime = "20210808160422"
endtime = "20210808171322"

hudiDF = spark.read \
.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", starttime) \
.option("hoodie.datasource.read.end.instanttime", endtime) \
.load(tablePath).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+-------------------------------------+--------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path               |_hoodie_file_name                                                         |name   |date      |col_to_update_integer|col_to_update_string|new_col|year|month|day|
+-------------------+--------------------+------------------+-------------------------------------+--------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|20210808171322     |20210808171322_2_1  |Person2           |name=Person2/year=2021/month=7/day=22|c2f7bac6-24ab-4661-8791-4e299781dd92-0_2-510-167393_20210808171322.parquet|Person2|2021-07-22|11121314             |Purpl

In [8]:
starttime = "20210808171322"
endtime = "20210808171924"

hudiDF = spark.read \
.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", starttime) \
.option("hoodie.datasource.read.end.instanttime", endtime) \
.load(tablePath).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+-------------------------------------+--------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path               |_hoodie_file_name                                                         |name   |date      |col_to_update_integer|col_to_update_string|new_col|year|month|day|
+-------------------+--------------------+------------------+-------------------------------------+--------------------------------------------------------------------------+-------+----------+---------------------+--------------------+-------+----+-----+---+
|20210808171924     |20210808171924_2_11 |Person2           |name=Person2/year=2021/month=7/day=22|c2f7bac6-24ab-4661-8791-4e299781dd92-0_2-581-194533_20210808171924.parquet|Person2|2021-07-22|15161718             |Orang