#### Configure Iceberg catalog

An Iceberg catalog contains the Iceberg table data and metadata. 
There are various implementations of the catalog, including REST and JDBC.

In this case, a `hadoop` catalog type is used to define a catalog called `local`.

A directory called `spark-warehouse/iceberg` is then created on the local filesystem.

Within this directory, Iceberg table metadata and data is stored in a subdirectory for each table.


# Stuff I've learned

## file_paths
The metadata JSON files and the snapshot AVRO files contain absolute path references in the case of s3 or Glue. 
For example: `s3a://bjorn-test-bucket-902439737514/glue-warehouse/iceberg/default.db/users/data/00000-0-0f9846f1-7773-46fe-870d-5336aa0122d3-0-00001.parquet`
This will make it impossible to clone an iceberg table's files into another catalog

For local catalogs it is a relative path which is fair enough

## Adding files
Requires the iceberg extensions

Uploaded some new parquet files in the same bucket but a different prefix
This: doesn't support globbing
source_table => '`parquet`.`s3a://bjorn-test-bucket-902439737514/extra-parquet-files/*.parquet`'

but this works:
source_table => '`parquet`.`s3a://bjorn-test-bucket-902439737514/extra-parquet-files/`'

# Create table

What if we have an existing non-Iceberg, parquet table and want to convert it into an Iceberg table?

First you want to define an Iceberg table for the data.

CREATE TABLE ... LIKE ... is not supported.
But we could use CREATE TABLE ... AS SELECT ... WHERE 1=0

(I've confirmed this approach actually works)


Then you may want to copy the parquet files because after you add Icebergs metadata the paths will be absolute.

Copy them into the "data" folder so they look like Icebergers.

What if you call add files on the same dir twice, IE using the data dir?
Luckily it throws up:

java.lang.IllegalStateException: Cannot complete import because data files to be imported already exist within the target table: s3a://bjorn-test-bucket-902439737514/s3-warehouse/iceberg/default/extra_users/data/00000-4-4a6dfa15-615f-4814-b6fd-0b52a1f640ed-0-00001.parquet,s3a://bjorn-test-bucket-902439737514/s3-warehouse/iceberg/default/extra_users/data/00001-5-4a6dfa15-615f-4814-b6fd-0b52a1f640ed-0-00001.parquet,s3a://bjorn-test-bucket-902439737514/s3-warehouse/iceberg/default/extra_users/data/00002-6-4a6dfa15-615f-4814-b6fd-0b52a1f640ed-0-00001.parquet.  This is disabled by default as Iceberg is not designed for multiple references to the same file within the same table.  If you are sure, you may set 'check_duplicate_files' to false to force the import.

So if you're bringing over partitions it's better to put them in their own folder.
Or if you're bringing over an unpartitioned table you could also put it in its own folder, to avoid issues.

# Deleting files
Deleting a data file from the underlying storage which is in-use will break the table.
Unlike in Hive where you can delete partitions or individual files, you cant do that with Iceberg.

ERROR BaseReader: Error reading file(s): s3a://bjorn-test-bucket-902439737514/s3-warehouse/iceberg/default/extra_users/data/00000-4-4a6dfa15-615f-4814-b6fd-0b52a1f640ed-0-00001.parquet
org.apache.iceberg.exceptions.NotFoundException: File does not exist: s3a://bjorn-test-bucket-902439737514/s3-warehouse/iceberg/default/extra_users/data/00000-4-4a6dfa15-615f-4814-b6fd-0b52a1f640ed-0-00001.parquet

# Overview

Iceberg is vry popular, but there is a lot of Parquet data sitting around.

How can we migrate?
> Iceberg migration options:
- Snapshot
- Migrate (in place)
=> No mechanism to do a file-level copy

First you may want to try out Iceberg and then finally switch over.

Copy the data into a new table

# Another issue

If you try to do a snapshot of a table from a non-iceberg catalog it doesn't work:

IllegalArgumentException: Cannot create Iceberg table in non-Iceberg Catalog. Catalog 'spark_catalog' was of class 'org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog' but 'org.apache.iceberg.spark.SparkSessionCatalog' or 'org.apache.iceberg.spark.SparkCatalog' are required

# Another issue

Creating a regular parquet table in an iceberg enabled catalog just creates an Iceberg table anyway.

Even if I go into the Glue consonle and manually create a Parquet table in the "icebrge" database, and put some data in there, and am able to query it with athena, it doesnt work from spark
The Iceberg enabled catalog will lists only iceberg tables...

So it's basically impossible to use the SNAPSHOT procedure on a non-iceberg table.

# So it seems my only option is:

 1 Create empty iceberg
 2 Use Add files

Indeed this way actually seems to work.

Now you've played around with your Iceberg table and are happy.
You want to convert your existing table into an Iceberg.

The problem is the file paths are absolute so you cannot move the files into the new table location.
If you copy the files and then call add files you will have duplicate data.

If you move them, the table will break.

# Can you create an iceberg table on top of an existing table?




In [1]:
%env AWS_ACCESS_KEY_ID=ASIA5EHMZTCVCYBYKQNI
%env AWS_SECRET_ACCESS_KEY=q9+9KfF6NEEZQgGjJ3+r2uCZNzxn788BMH56xkeX
%env AWS_SESSION_TOKEN=IQoJb3JpZ2luX2VjEAoaCWV1LXdlc3QtMSJHMEUCIQDhqZmgG52g/w00fEwmcYibVlR/eGVipHL05cZik4ZpbQIgIV5mr1XwfllbTljrmwj5aun6sIshoiSdKkxTb8Ivpl0qlwMIExABGgw5MDI0Mzk3Mzc1MTQiDIUk8spE1gESmR12Ryr0AkRpJK8Et73roBto8J9nSWf3C21KmgjCY5ZJxaIDdBsZPbIJ251gS2cH2yq2PDj+hgwolrCkzJlYg4yMTzT8GGyCF4c54hTfm+QqE/jZkq/KszUpWZXy4W6+/vzjXo/32L0jNYStSMTJExoK1ETsICyEdKvkv/wJ3lP+0Ye7L9BsWInaZ1jOdugbxW0jYbYTaWu6guArmZiGHuowxkdxDWqVurh8Yf+52UqBlPYDNduJ8Ab6C3uDRO2qzKgGMf3mpeT9B4kP80Zn6Xx+eD4lLBeDUIDxSSWQIZaPxdUuiS4i3BjPpsPjidN/n77YXHIu/Dmtt1FYmfCivDyrzAipUT0c7vXKVSkDdOhJ9MCNXCKtL+Fs53lMErct/wVP5X9oAciTaBsN1UazmPnxzyCSlgolYQ5kystrXumt+2ZqoDyNTwTAdMSjjgJLBiM6CKL8tKF1Fam0LDFjPoBFp315VcdKDM9k1c+AehGvkXnpaI9TvHQPFTD/xs28BjqmAf+jPz99eAr7DKaM4J/o0L0ieTDktMaS9cKI8F7hQvi8aJn7BE3HkzWAzyJzGIV/y79+1CW9Ts3n1cj2ZFLOQxoxxynqScF3o07+u+T2Jd8Ckf5F2Ns4vhz/WuTYHpHpIYzeFb7ISycVviM6iI8cJ2SR7aAbCuqwUOY5A90+FFVuVR0i7GU8/Np/eQTdxYeUX44JpeLXyIsavhfXMydqAR5MjmBtZGU=


env: AWS_ACCESS_KEY_ID=ASIA5EHMZTCVCYBYKQNI
env: AWS_SECRET_ACCESS_KEY=q9+9KfF6NEEZQgGjJ3+r2uCZNzxn788BMH56xkeX
env: AWS_SESSION_TOKEN=IQoJb3JpZ2luX2VjEAoaCWV1LXdlc3QtMSJHMEUCIQDhqZmgG52g/w00fEwmcYibVlR/eGVipHL05cZik4ZpbQIgIV5mr1XwfllbTljrmwj5aun6sIshoiSdKkxTb8Ivpl0qlwMIExABGgw5MDI0Mzk3Mzc1MTQiDIUk8spE1gESmR12Ryr0AkRpJK8Et73roBto8J9nSWf3C21KmgjCY5ZJxaIDdBsZPbIJ251gS2cH2yq2PDj+hgwolrCkzJlYg4yMTzT8GGyCF4c54hTfm+QqE/jZkq/KszUpWZXy4W6+/vzjXo/32L0jNYStSMTJExoK1ETsICyEdKvkv/wJ3lP+0Ye7L9BsWInaZ1jOdugbxW0jYbYTaWu6guArmZiGHuowxkdxDWqVurh8Yf+52UqBlPYDNduJ8Ab6C3uDRO2qzKgGMf3mpeT9B4kP80Zn6Xx+eD4lLBeDUIDxSSWQIZaPxdUuiS4i3BjPpsPjidN/n77YXHIu/Dmtt1FYmfCivDyrzAipUT0c7vXKVSkDdOhJ9MCNXCKtL+Fs53lMErct/wVP5X9oAciTaBsN1UazmPnxzyCSlgolYQ5kystrXumt+2ZqoDyNTwTAdMSjjgJLBiM6CKL8tKF1Fam0LDFjPoBFp315VcdKDM9k1c+AehGvkXnpaI9TvHQPFTD/xs28BjqmAf+jPz99eAr7DKaM4J/o0L0ieTDktMaS9cKI8F7hQvi8aJn7BE3HkzWAzyJzGIV/y79+1CW9Ts3n1cj2ZFLOQxoxxynqScF3o07+u+T2Jd8Ckf5F2Ns4vhz/WuTYHpHpIYzeFb7ISycVviM6iI8cJ2SR7aAbCuqwUOY5A90+FFVuVR0i7

In [2]:
import os
from pyspark.sql import SparkSession

S3_LOCATION = "bjorn-test-bucket-902439737514/tests"

builder = SparkSession.builder \
    .appName("IcebergLocalDevelopment") \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2,org.apache.spark:spark-hadoop-cloud_2.12:3.5.4') \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")

# Configure AWS credentials
builder = builder \
    .config("spark.hadoop.fs.s3a.access.key", os.environ["AWS_ACCESS_KEY_ID"]) \
    .config("spark.hadoop.fs.s3a.secret.key", os.environ["AWS_SECRET_ACCESS_KEY"])

# Configure local_iceberg_catalog
builder = builder \
    .config("spark.sql.catalog.local_iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local_iceberg_catalog.type", "hadoop") \
    .config("spark.sql.catalog.local_iceberg_catalog.warehouse", "local-warehouse")

# Configure s3_iceberg_catalog
builder = builder \
    .config("spark.sql.catalog.s3_iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.s3_iceberg_catalog.type", "hadoop") \
    .config("spark.sql.catalog.s3_iceberg_catalog.warehouse", f"s3a://{S3_LOCATION}/s3-warehouse/iceberg")

# Configure glue_iceberg_catalog
builder = builder \
    .config("spark.sql.catalog.glue_iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.glue_iceberg_catalog.type", "glue") \
    .config("spark.sql.catalog.glue_iceberg_catalog.warehouse", f"s3a://{S3_LOCATION}/glue-warehouse/iceberg") \
    .config("spark.sql.catalog.glue_iceberg_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")

spark = builder.getOrCreate()

# Verify Spark session creation
spark.sql("SHOW DATABASES").show()

25/01/24 14:54:02 WARN Utils: Your hostname, Bjorns-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.68.102 instead (on interface en0)
25/01/24 14:54:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/bjorn/Documents/baolsen/projects/iceberg/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/bjorn/.ivy2/cache
The jars for the packages stored in: /Users/bjorn/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.apache.iceberg#iceberg-aws-bundle added as a dependency
org.apache.spark#spark-hadoop-cloud_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-dff8ffc4-8ded-414a-97d5-a0eda70fb26f;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 in central
	found org.apache.iceberg#iceberg-aws-bundle;1.5.2 in central
	found org.apache.spark#spark-hadoop-cloud_2.12;3.5.4 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.hadoop#hadoo

+---------+
|namespace|
+---------+
|  default|
+---------+



In [3]:
# database_name aka schema_name
database_name = "iceberg_test_db"

spark.sql(f"CREATE DATABASE IF NOT EXISTS local_iceberg_catalog.{database_name}")
spark.sql(f"USE local_iceberg_catalog.{database_name}")

spark.sql(f"CREATE DATABASE IF NOT EXISTS s3_iceberg_catalog.{database_name}")
spark.sql(f"USE s3_iceberg_catalog.{database_name}")

spark.sql(f"CREATE DATABASE IF NOT EXISTS glue_iceberg_catalog.{database_name}")
spark.sql(f"USE glue_iceberg_catalog.{database_name}")

spark.sql(f"CREATE DATABASE IF NOT EXISTS spark_catalog.{database_name}")
spark.sql(f"USE spark_catalog.{database_name}")

# Verify Spark session creation
spark.sql("SHOW CATALOGS").show()

25/01/24 14:54:22 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


+--------------------+
|             catalog|
+--------------------+
|glue_iceberg_catalog|
|local_iceberg_cat...|
|  s3_iceberg_catalog|
|       spark_catalog|
+--------------------+



# Create and query a regular parquet table

In [6]:
# Create a regular Parquet table
# Specify the catalog name
catalog_name = "glue_iceberg_catalog"
# Specify the database_name aka schema_name
database_name = "iceberg_test_db"

# This didn't work, it creates an ICEBERG table anyway!

# spark.sql(f"""
#   CREATE TABLE IF NOT EXISTS {catalog_name}.{database_name}.users (
#     id INT,
#     name STRING,
#     age INT
#   ) 
#   USING PARQUET
# """)


# Insert some sample data
# spark.sql(f"""
#   INSERT INTO {catalog_name}.{database_name}.users VALUES
#     (1, 'Alice', 30),
#     (2, 'Bob', 25),
#     (3, 'Charlie', 35)
# """)

spark.sql("USE glue_iceberg_catalog.iceberg_test_db")
spark.sql("show tables").show()

# spark.sql(f"SELECT * FROM {catalog_name}.{database_name}.manual_glue_table").show()

+---------------+------------------+-----------+
|      namespace|         tableName|isTemporary|
+---------------+------------------+-----------+
|iceberg_test_db|users_from_parquet|      false|
+---------------+------------------+-----------+



In [9]:
spark.sql(f"show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



#### Create and query an Iceberg table

In [7]:
# Create an Iceberg table

# Specify the catalog name
catalog_name = "local_iceberg_catalog"
# Specify the database_name aka schema_name
database_name = "iceberg_test_db"

spark.sql(f"""
  CREATE TABLE IF NOT EXISTS {catalog_name}.{database_name}.users (
    id INT,
    name STRING,
    age INT
  ) 
  USING iceberg
  """)

# Insert some sample data
spark.sql(f"""
  INSERT INTO {catalog_name}.{database_name}.users VALUES
    (1, 'Alice', 30),
    (2, 'Bob', 25),
    (3, 'Charlie', 35)
    """)

spark.sql(f"SELECT * FROM {catalog_name}.{database_name}.users").show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 30|
|  1|  Alice| 30|
|  2|    Bob| 25|
|  2|    Bob| 25|
|  3|Charlie| 35|
|  3|Charlie| 35|
+---+-------+---+



In [7]:
import duckdb
duckdb.sql(""" INSTALL avro FROM community; LOAD avro; """)
duckdb.sql(f""" FROM read_avro('{local_warehouse_directory}/iceberg/{schema}/users/metadata/snap*.avro'); """)

┌─────────────────────────────────────────────────────────────────────────────────────────────┬─────────────────┬───────────────────┬─────────┬─────────────────┬─────────────────────┬─────────────────────┬───────────────────┬──────────────────────┬─────────────────────┬──────────────────┬─────────────────────┬────────────────────┬───────────────────────────────────────────────────────────────────────────────────────────┐
│                                        manifest_path                                        │ manifest_length │ partition_spec_id │ content │ sequence_number │ min_sequence_number │  added_snapshot_id  │ added_files_count │ existing_files_count │ deleted_files_count │ added_rows_count │ existing_rows_count │ deleted_rows_count │                                        partitions                                         │
│                                           varchar                                           │      int64      │       int32       │  int32  │      i

In [8]:
# now lets test adding new files ot an Iceberg.

# First generate some local data

spark.sql("""
  INSERT INTO local_catalog.default.users VALUES
    (4, 'Dhiya', 20),
    (5, 'Elian', 50),
    (6, 'Freya', 50)
""")

DataFrame[]

In [9]:
duckdb.sql(f""" FROM read_parquet('local-warehouse/iceberg/default/users/data/*4a6*.parquet'); """)

┌───────┬─────────┬───────┐
│  id   │  name   │  age  │
│ int32 │ varchar │ int32 │
├───────┼─────────┼───────┤
│     4 │ Dhiya   │    20 │
│     5 │ Elian   │    50 │
│     6 │ Freya   │    50 │
└───────┴─────────┴───────┘

In [5]:
# First create a table for the extra files

spark.sql(f"""\
CALL {catalog}.system.add_files(
table => '{schema}.users',
source_table => '`parquet`.`s3a://bjorn-test-bucket-902439737514/extra-parquet-files/`'
)
""")

                                                                                

DataFrame[added_files_count: bigint, changed_partition_count: bigint]

In [9]:
# Create a spark table over the data
# First try with a temp table

df = spark.read.parquet('s3a://bjorn-test-bucket-902439737514/extra-parquet-files/*.parquet')
df.createOrReplaceTempView("users")
spark.sql(""" SELECT * FROM users """).show()

# Create an empty Iceberg table
spark.sql("""\
CREATE TABLE s3_catalog.default.extra_users
USING iceberg
AS SELECT * FROM users WHERE 1=0
""")

+---+-----+---+
| id| name|age|
+---+-----+---+
|  4|Dhiya| 20|
|  5|Elian| 50|
|  6|Freya| 50|
+---+-----+---+



DataFrame[]

In [10]:
spark.sql(""" SELECT * FROM s3_catalog.default.extra_users """).show()

+---+----+---+
| id|name|age|
+---+----+---+
+---+----+---+



In [25]:
# Now we copy across all the data files
import boto3
s3 = boto3.resource('s3')

bucket = 'bjorn-test-bucket-902439737514'

files = s3.Bucket(bucket).objects.filter(Prefix='extra-parquet-files/')

for file in files:
    key = file.key
    if not key.endswith('.parquet'):
        continue

    object_name = key.split("/")[-1]

    copy_source = {
        'Bucket': bucket,
        'Key': key
    }
    s3.meta.client.copy(copy_source, bucket, 's3-warehouse/iceberg/default/extra_users/data/extra-parquet-files/' + object_name)

In [26]:
spark.sql(""" SELECT * FROM s3_catalog.default.extra_users """).show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  4|Dhiya| 20|
|  5|Elian| 50|
|  6|Freya| 50|
+---+-----+---+



In [27]:
spark.sql(f"""\
CALL {catalog}.system.add_files(
table => '{schema}.extra_users',
source_table => '`parquet`.`s3a://bjorn-test-bucket-902439737514/s3-warehouse/iceberg/default/extra_users/data/extra-parquet-files/`'
)
""")

DataFrame[added_files_count: bigint, changed_partition_count: bigint]

In [29]:
spark.sql(""" SELECT * FROM s3_catalog.default.extra_users """).show()

25/01/17 16:51:03 ERROR BaseReader: Error reading file(s): s3a://bjorn-test-bucket-902439737514/s3-warehouse/iceberg/default/extra_users/data/00000-4-4a6dfa15-615f-4814-b6fd-0b52a1f640ed-0-00001.parquet
org.apache.iceberg.exceptions.NotFoundException: File does not exist: s3a://bjorn-test-bucket-902439737514/s3-warehouse/iceberg/default/extra_users/data/00000-4-4a6dfa15-615f-4814-b6fd-0b52a1f640ed-0-00001.parquet
	at org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:164)
	at org.apache.iceberg.hadoop.HadoopInputFile.getStat(HadoopInputFile.java:200)
	at org.apache.iceberg.parquet.ParquetIO.file(ParquetIO.java:51)
	at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:238)
	at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:81)
	at org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
	at org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
	at org.apache.iceberg.spark.sour

Py4JJavaError: An error occurred while calling o141.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 64.0 failed 1 times, most recent failure: Lost task 0.0 in stage 64.0 (TID 1642) (192.168.68.104 executor driver): org.apache.iceberg.exceptions.NotFoundException: File does not exist: s3a://bjorn-test-bucket-902439737514/s3-warehouse/iceberg/default/extra_users/data/00000-4-4a6dfa15-615f-4814-b6fd-0b52a1f640ed-0-00001.parquet
	at org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:164)
	at org.apache.iceberg.hadoop.HadoopInputFile.getStat(HadoopInputFile.java:200)
	at org.apache.iceberg.parquet.ParquetIO.file(ParquetIO.java:51)
	at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:238)
	at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:81)
	at org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
	at org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
	at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:109)
	at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:41)
	at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:143)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
	at scala.Option.exists(Option.scala:376)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://bjorn-test-bucket-902439737514/s3-warehouse/iceberg/default/extra_users/data/00000-4-4a6dfa15-615f-4814-b6fd-0b52a1f640ed-0-00001.parquet
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
	at org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:162)
	... 38 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.iceberg.exceptions.NotFoundException: File does not exist: s3a://bjorn-test-bucket-902439737514/s3-warehouse/iceberg/default/extra_users/data/00000-4-4a6dfa15-615f-4814-b6fd-0b52a1f640ed-0-00001.parquet
	at org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:164)
	at org.apache.iceberg.hadoop.HadoopInputFile.getStat(HadoopInputFile.java:200)
	at org.apache.iceberg.parquet.ParquetIO.file(ParquetIO.java:51)
	at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:238)
	at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:81)
	at org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
	at org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
	at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:109)
	at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:41)
	at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:143)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
	at scala.Option.exists(Option.scala:376)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://bjorn-test-bucket-902439737514/s3-warehouse/iceberg/default/extra_users/data/00000-4-4a6dfa15-615f-4814-b6fd-0b52a1f640ed-0-00001.parquet
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
	at org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:162)
	... 38 more


In [5]:
# Create a regular parquet table in a regular catalog

# I had to do this manually....

# Create a regular Parquet table
# Specify the catalog name
catalog_name = "spark_catalog"
# Specify the database_name aka schema_name
database_name = "iceberg_test_db"

# Creating a regular parquet table in an iceberg enabled catalog just creates an Iceberg table anyway.
spark.sql(f"""
  CREATE TABLE IF NOT EXISTS {catalog_name}.{database_name}.users_parquet (
    id INT,
    name STRING,
    age INT
  ) 
  USING PARQUET
""")


# Insert some sample data
spark.sql(f"""
  INSERT INTO {catalog_name}.{database_name}.users_parquet VALUES
    (1, 'Alice', 30),
    (2, 'Bob', 25),
    (3, 'Charlie', 35)
""")

spark.sql(f"SELECT * FROM {catalog_name}.{database_name}.users_parquet").show()

                                                                                

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 35|
|  1|  Alice| 30|
|  2|    Bob| 25|
+---+-------+---+



In [6]:
# Test the Iceberg "snapshot" procedure

in_catalog_name = "spark_catalog"
out_catalog_name = "glue_iceberg_catalog"

database_name = "iceberg_test_db"

# First create an empty Glue Iceberg table
spark.sql(f"""\
CREATE TABLE IF NOT EXISTS {out_catalog_name}.{database_name}.users_from_parquet
USING ICEBERG
AS SELECT * FROM {in_catalog_name}.{database_name}.users_parquet WHERE 1=0
""")

spark.sql(f"""SELECT * FROM {out_catalog_name}.{database_name}.users_from_parquet""").show()

                                                                                

+---+----+---+
| id|name|age|
+---+----+---+
+---+----+---+



In [10]:
# spark.sql(f"""
# CALL local_iceberg_catalog.system.snapshot('spark_catalog.iceberg_test_db.users', 'iceberg_test_db.users_snap')
# """).show()

# IllegalArgumentException: Cannot create Iceberg table in non-Iceberg Catalog. Catalog 'spark_catalog' was of class 'org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog' but 'org.apache.iceberg.spark.SparkSessionCatalog' or 'org.apache.iceberg.spark.SparkCatalog' are required

# However if you've created the normal table in an ICEBERG enabled catalog maybe it works?


IllegalArgumentException: Cannot create Iceberg table in non-Iceberg Catalog. Catalog 'spark_catalog' was of class 'org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog' but 'org.apache.iceberg.spark.SparkSessionCatalog' or 'org.apache.iceberg.spark.SparkCatalog' are required

In [None]:
# spark.sql(f"""
# CALL {catalog_name}.system.snapshot('{database_name}.users_parquet', '{database_name}.users_parquet_snap')
# """).show()

# org.apache.iceberg.exceptions.NoSuchTableException: Cannot not find source table 'iceberg_test_db.users_parquet'

# spark.sql(f"""
# CALL glue_iceberg_catalog.system.snapshot('{database_name}.manual_glue_table', '{database_name}.manual_glue_table_snap')
# """).show()
# IllegalArgumentException: Cannot snapshot a table that isn't in the session catalog (i.e. spark_catalog). Found source catalog: glue_iceberg_catalog
# apparently this error is because you cant snapshot an Iceberg table, but my table isn't an iceberg table...

# 
# spark.sql("SELECT * FROM glue_iceberg_catalog.iceberg_test_db.manual_glue_table").show()
# UnresolvedRelation [glue_iceberg_catalog, iceberg_test_db, manual_glue_table], [], false


AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `glue_iceberg_catalog`.`iceberg_test_db`.`manual_glue_table` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [glue_iceberg_catalog, iceberg_test_db, manual_glue_table], [], false


In [12]:
spark.sql(f"SELECT * FROM {catalog_name}.{database_name}.users_parquet").show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 30|
|  2|    Bob| 25|
|  3|Charlie| 35|
+---+-------+---+



In [None]:
# What about creating a regular table in a regular catalog, then making it an Iceberg catalog?

catalog_name = "local_catalog"
# Specify the database_name aka schema_name
database_name = "iceberg_test_db"

# Creating a regular parquet table in an iceberg enabled catalog just creates an Iceberg table anyway.
spark.sql(f"""
  CREATE TABLE IF NOT EXISTS {catalog_name}.{database_name}.users_parquet (
    id INT,
    name STRING,
    age INT
  ) 
  USING PARQUET
""")


# Insert some sample data
spark.sql(f"""
  INSERT INTO {catalog_name}.{database_name}.users_parquet VALUES
    (1, 'Alice', 30),
    (2, 'Bob', 25),
    (3, 'Charlie', 35)
""")

spark.sql(f"SELECT * FROM {catalog_name}.{database_name}.users_parquet").show()

In [None]:
# Now that we have an empty Iceberg table and a full Parquet table, lets try to snapshot the Parquet table into the Iceberg table

# Normal table
spark.sql(f"SELECT * FROM spark_catalog.{database_name}.users_parquet").show()
# Iceberg table
spark.sql(f"SELECT * FROM glue_iceberg_catalog.{database_name}.users_from_parquet").show()

# Use the ADD FILES command to add the data files to the Iceberg table
spark.sql(f"""\
CALL glue_iceberg_catalog.system.add_files(
table => '{database_name}.users_from_parquet',
source_table => '`parquet`.`s3a://bjorn-test-bucket-902439737514/tests/manual/`'
)
""")

spark.sql(f"SELECT * FROM glue_iceberg_catalog.{database_name}.users_from_parquet").show()

# After doing this, the Glue Table Overview panel in AWS would just spin forever, it really doesn't seem to like this.
# Okay it came right eventually, but it took a long time.
# Athena is able to query the added data at least.

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 35|
|  1|  Alice| 30|
|  2|    Bob| 25|
+---+-------+---+

+---+----+---+
| id|name|age|
+---+----+---+
+---+----+---+



[Stage 14:>                                                         (0 + 1) / 1]

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 30|
|  2|    Bob| 25|
|  3|Charlie| 35|
+---+-------+---+



                                                                                

In [None]:
# Lets try create an Iceberg table on top of the manual table data

spark.sql(f"""
CREATE TABLE IF NOT EXISTS glue_iceberg_catalog.iceberg_test_db.manual_glue_table_iceberg
USING iceberg
LOCATION 's3a://bjorn-test-bucket-902439737514/tests/manual'
          AS SELECT * FROM {in_catalog_name}.{database_name}.users_parquet WHERE 1=0
""")

# AS SELECT * FROM glue_iceberg_catalog.iceberg_test_db.manual_glue_table WHERE 1=0
# ofc this^ wont work because glue_iceberg_catalog is an Iceberg catalog and doesnt recognse the manual table.

# Above CREATE statement works as expected, it creates a metadata folder at the same location as the data.

# And ofc there is no data until you do an "ADD FILES" command.
spark.sql("SELECT * FROM glue_iceberg_catalog.iceberg_test_db.manual_glue_table_iceberg").show()


+---+----+---+
| id|name|age|
+---+----+---+
+---+----+---+

