In [1]:
import os
from pathlib import Path
from datetime import datetime, date

from pyspark.conf import SparkConf
from pyspark.sql import Row, SparkSession

In [4]:
spark_conf = SparkConf()
spark_conf.setAll([
    ("spark.app.name", "Hudi"),
    ("spark.ui.showConsoleProgress", "true"),
    ("spark.eventLog.enabled", "false"),
    # see https://hudi.apache.org/docs/quick-start-guide#setup
    ("spark.jars", ",".join([
        "file://{}".format(os.path.join(Path().resolve(), "..", "jars", "hudi-spark3-bundle_2.12-0.8.0.jar")),
        "file://{}".format(os.path.join(Path().resolve(), "..", "jars", "spark-avro_2.12-3.1.2.jar")),
    ])),
    ("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
])

<pyspark.conf.SparkConf at 0x7fe285045d60>

In [5]:
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()

21/08/22 11:08:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [6]:
tableName = "hudi_trips_cow"
basePath = "file:///workspaces/workspace-lakehouse/hudi/tmp/hudi_trips_cow"
dataGen = spark._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()

In [7]:
inserts = spark._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

hudi_options = {
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.recordkey.field': 'uuid',
    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
    'hoodie.datasource.write.table.name': tableName,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'ts',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2
}

df.write.format("hudi"). \
    options(**hudi_options). \
    mode("overwrite"). \
    save(basePath)



In [8]:
tripsSnapshotDF = spark. \
  read. \
  format("hudi"). \
  load(basePath + "/*/*/*/*")
# load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery

tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

21/08/22 11:11:50 WARN DefaultSource: Loading Base File Only View.


+------------------+-------------------+-------------------+-------------+
|              fare|          begin_lon|          begin_lat|           ts|
+------------------+-------------------+-------------------+-------------+
| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1629046157276|
| 93.56018115236618|0.14285051259466197|0.21624150367601136|1629561586569|
| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1629519091043|
| 27.79478688582596| 0.6273212202489661|0.11488393157088261|1629199623019|
|  43.4923811219014| 0.8779402295427752| 0.6100070562136587|1629574424275|
| 66.62084366450246|0.03844104444445928| 0.0750588760043035|1629467953558|
|34.158284716382845|0.46157858450465483| 0.4726905879569653|1629624440054|
| 41.06290929046368| 0.8192868687714224|  0.651058505660742|1629235446314|
+------------------+-------------------+-------------------+-------------+

+-------------------+--------------------+----------------------+---------+----------+-------------

In [9]:
spark.sparkContext._gateway.close()
spark.stop()