In [2]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
import os
import openai
from openai import OpenAI
from flask import Flask, request, jsonify
import traceback
import json

In [3]:
# Create a Spark session with Iceberg configurations, remember to start docker container ('docker compose up -d', using environment '.venv')

# Initialize Spark session with Iceberg configurations
spark = SparkSession.builder \
  .appName("IcebergLocalDevelopment") \
  .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.0') \
  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
  .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
  .config("spark.sql.catalog.iceberg.type", "hadoop") \
  .config("spark.sql.catalog.iceberg.warehouse", "spark-warehouse/iceberg") \
  .getOrCreate()

:: loading settings :: url = jar:file:/Users/francescogalli/Desktop/Iceberg_Thesis_Work/.venv/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/francescogalli/.ivy2/cache
The jars for the packages stored in: /Users/francescogalli/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f18f2205-651f-4073-8a26-0b18409c7116;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.8.0 in central
:: resolution report :: resolve 71ms :: artifacts dl 1ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.8.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving

25/04/14 16:29:55 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [2]:
# See databases in Iceberg catalog

spark.sql("SHOW DATABASES IN iceberg").show()

+-----------+
|  namespace|
+-----------+
|employee_db|
| iceberg_db|
+-----------+



25/03/26 15:13:26 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [18]:
# Make sure we are referring to the correct database without having to write it down every time

spark.sql("USE iceberg.iceberg_db")

DataFrame[]

In [19]:
# See what tables are in the database we are using

spark.sql("SHOW TABLES").show()

+----------+-----------------+-----------+
| namespace|        tableName|isTemporary|
+----------+-----------------+-----------+
|iceberg_db|           table2|      false|
|iceberg_db|             test|      false|
|iceberg_db|           table3|      false|
|iceberg_db|            table|      false|
|iceberg_db|changelog_testing|      false|
+----------+-----------------+-----------+



In [None]:
# Create a database (redundant)

spark.sql(""" CREATE DATABASE IF NOT EXISTS iceberg_db """)
spark.sql("SHOW DATABASES").show()

+----------+
| namespace|
+----------+
|iceberg_db|
+----------+



In [None]:
# Create a table

spark.sql("""
          CREATE TABLE IF NOT EXISTS table (
          id INT,
          name STRING,
          added_at TIMESTAMP)
          USING iceberg
          PARTITIONED BY (day(added_at))""")

spark.sql("SHOW TABLES").show()


+----------+---------+-----------+
| namespace|tableName|isTemporary|
+----------+---------+-----------+
|iceberg_db|   table2|      false|
|iceberg_db|    table|      false|
+----------+---------+-----------+



In [None]:
# Create a table

spark.sql("""
          CREATE TABLE IF NOT EXISTS table2 (
          id INT,
          name STRING)
          USING iceberg
          PARTITIONED BY (id)""")

spark.sql("SHOW TABLES").show()


+----------+---------+-----------+
| namespace|tableName|isTemporary|
+----------+---------+-----------+
|iceberg_db|   table2|      false|
|iceberg_db|    table|      false|
+----------+---------+-----------+



In [None]:
# Append data into table2

'''
spark.sql("""
          INSERT INTO iceberg.iceberg_db.table2 
          VALUES 
          (1, 'Alice'), 
          (2, 'Bob'), 
          (3, 'Charlie')""")

spark.sql("SELECT * FROM iceberg.iceberg_db.table2").show()

'''

'\nspark.sql("""\n          INSERT INTO iceberg.iceberg_db.table2 \n          VALUES \n          (1, \'Alice\'), \n          (2, \'Bob\'), \n          (3, \'Charlie\')""")\n\nspark.sql("SELECT * FROM iceberg.iceberg_db.table2").show()\n\n'

In [None]:
# See data in table2

spark.sql("SELECT * FROM table2").show()

+---+-------+--------------------+
| id|   name|            added_at|
+---+-------+--------------------+
|  1|  Alice|2025-02-23 19:18:...|
|  1|  Alice|2025-02-25 18:05:...|
|  2|    Bob|2025-02-25 18:05:...|
|  2|    Bob|2025-02-23 19:18:...|
|  3|Charlie|2025-02-25 18:05:...|
|  3|Charlie|2025-02-23 19:18:...|
+---+-------+--------------------+



                                                                                

In [None]:
# Append data into table2

spark.sql("""
            INSERT INTO table2 
            VALUES
            (1, 'Alice', current_timestamp()),
            (2, 'Bob', current_timestamp()),
            (3, 'Charlie', current_timestamp())
          """)

spark.sql("""DELETE FROM table2 
          WHERE added_at IS NULL
           """)

spark.sql("SELECT * FROM table2").show()

+---+-------+--------------------+
| id|   name|            added_at|
+---+-------+--------------------+
|  1|  Alice|2025-02-25 18:05:...|
|  2|    Bob|2025-02-25 18:05:...|
|  3|Charlie|2025-02-25 18:05:...|
|  1|  Alice|2025-02-23 19:18:...|
|  2|    Bob|2025-02-23 19:18:...|
|  3|Charlie|2025-02-23 19:18:...|
+---+-------+--------------------+



In [49]:
# Testing the time travel feature

# x will be the number of minutes ago you want to query the table
x = 1

# Get the timestamp x minutes ago
timestamp_x_minutes_ago = spark.sql(f"SELECT current_timestamp() - INTERVAL '{x}' DAYS").collect()[0][0]

# Query the table for the timestamp x minutes ago
spark.sql(f"""
    SELECT * FROM table2
    FOR SYSTEM_TIME AS OF '{timestamp_x_minutes_ago}'
""").show()

+---+-------+--------------------+
| id|   name|            added_at|
+---+-------+--------------------+
|  1|  Alice|2025-02-23 19:18:...|
|  2|    Bob|2025-02-23 19:18:...|
|  3|Charlie|2025-02-23 19:18:...|
|  1|  Alice|2025-02-25 18:05:...|
|  2|    Bob|2025-02-25 18:05:...|
|  3|Charlie|2025-02-25 18:05:...|
+---+-------+--------------------+



In [None]:
# This command replaces table with a new version containing only some rows from table2 

spark.sql("""
REPLACE TABLE table
USING iceberg
AS 
SELECT *
FROM table2
WHERE DATE(added_at) = '2025-02-23'
""")

DataFrame[]

In [16]:
spark.sql("SELECT * FROM table").show()

+---+-------+--------------------+
| id|   name|            added_at|
+---+-------+--------------------+
|  1|  Alice|2025-02-23 19:18:...|
|  2|    Bob|2025-02-23 19:18:...|
|  3|Charlie|2025-02-23 19:18:...|
+---+-------+--------------------+



In [None]:
# 'Superficial' table description

spark.sql("DESCRIBE TABLE table").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|      id|      int|   NULL|
|    name|   string|   NULL|
|added_at|timestamp|   NULL|
+--------+---------+-------+



In [50]:
# See the history of the table

df = spark.read.format("iceberg").load("iceberg_db.table2.history")
df.show(truncate=False)

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2025-02-23 18:51:24.064|3195098715316441258|NULL               |true               |
|2025-02-23 18:52:03.372|5374994393316461417|3195098715316441258|true               |
|2025-02-23 19:07:34.053|9187256101761608397|5374994393316461417|true               |
|2025-02-23 19:07:38.157|3014219229801966893|9187256101761608397|true               |
|2025-02-23 19:07:40.841|9153823259828413158|3014219229801966893|true               |
|2025-02-23 19:07:54.007|3264605670712922968|9153823259828413158|true               |
|2025-02-23 19:18:46.366|293882809901543937 |3264605670712922968|true               |
|2025-02-23 19:20:23.355|4306225710289109897|293882809901543937 |true               |
|2025-02-25 18:05:57.074|3285014568135250506|430622571

In [None]:
# See snapshots of the table, seems like a more detailed version of 'history'

df = spark.read.format("iceberg").load("iceberg_db.table2.snapshots")
df.show(truncate=False)

+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                          |summary                                                                                                                                          

In [None]:
# Update the table properties to perform row level changes

spark.sql("""
    ALTER TABLE table2 SET TBLPROPERTIES (
        'format-version'='2',
        'write.delete.mode'='merge-on-read',
        'write.update.mode'='merge-on-read',
        'write.merge.mode'='merge-on-read'
    )
""")

DataFrame[]

In [None]:
# Row level operations

spark.sql('''
          UPDATE table2
          SET name = 'Francesco'
          WHERE id = 1
          ''')

spark.sql("SELECT * FROM table2").show()

+---+---------+--------------------+
| id|     name|            added_at|
+---+---------+--------------------+
|  2|      Bob|2025-02-25 18:05:...|
|  1|Francesco|2025-02-23 19:18:...|
|  1|Francesco|2025-02-25 18:05:...|
|  3|  Charlie|2025-02-25 18:05:...|
|  2|      Bob|2025-02-23 19:18:...|
|  3|  Charlie|2025-02-23 19:18:...|
+---+---------+--------------------+



In [42]:
# See extended description of table properties

spark.sql("DESCRIBE EXTENDED table2").show(truncate=False, n=1000)

+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                                                                                                                            |comment|
+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|id                          |int                                                                                                                                                                                                                

In [None]:
# Create new table to test row level operations

spark.sql('''CREATE TABLE iceberg_db.table3 (
    id BIGINT,
    name STRING,
    age INT
) USING iceberg
TBLPROPERTIES (
    'format-version'='2',  -- Iceberg v2 is required for row-level deletes
    'write.delete.mode'='merge-on-read', -- Enables row-level deletes
    'write.update.mode'='merge-on-read', -- Enables row-level updates
    'write.merge.mode'='merge-on-read'   -- Enables merge-on-read behavior
);''').show()


++
||
++
++



In [None]:
# Append some data into table3

spark.sql('''INSERT INTO table3
          VALUES
(1, 'Alice', 30),
(2, 'Bob', 40),
(3, 'Charlie', 50)
''').show()

++
||
++
++



In [None]:
# See data in table

spark.sql("SELECT * FROM table3").show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 30|
|  2|    Bob| 40|
|  3|Charlie| 50|
+---+-------+---+



In [43]:
# Perform row level operation in table3, update

spark.sql('''
          UPDATE table3
          SET age = 31
          WHERE id = 1
          ''')
spark.sql("SELECT * FROM table3").show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  2|    Bob| 40|
|  1|  Alice| 31|
|  3|Charlie| 50|
+---+-------+---+



In [51]:
# Attempt to see table3 changelog (not working)

df = spark.read.format("iceberg").load("iceberg_db.table3.changelog")
df.show(truncate=False)

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view iceberg_db.table3.changelog cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.

In [7]:
# Create a table

spark.sql("""
          CREATE TABLE IF NOT EXISTS test (
          id INT,
          name STRING,
          added_at TIMESTAMP)
          USING iceberg
          PARTITIONED BY (day(added_at))""")

spark.sql("SHOW TABLES").show()


+----------+---------+-----------+
| namespace|tableName|isTemporary|
+----------+---------+-----------+
|iceberg_db|   table2|      false|
|iceberg_db|     test|      false|
|iceberg_db|   table3|      false|
|iceberg_db|    table|      false|
+----------+---------+-----------+



In [8]:
# Append data into test

spark.sql("""
          INSERT INTO iceberg.iceberg_db.test
          VALUES 
          (1, 'Alice', current_timestamp()), 
          (2, 'Bob', current_timestamp()), 
          (3, 'Charlie', current_timestamp())"""
          )

spark.sql("SELECT * FROM iceberg.iceberg_db.test").show()


+---+-------+--------------------+
| id|   name|            added_at|
+---+-------+--------------------+
|  1|  Alice|2025-03-05 14:34:...|
|  2|    Bob|2025-03-05 14:34:...|
|  3|Charlie|2025-03-05 14:34:...|
+---+-------+--------------------+



In [None]:
# Renaming a column 

spark.sql("""
          ALTER TABLE test
          RENAME COLUMN name TO new_name
          """)

spark.sql("SELECT * FROM iceberg.iceberg_db.test").show()

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'RENAME'.(line 2, pos 10)

== SQL ==

          RENAME COLUMN test.new_name TO name
----------^^^
          


In [18]:
spark.sql(f"ALTER TABLE test ADD COLUMN age STRING")

spark.sql("SELECT * FROM iceberg.iceberg_db.test").show()


+---+-------+--------------------+--------+----+
| id|   name|            added_at|birthday| age|
+---+-------+--------------------+--------+----+
|  1|  Alice|2025-03-05 14:34:...|    NULL|NULL|
|  2|    Bob|2025-03-05 14:34:...|    NULL|NULL|
|  3|Charlie|2025-03-05 14:34:...|    NULL|NULL|
+---+-------+--------------------+--------+----+



In [22]:
df = spark.read.format("iceberg").load("iceberg_db.test.history")
df.show(truncate=False)

+-----------------------+-------------------+---------+-------------------+
|made_current_at        |snapshot_id        |parent_id|is_current_ancestor|
+-----------------------+-------------------+---------+-------------------+
|2025-03-05 14:34:12.473|1156343851529063989|NULL     |true               |
+-----------------------+-------------------+---------+-------------------+



In [6]:
df = spark.sql(f"SELECT * FROM iceberg.iceberg_db.test.history").show()
df

+--------------------+-------------------+---------+-------------------+
|     made_current_at|        snapshot_id|parent_id|is_current_ancestor|
+--------------------+-------------------+---------+-------------------+
|2025-03-05 14:34:...|1156343851529063989|     NULL|               true|
+--------------------+-------------------+---------+-------------------+



In [57]:
spark.sql("""CALL iceberg.system.create_changelog_view(
  table => 'iceberg_db.table3'
)""").show()

#options => map('start-snapshot-id','1','end-snapshot-id', '2')

+----------------+
|  changelog_view|
+----------------+
|`table3_changes`|
+----------------+



In [56]:
spark.sql("SELECT * FROM iceberg_db.table3.snapshots").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-02-27 11:30:...|6029896344713609890|               NULL|   append|spark-warehouse/i...|{spark.app.id -> ...|
|2025-02-27 11:51:...|3798345041391162531|6029896344713609890|overwrite|spark-warehouse/i...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [60]:
spark.sql("""
	SELECT * FROM table3_changes 
	WHERE _change_type != 'Delete'
""").show()

UnsupportedOperationException: Delete files are currently not supported in changelog scans

In [13]:
spark.sql("SELECT DISTINCT _change_type FROM table3_changes").show()

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `table3_changes` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 34;
'Distinct
+- 'Project ['_change_type]
   +- 'UnresolvedRelation [table3_changes], [], false


In [21]:
spark.sql("""
          CREATE TABLE IF NOT EXISTS changelog_testing (
          id INT,
          name STRING,
          added_at TIMESTAMP)
          USING iceberg
          PARTITIONED BY (day(added_at))""")

"""spark.sql('''INSERT INTO changelog_testing
          VALUES
(1, 'Alice', current_timestamp()),
(2, 'Bob', current_timestamp()),
(3, 'Charlie', current_timestamp())
''')"""

spark.sql("SELECT * FROM iceberg.iceberg_db.changelog_testing").show()

+------+-------+--------------------+
|id_new|  name7|            added_at|
+------+-------+--------------------+
|     4|  David|2025-03-10 14:38:...|
|     1|  Alice|2025-03-10 13:20:...|
|     2|    Bob|2025-03-10 13:20:...|
|     3|Charlie|2025-03-10 13:20:...|
|     5|Richard|2025-03-10 14:40:...|
+------+-------+--------------------+



In [None]:
# Change the name of the 'name' column into 'new_name'

spark.sql("""ALTER TABLE changelog_testing RENAME COLUMN name TO new_name""")

spark.sql("""CALL iceberg.system.create_changelog_view(
  table => 'iceberg_db.changelog_testing'
)""").show()

spark.sql("SELECT * FROM changelog_testing_changes").show()

+--------------------+
|      changelog_view|
+--------------------+
|`changelog_testin...|
+--------------------+

+---+--------+--------------------+------------+---------------+-------------------+
| id|new_name|            added_at|_change_type|_change_ordinal|_commit_snapshot_id|
+---+--------+--------------------+------------+---------------+-------------------+
|  1|   Alice|2025-03-10 13:20:...|      INSERT|              0|2510292383777452097|
|  2|     Bob|2025-03-10 13:20:...|      INSERT|              0|2510292383777452097|
|  3| Charlie|2025-03-10 13:20:...|      INSERT|              0|2510292383777452097|
+---+--------+--------------------+------------+---------------+-------------------+



In [68]:
spark.sql("""ALTER TABLE changelog_testing RENAME COLUMN new_name TO name3""")

spark.sql("""CALL iceberg.system.create_changelog_view(
  table => 'iceberg_db.changelog_testing'
)""").show()

spark.sql("SELECT * FROM changelog_testing_changes").show()

+--------------------+
|      changelog_view|
+--------------------+
|`changelog_testin...|
+--------------------+

+---+-------+--------------------+------------+---------------+-------------------+
| id|  name3|            added_at|_change_type|_change_ordinal|_commit_snapshot_id|
+---+-------+--------------------+------------+---------------+-------------------+
|  1|  Alice|2025-03-10 13:20:...|      INSERT|              0|2510292383777452097|
|  2|    Bob|2025-03-10 13:20:...|      INSERT|              0|2510292383777452097|
|  3|Charlie|2025-03-10 13:20:...|      INSERT|              0|2510292383777452097|
+---+-------+--------------------+------------+---------------+-------------------+



In [22]:
spark.sql("SELECT * FROM iceberg_db.changelog_testing.metadata_log_entries").show()

+--------------------+--------------------+-------------------+----------------+----------------------+
|           timestamp|                file| latest_snapshot_id|latest_schema_id|latest_sequence_number|
+--------------------+--------------------+-------------------+----------------+----------------------+
|2025-03-10 13:20:...|spark-warehouse/i...|               NULL|            NULL|                  NULL|
|2025-03-10 13:20:...|spark-warehouse/i...|2510292383777452097|               0|                     1|
|2025-03-10 13:24:...|spark-warehouse/i...|2510292383777452097|               0|                     1|
|2025-03-10 13:25:...|spark-warehouse/i...|2510292383777452097|               0|                     1|
|2025-03-10 13:38:...|spark-warehouse/i...|2510292383777452097|               0|                     1|
|2025-03-10 13:38:...|spark-warehouse/i...|2510292383777452097|               0|                     1|
|2025-03-10 14:36:...|spark-warehouse/i...|2510292383777452097| 

In [112]:
spark.sql("""ALTER TABLE changelog_testing RENAME COLUMN name6 TO name7""")

DataFrame[]

In [113]:
spark.sql("INSERT INTO changelog_testing VALUES (5, 'Richard', current_timestamp())")
spark.sql("SELECT * FROM iceberg_db.changelog_testing").show()

+---+-------+--------------------+
| id|  name7|            added_at|
+---+-------+--------------------+
|  5|Richard|2025-03-10 14:40:...|
|  4|  David|2025-03-10 14:38:...|
|  1|  Alice|2025-03-10 13:20:...|
|  2|    Bob|2025-03-10 13:20:...|
|  3|Charlie|2025-03-10 13:20:...|
+---+-------+--------------------+



In [25]:
spark.sql("""SELECT * FROM iceberg_db.changelog_testing FOR SYSTEM_TIME AS OF '2025-03-10 14:41:00'""").show()

+---+-------+--------------------+
| id|  name7|            added_at|
+---+-------+--------------------+
|  4|  David|2025-03-10 14:38:...|
|  5|Richard|2025-03-10 14:40:...|
|  1|  Alice|2025-03-10 13:20:...|
|  2|    Bob|2025-03-10 13:20:...|
|  3|Charlie|2025-03-10 13:20:...|
+---+-------+--------------------+



In [88]:
spark.sql("SELECT * FROM iceberg_db.changelog_testing").show()

+---+-------+--------------------+
| id|  name5|            added_at|
+---+-------+--------------------+
|  1|  Alice|2025-03-10 13:20:...|
|  2|    Bob|2025-03-10 13:20:...|
|  3|Charlie|2025-03-10 13:20:...|
+---+-------+--------------------+



In [99]:
spark.sql("SELECT * FROM iceberg_db.changelog_testing.refs").show()

+----+------+-------------------+-----------------------+---------------------+----------------------+
|name|  type|        snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+----+------+-------------------+-----------------------+---------------------+----------------------+
|main|BRANCH|2510292383777452097|                   NULL|                 NULL|                  NULL|
+----+------+-------------------+-----------------------+---------------------+----------------------+



In [101]:
spark.sql("SELECT * FROM iceberg_db.changelog_testing VERSION AS OF 2510292383777452097").show()
#SELECT * FROM prod.db.table.`tag_historical-snapshot`;

+---+-------+--------------------+
| id|   name|            added_at|
+---+-------+--------------------+
|  1|  Alice|2025-03-10 13:20:...|
|  2|    Bob|2025-03-10 13:20:...|
|  3|Charlie|2025-03-10 13:20:...|
+---+-------+--------------------+



In [119]:
spark.sql("SELECT * FROM iceberg_db.changelog_testing.snapshots").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-03-10 13:20:...|2510292383777452097|               NULL|   append|spark-warehouse/i...|{spark.app.id -> ...|
|2025-03-10 14:38:...|5657999723248319641|2510292383777452097|   append|spark-warehouse/i...|{spark.app.id -> ...|
|2025-03-10 14:40:...|3984517976872106574|5657999723248319641|   append|spark-warehouse/i...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [151]:
df = spark.sql("SELECT file FROM iceberg_db.changelog_testing.metadata_log_entries")
data = df.toPandas().to_dict(orient="records")
data

[{'file': 'spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v1.metadata.json'},
 {'file': 'spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v2.metadata.json'},
 {'file': 'spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v3.metadata.json'},
 {'file': 'spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v4.metadata.json'},
 {'file': 'spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v5.metadata.json'},
 {'file': 'spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v6.metadata.json'},
 {'file': 'spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v7.metadata.json'},
 {'file': 'spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v8.metadata.json'},
 {'file': 'spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v9.metadata.json'},
 {'file': 'spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v10.metadata.json'}]

In [10]:
spark.sql("SELECT * FROM iceberg_db.changelog_testing.history").show()

AnalysisException: [REQUIRES_SINGLE_PART_NAMESPACE] spark_catalog requires a single-part namespace, but got `iceberg_db`.`changelog_testing`.

In [167]:
df = spark.read.json("spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v1.metadata.json")
df

DataFrame[_corrupt_record: string]

In [None]:
metadata_files = [
    row["file"] for row in spark.sql("SELECT file FROM iceberg_db.changelog_testing.metadata_log_entries").collect()
]

for file in metadata_files:
    df = spark.read.json(f"spark-warehouse/{file}")
    print(f"Schema in {file}:")
    df.select("schemas").show(truncate=False)


df = spark.read.json("spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v1.metadata.json")
df

DataFrame[_corrupt_record: string]

In [None]:
import json

# The last version of the table will have information about ALL PAST SCHEMAS
# The last version number will be stored in the version-hint.text file
last_version_path = "spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/version-hint.text"
with open(last_version_path, "r") as f:
    last_version = f.read()

metadata_path = f"spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v{last_version}.metadata.json"

# Open and parse the metadata JSON file
with open(metadata_path, "r") as f:
    metadata = json.load(f)

# Print ALL available schema versions
print("Schemas in Metadata File:")
for schema in metadata.get("schemas", []):
    print(json.dumps(schema, indent=4))

Schemas in Metadata File:
{
    "type": "struct",
    "schema-id": 0,
    "fields": [
        {
            "id": 1,
            "name": "id",
            "required": false,
            "type": "int"
        },
        {
            "id": 2,
            "name": "name",
            "required": false,
            "type": "string"
        },
        {
            "id": 3,
            "name": "added_at",
            "required": false,
            "type": "timestamptz"
        }
    ]
}
{
    "type": "struct",
    "schema-id": 1,
    "fields": [
        {
            "id": 1,
            "name": "id",
            "required": false,
            "type": "int"
        },
        {
            "id": 2,
            "name": "new_name",
            "required": false,
            "type": "string"
        },
        {
            "id": 3,
            "name": "added_at",
            "required": false,
            "type": "timestamptz"
        }
    ]
}
{
    "type": "struct",
    "schema-id": 2,
    

In [312]:
spark.sql("SELECT file FROM iceberg_db.changelog_testing.metadata_log_entries").show(truncate=False)

+-------------------------------------------------------------------------------+
|file                                                                           |
+-------------------------------------------------------------------------------+
|spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v1.metadata.json |
|spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v2.metadata.json |
|spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v3.metadata.json |
|spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v4.metadata.json |
|spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v5.metadata.json |
|spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v6.metadata.json |
|spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v7.metadata.json |
|spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v8.metadata.json |
|spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/v9.metadata.json |
|spark-warehouse

In [314]:
last_version_path = "spark-warehouse/iceberg/iceberg_db/changelog_testing/metadata/version-hint.text"
with open(last_version_path, "r") as f:
        last_version = f.read()
        print(last_version)

11


In [5]:
spark.sql("SELECT * FROM iceberg_db.changelog_testing").show()

                                                                                

+------+-------+--------------------+
|id_new|  name7|            added_at|
+------+-------+--------------------+
|     1|  Alice|2025-03-10 13:20:...|
|     2|    Bob|2025-03-10 13:20:...|
|     3|Charlie|2025-03-10 13:20:...|
|     4|  David|2025-03-10 14:38:...|
|     5|Richard|2025-03-10 14:40:...|
+------+-------+--------------------+



In [47]:
import json
column_name = input("Enter the name of the column you would like to retrieve: ")

columns_set = set(spark.table("employee_db.employee").columns)

# Chec if the column exists in current schema, if it does simply retireve it
if column_name in columns_set:
    query = spark.sql(f"SELECT `{column_name}` FROM employee_db.employee")
    query.show()

# If it doesn't, we look at all past schema versions of the table to check if there is a column with this name
else:
    print(f"Column '{column_name}' does not exist in the current table.")

    # Now we look for a column with this name in previous schema versions
    last_version_path = "spark-warehouse/iceberg/employee_db/employee/metadata/version-hint.text"
    with open(last_version_path, "r") as f:
        last_version = f.read()

    metadata_path = f"spark-warehouse/iceberg/employee_db/employee/metadata/v{last_version}.metadata.json"

    # Load the JSON file with the metadata of the LAST VERSION OF THE TABLE, which will contain information about ALL PAST SCHEMAS
    with open(metadata_path, "r") as f:
        metadata = json.load(f)

    column_name_versions = set()
    schema_id = 0
    # Instantiating the id of the column name we care about
    id_of_interest = None

    for schema in metadata.get("schemas", []):
        schema_id = max(schema_id, schema['schema-id'])
        for field in schema.get('fields', []):
            column_name_versions.add(field['name'])
            if field['name'] == column_name:
                # Remembering the id of the column the user looked for, so that we can retrieve it later
                id_of_interest = field['id']


    if column_name in column_name_versions:
        last_schema_fields = metadata.get("schemas")[schema_id]['fields']
        # We are looking for the name of the column that the user looked for in the LAST SCHEMA VERSION
        last_schema_fields_of_interest = [field for field in last_schema_fields if field['id'] == id_of_interest]
        print(f"Column '{column_name}' has existed in the past. The current column of this table is named {last_schema_fields_of_interest[0]['name']}'.")
        query = spark.sql(f"SELECT `{last_schema_fields_of_interest[0]['name']}` FROM employee_db.employee")
        query.show()
    else:
        print("Could not find the column in neither the current table schema nor the schema history.")


Column 'Phone' does not exist in the current table.
Column 'Phone' has existed in the past. The current column of this table is named Phone number'.
+--------------------+
|        Phone number|
+--------------------+
|  (971)643-6089x9160|
|+1-114-355-1841x7...|
|          9017807728|
|+1-607-333-9911x5...|
|          3739847538|
|001-314-829-5014x...|
|       (314)591-7413|
|               -7199|
|   166-234-6882x7457|
|  (389)824-3204x8287|
|  (285)029-1604x5466|
|   (233)811-1749x417|
|       (831)049-2030|
|          7788378816|
|  990-374-0521x33156|
|          2534420151|
|   355-863-2311x6315|
|   (566)667-8566x109|
|          4735530004|
|001-940-671-0693x345|
+--------------------+
only showing top 20 rows



In [None]:
spark.sql("SELECT name FROM iceberg_db.changelog_testing").show()

In [32]:
spark.sql("SELECT * FROM iceberg_db.changelog_testing").show()

+------+-------+--------------------+
|id_new|  name7|            added_at|
+------+-------+--------------------+
|     4|  David|2025-03-10 14:38:...|
|     5|Richard|2025-03-10 14:40:...|
|     1|  Alice|2025-03-10 13:20:...|
|     2|    Bob|2025-03-10 13:20:...|
|     3|Charlie|2025-03-10 13:20:...|
+------+-------+--------------------+



In [34]:
spark.sql("SELECT * FROM iceberg_db.table2").show()

+---+---------+--------------------+
| id|     name|            added_at|
+---+---------+--------------------+
|  2|      Bob|2025-02-23 19:18:...|
|  3|  Charlie|2025-02-23 19:18:...|
|  1|Francesco|2025-02-23 19:18:...|
|  1|Francesco|2025-02-25 18:05:...|
|  2|      Bob|2025-02-25 18:05:...|
|  3|  Charlie|2025-02-25 18:05:...|
+---+---------+--------------------+



In [307]:
spark.sql("ALTER TABLE changelog_testing RENAME COLUMN id TO id_new")
spark.sql("SELECT * FROM iceberg_db.changelog_testing").show()

+------+-------+--------------------+
|id_new|  name7|            added_at|
+------+-------+--------------------+
|     5|Richard|2025-03-10 14:40:...|
|     4|  David|2025-03-10 14:38:...|
|     1|  Alice|2025-03-10 13:20:...|
|     2|    Bob|2025-03-10 13:20:...|
|     3|Charlie|2025-03-10 13:20:...|
+------+-------+--------------------+



In [4]:
spark.sql("""CALL iceberg.system.create_changelog_view(
  table => 'iceberg_db.changelog_testing'
  )""").show(truncate=False)

+---------------------------+
|changelog_view             |
+---------------------------+
|`changelog_testing_changes`|
+---------------------------+



In [5]:
spark.sql("SELECT * FROM changelog_testing_changes").show()

+------+-------+--------------------+------------+---------------+-------------------+
|id_new|  name7|            added_at|_change_type|_change_ordinal|_commit_snapshot_id|
+------+-------+--------------------+------------+---------------+-------------------+
|     1|  Alice|2025-03-10 13:20:...|      INSERT|              0|2510292383777452097|
|     2|    Bob|2025-03-10 13:20:...|      INSERT|              0|2510292383777452097|
|     3|Charlie|2025-03-10 13:20:...|      INSERT|              0|2510292383777452097|
|     4|  David|2025-03-10 14:38:...|      INSERT|              1|5657999723248319641|
|     5|Richard|2025-03-10 14:40:...|      INSERT|              2|3984517976872106574|
+------+-------+--------------------+------------+---------------+-------------------+



In [27]:
spark.sql("SELECT * FROM iceberg_db.changelog_testing.snapshots").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-03-10 13:20:...|2510292383777452097|               NULL|   append|spark-warehouse/i...|{spark.app.id -> ...|
|2025-03-10 14:38:...|5657999723248319641|2510292383777452097|   append|spark-warehouse/i...|{spark.app.id -> ...|
|2025-03-10 14:40:...|3984517976872106574|5657999723248319641|   append|spark-warehouse/i...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [31]:
spark.sql("SELECT * FROM iceberg_db.changelog_testing VERSION AS OF 5657999723248319641").show()

+---+-------+--------------------+
| id|  name6|            added_at|
+---+-------+--------------------+
|  1|  Alice|2025-03-10 13:20:...|
|  2|    Bob|2025-03-10 13:20:...|
|  3|Charlie|2025-03-10 13:20:...|
|  4|  David|2025-03-10 14:38:...|
+---+-------+--------------------+



In [26]:
spark.sql("SELECT * FROM iceberg_db.changelog_testing.metadata_log_entries").show()

+--------------------+--------------------+-------------------+----------------+----------------------+
|           timestamp|                file| latest_snapshot_id|latest_schema_id|latest_sequence_number|
+--------------------+--------------------+-------------------+----------------+----------------------+
|2025-03-10 13:20:...|spark-warehouse/i...|               NULL|            NULL|                  NULL|
|2025-03-10 13:20:...|spark-warehouse/i...|2510292383777452097|               0|                     1|
|2025-03-10 13:24:...|spark-warehouse/i...|2510292383777452097|               0|                     1|
|2025-03-10 13:25:...|spark-warehouse/i...|2510292383777452097|               0|                     1|
|2025-03-10 13:38:...|spark-warehouse/i...|2510292383777452097|               0|                     1|
|2025-03-10 13:38:...|spark-warehouse/i...|2510292383777452097|               0|                     1|
|2025-03-10 14:36:...|spark-warehouse/i...|2510292383777452097| 

In [49]:
# TESTING CODE, NOT ACTUAL CODE

# The last version of the table will have information about ALL PAST SCHEMAS
# The last version number will be stored in the version-hint.text file
last_version_path = "spark-warehouse/iceberg/employee_db/employee/metadata/version-hint.text"
with open(last_version_path, "r") as f:
    last_version = f.read()

metadata_path = f"spark-warehouse/iceberg/employee_db/employee/metadata/v{last_version}.metadata.json"

# Open and parse the metadata JSON file
with open(metadata_path, "r") as f:
    metadata = json.load(f)

# Print all the names that the column 'name' has had across all schema versions
column_name_versions = set()
schema_id = 0
id_of_interest = None

for schema in metadata.get("schemas", []):
    schema_id = max(schema_id, schema['schema-id'])
    for field in schema.get('fields', []):
        column_name_versions.add(field['name'])
        if field['name'] == 'name':
                id_of_interest = field['id']

column_name_versions
# Problem: there is no recollection of WHEN THE COLUMNS WERE NAMED AS SUCH, and when the schema changed.
# Do we care? 

last_schema_fields = metadata.get("schemas")[schema_id]['fields']
last_schema_fields_of_interest = [field for field in last_schema_fields if field['id'] == 2]
print(last_schema_fields_of_interest[0]['name'])

First Name


In [10]:
# Loading the employee 1000x dataset
df = pd.read_csv('Employee 1000x.csv')

# Converting the 'Date of birth' column to datetime format
df['Date of birth'] = pd.to_datetime(df['Date of birth'], format='%d-%m-%y', errors='coerce').dt.date

# Replacing the year of birth if it is in the future (I was getting years of birth like 2059)
df['Date of birth'] = df['Date of birth'].apply(lambda x: x.replace(year=x.year - 100) if x and x.year > pd.Timestamp.now().year else x)

# Creating a Spark DataFrame from the pandas DataFrame
spark_df = spark.createDataFrame(df)
spark_df.show()

+-----+----------+----------+------+--------------------+--------------------+-------------+--------------------+
|Index|First Name| Last Name|   Sex|               Email|               Phone|Date of birth|           Job Title|
+-----+----------+----------+------+--------------------+--------------------+-------------+--------------------+
|    1|      Sara|   Mcguire|Female|  tsharp@example.net|  (971)643-6089x9160|   2021-08-17|Editor, commissio...|
|    2|    Alisha|    Hebert|  Male|vincentgarrett@ex...|+1-114-355-1841x7...|   1969-06-28|  Broadcast engineer|
|    3| Gwendolyn|  Sheppard|  Male|mercadojonathan@e...|          9017807728|   2015-09-25|    Industrial buyer|
|    4|  Kristine|    Mccann|Female|lindsay55@example...|+1-607-333-9911x5...|   1978-07-27|Multimedia specia...|
|    5|     Bobby|   Pittman|Female|blevinsmorgan@exa...|          3739847538|   1989-11-17|Planning and deve...|
|    6|    Calvin|    Ramsey|Female|loretta85@example...|001-314-829-5014x...|   2017-08

In [4]:
spark_df.dtypes

[('Index', 'bigint'),
 ('First Name', 'string'),
 ('Last Name', 'string'),
 ('Sex', 'string'),
 ('Email', 'string'),
 ('Phone', 'string'),
 ('Date of birth', 'date'),
 ('Job Title', 'string')]

In [11]:
spark_df.createOrReplaceTempView("Employee1000")
spark.sql("CREATE DATABASE IF NOT EXISTS iceberg.employee_db")
spark.sql("USE iceberg.employee_db")

spark.sql("""
          CREATE TABLE IF NOT EXISTS
          employee
          USING iceberg
          AS SELECT * FROM Employee1000
          """).show()

spark.sql("SELECT * FROM iceberg.employee_db.employee LIMIT 20").show()

++
||
++
++

+-----+----------+----------+------+--------------------+--------------------+-------------+--------------------+
|Index|First Name| Last Name|   Sex|               Email|               Phone|Date of birth|           Job Title|
+-----+----------+----------+------+--------------------+--------------------+-------------+--------------------+
|    1|      Sara|   Mcguire|Female|  tsharp@example.net|  (971)643-6089x9160|   2021-08-17|Editor, commissio...|
|    2|    Alisha|    Hebert|  Male|vincentgarrett@ex...|+1-114-355-1841x7...|   1969-06-28|  Broadcast engineer|
|    3| Gwendolyn|  Sheppard|  Male|mercadojonathan@e...|          9017807728|   2015-09-25|    Industrial buyer|
|    4|  Kristine|    Mccann|Female|lindsay55@example...|+1-607-333-9911x5...|   1978-07-27|Multimedia specia...|
|    5|     Bobby|   Pittman|Female|blevinsmorgan@exa...|          3739847538|   1989-11-17|Planning and deve...|
|    6|    Calvin|    Ramsey|Female|loretta85@example...|001-314-829-5014x.

In [15]:
spark.sql("DESCRIBE employee").show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|        Index|   bigint|   NULL|
|   First Name|   string|   NULL|
|    Last Name|   string|   NULL|
|          Sex|   string|   NULL|
|        Email|   string|   NULL|
|        Phone|   string|   NULL|
|Date of birth|     date|   NULL|
|    Job Title|   string|   NULL|
+-------------+---------+-------+



In [28]:
spark.sql("SELECT * FROM employee WHERE YEAR(`Date of birth`) < 1927").show()

+-----+----------+----------+------+--------------------+--------------------+-------------+--------------------+
|Index|First Name| Last Name|   Sex|               Email|               Phone|Date of birth|           Job Title|
+-----+----------+----------+------+--------------------+--------------------+-------------+--------------------+
|  128|      Glen|  Hamilton|  Male|nashantonio@examp...|+1-478-796-8374x9...|   1926-04-14|Special education...|
|  155|     Jamie|     Riggs|  Male| pmalone@example.net|   (185)304-1686x938|   1926-01-09|Conservation offi...|
|  161|    Hunter|    Romero|Female|austinruben@examp...|   977-555-6372x3075|   1926-08-21|Therapeutic radio...|
|  209|   Maxwell|     Floyd|Female|fryejoanne@exampl...|          1197827255|   1926-01-24|           Osteopath|
|  258|     Diana|    Travis|Female|bishopkayla@examp...|   771-351-1656x3614|   1926-07-12|Volunteer coordin...|
|  426|      Sean|   Shelton|Female|mccarthyvernon@ex...|   585-660-0047x2644|   1926-05

In [36]:
spark.sql("ALTER TABLE employee_db.employee RENAME COLUMN Phone TO `Phone number`")

DataFrame[]

In [50]:
spark.sql("SELECT * FROM employee_db.employee.metadata_log_entries").show()

+--------------------+--------------------+-------------------+----------------+----------------------+
|           timestamp|                file| latest_snapshot_id|latest_schema_id|latest_sequence_number|
+--------------------+--------------------+-------------------+----------------+----------------------+
|2025-03-26 15:15:...|spark-warehouse/i...|1563362177603208424|               0|                     1|
|2025-03-26 15:28:...|spark-warehouse/i...|1563362177603208424|               0|                     1|
+--------------------+--------------------+-------------------+----------------+----------------------+



In [51]:
spark.sql("SELECT * FROM employee_db.employee.snapshots").show()

+--------------------+-------------------+---------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+---------+---------+--------------------+--------------------+
|2025-03-26 15:15:...|1563362177603208424|     NULL|   append|spark-warehouse/i...|{spark.app.id -> ...|
+--------------------+-------------------+---------+---------+--------------------+--------------------+



In [77]:
table_name = "employee"

last_version_path = f"spark-warehouse/iceberg/employee_db/{table_name}/metadata/version-hint.text"
with open(last_version_path, "r") as f:
    last_version = f.read()
metadata_path = f"spark-warehouse/iceberg/employee_db/{table_name}/metadata/v{last_version}.metadata.json"
with open(metadata_path, "r") as f:
    metadata = json.load(f)

column_name_versions = set()
schema_id = 0
# Instantiating the id of the column name we care about
id_of_interest = 2


column_info = metadata.get('schemas')[-1].get('fields')
column_name = column_info[id_of_interest - 1].get('name')
print(column_name)

First Name


In [82]:
spark.sql(f"SELECT * FROM iceberg.employee_db.employee WHERE `First Name` = 'Sarah'").show()

+-----+----------+----------+------+--------------------+--------------------+-------------+--------------------+
|Index|First Name| Last Name|   Sex|               Email|        Phone number|Date of birth|           Job Title|
+-----+----------+----------+------+--------------------+--------------------+-------------+--------------------+
|  146|     Sarah|  Humphrey|Female|tammie97@example.org| (135)381-5257x34881|   2007-08-03|Conservator, muse...|
|  160|     Sarah|     Huber|Female|wilkersonlucas@ex...|        893.355.7289|   2021-05-20|Chartered managem...|
| 2436|     Sarah|  Franklin|Female|connor73@example.net|  189.043.5346x35696|   1960-04-17|           Ecologist|
| 2464|     Sarah|    Church|  Male|   vbond@example.com|001-224-972-5823x...|   1942-10-31|  Broadcast engineer|
| 2661|     Sarah|    Lowery|  Male|atkinscheryl@exam...|               -4781|   2002-03-20|Diplomatic Servic...|
| 4759|     Sarah|     Nixon|  Male|candice04@example...| +1-364-324-2704x270|   1996-04

In [None]:
from openai import OpenAI
client = OpenAI(
    api_key="xxx"
)

response = client.responses.create(
    model="gpt-4o",
    input="Write a one-sentence bedtime story about a unicorn."
)

print(response.output_text)

Under the shimmering glow of the moonlit forest, a gentle unicorn named Luna gracefully danced among the stars, spreading dreams of magic and wonder to all sleeping creatures below.


25/04/06 13:31:07 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [None]:
import os

def create_new_api(table_name):

    # Get the current directory
    current_directory = "/Users/francescogalli/Desktop/Iceberg_Thesis_Work"
    table_name = "employee"

    # Find the latest version number
    last_version_path = f"spark-warehouse/iceberg/employee_db/{table_name}/metadata/version-hint.text"
    with open(last_version_path, "r") as f:
        last_version = f.read()

    # Increment the version number
    #new_version = last_version + 1
    new_file_name = f"apiv{last_version}.py"

    # Create the new file
    with open(new_file_name, "w") as file:
        file.write("# This is versioned API file\n")
        file.write(f"# Version: {last_version}\n")

    print(f"Created new file: {new_file_name}")

In [26]:
# Funcion that finds the semantically closest column name in the current schema 

def find_closest_column(table_name, column):

    current_schema_columns = set(spark.table(f"iceberg.employee_db.{table_name}").columns)

    #prompt = f"The column '{column}' does not exist. Based on these columns {latest_columns}, which one is the closest match in meaning?"
    prompt = f"The column '{column}' does not exist. Based on these columns: {', '.join(current_schema_columns)}, \
                is there a column that is very close in meaning? \
                If there is, please return the name of the column and nothing else. \
                If there is no such column, please return 'NO MATCH'. \
                If there are multiple columns that are close in meaning, please return 'AMBIGUOUS', and the names of the possible columns."

    response = client.responses.create(
        model="gpt-4o",
        input=prompt
    )

    return response.output_text

In [27]:
current_schema_columns = set(spark.table(f"iceberg.employee_db.employee").columns)
print(current_schema_columns)

{'Phone number', 'Job Title', 'First Name', 'Sex', 'Email', 'Last Name', 'Index', 'Date of birth'}


In [28]:
find_closest_column("employee", "Phone")

'Phone number'

In [29]:
find_closest_column("employee", "Social Security Number")

'NO MATCH'

In [30]:
find_closest_column("employee", "Name")

'AMBIGUOUS: First Name, Last Name'

In [None]:

class GetColumn(Resource):
    def get(self, table_name, column):
        try:
            # Check if the column exists in the current schema
            columns_set = set(spark.table(f"iceberg.employee_db.{table_name}").columns)
            if column in columns_set:
                query = spark.sql(f"SELECT `{column}` FROM iceberg.employee_db.{table_name}")
                data = query.toPandas().to_dict(orient="records")
                return jsonify(data)
            else:
                # If the column doesn't exist, find the closest column name
                closest_column = find_closest_column(table_name, column)
                query = spark.sql(f"SELECT `{closest_column}` FROM iceberg.employee_db.{table_name}")
                data = query.toPandas().to_dict(orient="records")
                return jsonify(data)
            
        except Exception as e:
                print("Error occurred:", e)
                traceback.print_exc()
                return {"error": str(e)}, 500

In [None]:
# Example get requests (STATIC FOR SPECIFIC COLUMN NAMES)

class GetPhoneNumber(Resource):
    def get(self, table_name):
        try:
            query = spark.sql(f"SELECT `Phone number` FROM iceberg.employee_db.{table_name}")
            data = query.toPandas().to_dict(orient="records")
            return jsonify(data)
        
        except Exception as e:
                print("Error occurred:", e)
                traceback.print_exc()
                return {"error": str(e)}, 500
        

class GetFirstName(Resource):
     def get(self, table_name):
        try:
            query = spark.sql(f"SELECT `First Name` FROM iceberg.employee_db.{table_name}")
            data = query.toPandas().to_dict(orient="records")
            return jsonify(data)
        
        except Exception as e:
                print("Error occurred:", e)
                traceback.print_exc()
                return {"error": str(e)}, 500
        
class GetDateOfBirth(Resource):
     def get(self, table_name):
        try:
            query = spark.sql(f"SELECT `Date of birth` FROM iceberg.employee_db.{table_name}")
            data = query.toPandas().to_dict(orient="records")
            return jsonify(data)
          
        except Exception as e:
                print("Error occurred:", e)
                traceback.print_exc()
                return {"error": str(e)}, 500

In [41]:
def ai_rewrite_api(table_name, column, new_column):

    prompt = f"""The name of the column '{column}' has just been changed to '{new_column}'.
            Please rewrite the API code to reflect this change.
            The structure you are to use is the following:
            'class GetColumnName(Resource):
                def get(self, table_name):
                    try:
                        query = spark.sql(f"SELECT `Column name` FROM iceberg.employee_db.{table_name}")
                        data = query.toPandas().to_dict(orient="records")
                        return jsonify(data)
                    
                    except Exception as e:
                            print("Error occurred:", e)
                            traceback.print_exc()
                            return {{"error": str(e)}}, 500'. 
            Please do not include any other information, just the code, and pay attention to indentation
            since this code will be directly copied into another API file.
            Do not include '''python at the beginning of the code, and do not include ''' at the end of the code,
            since this will be actual code written in a .py file.
    """

    response = client.responses.create(
        model="gpt-4o",
        input=prompt
    )
    return response.output_text

In [42]:
ai_rewrite_api("employee", "Phone number", "Phone")

'class GetColumnName(Resource):\n    def get(self, table_name):\n        try:\n            query = spark.sql(f"SELECT `Phone` FROM iceberg.employee_db.employee")\n            data = query.toPandas().to_dict(orient="records")\n            return jsonify(data)\n        \n        except Exception as e:\n            print("Error occurred:", e)\n            traceback.print_exc()\n            return {"error": str(e)}, 500'

In [33]:
def create_new_api_file(table_name, column_name, new_column_name):

    # Get the current directory
    current_directory = "/Users/francescogalli/Desktop/Iceberg_Thesis_Work"
    table_name = "employee"

    # Find the latest version number (remember we are calling this function only AFTER a column name is changed)
    last_version_path = f"spark-warehouse/iceberg/employee_db/{table_name}/metadata/version-hint.text"
    with open(last_version_path, "r") as f:
        last_version = f.read()

    new_file_name = f"apiv{last_version}.py"

    # Create the new file in the curernt_directory
    new_file_path = os.path.join(current_directory, new_file_name)
    with open(new_file_path, "w") as file:
        file.write(f"# This is the file with the new static API for the column {new_column_name}\n")
        file.write(f"# Version: {last_version}\n")
        file.write(f"New API code:\n")
        file.write(ai_rewrite_api(table_name, column_name, new_column_name))
        


    print(f"Created new file: {new_file_name}")

In [None]:
class ChangeColumnName(Resource):
    def post(self, table_name, column, new_column):
        try:
            query = spark.sql(f"ALTER TABLE iceberg.employee_db.{table_name} RENAME COLUMN `{column}` TO `{new_column}`")
            data = query.toPandas().to_dict(orient="records")
            create_new_api_file(table_name, request, column, new_column)
            return jsonify(data)
        
        except Exception as e:
                print("Error occurred:", e)
                traceback.print_exc()
                return {"error": str(e)}, 500

NameError: name 'Resource' is not defined

In [21]:
api_path = "/Users/francescogalli/Desktop/Iceberg_Thesis_Work/api.py"
Column = "Phone number"

with open(api_path, "r") as file:
    api_code = file.read()
    for line in api_code.splitlines():
        if Column in line:
            print(line)

            query = spark.sql(f"SELECT `Phone number` FROM iceberg.employee_db.{table_name}")
        # THIS WORKS ONLY IF THE COLUMN NAME IS 'Phone number' (STATIC)


In [None]:
# Path to your API file
api_path = "/Users/francescogalli/Desktop/Iceberg_Thesis_Work/api.py"

# Column rename
old_column = "Phone number"
new_column = "Phone"

# Read the contents of api.py
with open(api_path, "r") as file:
    api_code = file.read()

client = OpenAI(
    api_key="xxx"
)

# Build the prompt
prompt = f"""
You are a helpful assistant with expertise in code editing and API design.

This is the current content of an API file (in Python, using Flask and Spark):

{api_code}


Now: the column name in the database has changed from '{old_column}' to '{new_column}'.

Your task:
- Keep the file structure and logic exactly the same.
- ONLY change the lines that would break due to the column name change.
- Update the SQL queries or any other references to the old column name so that the API continues to work.
- Do not rename endpoint routes or class/function names unless strictly necessary.

Return ONLY the full updated Python file content, without adding any markdown formatting, code fences, or extra characters.

Do not include ```python at the beginning or ``` at the end — only return the pure Python code so it can be written directly to a .py file.
"""

response = client.responses.create(
        model="gpt-4o",
        input=prompt
    )

# Extract the edited code from the response
#updated_code = response['choices'][0]['message']['content']
#print(response.output_text)

updated_code = response.output_text

last_schema_path = f"spark-warehouse/iceberg/employee_db/employee/metadata/version-hint.text"
with open(last_schema_path, "r") as f:
    last_schema = f.read()


# Write to a new file (safer than overwriting directly)
with open(f"apiv{last_schema}.py", "w") as out_file:
    out_file.write(updated_code)


25/04/12 09:21:38 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1275304 ms exceeds timeout 120000 ms
25/04/12 09:21:38 WARN SparkContext: Killing executors is not supported by current scheduler.
25/04/12 09:21:39 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

In [None]:
def load_json(path):
    with open(path, "r") as f:
        return json.load(f)

def save_json(path, data):
    with open(path, "w") as f:
        json.dump(data, f, indent=2)

def update_schema_mapping():
    schema_mapping_path = '/Users/francescogalli/Desktop/Iceberg_Thesis_Work/schema_api_mapping.json'

    schema_mapping = load_json(schema_mapping_path)

    last_schema_path = f"spark-warehouse/iceberg/employee_db/employee/metadata/version-hint.text"
    with open(last_schema_path, "r") as f:
        last_schema = f.read()

    # Update or add the table mapping
    additional_schema = {
        "schema_id": last_schema,
        "api_version": f"v{last_schema}",  # Simple versioning scheme
    }


    schema_mapping.append(additional_schema)

    # Save the updated mapping back to the JSON file
    save_json(schema_mapping_path, schema_mapping)
    print(load_json(schema_mapping_path))
    


In [42]:
update_schema_mapping()

[{'schema_id': 1, 'api_version': 'v1'}, {'schema_id': '8', 'api_version': 'v8'}]


In [None]:
def rewrite_api(old_column, new_column):
    # Load original API code
    with open(os.path.abspath(__file__), "r") as file:
        api_code = file.read()

    # Load current schema version
    metadata_path = f"spark-warehouse/iceberg/employee_db/employee/metadata/version-hint.text"
    with open(metadata_path, "r") as f:
        last_schema = f.read().strip()

    # Build OpenAI prompt
    prompt = f"""
You are a helpful assistant with expertise in code editing and API design.

This is the current content of an API file (in Python, using Flask and Spark):

{api_code}


Now: the column name in the database has changed from '{old_column}' to '{new_column}'.

Your task:
- Keep the file structure and logic exactly the same.
- ONLY change the lines that would break due to the column name change.
- Update the SQL queries or any other references to the old column name so that the API continues to work.
- Do not rename endpoint routes or class/function names unless strictly necessary.

Return ONLY the full updated Python file content, without adding any markdown formatting, code fences, or extra characters.

Do not include ```python at the beginning or ``` at the end — only return the pure Python code so it can be written directly to a .py file.
    """

    # Send prompt to OpenAI
    client = OpenAI(api_key="xxxx")

    response = client.chat.completions.create(
        model='gpt-4o',
        messages=[
            {"role": "user", "content": prompt}
        ]
    )

    updated_code = response.choices[0].message.content

    # Save updated API file
    output_path = f"/Users/francescogalli/Desktop/Iceberg_Thesis_Work/apiv{last_schema}.py"
    with open(output_path, "w") as out_file:
        out_file.write(updated_code)

    print(f"✅ API updated and written to {output_path}")


In [47]:
with open("apiv12.py", 'r') as f:
    f = f.read()

def clean_code(response_text):
    return response_text.strip().removeprefix("```python").removesuffix("```").strip()

f = clean_code(f)
print(f)

from flask import Flask, jsonify, request
from flask_restful import Api, Resource
from pyspark.sql import SparkSession
import traceback
import pandas as pd
import os
import json
from openai import OpenAI

client = OpenAI(
    api_key="xxxxx"
)

# Initialize Flask App
app = Flask(__name__)
api = Api(app)

# Connect to Spark inside Docker
spark = SparkSession.builder \
    .appName("IcebergFlaskAPI") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.0") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hadoop") \
    .config("spark.sql.catalog.iceberg.warehouse", "spark-warehouse/iceberg") \
    .getOrCreate()

def load_json(path):
    with open(path, "r") as f:
        return json.load(f)

def save_json(path, data):
    with open(path, "w") as f:
        json