In [56]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, DoubleType, IntegerType, LongType
from pyspark.sql.functions import sum as _sum, avg, expr, window, from_unixtime, col

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
import time, datetime
import os
import random
import uuid

spark = (
    SparkSession.builder.appName("spark application")
    .config(
        "spark.jars.packages",
        "org.apache.hudi:hudi-spark3.1-bundle_2.12:0.11.0,"
        "org.apache.spark:spark-avro_2.12:3.1.1,"
        "org.apache.hadoop:hadoop-aws:3.1.1,"
        "com.amazonaws:aws-java-sdk:1.11.271,"
    )
    .config(
        "spark.sql.extensions",
        "org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
    )
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config(
        "spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
    )
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config(
        "spark.hadoop.fs.s3a.aws.credentials.provider",
        "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
    )
    .config("spark.hadoop.fs.s3a.connection.maximum", "1000")
    .getOrCreate()
)
# spark.conf.set("spark.sql.session.timeZone", "Asia/Ho_Chi_Minh")


In [57]:
def parsing_timestamp(df, timestamp_col: str):
    """
    parse timestamp following day, month, year
    Args:
        df (DataFrame): DataFrame
        timestamp_col (str): column of timestamp field
        id_col (str): column id which is used to _id of hudi talbe
    Returns:
        DataFrame: return dataframe which adds day, month, year
    """

    df = df.withColumn("ts", (col(f"{timestamp_col}") / 1000).cast("timestamp"))

    df = (
        df.withColumn("year", year(df.ts))
        .withColumn("month", month(df.ts))
        .withColumn("day", dayofmonth(expr("ts")))
    )

    return df


In [58]:
hudi_table_name = "hudi_upsert"
hudi_operation = "UPSERT"
hudi_path = f"/home/ducdn/Desktop/workspace/hudi_mor/{hudi_table_name}"
hudi_path
# hudi_path = "s3a://datalake/hudi_mor"

'/home/ducdn/Desktop/workspace/hudi_mor/hudi_upsert'

In [59]:

hoodie_options = {
    "hoodie.table.name": f"{hudi_table_name}",
    "hoodie.metadata.enable": "true",
    "hoodie.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.operation": hudi_operation,
    "hoodie.datasource.write.recordkey.field": "id",
    "hoodie.datasource.write.partitionpath.field": "partition",
    "hoodie.datasource.write.table.name": f"{hudi_table_name}",
    "hoodie.datasource.write.precombine.field": "timestamp",
    "hoodie.clean.automatic": "true",
    "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS",
    "hoodie.cleaner.fileversions.retained": 8,
    "hoodie.compact.inline": "true",
    "hoodie.compact.inline.max.delta.commits": 3,
     "hoodie.datasource.hive_sync.support_timestamp": "true",
}

In [60]:
schema = StructType([
    StructField("id", StringType()),
    StructField("value", StringType()),
    StructField("timestamp", LongType()),
])

In [62]:
data = [
    ("1", "value1", 1687996799991),
    ("2", "value2", 1687996799943),
    ("3", "value3", 1687996799383),
    ("4", "value4", 1687996799339),
    ("5", "value5", 1687923298333),
    ("6", "value6", 1687948498111),
    ("7", "value7", 1687991698111),
]

In [63]:
df = spark.createDataFrame(data, schema=schema)
df = parsing_timestamp(df, "timestamp")
df.show(truncate=False)

+---+------+-------------+-----------------------+----+-----+---+
|id |value |timestamp    |ts                     |year|month|day|
+---+------+-------------+-----------------------+----+-----+---+
|1  |value1|1687996799991|2023-06-29 06:59:59.991|2023|6    |29 |
|2  |value2|1687996799943|2023-06-29 06:59:59.943|2023|6    |29 |
|3  |value3|1687996799383|2023-06-29 06:59:59.383|2023|6    |29 |
|4  |value4|1687996799339|2023-06-29 06:59:59.339|2023|6    |29 |
|5  |value5|1687923298333|2023-06-28 10:34:58.333|2023|6    |28 |
|6  |value6|1687948498111|2023-06-28 17:34:58.111|2023|6    |28 |
|7  |value7|1687991698111|2023-06-29 05:34:58.111|2023|6    |29 |
+---+------+-------------+-----------------------+----+-----+---+



In [55]:
df.write.format("hudi") \
        .mode("append") \
        .options(**hoodie_options) \
        .partitionBy("year,month, day") \
        .save("/home/ducdn/Documents/workspace/hudi_mor/test")

23/07/01 11:24:18 WARN HoodieBackedTableMetadata: Metadata table was not found at path /home/ducdn/Documents/workspace/hudi_mor/test/.hoodie/metadata
                                                                                

#### Write Hudi Mor

In [25]:
def current_timestamp():
    current_time = int(time.time())
    return current_time

def gen_data():
    data = []
    for i in range(0, 3):
        i = uuid.uuid4()
        value = random.randint(0, 1000000)
        timestamp = current_timestamp()
        partition = random.randint(1,3)
        if partition != 2: continue
        x = (str(i), value, timestamp, partition)
        data.append(x)
    return data

while True:
    print("="* 20 + "START GEN DATA" + "="*20)
    data = gen_data()
    stream_df = spark.createDataFrame(data, schema)
    # stream_df = stream_df.withColumn("timestamp", current_timestamp())

    stream_df.show(n=2, truncate=False)

    stream_df.write.format("hudi") \
        .options(**hoodie_options) \
        .mode("append") \
        .save(hudi_path)

    print(20*"-" + "STARTING WRITE HUDI TABLE" + "-"*20)

    print(20*"-" + "DONE" + "-"*20)

    print(f"================== TIME SLEEP ==============")
    time.sleep(10)



+---+-----+---------+---------+
|id |value|timestamp|partition|
+---+-----+---------+---------+
+---+-----+---------+---------+



23/06/20 10:40:39 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
23/06/20 10:40:39 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
23/06/20 10:40:40 WARN BaseHoodieCompactionPlanGenerator: No operations are retrieved for /home/ducdn/Desktop/workspace/hudi_mor/hudi_mor


--------------------STARTING WRITE HUDI TABLE--------------------
--------------------DONE--------------------
+------------------------------------+------+----------+---------+
|id                                  |value |timestamp |partition|
+------------------------------------+------+----------+---------+
|4a090c34-4438-4bac-8b08-34d506009485|605253|1687232450|2        |
+------------------------------------+------+----------+---------+



23/06/20 10:40:51 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
23/06/20 10:40:52 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
23/06/20 10:40:55 WARN BaseHoodieCompactionPlanGenerator: No operations are retrieved for /home/ducdn/Desktop/workspace/hudi_mor/hudi_mor


--------------------STARTING WRITE HUDI TABLE--------------------
--------------------DONE--------------------


KeyboardInterrupt: 

#### Update

In [24]:

data = [("fd6be044-0e71-4268-b9ae-8ac0a5sqwer", 3306, current_timestamp(), 2)]
stream_df = spark.createDataFrame(data, schema)


stream_df.write.format("hudi") \
    .options(**hoodie_options) \
    .mode("append") \
    .save(hudi_path)

23/06/20 10:35:31 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
23/06/20 10:35:32 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
23/06/20 10:35:34 WARN BaseHoodieCompactionPlanGenerator: No operations are retrieved for /home/ducdn/Desktop/workspace/hudi_mor/hudi_mor
