# This follows the official [quick-start guide](https://hudi.apache.org/docs/quick-start-guide)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col

spark = (
    SparkSession.builder.appName("Hudi_Data_Processing_Framework")
    .master("spark://spark-master:7077")
    .config("spark.executor.memory", "512m")
    .config("fs.default.name", "hdfs://namenode:9000")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.hudi.catalog.HoodieCatalog")
    .config("spark.sql.extensions","org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .config(
        "spark.jars.packages",
        "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1"
    )
    .getOrCreate()
)

spark.sparkContext.setLogLevel('ERROR')



:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hudi#hudi-spark3.3-bundle_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ef08f1e7-c886-44d8-a734-9630ce718482;1.0
	confs: [default]
	found org.apache.hudi#hudi-spark3.3-bundle_2.12;0.13.1 in central
downloading https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.3-bundle_2.12/0.13.1/hudi-spark3.3-bundle_2.12-0.13.1.jar ...
	[SUCCESSFUL ] org.apache.hudi#hudi-spark3.3-bundle_2.12;0.13.1!hudi-spark3.3-bundle_2.12.jar (97843ms)
:: resolution report :: resolve 2613ms :: artifacts dl 97848ms
	:: modules in use:
	org.apache.hudi#hudi-spark3.3-bundle_2.12;0.13.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	------------------------------------

23/09/10 17:21:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
# pyspark
from pyspark import SparkContext as sc
tableName = "hudi_trips_cow"
basePath = f"hdfs://namenode:9000/tmp/{tableName}"
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()

# Generate data
Trip data with starting and ending coordinates, and the cost. 
Only 3 trips are generated

In [3]:
# pyspark
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(3))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

hudi_options = {
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.recordkey.field': 'uuid',
    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
    'hoodie.datasource.write.table.name': tableName,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'ts',
    'hoodie.upsert.shuffle.parallelism': 1,
    'hoodie.insert.shuffle.parallelism': 1
}

                                                                                

In [4]:
df.toPandas().head()

Unnamed: 0,begin_lat,begin_lon,driver,end_lat,end_lon,fare,partitionpath,rider,ts,uuid
0,0.472691,0.461579,driver-213,0.754803,0.967116,34.158285,americas/brazil/sao_paulo,rider-213,1694264657063,57cac6b3-4d3a-4c08-a769-a23a4c6e090e
1,0.610007,0.87794,driver-213,0.340787,0.50308,43.492381,americas/brazil/sao_paulo,rider-213,1694122729122,f22ea337-61ce-40b8-b0b4-6380c7c8208e
2,0.573184,0.492348,driver-213,0.089886,0.425209,64.276963,americas/united_states/san_francisco,rider-213,1694210071708,1b59d447-8ec0-4ee2-9f2b-021cf0191656


# Write data - overwrite

In [5]:
df.write.format("hudi"). \
    options(**hudi_options). \
    mode("overwrite"). \
    save(basePath)

                                                                                

23/09/10 17:41:40 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
23/09/10 17:41:40 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.errors.SparkCoreErrors$.clusterSchedulerError(SparkCoreErrors.scala:218)
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:923)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:154)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:262)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:169)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.proce

# Read data with SQL

In [72]:
# pyspark
tripsSnapshotDF = spark. \
  read. \
  format("hudi"). \
  load(basePath)
# load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery

tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

+------------------+--------------------+-------------------+-------------+
|              fare|           begin_lon|          begin_lat|           ts|
+------------------+--------------------+-------------------+-------------+
| 64.51866124948768|0.011933855867048981| 0.3643791915968686|1694177464554|
|30.958879211588386|  0.4188521186617712|0.06076955804980588|1694045480078|
+------------------+--------------------+-------------------+-------------+

+-------------------+--------------------+----------------------+---------+----------+------------------+
|_hoodie_commit_time|  _hoodie_record_key|_hoodie_partition_path|    rider|    driver|              fare|
+-------------------+--------------------+----------------------+---------+----------+------------------+
|  20230910142900300|5f52d781-a905-415...|  americas/united_s...|rider-047|driver-047| 64.51866124948768|
|  20230910142900300|9614a1c4-0176-43b...|  americas/brazil/s...|rider-047|driver-047|17.496376187467867|
|  2023091014

# Read with Pandas

In [73]:
tripsSnapshotDF.toPandas().head(15)

Unnamed: 0,_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,begin_lat,begin_lon,driver,end_lat,end_lon,fare,rider,ts,uuid,partitionpath
0,20230910142900300,20230910142900300_1_0,5f52d781-a905-4155-a0f2-027fb5808729,americas/united_states/san_francisco,5b051018-29c0-4ddd-b0da-896770ec7284-0_1-676-8...,0.364379,0.011934,driver-047,0.162582,0.963531,64.518661,rider-047,1694177464554,5f52d781-a905-4155-a0f2-027fb5808729,americas/united_states/san_francisco
1,20230910142900300,20230910142900300_0_0,9614a1c4-0176-43ba-82c5-c2b909313af0,americas/brazil/sao_paulo,3081a60e-7ce2-4b67-ba9e-6ce164192c5a-0_0-676-8...,0.236524,0.571546,driver-047,0.880675,0.495799,17.496376,rider-047,1694198389570,9614a1c4-0176-43ba-82c5-c2b909313af0,americas/brazil/sao_paulo
2,20230910142736668,20230910142736668_0_1,4deb37fc-e68c-4d2a-a3c7-e60c6289c38b,americas/brazil/sao_paulo,3081a60e-7ce2-4b67-ba9e-6ce164192c5a-0_0-676-8...,0.06077,0.418852,driver-146,0.492771,0.693565,30.958879,rider-146,1694045480078,4deb37fc-e68c-4d2a-a3c7-e60c6289c38b,americas/brazil/sao_paulo


# Update data

In [90]:
# pyspark
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(3))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
(
df.write.format("hudi"). 
  options(**hudi_options). 
  mode("append"). 
  save(basePath)
)

In [91]:
df.toPandas().head(15)

Unnamed: 0,begin_lat,begin_lon,driver,end_lat,end_lon,fare,partitionpath,rider,ts,uuid
0,0.479578,0.685089,driver-932,0.147921,0.486872,3.482702,americas/brazil/sao_paulo,rider-932,1693955532276,9614a1c4-0176-43ba-82c5-c2b909313af0
1,0.75681,0.673252,driver-932,0.722576,0.392536,57.610972,americas/united_states/san_francisco,rider-932,1693913875613,5f52d781-a905-4155-a0f2-027fb5808729
2,0.723062,0.360805,driver-932,0.825075,0.030241,74.734102,americas/brazil/sao_paulo,rider-932,1693816310709,9614a1c4-0176-43ba-82c5-c2b909313af0


# Time travel query

In [97]:
df_time_travel = (
    spark.read. 
      format("hudi"). 
      option("as.of.instant", "20230910142736670"). 
      load(basePath)
)

df_time_travel.filter(col('uuid')=='5f52d781-a905-4155-a0f2-027fb5808729').toPandas().head(15)

Unnamed: 0,_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,begin_lat,begin_lon,driver,end_lat,end_lon,fare,rider,ts,uuid,partitionpath
0,20230910142736668,20230910142736668_1_0,5f52d781-a905-4155-a0f2-027fb5808729,americas/united_states/san_francisco,5b051018-29c0-4ddd-b0da-896770ec7284-0_1-607-7...,0.0014,0.515296,driver-146,0.754257,0.740671,82.201951,rider-146,1693811790644,5f52d781-a905-4155-a0f2-027fb5808729,americas/united_states/san_francisco


# Incremental query

In [96]:
# pyspark
# reload data
spark. \
  read. \
  format("hudi"). \
  load(basePath). \
  createOrReplaceTempView("hudi_trips_snapshot")

commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
beginTime = commits[len(commits) - 2] # commit time we are interested in

# incrementally query data
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': '20230910144418889736',
}

tripsIncrementalDF = spark.read.format("hudi"). \
  options(**incremental_read_options). \
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

spark.sql("select `_hoodie_commit_time`, uuid, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 0.0").show()

+-------------------+--------------------+------------------+------------------+------------------+-------------+
|_hoodie_commit_time|                uuid|              fare|         begin_lon|         begin_lat|           ts|
+-------------------+--------------------+------------------+------------------+------------------+-------------+
|  20230910144449253|5f52d781-a905-415...|57.610971935361185| 0.673252109470476|0.7568098508318892|1693913875613|
|  20230910144449253|9614a1c4-0176-43b...| 3.482702091010481|0.6850887026829963|0.4795784679677898|1693955532276|
+-------------------+--------------------+------------------+------------------+------------------+-------------+



In [80]:
print(commits)

['20230910142736668', '20230910142900300']


In [84]:
from datetime import datetime

In [89]:
datetime.utcnow().strftime('%Y%m%d%H%M%S%f')

'20230910144418889736'