<img src="https://projectnessie.org/img/nessie.svg" alt="lakeFS logo" width=100/> <img src="https://www.apache.org/logos/res/iceberg/iceberg.png" alt="Apache Iceberg logo" width=300/>  

## ProjectNessie ❤️ Apache Iceberg - an example using NYC Film Permits dataset

# Config

**_If you're not using the provided Nessie server and MinIO storage then change these values to match your environment_**

### Nessie endpoint and credentials

In [123]:
nessieEndpoint = 'http://nessie:19120' # e.g. 'https://username.aws_region_name.lakefscloud.io' 
minioEndpoint = 'http://172.20.0.7:9000'  # maybe have to replace by the ip address
nessieWarehouse = 's3://nessie-demo-bucket/'
accessKey = 'V42FCGRVMK24JJ8DHUYG'
secretKey = 'bKhWxVF3kQoLY9kFmt91l+tDrEoZjqnWXzY9Eza'

In [2]:
storageNamespace = 's3://nessie-demo-bucket' # e.g. "s3://bucket"

### Verifying Nessie with basic curl request

This command queries the list of available "trees" (branches or tags) in the Nessie catalog. If Nessie is running properly, you should receive a JSON response that includes information about the default branch, typically called main.

In [3]:
!curl -X GET http://nessie:19120/api/v2/trees

{
  "token" : null,
  "references" : [ {
    "type" : "BRANCH",
    "name" : "main",
    "hash" : "8a0875cb7a3a0d72cd474d0a3727a5383ca852c765303b44c21871a44b49aee2"
  } ],
  "hasMore" : false
}

In [4]:
!pip install pyspark==3.5.3

Collecting pyspark==3.5.3
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m133.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark==3.5.3)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=1d810c7a4c6084136bd170010f1bfe2511cbd82205e6f45092f604ec406aa2c1
  Stored in directory: /home/jovyan/.cache/pip/wheels/97/f5/c0/947e2c0942b361ffe58651f36bd7f13772675b3863fd63d1b1
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.3


# Setup

**(you shouldn't need to change anything in this section, just run it)**

### Versioning Information

In [6]:
mainBranch = "main"
devBranch = "dev"

In [7]:
### Import libraries

In [8]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
import os

### Set environment variables

In [9]:
os.environ["CATALOG_URI"] = nessieEndpoint
os.environ["WAREHOUSE"] = nessieWarehouse
os.environ["STORAGE_URI"] = minioEndpoint

### Set up Spark

In [10]:
# Configure Spark with necessary packages and Iceberg/Nessie settings
conf = (
    pyspark.SparkConf()
        .setAppName('Iceberg & Nessie / Jupyter')
        # Include necessary packages
        .set('spark.jars.packages', 'org.postgresql:postgresql:42.7.3,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.77.1,software.amazon.awssdk:bundle:2.24.8,software.amazon.awssdk:url-connection-client:2.24.8')
        # Enable Iceberg and Nessie extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
        # Configure Nessie catalog
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', nessieEndpoint + '/api/v1')
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        # Set Minio as the S3 endpoint for Iceberg storage
        .set('spark.sql.catalog.nessie.s3.endpoint', minioEndpoint)
        .set('spark.sql.catalog.nessie.warehouse', nessieWarehouse)
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)
# Start Spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Session Started")

spark

Spark Session Started


---
---

# Main demo starts here 🚦 👇🏻

# Load some Data

For this demo, we will use the Flight dataset with the Airports data.

We'll save the sample dataset into an Iceberg table called `airports`, using Nessie for the catalog.

In [11]:
df = spark.read.option("inferSchema","true").option("multiline","true").json("/data-transfer/flight-data/airports.json")

In [28]:
df.printSchema()

root
 |-- airport: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- iata: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- state: string (nullable = true)



In [29]:
df.show()

+--------------------+------------------+-------+----+-----------+------------+-----+
|             airport|              city|country|iata|        lat|        long|state|
+--------------------+------------------+-------+----+-----------+------------+-----+
|            Thigpen |       Bay Springs|    USA| 00M|31.95376472|-89.23450472|   MS|
|Livingston Municipal|        Livingston|    USA| 00R|30.68586111|-95.01792778|   TX|
|         Meadow Lake|  Colorado Springs|    USA| 00V|38.94574889|-104.5698933|   CO|
|        Perry-Warsaw|             Perry|    USA| 01G|42.74134667|-78.05208056|   NY|
|    Hilliard Airpark|          Hilliard|    USA| 01J| 30.6880125|-81.90594389|   FL|
|   Tishomingo County|           Belmont|    USA| 01M|34.49166667|-88.20111111|   MS|
|         Gragg-Wade |           Clanton|    USA| 02A|32.85048667|-86.61145333|   AL|
|             Capitol|        Brookfield|    USA| 02C|   43.08751|-88.17786917|   WI|
|   Columbiana County|    East Liverpool|    USA| 02G|

In [14]:
%load_ext sql

In [15]:
%sql spark

In [27]:
%%sql
CREATE NAMESPACE IF NOT EXISTS nessie.flights;

In [30]:
%%sql
DROP TABLE nessie.flights.airports;

In [31]:
df.write.saveAsTable("nessie.flights.airports")

Taking a quick peek at the data, you can see that there are airports located in the USA.

In [39]:
%%sql
SELECT
  *
FROM
  nessie.flights.airports;

Field 1,Field 2,Field 3,Field 4,Field 5,Field 6,Field 7
Thigpen,Bay Springs,USA,00M,31.95376472,-89.23450472,MS
Livingston Municipal,Livingston,USA,00R,30.68586111,-95.01792778,TX


# Verifying Nessie

For the NessieUI, navigate to <http://dataplatform:19120>

In [46]:
!curl -X GET "http://nessie:19120/api/v2/trees"

{
  "token" : null,
  "references" : [ {
    "type" : "BRANCH",
    "name" : "main",
    "hash" : "a92d4ec7cbf2b0edb8bf9409980ff6f2af2b07cacdd5f0c7b6d5837947d75b0b"
  } ],
  "hasMore" : false
}

In [64]:
!curl -X GET "http://nessie:19120/api/v2/trees/main/contents/nessie.flights"

{
  "status" : 404,
  "reason" : "Not Found",
  "message" : "Could not find content for key 'nessie.flights' in reference 'main'.",
  "errorCode" : "CONTENT_NOT_FOUND",
  "serverStackTrace" : null
}

In [66]:
%%sql
LIST REFERENCES IN nessie

Field 1,Field 2,Field 3
Branch,main,a92d4ec7cbf2b0edb8bf9409980ff6f2af2b07cacdd5f0c7b6d5837947d75b0b


In [54]:
spark.sql("LIST REFERENCES IN nessie").toPandas()

Unnamed: 0,refType,name,hash
0,Branch,main,a92d4ec7cbf2b0edb8bf9409980ff6f2af2b07cacdd5f0...


In [121]:
spark.sql("SHOW TABLES IN nessie").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,flights,agg_airpors_per_state,False
1,flights,airports,False


# Verifying Iceberg Data and Metadata in Minio 

Access the Minio UI on <http://dataplatform:9010> and login.

Once logged in, locate the bucket where the Iceberg table is stored (e.g., the `nessie-demo-bucket` bucket).
                                                                     
Inside the bucket, you will see a directory structure that represents the Iceberg table. The structure typically includes:

* Data Files: These are the physical Parquet files containing the actual data for the table.
* Metadata Files: These are JSON files that track the state and evolution of the table, including schema changes, partitions, snapshots, and more.

Examine the Iceberg Metadata files

In [127]:
import boto3
import json

# Define Minio connection parameters
minio_client = boto3.client(
    's3',
    endpoint_url='http://minio-1:9000',  # Minio IP address from docker inspect
    aws_access_key_id=accessKey,
    aws_secret_access_key=secretKey,
    region_name='us-east-1'
)

# Specify the bucket and metadata file path
bucket_name = 'nessie-demo-bucket'
metadata_file_key = 'flights/airports_610e4b2a-bc43-447c-b43d-c20c087afe41/metadata/00000-04688b6e-190e-45c1-be1a-9bd4a6af49fa.metadata.json'  # Example metadata path

# Download the metadata file
metadata_file = minio_client.get_object(Bucket=bucket_name, Key=metadata_file_key)
metadata_content = metadata_file['Body'].read().decode('utf-8')

# Parse and print the metadata content
metadata_json = json.loads(metadata_content)
print(json.dumps(metadata_json, indent=4))


{
    "format-version": 2,
    "table-uuid": "55badf73-a9b5-4ddb-9074-e9e2634ab807",
    "location": "s3://nessie-demo-bucket/flights/airports_610e4b2a-bc43-447c-b43d-c20c087afe41",
    "last-sequence-number": 1,
    "last-updated-ms": 1731957518327,
    "last-column-id": 7,
    "current-schema-id": 0,
    "schemas": [
        {
            "type": "struct",
            "schema-id": 0,
            "fields": [
                {
                    "id": 1,
                    "name": "airport",
                    "required": false,
                    "type": "string"
                },
                {
                    "id": 2,
                    "name": "city",
                    "required": false,
                    "type": "string"
                },
                {
                    "id": 3,
                    "name": "country",
                    "required": false,
                    "type": "string"
                },
                {
                    "id": 4,


The metadata JSON file contains important information about the table, including:

* **Schema:** Defines the structure of the table (columns, types, etc.).
* **Snapshots:** Lists all snapshots of the table, which track historical versions of the data.
* **Partition Information:** Details about how the table is partitioned, if applicable. By examining this metadata, you can gain insight into how Apache Iceberg tracks the state of the table, manages schema evolution, and supports features like time travel and partitioning.


# Create a new branch

_This is copy-on-write; we're not duplicating the data_

In [72]:
# Create the 'dev' branch from 'main' branch
spark.sql("CREATE BRANCH dev IN nessie FROM main").toPandas()

Unnamed: 0,refType,name,hash
0,Branch,dev,a92d4ec7cbf2b0edb8bf9409980ff6f2af2b07cacdd5f0...


### Confirm that we can see the data on the `dev` branch

In [87]:
spark.sql("USE REFERENCE dev IN nessie")

DataFrame[refType: string, name: string, hash: string]

In [88]:
%%sql
SELECT
  COUNT(*)
FROM
  nessie.flights.airports;

Field 1
3376


# Making [and reverting] changes on the dev branch

Let's go big! Let's see what happens when we delete the contents of the table with a careless `DELETE` omitting an all-important predicate

In [75]:
%sql DELETE FROM nessie.flights.airports

How's that data looking now?

In [89]:
%%sql
SELECT
  COUNT(*)
FROM
  nessie.flights.airports;

Field 1
3376


But `main` is safe and not touched 😌

In [78]:
spark.sql("USE REFERENCE main IN nessie")

DataFrame[refType: string, name: string, hash: string]

In [79]:
%%sql
SELECT
  COUNT(*)
FROM
  nessie.flights.airports;

Field 1
3376


## Reverting changes to the `dev` branch

## Install the nessie-cli into the Jupyter container

We can install the `nessie-cli` needed to revert changes into the Jupyter container. Unfortunately it does not work properly when called from Jupyter. Therefore we have to do it from a terminal on the docker machine and in that case we better use the `nessie-cli` running as another docker container as part of the stack.

In [290]:
!wget https://github.com/projectnessie/nessie/releases/download/nessie-0.100.0/nessie-cli-0.100.0.jar -O nessie-cli.jar && echo "java -jar nessie-cli.jar -q -u http://nessie:19120/api/v2 \"\$@\"" > nessie.sh && chmod +x nessie.sh

--2024-11-19 11:10:57--  https://github.com/projectnessie/nessie/releases/download/nessie-0.100.0/nessie-cli-0.100.0.jar
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/254450741/864708a6-45f8-4432-b7a0-b0023c0082db?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20241119%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20241119T111057Z&X-Amz-Expires=300&X-Amz-Signature=2ec3ec6442b8a6386875197b8e198007d4667d94e5f57c03d57d70f55eb0ec58&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3Dnessie-cli-0.100.0.jar&response-content-type=application%2Foctet-stream [following]
--2024-11-19 11:10:57--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/254450741/864708a6-45f8-4432-b7a0-b0023c0082db?X-Amz-Algorithm=AW

### Reset the branch

In a terminal on the docker machine, attach to the `nessie-cli`:

```
docker attach nessie-cli
```

you should automatically be connected to the Nessie server. 

```
ubuntu@ip-172-26-6-68:~/platys-nessie$ docker attach nessie-cli
Successfully connected to Iceberg REST at http://nessie:19120/iceberg/
Connecting to Nessie REST at http://nessie:19120/api/v2/ ...
Successfully connected to Nessie REST at http://nessie:19120/api/v2/ - Nessie API version 2, spec version 2.2.0
main>
```

Let's list all the named references

```sql
LIST REFERENCES
```

Now let's see the Nessie commit log on the dev branch

```sql
SHOW LOG ON dev
```

```
main> SHOW LOG ON dev
commit 197726500f10072ca97f6c6f4dced160bcffeacf3acad812e15e90bc7a977a94
Author:  jovyan
Date:    Nov 18, 2024, 8:10:23 PM UTC (committed: 2024-11-18T20:10:23.319845054Z)
Parents: a92d4ec7cbf2b0edb8bf9409980ff6f2af2b07cacdd5f0c7b6d5837947d75b0b

    Iceberg delete against flights.airports

commit a92d4ec7cbf2b0edb8bf9409980ff6f2af2b07cacdd5f0c7b6d5837947d75b0b
Author:  jovyan
Date:    Nov 18, 2024, 7:28:05 PM UTC (committed: 2024-11-18T19:28:05.247013319Z)
Parents: 5bd6bf11aa5b1e347477e464e11d56fb60465912df3305adf0d17b1006eb8304

    Iceberg append against flights.airports

commit 5bd6bf11aa5b1e347477e464e11d56fb60465912df3305adf0d17b1006eb8304
Author:  jovyan
Date:    Nov 18, 2024, 7:27:59 PM UTC (committed: 2024-11-18T19:27:59.571172865Z)
Parents: c0b526f2ae512fc769847cffcd9a8b64d2852a965fbce63fc2d0b4209981cf9a

    Iceberg delete table flights.airports

commit c0b526f2ae512fc769847cffcd9a8b64d2852a965fbce63fc2d0b4209981cf9a
Author:  jovyan
Date:    Nov 18, 2024, 7:18:38 PM UTC (committed: 2024-11-18T19:18:38.761418322Z)
Parents: 8a0875cb7a3a0d72cd474d0a3727a5383ca852c765303b44c21871a44b49aee2

    Iceberg append against flights.airports

commit 8a0875cb7a3a0d72cd474d0a3727a5383ca852c765303b44c21871a44b49aee2
Author:  jovyan
Date:    Nov 18, 2024, 6:51:38 PM UTC (committed: 2024-11-18T18:51:38.893701908Z)
Parents: 2e1cfa82b035c26cbbbdae632cea070514eb8b773f616aaeaf668e2f0be8f10d

    create namespace flights

main> list content on 197726500f10072ca97f6c6f4dced160bcffeacf3acad812e15e90bc7a977a94
Encountered an error parsing the statement around line 1, column 6 .. line 1 column 12

Found: content
Expected one of the following: CONTENTS , REFERENCES

list content on 197726500f10072ca97f6c6f4dced160bcffeacf3acad812e15e90bc7a977a94
```

Lists all tables, views and namespaces in the `dev` branch for the commit reference `197726500f10072ca97f6c6f4dced160bcffeacf3acad812e15e90bc7a977a94` (the latest commit in the log above)

```
LIST CONTENTS ON dev AT 197726500f10072ca97f6c6f4dced160bcffeacf3acad812e15e90bc7a977a94
```

```
    NAMESPACE flights
  ICEBERG_TABLE flights.airports
```

Revert the object back to the previous commit point (`a92d4ec7cbf2b0edb8bf9409980ff6f2af2b07cacdd5f0c7b6d5837947d75b0b`)

```
REVERT CONTENT OF flights.airports ON dev TO STATE ON dev AT a92d4ec7cbf2b0edb8bf9409980ff6f2af2b07cacdd5f0c7b6d5837947d75b0b
```

## Our data is back!

In [90]:
spark.sql("USE REFERENCE dev IN nessie")

DataFrame[refType: string, name: string, hash: string]

In [91]:
%%sql
SELECT
  COUNT(*)
FROM
  nessie.flights.airports;

Field 1
3376


# Making changes to the `dev` branch as a collection

## Delete all rows for state in `TX` from the airports table

In [94]:
%sql DELETE FROM nessie.flights.airports WHERE state='TX'

In [95]:
%%sql
SELECT
  COUNT(*)
FROM
  nessie.flights.airports;

Field 1
3167


## Build an aggregate table of the data to show how many airports there are per state

In [96]:
%%sql
CREATE
OR REPLACE TABLE nessie.flights.agg_airpors_per_state AS
SELECT
  state,
  COUNT(*) airports_cnt
FROM
  nessie.flights.airports
GROUP BY
  state;

In [97]:
%sql SELECT * FROM nessie.flights.agg_airpors_per_state LIMIT 5;

Field 1,Field 2
AZ,59
SC,52
LA,55
MN,89
NJ,35


# Compare `main` and `dev`

In [98]:
spark.sql("USE REFERENCE dev IN nessie")

DataFrame[refType: string, name: string, hash: string]

In [99]:
%%sql
SELECT
  COUNT(*)
FROM
  nessie.flights.airports;

Field 1
3167


In [100]:
spark.sql("USE REFERENCE main IN nessie")

DataFrame[refType: string, name: string, hash: string]

In [101]:
%%sql
SELECT
  COUNT(*)
FROM
  nessie.flights.airports;

Field 1
3376


# Commit the changes to the `dev` branch

In [105]:
spark.sql("MERGE BRANCH dev INTO main IN nessie").toPandas()

Unnamed: 0,name,hash
0,main,80dd3c5fc1ae87c02a3265db87f389720d9f4bbf1232cb...


In [106]:
spark.sql("USE REFERENCE main IN nessie")

DataFrame[refType: string, name: string, hash: string]

In [107]:
%%sql
SELECT
  COUNT(*)
FROM
  nessie.flights.airports;

Field 1
3167


# Drop the branch

In [109]:
spark.sql("DROP BRANCH dev IN nessie")

DataFrame[status: string]