In [1]:
from pyspark.sql import SparkSession
import os

ICEBERG_CATALOG = "ic_minio"
DW_PATH = 's3a://warehouse/iceberg'

spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("spark-minio") \
    .config("spark.eventLog.enabled", "true")\
    .config("spark.eventLog.dir", "/opt/spark/spark-events")\
    .config("spark.history.fs.logDirectory", "/opt/spark/spark-events")\
    .config('spark.jars', '/opt/extra-jars/iceberg-spark-runtime-3.5_2.12-1.4.2.jar,/opt/extra-jars/hadoop-aws-3.3.4.jar,/opt/extra-jars/aws-java-sdk-bundle-1.12.262.jar')\
    .config("spark.hadoop.fs.s3a.endpoint", os.environ.get('MINIO_ENDPOINT')) \
    .config("spark.hadoop.fs.s3a.access.key", os.environ.get('MINIO_ACCESS_KEY')) \
    .config("spark.hadoop.fs.s3a.secret.key", os.environ.get('MINIO_SECRET_KEY')) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config(f'spark.sql.catalog.{ICEBERG_CATALOG}','org.apache.iceberg.spark.SparkCatalog') \
    .config(f'spark.sql.catalog.{ICEBERG_CATALOG}.type','hadoop') \
    .config(f'spark.sql.catalog.{ICEBERG_CATALOG}.warehouse', DW_PATH) \
    .getOrCreate()

24/12/22 06:50:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
# Create a dummy dataframe
df = spark.createDataFrame([("foo", 1), ("bar", 2)], ["key", "value"])
df.show()

                                                                                

+---+-----+
|key|value|
+---+-----+
|foo|    1|
|bar|    2|
+---+-----+



In [3]:
# Read test from notebook to minio
TABLE_NAME = f"{ICEBERG_CATALOG}.db.sales"
ic_df = spark.table(TABLE_NAME)

24/12/22 06:50:44 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [4]:
ic_df.show()

[Stage 3:>                                                          (0 + 1) / 1]

+------------+------------+-------+--------+
|order_number|product_code|year_id|month_id|
+------------+------------+-------+--------+
|           7|         ABC|   2024|       2|
|           8|         XYZ|   2024|       2|
|           5|         ABC|   2024|       1|
|           6|         XYZ|   2024|       1|
|           3|         ABC|   2023|       2|
|           4|         XYZ|   2023|       2|
|           1|         ABC|   2023|       1|
|           2|         XYZ|   2023|       1|
+------------+------------+-------+--------+



                                                                                

In [5]:
# write test from notebook to minio
# Create a dummy dataframe to write into sales table
sales_df = spark.createDataFrame([(9, 'ABC', 2024, 3), (10, 'XYZ', 2024, 3)], ["order_number", "product_code", "year_id", "month_id"])
sales_df.coalesce(1).writeTo(TABLE_NAME).append()

                                                                                

In [None]:
spark.table(f"{TABLE_NAME}.history").show(truncate=False)