Setup and spark config- connections to nessie catalog and minio objectstore

In [1]:
import pyspark
from pyspark.sql import SparkSession
import os

## DEFINE SENSITIVE VARIABLES
NESSIE_SERVER_URI = "http://172.19.0.3:19120/api/v2"
WAREHOUSE_BUCKET = "s3://warehouse"
MINIO_URI = "http://172.19.0.2:9000"


## Configurations for Spark Session
conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
  		#packages
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.91.3,software.amazon.awssdk:bundle:2.20.131,software.amazon.awssdk:url-connection-client:2.20.131')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', NESSIE_SERVER_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set("spark.sql.catalog.nessie.s3.endpoint",MINIO_URI)
        .set('spark.sql.catalog.nessie.warehouse', WAREHOUSE_BUCKET)
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

## TEST QUERY TO CHECK IT WORKING
### Create TABLE
spark.sql("CREATE TABLE nessie.example (name STRING) USING iceberg;").show()
### INSERT INTO TABLE
spark.sql("INSERT INTO nessie.example VALUES ('Pooja Shingavi');").show()
### Query Table
spark.sql("SELECT * FROM nessie.example;").show()

:: loading settings :: url = jar:file:/home/docker/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/docker/.ivy2/cache
The jars for the packages stored in: /home/docker/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0eb70725-f1cb-43d0-a578-61d8d9d3ee1f;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.91.3 in central
	found software.amazon.awssdk#bundle;2.20.131 in central
	found software.amazon.eventstream#eventstream;1.0.1 in central
	found software.amazon.awssdk#url-connection-client;2.20.131 in central
	found software.amazon.awssdk#utils;2.20.131 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central

Spark Running


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


++
||
++
++



                                                                                

++
||
++
++



                                                                                

+--------------+
|          name|
+--------------+
|Pooja Shingavi|
+--------------+



Create table from spark dataframe

In [2]:

from pyspark.sql.functions import current_timestamp

# Create DataFrame
data = [(1, "Pooja"), (2, "Foo"), (3, "Bar")]
columns = ["id", "name"]
df = spark.createDataFrame(data, columns).withColumn("ts", current_timestamp())

# Write DataFrame to Iceberg table
df.writeTo("nessie.df_table").create()
print("Iceberg table created using DataFrames")

#query table
spark.sql("SELECT * FROM nessie.df_table;").show()


                                                                                

Iceberg table created using DataFrames
+---+-----+--------------------+
| id| name|                  ts|
+---+-----+--------------------+
|  1|Pooja|2025-01-04 20:36:...|
|  2|  Foo|2025-01-04 20:36:...|
|  3|  Bar|2025-01-04 20:36:...|
+---+-----+--------------------+



Create Table using SQL statement

In [3]:
# Create an Iceberg table using SQL
spark.sql("""
    CREATE TABLE nessie.sql_table (
        id INT,
        name STRING
    )
    USING iceberg
""")

spark.sql("INSERT INTO nessie.sql_table VALUES (1, 'Pooja Shingavi'), (2, 'Foo Bar');")

spark.sql("SELECT * FROM nessie.sql_table;").show()

                                                                                

+---+--------------+
| id|          name|
+---+--------------+
|  1|Pooja Shingavi|
|  2|       Foo Bar|
+---+--------------+



Inserting Data Into an Iceberg Table

In [4]:
import json
from pyspark.sql.functions import col

# Step 1: Create a JSON file with a few records
json_data = [
    {"id": 1, "name": "Pooja", "ts": "2025-01-04T12:00:00"},
    {"id": 2, "name": "Foo", "ts": "2025-01-04T12:05:00"},
    {"id": 3, "name": "Bar", "ts": "2025-01-04T12:10:00"}
]

json_file_path = "/tmp/data.json"
with open(json_file_path, 'w') as json_file:
    json.dump(json_data, json_file)

# Step 2: Read the JSON file into a DataFrame
df = spark.read.json(json_file_path)
df = df.withColumn("ts", col("ts").cast("timestamp"))
df.show()

# Step 3: Create the "db" namespace in Nessie
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.db")
print("Namespace 'db' created in Nessie")

# Step 4: Create an empty Iceberg table using SQL
spark.sql("""
    CREATE TABLE IF NOT EXISTS nessie.db.example_table (
        id INT,
        name STRING,
        ts TIMESTAMP
    )
    USING iceberg
""")
print("Iceberg table created with no records")

# Step 5: Insert the data from JSON DataFrame into the Iceberg table
df.writeTo("nessie.db.example_table").append()
print("Data inserted into Iceberg table from JSON DataFrame")

spark.sql("SELECT * FROM nessie.db.example_table").show()

+---+-----+-------------------+
| id| name|                 ts|
+---+-----+-------------------+
|  1|Pooja|2025-01-04 12:00:00|
|  2|  Foo|2025-01-04 12:05:00|
|  3|  Bar|2025-01-04 12:10:00|
+---+-----+-------------------+

Namespace 'db' created in Nessie
Iceberg table created with no records
Data inserted into Iceberg table from JSON DataFrame
+---+-----+-------------------+
| id| name|                 ts|
+---+-----+-------------------+
|  1|Pooja|2025-01-04 12:00:00|
|  2|  Foo|2025-01-04 12:05:00|
|  3|  Bar|2025-01-04 12:10:00|
+---+-----+-------------------+



In [None]:
Using MERGE INTO

In [5]:
# Step 1: Create a JSON file with a few records
json_data = [
    {"id": 1, "name": "Pooja", "count": None, "ts": "2025-01-04T12:00:00"},
    {"id": 2, "name": "Foo", "count": 5, "ts": "2025-01-04T12:05:00"},
    {"id": 3, "name": "Bar", "count": 10, "ts": "2025-01-04T12:10:00"}
]

json_file_path = "/tmp/data.json"
with open(json_file_path, 'w') as json_file:
    json.dump(json_data, json_file)

# Step 2: Read the JSON file into a DataFrame
df = spark.read.json(json_file_path)
df = df.withColumn("ts", col("ts").cast("timestamp"))
df.createOrReplaceTempView("json_table")
df.show()

# Step 3: Create the "db" namespace in Nessie
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.merge_example")
print("Namespace 'merge_example' created in Nessie")

# Step 4: Create an empty Iceberg table using SQL
spark.sql("""
    CREATE TABLE nessie.merge_example.example_table (
        id INT,
        name STRING,
        count INT,
        ts TIMESTAMP
    )
    USING iceberg
""")
print("Iceberg table created with no records")

# Insert the initial data into the Iceberg table
spark.sql("""
    INSERT INTO nessie.merge_example.example_table
    SELECT id, name, count, ts FROM json_table
""")
print("Initial data inserted into Iceberg table")

spark.sql("SELECT * FROM nessie.merge_example.example_table;").show()

# Step 5: Create a source DataFrame for updates
update_data = [
    {"id": 1, "name": "Pooja", "count": 1, "op": "increment"},
    {"id": 2, "name": "Foo", "count": 1, "op": "increment"},
    {"id": 3, "name": "Bar", "count": None, "op": "delete"},
    {"id": 4, "name": "Funion", "count": 1, "op": "insert"}
]

update_file_path = "/tmp/update_data.json"
with open(update_file_path, 'w') as update_file:
    json.dump(update_data, update_file)

source_df = spark.read.json(update_file_path)
source_df.createOrReplaceTempView("source_table")
source_df.show()

# Step 6: Use the MERGE INTO command to update the Iceberg table
spark.sql("""
    MERGE INTO nessie.merge_example.example_table t
    USING source_table s
    ON t.id = s.id
    WHEN MATCHED AND s.op = 'delete' THEN DELETE
    WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
    WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1
    WHEN NOT MATCHED THEN INSERT (id, name, count, ts) VALUES (s.id, s.name, s.count, current_timestamp())
""")
print("Data merged into Iceberg table using MERGE INTO")

spark.sql("SELECT * FROM nessie.merge_example.example_table;").show()

+-----+---+-----+-------------------+
|count| id| name|                 ts|
+-----+---+-----+-------------------+
| NULL|  1|Pooja|2025-01-04 12:00:00|
|    5|  2|  Foo|2025-01-04 12:05:00|
|   10|  3|  Bar|2025-01-04 12:10:00|
+-----+---+-----+-------------------+

Namespace 'merge_example' created in Nessie
Iceberg table created with no records
Initial data inserted into Iceberg table
+---+-----+-----+-------------------+
| id| name|count|                 ts|
+---+-----+-----+-------------------+
|  1|Pooja| NULL|2025-01-04 12:00:00|
|  2|  Foo|    5|2025-01-04 12:05:00|
|  3|  Bar|   10|2025-01-04 12:10:00|
+---+-----+-----+-------------------+

+-----+---+------+---------+
|count| id|  name|       op|
+-----+---+------+---------+
|    1|  1| Pooja|increment|
|    1|  2|   Foo|increment|
| NULL|  3|   Bar|   delete|
|    1|  4|Funion|   insert|
+-----+---+------+---------+



                                                                                

Data merged into Iceberg table using MERGE INTO
+---+------+-----+--------------------+
| id|  name|count|                  ts|
+---+------+-----+--------------------+
|  1| Pooja|    0| 2025-01-04 12:00:00|
|  2|   Foo|    6| 2025-01-04 12:05:00|
|  4|Funion|    1|2025-01-04 20:50:...|
+---+------+-----+--------------------+



Partitioning with Spark

In [None]:
# Step 1: Create the "partitioning_example" namespace in Nessie
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.partitioning_example")
print("Namespace 'partitioning_example' created in Nessie")

# Step 2: Create an Iceberg table with initial partitioning on the 'name' column
spark.sql("""
    CREATE TABLE nessie.partitioning_example.example_table (
        id INT,
        name STRING,
        count INT,
        ts TIMESTAMP
    )
    USING iceberg
    PARTITIONED BY (name)
""")
print("Iceberg table created with initial partitioning on 'name'")

# Step 3: Insert initial data into the Iceberg table
spark.sql("""
    INSERT INTO nessie.partitioning_example.example_table VALUES
    (1, 'Pooja', NULL, TIMESTAMP('2024-07-02T12:00:00')),
    (2, 'Foo', 5, TIMESTAMP('2024-07-02T12:05:00')),
    (3, 'Bar', 10, TIMESTAMP('2024-07-02T12:10:00'))
""")
print("Initial data inserted into Iceberg table")

# Step 4: Update the partitioning of the Iceberg table to include the 'ts' column
spark.sql("""
    ALTER TABLE nessie.partitioning_example.example_table
    ADD PARTITION FIELD bucket(4, ts) AS ts_bucket
""")
print("Iceberg table partitioning updated to include 'ts' column")

# Step 5: Insert additional data into the Iceberg table
spark.sql("""
    INSERT INTO nessie.partitioning_example.example_table VALUES
    (4, 'Funion', 15, TIMESTAMP('2024-07-02T12:15:00')),
    (5, 'Minion', 20, TIMESTAMP('2024-07-02T12:20:00'))
""")
print("Additional data inserted into Iceberg table")

spark.sql("SELECT * FROM nessie.partitioning_example.example_table;").show()