In [3]:
import pyspark
from pyspark.sql import SparkSession
import os

## DEFINE SENSITIVE VARIABLES
NESSIE_URI = os.environ.get("NESSIE_URI") ## Nessie Server URI
WAREHOUSE = os.environ.get("WAREHOUSE") ## BUCKET TO WRITE DATA TOO
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY") ## AWS CREDENTIALS
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY") ## AWS CREDENTIALS
AWS_S3_ENDPOINT= os.environ.get("AWS_S3_ENDPOINT") ## MINIO ENDPOINT

conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.3_2.12:0.67.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', NESSIE_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.nessie.s3.endpoint', AWS_S3_ENDPOINT)
        .set('spark.sql.catalog.nessie.warehouse', WAREHOUSE)
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY)
)


## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")


## Test Run a Query
spark.sql("CREATE TABLE IF NOT EXISTS nessie.test1 (name string) USING iceberg;").show()
spark.sql("INSERT INTO nessie.test1 VALUES ('test');").show()
spark.sql("SELECT * FROM nessie.test1;").show()

:: loading settings :: url = jar:file:/home/docker/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/docker/.ivy2/cache
The jars for the packages stored in: /home/docker/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.3_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.3_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f0601cfb-cc41-4b45-b811-bb4bdf690974;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.3.1 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.3_2.12;0.67.0 in central
	found software.amazon.awssdk#bundle;2.17.178 in central
	found software.amazon.eventstream#eventstream;1.0.1 in central
	found software.amazon.awssdk#url-connection-client;2.17.178 in central
	found software.amazon.awssdk#utils;2.17.178 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central

24/02/05 05:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Spark Running


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


++
||
++
++



                                                                                

++
||
++
++

+----+
|name|
+----+
|test|
+----+



In [7]:
## LOAD A CSV INTO AN SQL VIEW
csv_df = spark.read.format("csv").option("header", "true").load("../datasets/sales_data_sample.csv")
csv_df.createOrReplaceTempView("sales_data")

## CREATE AN ICEBERG TABLE FROM THE SQL VIEW
spark.sql("CREATE TABLE IF NOT EXISTS nessie.sales USING iceberg AS SELECT * FROM sales_data;").show()

## QUERY THE ICEBERG TABLE
spark.sql("SELECT * FROM nessie.sales limit 10;").show()

++
||
++
++

+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+-----+----------+-------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|           PHONE|        ADDRESSLINE1|ADDRESSLINE2|         CITY|STATE|POSTALCODE|COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+-----+----------+-------+---------+---------------+----------------+--------+
|      10107|             30|     95.7|              2|   2871| 2/24/2003 0

In [8]:
# Demonstration of zero copy experimentation using nessie 

## QUERY THE COUNT OF ENTRIES
spark.sql("SELECT Count(*) as Total FROM nessie.sales").show()

## CREATE A BRANCH WITH NESSIE
spark.sql("CREATE BRANCH IF NOT EXISTS demo IN nessie")

## SWTICH TO THE NEW BRANCH
spark.sql("USE REFERENCE demo IN nessie")

+-----+
|Total|
+-----+
| 2823|
+-----+



DataFrame[refType: string, name: string, hash: string]

In [9]:
## DELETE ALL RECORDS WHERE countryOfOriginCode = 'FR'
spark.sql("DELETE FROM nessie.sales WHERE COUNTRY = 'France'")

## QUERY THE COUNT OF ENTRIES
spark.sql("SELECT Count(*) as Total FROM nessie.sales").show()

24/02/05 05:48:41 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

+-----+
|Total|
+-----+
| 2509|
+-----+



In [10]:
## SWITCH BACK TO MAIN BRANCH
spark.sql("USE REFERENCE main IN nessie")

## QUERY THE COUNT OF ENTRIES
spark.sql("SELECT Count(*) as Total FROM nessie.sales").show()

+-----+
|Total|
+-----+
| 2823|
+-----+



In [11]:
## MERGE THE CHANGES
spark.sql("MERGE BRANCH demo INTO main IN nessie")

## QUERY THE COUNT OF ENTRIES
spark.sql("SELECT Count(*) as Total FROM nessie.sales").show()

+-----+
|Total|
+-----+
| 2823|
+-----+



In [17]:
# Performing update

spark.sql("UPDATE nessie.sales SET PRODUCTLINE = 'Books' where COUNTRY = 'Norway'")

                                                                                

DataFrame[]

In [19]:
## QUERY THE ICEBERG TABLE
spark.sql("SELECT * FROM nessie.sales where COUNTRY ='Norway' limit 10;").show()

+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+------------------+-------------+--------------------+------------+-------+-----+----------+-------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|      CUSTOMERNAME|        PHONE|        ADDRESSLINE1|ADDRESSLINE2|   CITY|STATE|POSTALCODE|COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+------------------+-------------+--------------------+------------+-------+-----+----------+-------+---------+---------------+----------------+--------+
|      10188|             48|      100|              1|5512.32|11/18/2003 0:00|Shipped|     4|      11|   2003|      Book

In [26]:
spark.sql("SELECT * FROM nessie.sales.history").show()

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2024-02-05 05:48:...|2863199929315987761|               null|               true|
|2024-02-05 05:48:...| 319468681545983666|2863199929315987761|               true|
|2024-02-05 06:07:...|8967947541697229411| 319468681545983666|               true|
|2024-02-05 06:10:...|3709300837485886423|8967947541697229411|               true|
+--------------------+-------------------+-------------------+-------------------+



In [27]:
spark.sql("SELECT * FROM nessie.sales.snapshots").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2024-02-05 05:48:...|2863199929315987761|               null|   append|s3a://warehouse//...|{spark.app.id -> ...|
|2024-02-05 05:48:...| 319468681545983666|2863199929315987761|overwrite|s3a://warehouse//...|{spark.app.id -> ...|
|2024-02-05 06:07:...|8967947541697229411| 319468681545983666|overwrite|s3a://warehouse//...|{spark.app.id -> ...|
|2024-02-05 06:10:...|3709300837485886423|8967947541697229411|overwrite|s3a://warehouse//...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [28]:
spark.sql("SELECT * FROM nessie.sales.files").show()

+-------+--------------------+-----------+-------+------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+
|content|           file_path|file_format|spec_id|record_count|file_size_in_bytes|        column_sizes|        value_counts|   null_value_counts|nan_value_counts|        lower_bounds|        upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id|    readable_metrics|
+-------+--------------------+-----------+-------+------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+
|      0|s3a://warehouse//...|    PARQUET|      0|        2509|             55849|{1 -> 3196, 2 -> ...|{1 -> 2509, 2 -> ...|{1 -> 0, 2 -> 0, ...|              {

In [35]:
spark.sql("SELECT * FROM nessie.sales TIMESTAMP AS OF '2024-02-05 06:10' ").show()

+-----------+---------------+---------+---------------+-------+---------------+--------+------+--------+-------+-----------+----+-----------+--------------------+---------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE|  STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|          PHONE|        ADDRESSLINE1|ADDRESSLINE2|         CITY|   STATE|POSTALCODE|  COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-------+---------------+--------+------+--------+-------+-----------+----+-----------+--------------------+---------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+--------+
|      10107|             30|     95.7|              2|   2871| 2/24/2003