# Data preparation script

## Creating database, uncomment for first run, can be skipped after initial creation

In [None]:
# spark.sql(
#     "CREATE DATABASE IF NOT EXISTS tpch_iceberg LOCATION 's3://<your_bucket>/<path_to_save_files>'"
# )

## Setting default database for scripts, optional

In [None]:
spark.sql("use tpch_iceberg")

## Drop table. Uncomment if you are recreating the dataset.

In [None]:
%%sql
drop table customer;

## Creating the table

Setting Iceberg version to 2, compression codec as snappy, and default format as parquet.

The partitioning is on c_nationkey.

Feel free to play with different formats, partitioning and settings and see how they can affect performance.

In [None]:
spark.sql("""
CREATE TABLE IF NOT EXISTS customer (
    c_custkey BIGINT,
    c_name STRING,
    c_address STRING,
    c_nationkey BIGINT,
    c_phone STRING,
    c_acctbal DECIMAL(15,2),
    c_mktsegment STRING,
    c_comment STRING
) USING iceberg
PARTITIONED BY (c_nationkey)
TBLPROPERTIES (
    'write.format.default' = 'parquet',
    'write.parquet.compression-codec' = 'snappy',
    'format-version' = '2'
)""")


## Creating schema for the table and loading it from s3

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, DateType, LongType

# Customer Schema
customer_schema = StructType([
    StructField("c_custkey", LongType(), False),
    StructField("c_name", StringType(), True),
    StructField("c_address", StringType(), True),
    StructField("c_nationkey", LongType(), True),
    StructField("c_phone", StringType(), True),
    StructField("c_acctbal", DecimalType(15,2), True),
    StructField("c_mktsegment", StringType(), True),
    StructField("c_comment", StringType(), True)
])

In [None]:
customer_df = spark.read \
    .format("csv") \
    .option("delimiter", "|") \
    .schema(customer_schema) \
    .load('s3://redshift-downloads/TPC-H/2.18/3TB/customer/')
customer_df.show(10)

## Saving the data frame in the customer table

In [None]:
customer_df.write \
    .format("iceberg") \
    .mode("append") \
    .saveAsTable("customer")

## Validating the data, should be 450,000,000 rows

In [None]:
%%sql
select count(*) from spark_catalog.tpch_iceberg.customer

## Create staging table for merge example

### Uncomment for recreating staging table

In [None]:
# %%sql
# drop table customer_stg

### Create staging table table

In [None]:
# %%sql
# create table customer_stg
# USING iceberg
# PARTITIONED BY (c_nationkey)
# TBLPROPERTIES (
#     'write.format.default' = 'parquet',
#     'write.parquet.compression-codec' = 'snappy',
#     'format-version' = '2')
# as select * from customer limit 1000;



### Verify data

In [None]:
# %%sql
# select * from customer_stg limit 10;

### Changing data in staging table - to create new keys and update data for existing ones

In [None]:
# %%sql
# update customer_stg
# set c_custkey = c_custkey * 1000
# where c_custkey % 3 = 0;

In [None]:
# %%sql
# update customer_stg
# set c_nationkey = 18
# where c_custkey % 5 = 0

## Example for expiring snapshots via Spark

In [None]:
# %%sql
# CALL spark_catalog.system.expire_snapshots(table => 'tpch_iceberg.customer',older_than => TIMESTAMP '2025-05-05 00:00:00.000', retain_last => 1)

## Example for altering table properties setting updates to MoR (should speed up writes, but can potentially slow down reads)

In [None]:
# %%sql
# alter table spark_catalog.tpch_iceberg.customer 
# SET TBLPROPERTIES ('write.update.mode' = 'merge-on-read');