###  AWS Glue session config

In [1]:
%%configure -f
{
    "conf": {
        "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
        "spark.sql.hive.convertMetastoreParquet": "false",
        "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
        "spark.sql.legacy.pathOptionBehavior.enabled": "true",
        "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
    }
}

###  Local bucket config

In [2]:
try:
    import os, uuid, sys, boto3, time, sys
    from pyspark.context import SparkContext
    from pyspark.sql.functions import lit, udf
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from awsglue.job import Job
except Exception as e:
    print("Modules are missing : {} ".format(e))


spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://minio:9000/")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minioadmin")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "minioadmin")
spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")


# sc = SparkSession.builder.getOrCreate()
# glueContext = GlueContext(sc)
# job = Job(glueContext)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
12,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

###  Import demo data applying its schema

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

schema = StructType([
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("document", StringType(), True),
    StructField("payer", StringType(), True),
    StructField("amount", IntegerType(), True),
])

df = spark.read.format("json") \
    .schema(schema) \
    .load("s3a://warehouse/raw")

df.head(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(id='99e7a457-c86d-47de-824e-dcb90865a388', type='debit', created_at=datetime.datetime(2022, 9, 13, 0, 0), document='80', payer='Joyce Wimberly', amount=3814)]

###  Hudi configuration builder method

In [4]:
def build_hudi_config(
    database, 
    table, 
    record_id, 
    precomb_key, 
    table_type, 
    partition_fields,
    operation,
    enable_partition, 
    enable_cleaner, 
    enable_hive_sync, 
    enable_clustering,
    enable_metadata_indexing, 
    index_type='BLOOM', 
    clustering_column='default'
    ):
    """
    Upserts a dataframe into a Hudi table.

    Args:
        database (str):                     Name of the glue database.
        table (str):                        Name of the Hudi table.
        record_id (str):                    Field in the dataframe that will be used as the record key.
        precomb_key (str):                  Field in the dataframe that will be used for pre-combine.
        table_type (str):                   COPY_ON_WRITE or MERGE_ON_READ.
        partition_fields(str):              Filds used in the partitioning criteria 
        operation (str):                    The Hudi write method to use.
        enable_partition (bool):            Whether or not to enable partitioning.
        enable_cleaner (bool):              Whether or not to enable data cleaning.
        enable_hive_sync (bool):            Whether or not to enable syncing with Hive.
        enable_clustering (bool):           Whether or not to enable clustering.
        enable_metadata_indexing (bool):    Whether or not to enable metadata indexing.
        index_type :                        BLOOM or GLOBAL_BLOOM
    Returns:
        Dict
    """
    # These are the basic settings for the Hoodie table
    hudi_base_settings = {
        "hoodie.table.name": table,
        "hoodie.datasource.write.table.type": table_type,
        "hoodie.datasource.write.operation": operation,
        "hoodie.datasource.write.recordkey.field": record_id,
        "hoodie.datasource.write.precombine.field": precomb_key,
        "hoodie.index.type": index_type,
        "hoodie.parquet.max.file.size": 512 * 1024 * 1024,    # 512MB
        "hoodie.parquet.small.file.limit": 100 * 1024 * 1024, # 100MB
    }

    # These settings enable syncing with Hive
    hudi_hive_sync_settings = {
        "hoodie.parquet.compression.codec": "gzip",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": database,
        "hoodie.datasource.hive_sync.table": table,
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms",
    }

    if enable_hive_sync == True:
        for key, value in hudi_hive_sync_settings.items(): hudi_base_settings[key] = value

    # These settings enable automatic cleaning of old data
    hudi_cleaner_options = {
        "hoodie.clean.automatic": "true",
        "hoodie.clean.async": "true",
        "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
        "hoodie.cleaner.fileversions.retained": "3",
        "hoodie-conf hoodie.cleaner.parallelism": '200',
        'hoodie.cleaner.commits.retained': 5
    }

    if enable_cleaner == True:
        for key, value in hudi_cleaner_options.items(): hudi_base_settings[key] = value

    # These settings enable partitioning of the data
    partition_settings = {
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        "hoodie.datasource.hive_sync.partition_fields": partition_fields,
        "hoodie.datasource.write.hive_style_partitioning": "true",
    }

    if enable_partition == True:
        for key, value in partition_settings.items(): hudi_base_settings[key] = value

    # These settings enable data clustering
    hudi_clustering = {
        "hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
        "hoodie.clustering.inline": "true",
        "hoodie.clustering.plan.strategy.sort.columns": clustering_column,
        "hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
        "hoodie.clustering.plan.strategy.small.file.limit": "629145600"
    }

    if enable_clustering == True:
        for key, value in hudi_clustering.items():
            hudi_base_settings[key] = value

    # These settings enable metadata indexing
    hudi_metadata_indexing = {
        "hoodie.metadata.enable": "true",
        "hoodie.metadata.index.async": "true",
        "hoodie.metadata.index.column.stats.enable": "true",
        "hoodie.metadata.index.check.timeout.seconds": "60",
        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
    }

    if enable_metadata_indexing == True:
        for key, value in hudi_metadata_indexing.items():
            hudi_base_settings[key] = value 

    return hudi_base_settings

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
database_name = "gold"
table_name = "transactions"
s3_path = f"s3://warehouse/{database_name}/{table_name}"

hudi_config = build_hudi_config(
    database                 = database_name, 
    table                    = table_name, 
    record_id                = "id", 
    precomb_key              = "id", 
    table_type               = "MERGE_ON_READ", 
    partition_fields         = "document",
    operation                = "upsert",
    enable_partition         = True, 
    enable_cleaner           = True, 
    enable_hive_sync         = False, 
    enable_clustering        = False, 
    enable_metadata_indexing = False, 
    index_type               = "BLOOM", 
    clustering_column        = "default"
)

hudi_config

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'hoodie.table.name': 'transactions', 'hoodie.datasource.write.table.type': 'MERGE_ON_READ', 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.precombine.field': 'id', 'hoodie.index.type': 'BLOOM', 'hoodie.parquet.max.file.size': 536870912, 'hoodie.parquet.small.file.limit': 104857600, 'hoodie.clean.automatic': 'true', 'hoodie.clean.async': 'true', 'hoodie.cleaner.policy': 'KEEP_LATEST_FILE_VERSIONS', 'hoodie.cleaner.fileversions.retained': '3', 'hoodie-conf hoodie.cleaner.parallelism': '200', 'hoodie.cleaner.commits.retained': 5, 'hoodie.datasource.write.partitionpath.field': 'document', 'hoodie.datasource.hive_sync.partition_fields': 'document', 'hoodie.datasource.write.hive_style_partitioning': 'true'}

###  Query helper method

In [6]:
def run_query(query):
    transactions_df = spark.read.format("hudi").load(s3_path)
    transactions_df.createOrReplaceTempView("transactions")
    return spark.sql(query)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

###  Create Hudi table from df top 5 rows

In [8]:
top_df = df.limit(5)
top_df.write.format("hudi").options(**hudi_config).mode("overwrite").save(s3_path)

run_query("SELECT COUNT(*) FROM transactions").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|       5|
+--------+

###  Add top 100 df data to existing table

In [9]:
# As the operation was setted as upsert by id the count should result in 100 instead of 105 
top_df = df.limit(100)
top_df.write.format("hudi").options(**hudi_config).mode("append").save(s3_path)

run_query("SELECT COUNT(*) FROM transactions").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|     100|
+--------+

###  Update Hudi table data

In [10]:
# Selects de 50th row to update its content
row_to_update = df.collect()[50].asDict() 
row_to_update

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'id': '61a606dd-9f4b-42a2-a95d-1f18f0ed753d', 'type': 'credit', 'created_at': datetime.datetime(2022, 4, 2, 0, 0), 'document': '34', 'payer': 'Sheila Ellard', 'amount': 1933}

In [11]:
row_to_update["amount"] += 1000000

df_to_update = spark.createDataFrame(data=[row_to_update], schema=schema)
df_to_update.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(id='61a606dd-9f4b-42a2-a95d-1f18f0ed753d', type='credit', created_at=datetime.datetime(2022, 4, 2, 0, 0), document='34', payer='Sheila Ellard', amount=1001933)

In [12]:
df_to_update.write.format("hudi").options(**hudi_config).mode("append").save(s3_path)

run_query('''
    SELECT *
    FROM transactions 
    WHERE id = '61a606dd-9f4b-42a2-a95d-1f18f0ed753d'
''').show(1, truncate=False, vertical=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0--------------------------------------------------------
 _hoodie_commit_time    | 20240111132422890                      
 _hoodie_commit_seqno   | 20240111132422890_0_75                 
 _hoodie_record_key     | 61a606dd-9f4b-42a2-a95d-1f18f0ed753d   
 _hoodie_partition_path | document=34                            
 _hoodie_file_name      | 45785df7-c465-4a7d-8013-06752c36ba37-0 
 id                     | 61a606dd-9f4b-42a2-a95d-1f18f0ed753d   
 type                   | credit                                 
 created_at             | 2022-04-02 00:00:00                    
 document               | 34                                     
 payer                  | Sheila Ellard                          
 amount                 | 1001933

###  Delete Hudi table data

In [15]:
delete_options = hudi_config
delete_options['hoodie.datasource.write.operation'] = 'delete'

hard_delete_df = run_query("SELECT * FROM transactions WHERE id = '61a606dd-9f4b-42a2-a95d-1f18f0ed753d'")
hard_delete_df.write.format("hudi").options(**delete_options).mode("append").save(s3_path)

run_query("SELECT COUNT(*) FROM transactions WHERE id = '61a606dd-9f4b-42a2-a95d-1f18f0ed753d'").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|       0|
+--------+