<img src="https://store-images.s-microsoft.com/image/apps.22094.728e1f25-a784-458f-90e1-7729049edba2.144bf785-b784-41dd-bcef-c91792108c09.f0be1bc2-af8f-49fc-ac4c-dfd9d53d9e8d" alt="lakeFS logo" width=130/> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <img src="https://www.apache.org/logos/res/iceberg/iceberg.png" alt="Apache Iceberg logo" width=300/>  

## lakeFS ❤️ Apache Iceberg - an example using Airports dataset

# Config

**_If you're not using the provided lakeFS server and MinIO storage then change these values to match your environment_**

### lakeFS endpoint and credentials

In [1]:
lakefsEndPoint = 'http://lakefs:8000' # e.g. 'https://username.aws_region_name.lakefscloud.io' 
lakefsAccessKey = 'V42FCGRVMK24JJ8DHUYG'
lakefsSecretKey = 'bKhWxVF3kQoLY9kFmt91l+tDrEoZjqnWXzY9Eza'

### Object Storage

In [2]:
storageNamespace = 's3://lakefs-demo-bucket' # e.g. "s3://bucket"

---

# Setup

**(you shouldn't need to change anything in this section, just run it)**

In [3]:
repo_name = "demo"

### Versioning Information

In [4]:
mainBranch = "main"
devBranch = "dev"

### Some helper functions

In [5]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import os

def print_diff(diff):
    results = map(
        lambda n:[n.path,n.path_type,n.size_bytes,n.type],
        diff)

    from tabulate import tabulate
    print(tabulate(
        results,
        headers=['Path','Path Type','Size(Bytes)','Type']))

def print_commit(log):
    from datetime import datetime
    from pprint import pprint

    print('Message:', log.message)
    print('ID:', log.id)
    print('Committer:', log.committer)
    print('Creation Date:', datetime.utcfromtimestamp(log.creation_date).strftime('%Y-%m-%d %H:%M:%S'))
    print('Parents:', log.parents)
    print('Metadata:')
    pprint(log.metadata)

def lakefs_ui_endpoint(lakefsEndPoint):
    if lakefsEndPoint.startswith('http://host.docker.internal'):
        lakefsUIEndPoint = lakefsEndPoint.replace('host.docker.internal','127.0.0.1')
    elif lakefsEndPoint.startswith('http://lakefs'):
        lakefsUIEndPoint = lakefsEndPoint.replace('lakefs','127.0.0.1')
    else:
        lakefsUIEndPoint = lakefsEndPoint
        
    return lakefsUIEndPoint

### Import libraries

In [6]:
import os
import lakefs

### Set environment variables

In [7]:
os.environ["LAKECTL_SERVER_ENDPOINT_URL"] = lakefsEndPoint
os.environ["LAKECTL_CREDENTIALS_ACCESS_KEY_ID"] = lakefsAccessKey
os.environ["LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY"] = lakefsSecretKey

#### Verify lakeFS credentials by getting lakeFS version

In [8]:
print("Verifying lakeFS credentials…")
try:
    v=lakefs.client.Client().version
except:
    print("🛑 failed to get lakeFS version")
else:
    print(f"…✅lakeFS credentials verified\n\nℹ️lakeFS version {v}")

Verifying lakeFS credentials…
…✅lakeFS credentials verified

ℹ️lakeFS version 1.42.0


### Define lakeFS Repository

In [9]:
repo = lakefs.Repository(repo_name).create(storage_namespace=f"{storageNamespace}/{repo_name}", default_branch=mainBranch, exist_ok=True)
branchMain = repo.branch(mainBranch)
print(repo)

{'id': 'demo', 'creation_date': 1732040198, 'default_branch': 'main', 'storage_namespace': 's3://lakefs-demo-bucket/demo'}


### Set up Spark

In [10]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Iceberg&LakeFS / Jupyter") \
        .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,io.lakefs:lakefs-iceberg:0.1.4,io.lakefs:lakefs-spark-extensions_2.12:0.0.3") \
        .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.endpoint", lakefsEndPoint) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.access.key", lakefsAccessKey) \
        .config("spark.hadoop.fs.s3a.secret.key", lakefsSecretKey) \
        .config("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog") \
        .config("spark.sql.catalog.lakefs.warehouse", f"lakefs://{repo_name}") \
        .config("spark.sql.catalog.lakefs.uri", lakefsEndPoint) \
        .config("spark.sql.catalog.lakefs.cache-enabled", "false") \
        .config("spark.sql.defaultCatalog", "lakefs") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.lakefs.iceberg.extension.LakeFSSparkSessionExtensions") \
        .getOrCreate()
spark.sparkContext.setLogLevel("INFO")

spark

---

---

# Main demo starts here 🚦 👇🏻

# Load some Data

For this demo, we will use [Airports Open Data](https://ourairports.com/data/) with the Airports dataset.

We'll save the sample dataset into an Iceberg table called `airports`, using LakeFS for the catalog.

In [11]:
df = spark.read.option("inferSchema","true").option("multiline","false").json("/data-transfer/airports-data/airports.json")

In [12]:
df.show(20)

+---------+------------+--------+--------------------+---------+------+-----+-----------+----------+--------+------------------+----------+-------------------+------------+--------------------+-----------------+-------------+--------------------+
|continent|elevation_ft|gps_code|           home_link|iata_code|    id|ident|iso_country|iso_region|keywords|      latitude_deg|local_code|      longitude_deg|municipality|                name|scheduled_service|         type|      wikipedia_link|
+---------+------------+--------+--------------------+---------+------+-----+-----------+----------+--------+------------------+----------+-------------------+------------+--------------------+-----------------+-------------+--------------------+
|       NA|          11|    K00A|https://www.pennd...|     NULL|  6523|  00A|         US|     US-PA|    NULL|         40.070985|       00A|         -74.933689|    Bensalem|   Total RF Heliport|               no|     heliport|                NULL|
|       NA| 

In [13]:
df.write.mode("overwrite").saveAsTable("lakefs.main.flights.airports")

In [14]:
from IPython.display import Markdown as md

if lakefsEndPoint=='http://lakefs:8000':
    lakeFSWebUI='http://dataplatform:28220'
else:
    lakeFSWebUI=lakefsEndPoint

md(f"#### 👉🏻 Optionally, go and view the objects in [lakeFS web UI]({lakeFSWebUI}/repositories/{repo_name}/objects?ref=main&path=nyc%2Fpermits%2F)")

#### 👉🏻 Optionally, go and view the objects in [lakeFS web UI](http://dataplatform:28220/repositories/demo/objects?ref=main&path=nyc%2Fpermits%2F)

Taking a quick peek at the data, we can see some of the airports and their columns and that here is a total of 81'193 airports in the table.

In [15]:
%load_ext sql

In [16]:
%sql spark

In [17]:
%%sql
SELECT
  *
FROM
  lakefs.main.flights.airports;

Field 1,Field 2,Field 3,Field 4,Field 5,Field 6,Field 7,Field 8,Field 9,Field 10,Field 11,Field 12,Field 13,Field 14,Field 15,Field 16,Field 17,Field 18
,11,K00A,https://www.penndot.pa.gov/TravelInPA/airports-pa/Pages/Total-RF-Heliport.aspx,,6523,00A,US,US-PA,,40.070985,00A,-74.933689,Bensalem,Total RF Heliport,no,heliport,
,3435,00AA,,,323361,00AA,US,US-KS,,38.704022,00AA,-101.473911,Leoti,Aero B Ranch Airport,no,small_airport,


# Verifying Iceberg Data and Metadata in Minio 

Access the Minio UI on <http://dataplatform:9010> and login.

Once logged in, locate the bucket where the Iceberg table is stored (e.g., the `nessie-demo-bucket` bucket).
                                                                     
Inside the bucket, you will see a directory structure that represents the Iceberg table. The structure typically includes:

* Data Files: These are the physical Parquet files containing the actual data for the table.
* Metadata Files: These are JSON files that track the state and evolution of the table, including schema changes, partitions, snapshots, and more.

Let's Examine the Iceberg Metadata files

In [18]:
import boto3
import json

# Define Minio connection parameters
minio_client = boto3.client(
    's3',
    endpoint_url=lakefsEndPoint, 
    aws_access_key_id=lakefsAccessKey,
    aws_secret_access_key=lakefsSecretKey,
    region_name='us-east-1'
)

# Specify the bucket (actually repo name, because we use LakeFS) and metadata file path
repo_name = 'demo'
metadata_file_key = 'main/flights/airports/metadata/v1.metadata.json'  # Example metadata path

# Download the metadata file
metadata_file = minio_client.get_object(Bucket=repo_name, Key=metadata_file_key)
metadata_content = metadata_file['Body'].read().decode('utf-8')

# Parse and print the metadata content
metadata_json = json.loads(metadata_content)
print(json.dumps(metadata_json, indent=4))


{
    "format-version": 2,
    "table-uuid": "6c738efd-07d6-456a-ba66-e3989674837b",
    "location": "flights/airports",
    "last-sequence-number": 1,
    "last-updated-ms": 1732042561425,
    "last-column-id": 18,
    "current-schema-id": 0,
    "schemas": [
        {
            "type": "struct",
            "schema-id": 0,
            "fields": [
                {
                    "id": 1,
                    "name": "continent",
                    "required": false,
                    "type": "string"
                },
                {
                    "id": 2,
                    "name": "elevation_ft",
                    "required": false,
                    "type": "long"
                },
                {
                    "id": 3,
                    "name": "gps_code",
                    "required": false,
                    "type": "string"
                },
                {
                    "id": 4,
                    "name": "home_link",
          

The metadata JSON file contains important information about the table, including:

* **Schema:** Defines the structure of the table (columns, types, etc.).
* **Snapshots:** Lists all snapshots of the table, which track historical versions of the data.
* **Partition Information:** Details about how the table is partitioned, if applicable. By examining this metadata, you can gain insight into how Apache Iceberg tracks the state of the table, manages schema evolution, and supports features like time travel and partitioning.


### Commit the new table and its data

In [19]:
ref = branchMain.commit(
    message="Initial data load",
    metadata={'author': 'lakefs',
              'data source': 'https://ourairports.com/data/'})
print_commit(ref.get_commit())

Message: Initial data load
ID: a540fc17a2d4e7be8facf74469b692c50489a8ca17ed0f74f02663d4b71de29f
Committer: quickstart
Creation Date: 2024-11-19 18:56:04
Parents: ['65e8d77f883721f05f6e4741467f00ee9e3187c98368b44da1dcb95bae4f23ff']
Metadata:
{'author': 'lakefs', 'data source': 'https://ourairports.com/data/'}


# Create a new branch

_This is copy-on-write; we're not duplicating the data_

In [20]:
branchDev = repo.branch(devBranch).create(source_reference=mainBranch, exist_ok=True)
print(f"{devBranch} ref:", branchDev.get_commit().id)

dev ref: a540fc17a2d4e7be8facf74469b692c50489a8ca17ed0f74f02663d4b71de29f


### Confirm that we can see the data on the `dev` branch

In [21]:
%%sql
SELECT
  COUNT(*)
FROM
  lakefs.main.flights.airports;

Field 1
81193


# Making [and reverting] changes on the dev branch

Let's go big! Let's see what happens when we delete the contents of the table with a careless `DELETE` omitting an all-important predicate

In [21]:
%sql DELETE FROM lakefs.dev.flights.airports

How's that data looking now?

In [22]:
%%sql
SELECT
  COUNT(*)
FROM
  lakefs.dev.flights.airports;

Field 1
0


But `main` is safe and unsullied 😌

In [23]:
%%sql
SELECT
  COUNT(*)
FROM
  lakefs.main.flights.airports;

Field 1
81193


## Reverting changes to the `dev` branch

### Uncommitted objects:

In [26]:
print_diff(branchDev.uncommitted())

Path                                                                                            Path Type      Size(Bytes)  Type
----------------------------------------------------------------------------------------------  -----------  -------------  -------
flights/airports/metadata/6fd965fe-dc76-4e4e-b4b7-55abe8e05256-m0.avro                          object                9952  added
flights/airports/metadata/snap-4756451933891843082-1-6fd965fe-dc76-4e4e-b4b7-55abe8e05256.avro  object                4229  added
flights/airports/metadata/v2.metadata.json                                                      object                4554  added
flights/airports/metadata/version-hint.text                                                     object                   1  changed


### Reset the branch

In [27]:
branchDev.reset_changes(path_type='common_prefix', path="flights/airports/")

_This just resets the changes to the files for this table. To reset the whole branch use_:

```python
branchDev.reset_changes(path_type='reset')
```

### Uncommitted objects:

In [28]:
print_diff(branchDev.uncommitted())

Path    Path Type    Size(Bytes)    Type
------  -----------  -------------  ------


## Our data's back!

In [29]:
%%sql
SELECT
  COUNT(*)
FROM
  lakefs.dev.flights.airports;

Field 1
81193


# Making changes to the `dev` branch as a collection

## Delete all airports which are not in the US

In [30]:
%sql DELETE FROM lakefs.dev.flights.airports WHERE iso_country!='US'

## Build an aggregate of the data to show the numbers of US airports per region

In [31]:
%%sql
CREATE
OR REPLACE TABLE lakefs.dev.flights.agg_us_airports_per_region AS
SELECT
  iso_region,
  COUNT(*) airports_cnt
FROM
  lakefs.dev.flights.airports
GROUP BY
  iso_region;

In [33]:
%config SqlMagic.displaylimit = None
%sql SELECT * FROM lakefs.dev.flights.agg_us_airports_per_region LIMIT 10;

Field 1,Field 2
US-TN,440
US-OK,696
US-VT,122
US-SD,256
US-WA,742
US-IN,785
US-NY,796
US-AL,500
US-MS,442
US-MT,442


# Compare `main` and `dev`

## `dev`

In [34]:
%%sql
SELECT
  COUNT(*)
FROM
  lakefs.dev.flights.airports;

Field 1
31695


## `main`

In [35]:
%%sql
SELECT
  COUNT(*)
FROM
  lakefs.main.flights.airports;

Field 1
81193


## `Data diff`
refs_data_diff is an SQL table-valued function (TVF). The expression:
##### `refs_data_diff(PREFIX, FROM_SCHEMA, TO_SCHEMA, TABLE)`
yields a relation that compares the "from" table PREFIX.FROM_SCHEMA.TABLE with the "to" table PREFIX.TO_SCHEMA.TABLE. Its output is the difference: a relation (like a view) that adds a single column lakefs_change to the table schema.

* Rows that appear only in the first version of the table  (in the example, on branch main) appear in the difference with lakefs_change==’-’.
* Rows that appear only in the second version of the table  (in the example, on branch dev) appear in the difference with lakefs_change==’+’.
* Rows that appear in both versions of the table do not appear in the difference.

In [36]:
%%sql
SELECT
  *
FROM
  refs_data_diff ('lakefs', 'main', 'dev', 'flights.airports')
LIMIT
  5;

Field 1,Field 2,Field 3,Field 4,Field 5,Field 6,Field 7,Field 8,Field 9,Field 10,Field 11,Field 12,Field 13,Field 14,Field 15,Field 16,Field 17,Field 18,Field 19
-,SA,89,SBJE,https://aeroportodejericoacoara.com.br,JJD,323221,JJD,BR,BR-CE,"SSVV, Jericoacoara",-2.906425,CE0003,-40.357338,Cruz,Comandante Ariston Pessoa Airport,yes,medium_airport,https://en.wikipedia.org/wiki/Jericoacoara_airport
-,,972,CYLS,http://www.lakesimcoeairport.com/,YLK,1293,CYLS,CA,CA-ON,"NB9, CNB9, Oro Station",44.485056,CYLS,-79.554663,Barrie,Barrie-Lake Simcoe Regional Airport,yes,medium_airport,https://en.wikipedia.org/wiki/Barrie-Orillia_(Lake_Simcoe_Regional)_Airport
-,EU,40,LIPR,http://www.riminiairport.com/english/,RMI,4362,LIPR,IT,IT-45,"Rimini Miramare Airport, Rimini San Marino International Airport",44.020302,RN01,12.6117,Rimini (RN),Federico Fellini International Airport,yes,medium_airport,https://en.wikipedia.org/wiki/Federico_Fellini_International_Airport
-,EU,36,LIER,http://www.sogeaor.it,FNU,29424,LIER,IT,IT-88,QOS,39.895308,OR01,8.642661,Oristano,Oristano-Fenosu Airport,no,small_airport,http://it.wikipedia.org/wiki/Aeroporto_di_Oristano-Fenosu
-,EU,59,LIPH,http://www.trevisoairport.it/tsf/index.jsp?_requestid=677517&language=en,TSF,4355,LIPH,IT,IT-34,"Venice-Treviso, Treviso-Sant'Angelo",45.648399,TV01,12.1944,Treviso (TV),Treviso Antonio Canova Airport,yes,medium_airport,https://en.wikipedia.org/wiki/Treviso_Airport


In [40]:
%%sql
SELECT
  lakefs_change,
  iso_country,
  COUNT(*) AS airports_per_country
FROM
  refs_data_diff ('lakefs', 'main', 'dev', 'flights.airports')
GROUP BY
  lakefs_change,
  iso_country
LIMIT 20

Field 1,Field 2,Field 3
-,CD,306
-,KE,375
-,NI,44
-,GN,17
-,LU,15
-,ES,553
-,CM,40
-,TZ,211
-,HK,103
-,BA,24


# Partition the data in the `dev` branch

In [42]:
%%sql
CREATE TABLE
  lakefs.dev.flights.airports_partitioned USING iceberg PARTITIONED BY (iso_region) AS
SELECT
  *
FROM
  lakefs.dev.flights.airports
ORDER BY
  iso_region;

In [43]:
%%sql
SELECT
  iso_region,
  COUNT(*) airports_per_region
FROM
  lakefs.dev.flights.airports_partitioned
GROUP BY
  iso_region

Field 1,Field 2
US-WA,742
US-NV,416
US-AK,1065
US-U-A,1
US-SD,256
US-AL,500
US-NY,796
US-CT,168
US-NC,609
US-RI,44


# Commit the changes to the `dev` branch

In [44]:
ref = branchDev.commit(
    message="Remove data for non US airports from the Airports dataset, build region aggregate and add a new, partitioned Airport table",
    metadata={"etl job name": "etl_job_42",
              "author": "lakefs"})
print_commit(ref.get_commit())

Message: Remove data for non US airports from the Airports dataset, build region aggregate and add a new, partitioned Airport table
ID: 7b417a9658390e754ce78382d7857b453c479bfdb612f942273edfa85a97a2f3
Committer: quickstart
Creation Date: 2024-11-19 15:28:54
Parents: ['d3449abf9c42d80efe4b98f9a0eb7221c3e866fdfda8b15c91688892fd536373']
Metadata:
{'author': 'lakefs', 'etl job name': 'etl_job_42'}


## Are the changes already visible in main branch?

Now, as you can see we still get the full data back (81'193) when querying the airports table. We need to Merge the changes first.

In [48]:
%%sql
SELECT
  COUNT(*)
FROM
  lakefs.main.flights.airports;

Field 1
31695


# Merge the branch back into `main`

In [46]:
res = branchDev.merge_into(branchMain)
print(res)

24284d22b1b6bf8c753476b153b493ab76ccd65d844191659a45644c2d3fae7a


---

---

---

In [47]:
from IPython.display import Markdown as md

if lakefsEndPoint=='http://lakefs:8000':
    lakeFSWebUI='http://dataplatform:28220'
else:
    lakeFSWebUI=lakefsEndPoint

md(f"### 👉🏻 View the objects in [lakeFS web UI]({lakeFSWebUI}/repositories/{repo_name}/objects)")

### 👉🏻 View the objects in [lakeFS web UI](http://dataplatform:28220/repositories/demo/objects)

## More Questions?

###### Join the lakeFS Slack group - https://lakefs.io/slack