In [None]:
# Apache Hudi-PySpark Configuration

from typing import *

from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Hudi Table") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.2.4,com.amazonaws:aws-java-sdk:1.12.262") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

print("Spark Running")

s3_path = "s3a://my-bucket/sandbox/daft_hudi/"

# Access SparkContext
sc = spark.sparkContext

In [None]:
# Create Hudi Table in S3 using source data 

TABLE_NAME = 'aldi_data'
INPUT = 's3a://my-bucket/input/retail/All_Data_Aldi.csv'
df_cow = spark.read.csv(INPUT, header=True, inferSchema=True)

# Minor Transformation
df_cow = df_cow.withColumnRenamed('prices_(¬£)', 'prices')
df_cow = df_cow.withColumnRenamed('prices_unit_(¬£)', 'prices_unit')

# Write the Records 
PATH = 's3a://my-bucket/sandbox/daft_hudi/'

hudi_options = {
        'hoodie.table.name': TABLE_NAME,
        'hoodie.table.keygenerator.class' : "org.apache.hudi.keygen.SimpleKeyGenerator",
        'hoodie.datasource.write.hive_style_partitioning' : "false",
  'hoodie.datasource.write.partitionpath.field' : "category" 
    }

spark.sql("DROP TABLE IF EXISTS " + TABLE_NAME)
df_cow.write.format("hudi").mode("overwrite").options(**hudi_options).mode("overwrite").save(PATH)