In [None]:
import warnings
warnings.simplefilter('ignore')


import polars as pl
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import BooleanType, DoubleType, LongType, StringType, TimestampType, NestedField, IntegerType
from pyiceberg.transforms import MonthTransform

First we need to connect to our Iceberg catalogue - since currently the client is doing the reading and writing, we also set the access credentials. This can also be done in a `.pyiceberg.yaml` file.

Note that normally, the REST catalogue can handle all S3 auth, sending a signed S3 url to the client to upload - but that would require additional auth setup, so we pass the client credentials directly here

In [None]:
catalog = load_catalog("nessie", **{"uri": "http://nessie:19120/iceberg", "s3.access-key-id": "minio", "s3.secret-access-key": "minio1234"})

Iceberg metadata is organized in `namespaces` which would be the equivalent of a schema in a database.

Here we name our catalog `steam` to represent data that comes from Steam

In [None]:
catalog.create_namespace_if_not_exists("steam")
catalog.list_namespaces()

We can define a schema using pyiceberg. Note that each field needs a unique id within the schema, as one of the ways that Iceberg can handle schema migrations is by referencing each field by position rather than by name, but we'll see an example of that later.

We will create a table containing a small subset for demonstration purposes, as it's a bit tedious to write out the whole schema by hand 😅

In [None]:
schema = Schema(
    NestedField(id=1, name='recommendationid', type=LongType()),
    NestedField(id=2, name='language', type=StringType()),
    NestedField(id=3, name='timestamp_created', type=TimestampType()),
    NestedField(id=4, name='voted_up', type=BooleanType()),
)

Now we're ready to create the table, by passing the schema to the catalog. It will take care of writing a metadata file in the object storage.

In [None]:
table = catalog.create_table_if_not_exists("steam.languages", schema=schema)

Let's insert some data into the table. Pyiceberg supports Arrow out of the box, so we use Polars to read data in and turn it into Arrow format

In [None]:
df = pl.read_csv("data/10.csv").select(pl.col('recommendationid'), 
                                       pl.col('language'), 
                                      pl.from_epoch(pl.col('timestamp_created')),
                                      pl.col('voted_up').cast(pl.Boolean)
                                      ).filter(pl.col('recommendationid').is_not_null())

Now we can `overwrite` or `append` this data

In [None]:
table.append(df.to_arrow())

Now that there's data in the table, we can scan the Iceberg table, filtering out the records we don't need. Pyiceberg will use the metadata stored in Iceberg to quickly locate the correct files, and only read the necessary parts of those files

In [None]:
t = table.scan().to_arrow()

In [None]:
print(f"Total rows in table: {table.scan().to_arrow().shape[0]:,}")

In [None]:
new_df = pl.from_arrow(table.scan(selected_fields=['language', 'voted_up'], 
                                #Alternatively use expressions -> EqualTo('language', 'english')
                                row_filter="language == 'english'"
                                 ).to_arrow()
                      )
new_df

One problem we have now, is that we don't actually have the game id in the table - that would be pretty useful. If we just try to insert data with the added column, that wouldn't work as the Iceberg schema doesn't contain the `game_id` column and will error to protect the data.

In [None]:
df_game = df.with_columns(game_id=pl.lit("10"))
table.overwrite(df_game.to_arrow())

Luckily in Iceberg, we can update the schema without having to rewrite all the physical files, much like in a traditional RDBMS

In [None]:
with table.update_schema() as update:
    update.add_column('game_id', StringType())

In [None]:
table.schema()

PyIceberg makes sure to give it a valid ID and puts it at the end of the schema. We can move the fields around as we want by updating the metadata, Iceberg keeps track of the corresponding data positions without having to rewrite the files

In [None]:
with table.update_schema() as update:
    update.move_first('game_id')

In [None]:
table.schema()

In [None]:
table.overwrite(df_game.to_arrow())

`game_id` should now be the first column of the table

In [None]:
pl.from_arrow(table.scan().to_arrow())

Now we're ready to insert some more data using `.append`

In [None]:
civ6_df = pl.read_csv('data/289070.csv').select(
    pl.lit('289070').alias('game_id'),
    pl.col('recommendationid'),
    pl.col('language'),
    pl.from_epoch(pl.col('timestamp_created')),
    pl.col('voted_up').cast(pl.Boolean)
)
civ6_df.head()

In [None]:
table.append(civ6_df.to_arrow())

In [None]:
t = table.scan().to_arrow()
print(f"Current rows: {t.shape[0]:,}")
pl.from_arrow(t).select(pl.col('game_id').unique())

Let's add some more data.

In [None]:
cs2 = pl.read_csv('data/730.csv').select(
    pl.lit('730').alias('game_id'),
    pl.col('recommendationid'),
    pl.col('language'),
    pl.from_epoch(pl.col('timestamp_created')),
    pl.col('voted_up').cast(pl.Boolean)
)
cs2.head()

In [None]:
table.append(cs2.to_arrow())

In [None]:
print(f"The table has {table.scan().to_arrow().num_rows:,} rows")

What if I find out that the end-users are mainly trying to analyze the upvotes over time? The Dashboard has been built, but it's not performing as they want. Partitioning might be the answer, but with Iceberg, we don't have to physically rewrite all the data:

In [None]:
with table.update_spec() as update:
    update.add_field("timestamp_created", MonthTransform(), 'month_created')

Nothing has happened yet, the Iceberg metada has been updated with the partition - no need to rewrite all the files. 

If we insert new data, it will be partitioned by our new partitioning schema

In [None]:
pubg = pl.read_csv('data/578080.csv').select(
    pl.lit('578080').alias('game_id'),
    pl.col('recommendationid'),
    pl.col('language'),
    pl.from_epoch(pl.col('timestamp_created')),
    pl.col('voted_up').cast(pl.Boolean)
)
pubg.head()

In [None]:
table.append(pubg.to_arrow())

Iceberg knows the partitioning scheme for each of the manifest lists and will generate a plan for each file independently

![Partition Spec Evolution](images/partition_spec_evolution.png)

> https://iceberg.apache.org/docs/latest/evolution/#partition-evolution

# Write-Audit-Publish with Git-for-data branches

Nessie provides the ability to perform a Write-Audit-Publish pattern, through git branching of data. Using the mechanisms of Iceberg, Nessie can keep track of the different branches of data, allowing us git-like semantics for working with data. Pyiceberg doesn't yet support this Nessie-specific syntax, so we're going to switch to another project, Dremio. We could also have chose a number of other query engines such as Apache Spark, or Trino/Presto as well.

A nice thing about Dremio is that since it's Arrow-backed internally, they also expose FlightSQL endpoints, letting us use ADBC as a generic DB client.

In [None]:
from adbc_driver_flightsql.dbapi import connect
from adbc_driver_flightsql import DatabaseOptions

In [None]:
conn = connect("grpc+tcp://dremio:32010", 
               db_kwargs={"username": "dremio", 
                          "password": "dremio123", 
                          DatabaseOptions.WITH_COOKIE_MIDDLEWARE.value: "true"}, 
              )

In [None]:
with conn.cursor() as c:
    c.execute("SELECT 1")
    print(c.fetchone())

Dremio has been pre-configured to talk to Nessie, and since Iceberg is client-agnostic, Dremio can read all the Iceberg tables registered in the Nessie catalogue. We're taking advantage of the Arrow-based nature of Dremio, Polars and ADBC to be able to interact directly

In [None]:
pl.read_database("SELECT * FROM Nessie.steam.languages LIMIT 10", conn)

Where before, `pyiceberg` let us filter the data, a query engine like Dremio can do SQL to do all the analytics we're used to from a database.

In [None]:
sql = """
    SELECT "language", 
    SUM(CAST(voted_up as int)) / CAST(COUNT(voted_up) as float) as ratio_positive_votes
    FROM Nessie.steam.languages 
    GROUP BY "language"
    ORDER BY 2 DESC
    """
pl.read_database(sql, conn)

## Write
In this example, we have our raw extract data in the Extract source - a CSV file for each game. Let's prepare a staging table for the CSV files, as they have a slightly different format than our final table

In [None]:
with conn.cursor() as c:
    c.execute(""" 
    CREATE TABLE IF NOT EXISTS Nessie.steam.staging.languages (
        game_id varchar(50),
        recommendationid int,
        "language" varchar(20),
        timestamp_created int,
        voted_up boolean
    )
    """)
    print(c.fetchone()[1])

In the Write stage, we start by creating a new branch where we can stage all our data changes

In [None]:
with conn.cursor() as c:
    c.execute("CREATE BRANCH insert_demo AT BRANCH main IN Nessie")
    print(c.fetchone()[1])

Now we can `COPY INTO` our staging table from our Extract file source

In [None]:
with conn.cursor() as c:
    c.execute("""
    COPY INTO Nessie.steam.staging.languages 
    AT BRANCH insert_demo
    FROM '@Extract/extract/reviews/550.csv'
    ( EXTRACT_HEADER true, TRIM_SPACE true )
    """)
    print(f"Inserted {c.fetchone()[0]:,} rows")

We can verify that the main branch doesn't see any data

In [None]:
pl.read_database("SELECT * FROM Nessie.steam.staging.languages", conn)

While the `insert_demo` branch does

In [None]:
pl.read_database("SELECT * FROM Nessie.steam.staging.languages AT BRANCH insert_demo", conn)

Since the `game_id` is stored in the file name instead of the data itself, we need to update our staged data with the game id.

In [None]:
with conn.cursor() as c:
    c.execute("USE BRANCH insert_demo IN Nessie;")
    print(c.fetchone()[1])

In [None]:
with conn.cursor() as c:
    c.execute("UPDATE Nessie.steam.staging.languages AT BRANCH insert_demo SET game_id = '550' where game_id is null")
    print(f"Inserted {c.fetchone()[0]:,} rows")

Still no rows in the main branch

In [None]:
print("Branch main")
print(pl.read_database("SELECT * FROM Nessie.steam.staging.languages AT BRANCH main WHERE game_id = '550'", conn))
print("Branch insert_demo")
print(pl.read_database("SELECT * FROM Nessie.steam.staging.languages AT BRANCH insert_demo WHERE game_id = '550'", conn))

To finish our Write phase, we can move the staging data into the `languages` table with the correct conversions

In [None]:
with conn.cursor() as c:
    c.execute("""
    INSERT INTO Nessie.steam.languages AT BRANCH insert_demo 
    SELECT 
        game_id,
        recommendationid,
        "language",
        to_timestamp(timestamp_created) as timestamp_created, 
        voted_up
    FROM Nessie.steam.staging.languages AT BRANCH insert_demo
    """)
    print(f"Inserted {c.fetchone()[0]:,} rows")

In [None]:
pl.read_database("SELECT COUNT(*) as num_rows FROM Nessie.steam.languages AT BRANCH insert_demo", conn)

Now we can run our Audit step - verifying data to ensure the data quality before consumers get it

In [None]:
sql = """
     WITH num_reviews as (
        select game_id, count(*) as num_reviews
        FROM Nessie.steam.languages
        GROUP BY game_id
    ), num_language_reviews as (
        SELECT game_id, "language", COUNT(*) as num_language_reviews
        FROM Nessie.steam.languages l
        GROUP BY game_id, "language"
    )
    SELECT l.game_id, l."language",  num_language_reviews / cast(num_reviews as float) as language_ratio
    FROM num_reviews r join num_language_reviews as l on r.game_id = l.game_id
    """

ratio_df = pl.read_database(sql, conn)
ratio_df

We can perform sanity checks, such as checking that there aren't less than 15% of the reviews in english, or compare data across branches to make sure the difference is correct

In [None]:
assert ratio_df.filter((pl.col('language') == 'english') & (pl.col('language_ratio') < 0.15)).is_empty

In [None]:
assert pl.read_database("""
SELECT (SELECT COUNT(*) from Nessie.steam.languages AT BRANCH insert_demo) 
     - (SELECT COUNT(*) From Nessie.steam.languages AT BRANCH main)
""", conn).item() == 858570

Given that we're happy with the new data - it passes all our data quality checks - we're ready for the Publish step. 

In a git-like fashion, we can merge the two branches and all our changes will be visible to the "regular" users

In [None]:
with conn.cursor() as c:
    c.execute("MERGE BRANCH insert_demo into main in Nessie")
    print(c.fetchone()[1])

In [None]:
pl.read_database("SELECT COUNT(*) as num_reviews FROM Nessie.steam.languages at branch main", conn)

The changes have been merged in, and we can clean up

In [None]:
with conn.cursor() as c:
    c.execute("USE BRANCH main in Nessie")
    print(c.fetchone()[1])
    c.execute("DROP BRANCH insert_demo in Nessie")
    print(c.fetchone()[1])

# Time Travel

Not only can we do Git branching, we can also do timetravel as part of the Iceberg spec. 

We could imagine that after the insert, we want to store a pointer to this version of the data, so we can go back to how data looked at an exact point in time. Iceberg has been keeping snapshots of each of our operations

In [None]:
pl.read_database("SELECT * FROM TABLE(table_snapshot('Nessie.steam.languages'))", conn)

I can choose between a snapshot_id:

In [None]:
pl.read_database("SELECT COUNT(*) FROM Nessie.steam.languages at snapshot '3558310252565355831'", conn)

Or a timestamp:

In [None]:
pl.read_database("SELECT COUNT(*) FROM Nessie.steam.languages at TIMESTAMP '2024-12-01 21:25:00'", conn)

After publishing, we may want to store a tag on the data in order to be able to audit how the data looked at a specific point in time

In [None]:
with conn.cursor() as c:
    c.execute('CREATE TAG my_report FROM BRANCH main IN Nessie')
    print(c.fetchone()[1])

In [None]:
with conn.cursor() as c:
    c.execute("DELETE FROM Nessie.steam.languages where game_id = '550'")
    print(f"Deleted {c.fetchone()[0]:,} rows")

In [None]:
pl.read_database("SELECT COUNT(*) as num_rows FROM Nessie.steam.languages", conn)

Oops! Good thing I have a tag - my report is unaffected

In [None]:
pl.read_database("SELECT COUNT(*) as num_rows FROM Nessie.steam.languages AT TAG my_report", conn)

Let's rollback the accidental deletion by going for a snapshot before the accidental deletion

In [None]:
pl.read_database("SELECT * FROM TABLE(table_snapshot('Nessie.steam.languages'))", conn)

In [None]:
with conn.cursor() as c:
    c.execute("ROLLBACK TABLE Nessie.steam.languages TO SNAPSHOT '7086946646526996956'")
    print(c.fetchone()[1])

In [None]:
pl.read_database("SELECT COUNT(*) as num_rows FROM Nessie.steam.languages", conn)

Back to normal! Time to clean up a bit

In [None]:
with conn.cursor() as c:
    c.execute("DROP TABLE Nessie.steam.languages")
    print(c.fetchone()[1])
    c.execute("DROP TABLE Nessie.steam.staging.languages")
    print(c.fetchone()[1])
    c.execute("DROP TAG my_report IN Nessie")
    print(c.fetchone()[1])