In [1]:
%load_ext sparksql_magic

In [2]:
import pyspark
from pyspark.sql import SparkSession, Row

from datetime import datetime,date

In [3]:
#https://iceberg.apache.org/spark-quickstart/#adding-a-catalog
#TODO for reference for aws config: https://www.dremio.com/blog/deep-dive-into-configuring-your-apache-iceberg-catalog-with-apache-spark/
conf = (
    pyspark.SparkConf()
        .setAppName('test')
        #Configure AWS (Minio)
        .set('spark.hadoop.fs.s3a.endpoint','http://localhost:9000')
        .set('spark.hadoop.fs.s3a.access.key','minioadmin')
        .set('spark.hadoop.fs.s3a.secret.key','minioadmin')
        .set('spark.hadoop.fs.s3a.path.style.access','true')
        .set('spark.hadoop.fs.s3a.path.style.access','org.apache.hadoop.fs.s3a.S3AFileSystem')
  		#packages
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.hdfs_catalog', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.hdfs_catalog.type', 'hadoop')
        .set('spark.sql.catalog.hdfs_catalog.warehouse', '../datasets/sample-datasets/iceberg/')
        .set('spark.sql.defaultCatalog', 'hdfs_catalog')
        
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

In [4]:
sql_create = """
CREATE TABLE IF NOT EXISTS prod.employee (
    id bigint COMMENT 'unique id for employee',
    birth_date date COMMENT 'birth date of employee',
    country string COMMENT 'Country location of employee',
    name string)
    PARTITIONED BY (country)
"""
spark.sql(sql_create)

DataFrame[]

### Above Step will create employe table under prod catalog
- You can navigate to `../datasets/iceberg/prod/employee/`
- You'll see metadata under the folder
- - This has version-hint.text and v1.metadata.json
  - `version-hint.text` has version number of the current metadata file. This file exists as we are using hadoop as our catalog.[1]
  - `v1.metadata.json` has metdata on the table including its schema information and location of the data

In [5]:
!cat ../datasets/sample-datasets/iceberg/prod/employee/metadata/version-hint.text

4

In [6]:
!cat ../datasets/sample-datasets/iceberg/prod/employee/metadata/v1.metadata.json

{
  "format-version" : 2,
  "table-uuid" : "465bf768-a384-4b51-b587-d9e6cb957cf1",
  "location" : "../datasets/sample-datasets/iceberg/prod/employee",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1706687747043,
  "last-column-id" : 4,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "id",
      "required" : false,
      "type" : "long",
      "doc" : "unique id for employee"
    }, {
      "id" : 2,
      "name" : "birth_date",
      "required" : false,
      "type" : "date",
      "doc" : "birth date of employee"
    }, {
      "id" : 3,
      "name" : "country",
      "required" : false,
      "type" : "string",
      "doc" : "Country location of employee"
    }, {
      "id" : 4,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "country",
      "t

In [7]:
df_emp = spark.createDataFrame([
    Row(id=1, country='USA', birth_date=date(2000, 8, 1), name="A"),
   
    Row(id=2, country='IN', birth_date=date(2000, 6, 2), name="B"),
   
    Row(id=4, country='USA', birth_date=date(2000, 5, 3), name="C")
])
 

In [8]:
df_emp.show()

+---+-------+----------+----+
| id|country|birth_date|name|
+---+-------+----------+----+
|  1|    USA|2000-08-01|   A|
|  2|     IN|2000-06-02|   B|
|  4|    USA|2000-05-03|   C|
+---+-------+----------+----+



In [9]:
df_emp.writeTo("prod.employee")\
        .partitionedBy("country") \
        .append()

In [10]:
### The above command will add data and you can see v2.metadata.json add

In [11]:
!cat ../datasets/sample-datasets/iceberg/prod/employee/metadata/version-hint.text

5

In [12]:
!cat ../datasets/sample-datasets/iceberg/prod/employee/metadata/v1.metadata.json

{
  "format-version" : 2,
  "table-uuid" : "465bf768-a384-4b51-b587-d9e6cb957cf1",
  "location" : "../datasets/sample-datasets/iceberg/prod/employee",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1706687747043,
  "last-column-id" : 4,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "id",
      "required" : false,
      "type" : "long",
      "doc" : "unique id for employee"
    }, {
      "id" : 2,
      "name" : "birth_date",
      "required" : false,
      "type" : "date",
      "doc" : "birth date of employee"
    }, {
      "id" : 3,
      "name" : "country",
      "required" : false,
      "type" : "string",
      "doc" : "Country location of employee"
    }, {
      "id" : 4,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "country",
      "t

### Query Data

In [13]:
spark.sql("select * from prod.employee").collect()

[Row(id=3, birth_date=datetime.date(2000, 8, 1), country='IN', name='D'),
 Row(id=2, birth_date=datetime.date(2000, 6, 2), country='IN', name='B'),
 Row(id=3, birth_date=datetime.date(2000, 8, 1), country='IN', name='D'),
 Row(id=1, birth_date=datetime.date(2000, 8, 1), country='USA', name='A'),
 Row(id=4, birth_date=datetime.date(2000, 5, 3), country='USA', name='C'),
 Row(id=2, birth_date=datetime.date(2000, 6, 2), country='IN', name='B'),
 Row(id=1, birth_date=datetime.date(2000, 8, 1), country='USA', name='A'),
 Row(id=4, birth_date=datetime.date(2000, 5, 3), country='USA', name='C')]

In [14]:
%%sparksql
select * from prod.employee

0,1,2,3
id,birth_date,country,name
2,2000-06-02,IN,B
1,2000-08-01,USA,A
4,2000-05-03,USA,C
3,2000-08-01,IN,D
3,2000-08-01,IN,D
2,2000-06-02,IN,B
1,2000-08-01,USA,A
4,2000-05-03,USA,C


References:
1. https://www.dremio.com/resources/guides/apache-iceberg-an-architectural-look-under-the-covers/

### Insert Data

In [15]:
%%sparksql
Describe prod.employee

0,1,2
col_name,data_type,comment
id,bigint,unique id for employee
birth_date,date,birth date of employee
country,string,Country location of employee
name,string,
# Partition Information,,
# col_name,data_type,comment
country,string,Country location of employee


In [16]:
%%sparksql
Insert into prod.employee
Values (3, CAST('2000-08-01' AS date), 'IN', 'D')

In [17]:
%%sparksql
select * from prod.employee.files

0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17
content,file_path,file_format,spec_id,partition,record_count,file_size_in_bytes,column_sizes,value_counts,null_value_counts,nan_value_counts,lower_bounds,upper_bounds,key_metadata,split_offsets,equality_ids,sort_order_id,readable_metrics
0,../datasets/sample-datasets/iceberg/prod/employee/data/country=IN/00000-22-de7cce31-22e9-4917-bd37-86ddaf2fa456-00001.parquet,PARQUET,0,Row(country='IN'),1,1196,"{1: 40, 2: 41, 3: 38, 4: 37}","{1: 1, 2: 1, 3: 1, 4: 1}","{1: 0, 2: 0, 3: 0, 4: 0}",{},"{1: bytearray(b'\x03\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'\xa2+\x00\x00'), 3: bytearray(b'IN'), 4: bytearray(b'D')}","{1: bytearray(b'\x03\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'\xa2+\x00\x00'), 3: bytearray(b'IN'), 4: bytearray(b'D')}",,[4],,0,"Row(birth_date=Row(column_size=41, value_count=1, null_value_count=0, nan_value_count=None, lower_bound=datetime.date(2000, 8, 1), upper_bound=datetime.date(2000, 8, 1)), country=Row(column_size=38, value_count=1, null_value_count=0, nan_value_count=None, lower_bound='IN', upper_bound='IN'), id=Row(column_size=40, value_count=1, null_value_count=0, nan_value_count=None, lower_bound=3, upper_bound=3), name=Row(column_size=37, value_count=1, null_value_count=0, nan_value_count=None, lower_bound='D', upper_bound='D'))"
0,../datasets/sample-datasets/iceberg/prod/employee/data/country=IN/00000-16-7cf8ba3a-c8a8-47cd-acb0-e654cc525c84-00002.parquet,PARQUET,0,Row(country='IN'),1,1220,"{1: 46, 2: 41, 3: 44, 4: 43}","{1: 1, 2: 1, 3: 1, 4: 1}","{1: 0, 2: 0, 3: 0, 4: 0}",{},"{1: bytearray(b'\x02\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'f+\x00\x00'), 3: bytearray(b'IN'), 4: bytearray(b'B')}","{1: bytearray(b'\x02\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'f+\x00\x00'), 3: bytearray(b'IN'), 4: bytearray(b'B')}",,[4],,0,"Row(birth_date=Row(column_size=41, value_count=1, null_value_count=0, nan_value_count=None, lower_bound=datetime.date(2000, 6, 2), upper_bound=datetime.date(2000, 6, 2)), country=Row(column_size=44, value_count=1, null_value_count=0, nan_value_count=None, lower_bound='IN', upper_bound='IN'), id=Row(column_size=46, value_count=1, null_value_count=0, nan_value_count=None, lower_bound=2, upper_bound=2), name=Row(column_size=43, value_count=1, null_value_count=0, nan_value_count=None, lower_bound='B', upper_bound='B'))"
0,../datasets/sample-datasets/iceberg/prod/employee/data/country=USA/00000-16-7cf8ba3a-c8a8-47cd-acb0-e654cc525c84-00001.parquet,PARQUET,0,Row(country='USA'),2,1280,"{1: 54, 2: 46, 3: 75, 4: 48}","{1: 2, 2: 2, 3: 2, 4: 2}","{1: 0, 2: 0, 3: 0, 4: 0}",{},"{1: bytearray(b'\x01\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'H+\x00\x00'), 3: bytearray(b'USA'), 4: bytearray(b'A')}","{1: bytearray(b'\x04\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'\xa2+\x00\x00'), 3: bytearray(b'USA'), 4: bytearray(b'C')}",,[4],,0,"Row(birth_date=Row(column_size=46, value_count=2, null_value_count=0, nan_value_count=None, lower_bound=datetime.date(2000, 5, 3), upper_bound=datetime.date(2000, 8, 1)), country=Row(column_size=75, value_count=2, null_value_count=0, nan_value_count=None, lower_bound='USA', upper_bound='USA'), id=Row(column_size=54, value_count=2, null_value_count=0, nan_value_count=None, lower_bound=1, upper_bound=4), name=Row(column_size=48, value_count=2, null_value_count=0, nan_value_count=None, lower_bound='A', upper_bound='C'))"
0,../datasets/sample-datasets/iceberg/prod/employee/data/country=IN/00000-20-1266f857-fc52-4d70-880b-5c86a1808492-00001.parquet,PARQUET,0,Row(country='IN'),1,1196,"{1: 40, 2: 41, 3: 38, 4: 37}","{1: 1, 2: 1, 3: 1, 4: 1}","{1: 0, 2: 0, 3: 0, 4: 0}",{},"{1: bytearray(b'\x03\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'\xa2+\x00\x00'), 3: bytearray(b'IN'), 4: bytearray(b'D')}","{1: bytearray(b'\x03\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'\xa2+\x00\x00'), 3: bytearray(b'IN'), 4: bytearray(b'D')}",,[4],,0,"Row(birth_date=Row(column_size=41, value_count=1, null_value_count=0, nan_value_count=None, lower_bound=datetime.date(2000, 8, 1), upper_bound=datetime.date(2000, 8, 1)), country=Row(column_size=38, value_count=1, null_value_count=0, nan_value_count=None, lower_bound='IN', upper_bound='IN'), id=Row(column_size=40, value_count=1, null_value_count=0, nan_value_count=None, lower_bound=3, upper_bound=3), name=Row(column_size=37, value_count=1, null_value_count=0, nan_value_count=None, lower_bound='D', upper_bound='D'))"
0,../datasets/sample-datasets/iceberg/prod/employee/data/country=IN/00000-16-ee33d2d1-60bd-41ca-8964-a8386fa2980b-00002.parquet,PARQUET,0,Row(country='IN'),1,1220,"{1: 46, 2: 41, 3: 44, 4: 43}","{1: 1, 2: 1, 3: 1, 4: 1}","{1: 0, 2: 0, 3: 0, 4: 0}",{},"{1: bytearray(b'\x02\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'f+\x00\x00'), 3: bytearray(b'IN'), 4: bytearray(b'B')}","{1: bytearray(b'\x02\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'f+\x00\x00'), 3: bytearray(b'IN'), 4: bytearray(b'B')}",,[4],,0,"Row(birth_date=Row(column_size=41, value_count=1, null_value_count=0, nan_value_count=None, lower_bound=datetime.date(2000, 6, 2), upper_bound=datetime.date(2000, 6, 2)), country=Row(column_size=44, value_count=1, null_value_count=0, nan_value_count=None, lower_bound='IN', upper_bound='IN'), id=Row(column_size=46, value_count=1, null_value_count=0, nan_value_count=None, lower_bound=2, upper_bound=2), name=Row(column_size=43, value_count=1, null_value_count=0, nan_value_count=None, lower_bound='B', upper_bound='B'))"
0,../datasets/sample-datasets/iceberg/prod/employee/data/country=USA/00000-16-ee33d2d1-60bd-41ca-8964-a8386fa2980b-00001.parquet,PARQUET,0,Row(country='USA'),2,1280,"{1: 54, 2: 46, 3: 75, 4: 48}","{1: 2, 2: 2, 3: 2, 4: 2}","{1: 0, 2: 0, 3: 0, 4: 0}",{},"{1: bytearray(b'\x01\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'H+\x00\x00'), 3: bytearray(b'USA'), 4: bytearray(b'A')}","{1: bytearray(b'\x04\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'\xa2+\x00\x00'), 3: bytearray(b'USA'), 4: bytearray(b'C')}",,[4],,0,"Row(birth_date=Row(column_size=46, value_count=2, null_value_count=0, nan_value_count=None, lower_bound=datetime.date(2000, 5, 3), upper_bound=datetime.date(2000, 8, 1)), country=Row(column_size=75, value_count=2, null_value_count=0, nan_value_count=None, lower_bound='USA', upper_bound='USA'), id=Row(column_size=54, value_count=2, null_value_count=0, nan_value_count=None, lower_bound=1, upper_bound=4), name=Row(column_size=48, value_count=2, null_value_count=0, nan_value_count=None, lower_bound='A', upper_bound='C'))"
0,../datasets/sample-datasets/iceberg/prod/employee/data/country=IN/00000-1-bb0ac802-bb68-4811-829e-106d4078207d-00001.parquet,PARQUET,0,Row(country='IN'),1,1196,"{1: 40, 2: 41, 3: 38, 4: 37}","{1: 1, 2: 1, 3: 1, 4: 1}","{1: 0, 2: 0, 3: 0, 4: 0}",{},"{1: bytearray(b'\x03\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'\xa2+\x00\x00'), 3: bytearray(b'IN'), 4: bytearray(b'D')}","{1: bytearray(b'\x03\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'\xa2+\x00\x00'), 3: bytearray(b'IN'), 4: bytearray(b'D')}",,[4],,0,"Row(birth_date=Row(column_size=41, value_count=1, null_value_count=0, nan_value_count=None, lower_bound=datetime.date(2000, 8, 1), upper_bound=datetime.date(2000, 8, 1)), country=Row(column_size=38, value_count=1, null_value_count=0, nan_value_count=None, lower_bound='IN', upper_bound='IN'), id=Row(column_size=40, value_count=1, null_value_count=0, nan_value_count=None, lower_bound=3, upper_bound=3), name=Row(column_size=37, value_count=1, null_value_count=0, nan_value_count=None, lower_bound='D', upper_bound='D'))"


In [18]:
%%sparksql
SELECT *
FROM prod.employee.history

0,1,2,3
made_current_at,snapshot_id,parent_id,is_current_ancestor
2024-02-09 13:06:56.525000,8629800571988728206,,True
2024-02-09 13:09:18.204000,1539503790352931415,8629800571988728206,True
2024-02-09 13:09:28.972000,5015095645526593413,1539503790352931415,True
2024-02-09 13:13:20.249000,5850161128086219275,5015095645526593413,True
2024-02-09 13:13:21.664000,4411926792244445815,5850161128086219275,True


In [19]:
%%sparksql
SELECT *
FROM prod.employee VERSION AS OF 8629800571988728206

0,1,2,3
id,birth_date,country,name
3,2000-08-01,IN,D
