In [1]:
!python --version

Python 3.11.12


In [2]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

# Create SparkSession with Hudi configuration

spark = SparkSession.builder \
    .appName("HudiJob") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .getOrCreate()

/home/spark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/11 02:14:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql.functions import month, year
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import to_timestamp
import os

# Define the schema
schema = StructType([
    StructField("InvoiceNo", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),  # Read as String first, to convert later
    StructField("UnitPrice", FloatType(), True),
    StructField("CustomerID", StringType(), True),
    StructField("Country", StringType(), True)
])

# Try different file path approaches
print("Current working directory:", os.getcwd())
print("Directory contents:", os.listdir())

# Approach 1: Use relative path directly
csv_file_path = "online_retail.csv"

# Debugging information
print(f"Checking if file exists at relative path: {csv_file_path}")
print(f"File exists: {os.path.exists(csv_file_path)}")

try:
    df = spark.read.csv(csv_file_path, header=True, schema=schema)
    df.show()
except Exception as e:
    print(f"Error with relative path: {e}")
    # Approach 2: Try using the full path within the container
    csv_file_path = "/home/sparkuser/app/online_retail.csv"
    print(f"Trying with container path: {csv_file_path}")
    print(f"File exists: {os.path.exists(csv_file_path)}")
    
    try:
        df = spark.read.csv(csv_file_path, header=True, schema=schema)
        df.show()
    except Exception as e:
        print(f"Error with container path: {e}")
        # Approach 3: Use file protocol properly
        csv_file_path = "file:/" + csv_file_path
        print(f"Trying with file protocol: {csv_file_path}")
        df = spark.read.csv(csv_file_path, header=True, schema=schema)
        df.show()

df_with_record_id = df.withColumn("InvoiceDateTS", to_timestamp(df["InvoiceDate"], "dd/MM/yy HH:mm")) \
        .withColumn("RecordId",(f.floor(f.rand() * 900000) + 100000).cast("int")) # Generates a number between 100000 and 999999

df_with_record_id.show()

df_with_partition_column = df_with_record_id.withColumn("InvoiceMonth", month(df_with_record_id["InvoiceDateTS"])) \
       .withColumn("InvoiceYear", year(df_with_record_id["InvoiceDateTS"])) \
       .drop("InvoiceDate")  # Optionally drop original InvoiceDate


df_with_partition_column.show()

Current working directory: /home/sparkuser/app
Directory contents: ['.ipynb_checkpoints', 'event_logs', 'online_retail.csv', 'spark-hudi-minio.ipynb', 'spark-warehouse']
Checking if file exists at relative path: online_retail.csv
File exists: True
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/10 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/10 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/10 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01/12/10 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY

In [4]:
# Print schema
df_with_partition_column.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- InvoiceDateTS: timestamp (nullable = true)
 |-- RecordId: integer (nullable = true)
 |-- InvoiceMonth: integer (nullable = true)
 |-- InvoiceYear: integer (nullable = true)



In [5]:
df_with_partition_column.describe().show()

25/05/11 02:14:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+-----------------+------------------+--------------------+------------------+-----------------+------------------+-----------+------------------+------------------+------------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|        UnitPrice|        CustomerID|    Country|          RecordId|      InvoiceMonth|       InvoiceYear|
+-------+-----------------+------------------+--------------------+------------------+-----------------+------------------+-----------+------------------+------------------+------------------+
|  count|           541909|            541909|              540455|            541909|           541909|            406829|     541909|            541909|            541909|            541909|
|   mean| 559965.752026781|27623.240210938104|             20713.0|  9.55224954743324|4.611113614622465|15287.690570239585|       NULL| 549895.9258639365| 7.553127923691985|2010.9216086095637|
| stddev|13428.41728079929|16799.73

                                                                                

In [6]:
df_filtered = df_with_partition_column.filter(df_with_partition_column["RecordId"].isNull())
df_filtered.show()



+---------+---------+-----------+--------+---------+----------+-------+-------------+--------+------------+-----------+
|InvoiceNo|StockCode|Description|Quantity|UnitPrice|CustomerID|Country|InvoiceDateTS|RecordId|InvoiceMonth|InvoiceYear|
+---------+---------+-----------+--------+---------+----------+-------+-------------+--------+------------+-----------+
+---------+---------+-----------+--------+---------+----------+-------+-------------+--------+------------+-----------+



                                                                                

In [12]:
df_filtered.count()
df_with_record_id.show(truncate=False)  # Show full contents
df_with_record_id.filter(df_with_record_id.RecordId.isNull() | (df_with_record_id.RecordId == "")).show()

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+-------------------+--------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |InvoiceDateTS      |RecordId|
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+-------------------+--------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |01/12/10 08:26|2.55     |17850     |United Kingdom|2010-12-01 08:26:00|638357  |
|536365   |71053    |WHITE METAL LANTERN                |6       |01/12/10 08:26|3.39     |17850     |United Kingdom|2010-12-01 08:26:00|374306  |
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |01/12/10 08:26|2.75     |17850     |United Kingdom|2010-12-01 08:26:00|355909  |
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |01/12/10 08:26|3.39     |17850     |United Kingdom|2

In [7]:
hudi_options = {
    'hoodie.table.name':'my_hudi_table',
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.recordkey.field': 'RecordId',
    'hoodie.datasource.write.precombine.field': 'InvoiceDateTS',
    'hoodie.datasource.write.partitionpath.field': 'InvoiceYear,InvoiceMonth',  # Consider changing this
    'hoodie.datasource.write.table.name': 'my_hudi_table2',
    'hoodie.datasource.hive_sync.enable': 'false',
    'hoodie.datasource.write.operation': 'upsert',
}

# Sample Hudi write code
df_with_partition_column.write.format("hudi").options(**hudi_options).mode("overwrite").save("s3a://hudi-minio-bucket/my_hudi_table")

25/05/11 02:14:58 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/05/11 02:14:59 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
25/05/11 02:14:59 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
25/05/11 02:14:59 WARN HoodieSparkSqlWriterInternal: hoodie table at s3a://hudi-minio-bucket/my_hudi_table already exists. Deleting existing data & overwriting with new data.
25/05/11 02:15:21 WARN S3ABlockOutputStream: Application invoked the Syncable API against stream writing to my_hudi_table/.hoodie/metadata/files/.files-0000-0_00000000000000010.log.1_0-0-0. This is unsupported
25/05/11 02:15:27 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
                                                                     



[Stage 41:>                                                         (0 + 1) / 1]



25/05/11 02:16:11 WARN HoodieSparkSqlWriterInternal: Closing write client       


In [17]:
hudi_read_options = {
    'hoodie.datasource.query.type': 'snapshot'
}

# Reading the Hudi table from the base path
hudi_df = spark.read.format("hudi").options(**hudi_read_options).load("s3a://hudi-minio-bucket/my_hudi_table")


hudi_df.printSchema()
# Display the data
# hudi_df.show(truncate=False)

root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- InvoiceDateTS: timestamp (nullable = true)
 |-- RecordId: integer (nullable = true)
 |-- InvoiceYear: integer (nullable = true)
 |-- InvoiceMonth: integer (nullable = true)



In [18]:
hudi_df.show()

                                                                                

+-------------------+--------------------+------------------+----------------------+--------------------+---------+---------+--------------------+--------+---------+----------+--------------+-------------------+--------+-----------+------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country|      InvoiceDateTS|RecordId|InvoiceYear|InvoiceMonth|
+-------------------+--------------------+------------------+----------------------+--------------------+---------+---------+--------------------+--------+---------+----------+--------------+-------------------+--------+-----------+------------+
|  20250511015430109|20250511015430109...|            609623|               2011/11|f5bd4b1b-d6ef-422...|   577740|    21034|REX CASH+CARRY JU...|       1|     0.95|     17625|United Kingdom|2011-11-21 13:36:00|  609623|       2011|          11|
|  2025051101543