# Purpose of Document Writing
- This document is written to guide AI engineers on how to use Apache Spark and Apache Iceberg in a Python environment. 
- The main purpose is as follows.

#### Understanding the Technology
- Apache Spark: A distributed processing framework designed for large-scale data processing and analysis, easily accessible through its Python API (pyspark). 
- Apache Iceberg: A data lake table format that supports schema evolution and ACID transactions, which enhances data management and analysis efficiency.

#### Efficient Data Processing
- Performance Optimization: Suggests methods to optimize Spark performance when handling large datasets and how to use Iceberg to enhance data storage and query performance.
- Maintaining Data Integrity: Describes how to maintain data integrity using Iceberg's ACID transactions and how to leverage schema evolution features to manage data models flexibly.

---

# Introduction to Spark

TIP : The Apache Spark version is 3.5.1.

#### [What is Apache Spark?](https://spark.apache.org/)

- Apache Spark is an open-source cluster computing framework for large-scale data processing.
- It supports fast data processing through in-memory operations and is widely used as a platform to implement various data processing tasks.

#### __Contents__ 
- DataFrame
- Basic Operations on DataFrame
- Aggregate Operations
- SQL
- Reading a JSON File into a DataFrame in PySpark

---

#### __DataFrame__ 
This section shows how to create a Spark DataFrame and run simple tasks with it.

In [1]:
from pyspark.sql import SparkSession # Which is needed to start processing data in PySpark

# Prepares and creates a new SparkSession
spark = SparkSession.builder \
    .appName("Elice Spark Example") \
    .getOrCreate() # Creates a new SparkSession based on the settings

24/08/01 05:13:01 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
# Data
data = [("Rabbit", 34), ("Elice", 45), ("Helpy", 29)]

# Column Name
columns = ["Name", "Age"]

# Creating a DataFrame
df = spark.createDataFrame(data, columns)

In [3]:
# Displaying the contents of a DataFrame
df.show()

                                                                                

+------+---+
|  Name|Age|
+------+---+
|Rabbit| 34|
| Elice| 45|
| Helpy| 29|
+------+---+



#### __Basic Operations on DataFrame__

In [4]:
# Filter only people who are 30 years old or older
df_filtered = df.filter(df.Age > 30)

# Filtered Data Output
df_filtered.show()

+------+---+
|  Name|Age|
+------+---+
|Rabbit| 34|
| Elice| 45|
+------+---+



#### __Aggregate Operations__

In [5]:
# Calculating the Average Age
df_avg_age = df.groupBy().avg("Age")

# Print average age
df_avg_age.show()

+--------+
|avg(Age)|
+--------+
|    36.0|
+--------+



In [6]:
# How to Calculate Average Age Using SQL 
spark.sql("select name, avg(age) from {df} group by name", df=df).show()



+------+--------+
|  name|avg(age)|
+------+--------+
|Rabbit|    34.0|
| Elice|    45.0|
| Helpy|    29.0|
+------+--------+



                                                                                

#### __SQL__
Spark module that can run SQL queries on structured data.

In [8]:
# Save the DataFrame as a Parquet table for easy access
df.write.mode('overwrite').saveAsTable("elice_people")

In [9]:
# Let's check if we can access the table using the table name
spark.sql("select * from elice_people").show()

+------+---+
|  Name|Age|
+------+---+
|Rabbit| 34|
| Elice| 45|
| Helpy| 29|
+------+---+



In [10]:
# Let's use SQL to insert some data rows into the table
spark.sql("INSERT INTO elice_people VALUES ('AI', 4)")

DataFrame[]

In [11]:
# To check if a row has been inserted, let's look at the table contents
spark.sql("select * from elice_people").show()

+------+---+
|  Name|Age|
+------+---+
|Rabbit| 34|
| Elice| 45|
| Helpy| 29|
|    AI|  4|
+------+---+



In [12]:
# Only show people who are 30 years old or older
df = spark.sql("SELECT * FROM elice_people WHERE Age >= 30")
df.show()

+------+---+
|  Name|Age|
+------+---+
|Rabbit| 34|
| Elice| 45|
+------+---+



In [13]:
# Calculating Average Age
df = spark.sql("SELECT AVG(Age) AS AverageAge FROM elice_people")
df.show()

+----------+
|AverageAge|
+----------+
|      28.0|
+----------+



---

#### __Supported file formats__
Apache Spark, It supports various file formats, providing flexibility to read and write data.

* __TXT Files (.txt)__
``` python
df = spark.read.text("home/elicer/workload/file.txt")
```

* __CSV Files (.csv)__
``` python
df = spark.read.csv("home/elicer/workload/file.csv", header=True, inferSchema=True)
```

* __JSON Files (.json)__
``` python
df = spark.read.json("home/elicer/workload/file.json")
```

* __Parquet (.parquet)__
``` python
df = spark.read.parquet("home/elicer/workload/file.parquet")
```

---

#### __Reading a JSON File into a DataFrame in PySpark__

In [14]:
pwd

'/home/elicer/workload'

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType

spark = SparkSession.builder \
    .appName("JSON File Example") \
    .getOrCreate()

# Schema Definition
schema = StructType([
    StructField("conversations", ArrayType(StructType([
        StructField("from", StringType(), True),
        StructField("value", StringType(), True)
    ])), True),
    StructField("chosen", StructType([
        StructField("from", StringType(), True),
        StructField("value", StringType(), True)
    ]), True),
    StructField("rejected", StructType([
        StructField("from", StringType(), True),
        StructField("value", StringType(), True)
    ]), True)
])

# JRead a JSON file into a DataFrame and apply a schema
file_path = "/home/elicer/workload/file.json"
df = spark.read.option("multiLine", True).json(file_path)

df.show()

# If there are incorrect records, print them
if "_corrupt_record" in df.columns:
    corrupt_records = df.filter(df["_corrupt_record"].isNotNull())
    if corrupt_records.count() > 0:
        print("Corrupt records found:")
        corrupt_records.show(truncate=False)

spark.stop()

24/08/01 05:13:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+--------------------+--------------------+--------------------+
|              chosen|       conversations|            rejected|
+--------------------+--------------------+--------------------+
|{gpt, That sounds...|[{human, Hi! I'd ...|{gpt, Hello! I'd ...|
|{gpt, In this tas...|[{system, You are...|{gpt, Sure, I'd b...|
+--------------------+--------------------+--------------------+



---

# Introduction to Iceberg

TIP : The Apache Iceberg version is 1.5.2.

#### [What is Apache Iceberg?](https://iceberg.apache.org/)

- Apache Iceberg is an open-source table format and management library that is compatible with Apache Spark.
- Provides the ability to efficiently manage and query structured data from data lakes.

![스크린샷](iceberg1.png)

![스크린샷](iceberg4.png)

![스크린샷](iceberg3.png)

---

#### Contents 
- Load to Iceberg Table
- Check Iceberg Table Schema
- Time travel
- Iceberg Table Writing
- Iceberg Table Reading
- Look up snapshot list
- Spark Data Management and Query Example with Iceberg
- Explanation of Rollback Functionality
- Iceberg Schema Evolution
- Update Table
- Average, Aggregate, and Group
- Verifying Specific Conditions Data
- Query bulk data query
- Delete Table (ID 2, delete the table)
- Iceberg table integration and JSON data processing with PySpark

---

#### __Load to Iceberg Table__

In [10]:
from pyspark.sql import SparkSession

# Setting and Authentication Key
hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

# Create Spark Session
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Read CSV files stored on the data hub
csv_path = "/mnt/elice/datahub/iceberg-poc/iceberg-tables/default/netflix_titles 2.csv"
df = spark.read.option("header", "true").option("inferSchema", "true").csv(csv_path)

# Iceberg Table Name
table_name = "netflix_titles"

# Load to Iceberg Table
df.write \
    .format("iceberg") \
    .mode("append") \
    .saveAsTable(f"spark_catalog.default.{table_name}")

# Query data from the Iceberg table
spark.table(f"spark_catalog.default.{table_name}").show()

24/07/10 05:19:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                NULL|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|                NULL|Ama Qamata, Khosi...|        South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglan

---

#### __Check Iceberg Table Schema__

In [12]:
from pyspark.sql import SparkSession

# Setting and Authentication Key
hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

# Create Spark Session
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Name of the Iceberg table
table_name = "netflix_titles"

# Schema lookup for an Iceberg table
df = spark.table(f"spark_catalog.default.{table_name}")
df.printSchema()


24/07/10 05:21:31 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)



---

#### __Iceberg Time travel__ 
Spark 3.3 and later supports time travel in SQL queries using TIMESTAMP AS OF or VERSION AS OF clauses. The VERSION AS OF clause can contain a long snapshot ID or a string branch or tag name.

In [None]:
-- time travel to October 26, 2024 at 12:21:00
SELECT * FROM prod.db.table TIMESTAMP AS OF '2024-10-26 12:21:00';

-- time travel to snapshot with id 10963874102873L
SELECT * FROM prod.db.table VERSION AS OF 10963874102873;

-- time travel to the head snapshot of ai-helpy
SELECT * FROM prod.db.table VERSION AS OF 'ai-helpy';

-- time travel to the snapshot referenced by the tag ai-snapshot
SELECT * FROM prod.db.table VERSION AS OF 'ai-snapshot';

---

#### __Iceberg Table Writing__ 
Once your table is created, insert data using INSERT INTO.

In [None]:
INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');
INSERT INTO local.db.table SELECT id, data FROM source WHERE length(data) = 1;

Iceberg also adds row-level SQL updates to Spark, MERGE INTO and DELETE FROM:

In [None]:
MERGE INTO local.db.target t USING (SELECT * FROM updates) u ON t.id = u.id
WHEN MATCHED THEN UPDATE SET t.count = t.count + u.count
WHEN NOT MATCHED THEN INSERT *;

---

#### __Iceberg Table Reading__
To read with SQL, use the Iceberg table's name in a SELECT query.

In [None]:
SELECT count(1) as count, data
FROM local.db.table
GROUP BY data;

SQL is also the recommended way to inspect tables. To view all snapshots in a table, use the snapshots metadata table.

In [None]:
SELECT * FROM local.db.table.snapshots;

---

#### __Look up snapshot list__

In [13]:
from pyspark.sql import SparkSession

hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

# Create Spark Session
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Look up snapshot list
snapshot_list_query = """
    SELECT *
    FROM spark_catalog.default.test.snapshots
"""
snapshot_list = spark.sql(snapshot_list_query)
snapshot_list.show(truncate=False)

24/07/10 04:16:32 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-----------------------+-------------------+---------+---------+------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id|operation|manifest_list                                                                                                                       |summary                                                                                                                                                                                                                                                                                         |
+-----------------------

---

#### __Look up specific versions of the Iceberg table (snapshot)__

In [1]:
from pyspark.sql import SparkSession

hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

# Create Spark Session
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# snapshot_id : 6046816094557856117
snapshot_list_query = """
    SELECT * FROM spark_catalog.default.test VERSION AS OF 6046816094557856117;
"""
snapshot_list = spark.sql(snapshot_list_query)
snapshot_list.show(truncate=False)

24/07/15 15:22:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/07/15 15:22:35 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

+---+----+
|id |data|
+---+----+
|1  |a   |
|2  |b   |
|3  |c   |
+---+----+



---

#### __How to use snapshots for time travel__

In [6]:
from pyspark.sql import SparkSession
from datetime import datetime

# Required JAR File Path
hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

# AWS S3 Access Key Settings
s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"

# Iceberg Metadata Location
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

# Create Spark Session
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Date and time string for time travel
committed_at_string = "2024-07-10 03:07:36.823"

# Converting date and time strings to Timestamp objects
committed_at = datetime.strptime(committed_at_string, "%Y-%m-%d %H:%M:%S.%f")

# SQL query for time travel in the Iceberg table
snapshot_query = f"""
    SELECT * FROM spark_catalog.default.test TIMESTAMP AS OF TIMESTAMP '{committed_at}'
"""

# Time Travel Run
snapshot_df = spark.sql(snapshot_query)

# Output Results
snapshot_df.show(truncate=False)

24/07/15 15:33:06 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+---+----+
|id |data|
+---+----+
|1  |a   |
|2  |b   |
|3  |c   |
+---+----+



                                                                                

---

#### __Spark Data Management and Query Example with Iceberg__

In [10]:
from pyspark.sql import SparkSession

# Set JAR file paths
hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

# AWS S3 credentials and Iceberg metadata location
s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

# Initialize Spark session
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Define function to query Iceberg table
def query_iceberg_table(table_name):
    return spark.sql(f"SELECT * FROM spark_catalog.default.{table_name}")

# Example usage: Query the 'sample' table
sample_df = query_iceberg_table("sample")
sample_df.show()

# Define function to drop Iceberg table
def drop_iceberg_table(table_name):
    spark.sql(f"DROP TABLE IF EXISTS spark_catalog.default.{table_name}")

# Example usage: Drop the 'sample' table
drop_iceberg_table("sample")

# Define function to list all tables in Iceberg catalog
def list_iceberg_tables():
    return spark.sql("SHOW TABLES IN spark_catalog.default").collect()

# Example usage: List all tables in the Iceberg catalog
tables = list_iceberg_tables()
print("Tables in Iceberg Catalog:")
for table in tables:
    print(table)

# Stop Spark session
spark.stop()

24/07/15 16:43:42 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+---+----+
| id|data|
+---+----+
|  1|   a|
|  2|   b|
|  3|   c|
+---+----+

Tables in Iceberg Catalog:
Row(namespace='default', tableName='iceberg_table', isTemporary=False)
Row(namespace='default', tableName='sample7', isTemporary=False)
Row(namespace='default', tableName='sample6', isTemporary=False)
Row(namespace='default', tableName='netflix_titles', isTemporary=False)
Row(namespace='default', tableName='sample3', isTemporary=False)
Row(namespace='default', tableName='users', isTemporary=False)
Row(namespace='default', tableName='sample2', isTemporary=False)
Row(namespace='default', tableName='sample11', isTemporary=False)
Row(namespace='default', tableName='test_tables', isTemporary=False)
Row(namespace='default', tableName='sample_schema', isTemporary=False)
Row(namespace='default', tableName='sample9', isTemporary=False)
Row(namespace='default', tableName='test', isTemporary=False)


---

#### __Explanation of Rollback Functionality__

In [2]:
from pyspark.sql import SparkSession

# Define your configuration and jar paths
hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

# Define S3 credentials and Iceberg metadata location
s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

# Initialize SparkSession with Iceberg integration
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Create table if not exists and insert initial data
spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.default.sample (id bigint, data string) USING iceberg")
spark.sql("INSERT INTO spark_catalog.default.sample VALUES (1, 'a'), (2, 'b'), (3, 'c')")

# Capture the initial snapshot ID
initial_snapshot = spark.sql("SELECT snapshot_id FROM spark_catalog.default.sample.snapshots ORDER BY committed_at DESC LIMIT 1").collect()[0][0]

try:
    # Perform data operations
    spark.sql("INSERT INTO spark_catalog.default.sample VALUES (4, 'd'), (5, 'e')")
    print("Data inserted successfully.")
except Exception as e:
    # Rollback to the initial snapshot in case of an error
    spark.sql(f"CALL spark_catalog.system.rollback_to_snapshot('default.sample', {initial_snapshot})")
    print(f"Transaction rolled back due to error: {str(e)}")
finally:
    # Query the table to verify the data
    result = spark.sql("SELECT * FROM spark_catalog.default.sample")
    result.show()

# Stop SparkSession
spark.stop()


24/07/15 17:02:32 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Data inserted successfully.


                                                                                

+---+----+
| id|data|
+---+----+
|  1|   a|
|  4|   d|
|  2|   b|
|  3|   c|
|  5|   e|
+---+----+



---

#### __Iceberg Schema Evolution__

In [34]:
from pyspark.sql import SparkSession

# Paths to necessary JAR files
hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

# S3 connection details
s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

# Create Spark session
spark = SparkSession.builder \
    .appName("IcebergSchemaManagement") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Create Iceberg table (initial schema)
spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.default.sample (id bigint, data string) USING iceberg")

# Alter Iceberg table schema (update with a new column)
spark.sql("ALTER TABLE spark_catalog.default.sample ADD COLUMN new_column string")

# Insert data
spark.sql("INSERT INTO spark_catalog.default.sample VALUES (1, 'a', 'foo'), (2, 'b', 'bar'), (3, 'c', 'baz')")

# Query data
result = spark.sql("SELECT * FROM spark_catalog.default.sample")
result.show()

# Stop Spark session
spark.stop()

24/07/15 16:19:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+---+----+----------+
| id|data|new_column|
+---+----+----------+
|  1|   a|       foo|
|  2|   b|       bar|
|  3|   c|       baz|
+---+----+----------+



---

#### __Create Table__

In [7]:
from pyspark.sql import SparkSession

# Paths to the required JAR files for Hadoop, AWS SDK, and Iceberg
hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

# AWS credentials and Iceberg metadata location on S3
s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

# Create a Spark session with configurations for Iceberg and S3 integration
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Create a table named 'sample' in the Iceberg catalog if it does not already exist
spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.default.sample (id bigint, data string) USING iceberg")

# Insert some sample data into the 'sample' table
spark.sql("INSERT INTO spark_catalog.default.sample VALUES (1, 'a'), (2, 'b'), (3, 'c')")

# Query the 'sample' table and display the results
result = spark.sql("SELECT * FROM spark_catalog.default.sample")
result.show()

# Stop the Spark session
spark.stop()

24/07/10 04:51:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+---+----+
| id|data|
+---+----+
|  1|   a|
|  2|   b|
|  3|   c|
+---+----+



---

#### __Update Table__

In [14]:
from pyspark.sql import SparkSession
import time

# Paths to JAR files required for Hadoop, AWS SDK, and Iceberg
hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

# AWS credentials and Iceberg metadata location on S3
s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

# Create a Spark session with configurations for Iceberg and S3 integration
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# SQL query to create a table named 'test_tables' in the Iceberg catalog if it does not already exist
create_table_query = """
    CREATE TABLE IF NOT EXISTS spark_catalog.default.test_tables (
        id INTEGER,
        backend_id VARCHAR(255),
        function_api_url VARCHAR(255),
        type VARCHAR(50),
        text STRING,
        log_time TIMESTAMP
    ) USING iceberg
"""

# Measure time taken to create the table
start_time_create = time.time()
spark.sql(create_table_query)
end_time_create = time.time()
create_duration = end_time_create - start_time_create
print(f"Table creation took {create_duration:.2f} seconds")

# SQL query to insert multiple rows of data into the 'test_tables' table
insert_data_query = """
    INSERT INTO spark_catalog.default.test_tables
    VALUES 
        (1, 'backend1', 'http://api.example.com/1', 'type1', 'Sample text 1', TIMESTAMP '2024-07-03 10:00:00'),
        (2, 'backend2', 'http://api.example.com/2', 'type2', 'Sample text 2', TIMESTAMP '2024-07-03 11:00:00'),
        (3, 'backend3', 'http://api.example.com/3', 'type3', 'Sample text 3', TIMESTAMP '2024-07-03 12:00:00'),
        ...
        (20, 'backend2', 'http://api.example.com/2', 'type2', 'Sample text 20', TIMESTAMP '2024-07-04 05:00:00')
"""

# Measure time taken to insert the data
start_time_insert = time.time()
spark.sql(insert_data_query)
end_time_insert = time.time()
insert_duration = end_time_insert - start_time_insert
print(f"Data insertion took {insert_duration:.2f} seconds")

# Query and display all records from the 'test_tables' table
result = spark.sql("SELECT * FROM spark_catalog.default.test_tables")
result.show()

# SQL query to update the 'text' field of the record with id = 1
update_data_query = """
    UPDATE spark_catalog.default.test_tables
    SET text = 'Updated text'
    WHERE id = 1
"""

# Measure time taken to update the data
start_time_update = time.time()
spark.sql(update_data_query)
end_time_update = time.time()
update_duration = end_time_update - start_time_update
print(f"Data update took {update_duration:.2f} seconds")

# Query and display the updated record with id = 1
updated_result = spark.sql("SELECT * FROM spark_catalog.default.test_tables WHERE id = 1")
updated_result.show()

# Stop the Spark session
spark.stop()

24/07/10 04:19:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Table creation took 3.02 seconds


                                                                                

Data insertion took 3.94 seconds


                                                                                

+---+----------+--------------------+-----+--------------+-------------------+
| id|backend_id|    function_api_url| type|          text|           log_time|
+---+----------+--------------------+-----+--------------+-------------------+
|  1|  backend1|http://api.exampl...|type1| Sample text 1|2024-07-03 10:00:00|
|  2|  backend2|http://api.exampl...|type2| Sample text 2|2024-07-03 11:00:00|
|  3|  backend3|http://api.exampl...|type3| Sample text 3|2024-07-03 12:00:00|
|  4|  backend1|http://api.exampl...|type1| Sample text 4|2024-07-03 13:00:00|
|  5|  backend2|http://api.exampl...|type2| Sample text 5|2024-07-03 14:00:00|
|  6|  backend3|http://api.exampl...|type3| Sample text 6|2024-07-03 15:00:00|
|  7|  backend1|http://api.exampl...|type1| Sample text 7|2024-07-03 16:00:00|
|  8|  backend2|http://api.exampl...|type2| Sample text 8|2024-07-03 17:00:00|
|  9|  backend3|http://api.exampl...|type3| Sample text 9|2024-07-03 18:00:00|
| 10|  backend1|http://api.exampl...|type1|Sample te

---

#### __Average, Aggregate, and Group__

In [3]:
from pyspark.sql import SparkSession
import time

hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()


# Average, Aggregate, and Group
complex_query = """
    SELECT type, COUNT(*), AVG(id)
    FROM spark_catalog.default.test_tables
    GROUP BY type
"""

complex_result = spark.sql(complex_query)
complex_result.show()

24/07/10 04:26:06 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
[Stage 6:>                                                          (0 + 1) / 1]

+-----+--------+-------+
| type|count(1)|avg(id)|
+-----+--------+-------+
|type3|      10|   16.5|
|type1|      10|   14.5|
|type2|      10|   15.5|
+-----+--------+-------+



                                                                                

---

#### __Verifying Specific Conditions Data__

In [5]:
from pyspark.sql import SparkSession
import time

# Paths to JAR files required for Hadoop, AWS SDK, and Iceberg
hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

# AWS credentials and Iceberg metadata location on S3
s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

# Create a Spark session with configurations for Iceberg and S3 integration
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# SQL query to alter the table by adding a new column named 'new_column' of type STRING
alter_table_query = """
    ALTER TABLE spark_catalog.default.test_tables
    ADD COLUMNS (new_column STRING)
"""

# Measure time taken to execute the schema alteration
start_time_alter = time.time()
spark.sql(alter_table_query)
end_time_alter = time.time()
alter_duration = end_time_alter - start_time_alter
print(f"Schema alteration took {alter_duration:.2f} seconds")

# Query to describe the table schema and display the result
altered_result = spark.sql("DESCRIBE spark_catalog.default.test_tables")
altered_result.show()

# Stop the Spark session
spark.stop()

24/07/10 04:27:21 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Schema alteration took 4.04 seconds
+----------------+---------+-------+
|        col_name|data_type|comment|
+----------------+---------+-------+
|              id|      int|   NULL|
|      backend_id|   string|   NULL|
|function_api_url|   string|   NULL|
|            type|   string|   NULL|
|            text|   string|   NULL|
|        log_time|timestamp|   NULL|
|      new_column|   string|   NULL|
+----------------+---------+-------+



---

#### __Query bulk data query__

performance_result.show (n=1000, truncate=False) is a command that outputs data processed by Spark to the screen without truncating the entire content, including up to 1000 rows.

In [6]:
from pyspark.sql import SparkSession
import time

# Paths to JAR files required for Hadoop, AWS SDK, and Iceberg
hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

# AWS credentials and Iceberg metadata location on S3
s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

# Create a Spark session with configurations for Iceberg and S3 integration
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Measure the performance of a query that retrieves and orders a large amount of data
start_time_performance = time.time()

# Define the SQL query to select the top 1000 rows from 'test_tables', ordered by 'log_time' in descending order
performance_query = """
    SELECT *
    FROM spark_catalog.default.test_tables
    ORDER BY log_time DESC
    LIMIT 1000
"""

# Execute the query and fetch the results
performance_result = spark.sql(performance_query)

# Display the top 1000 rows of the result without truncating the output
performance_result.show(n=1000, truncate=False)

# Measure and print the duration of the query execution
end_time_performance = time.time()
performance_duration = end_time_performance - start_time_performance
print(f"Query execution took {performance_duration:.2f} seconds")

# Stop the Spark session to release resources
spark.stop()

24/07/10 04:30:02 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
[Stage 9:>                                                          (0 + 1) / 1]

+---+----------+------------------------+-----+--------------+-------------------+----------+
|id |backend_id|function_api_url        |type |text          |log_time           |new_column|
+---+----------+------------------------+-----+--------------+-------------------+----------+
|30 |backend3  |http://api.example.com/3|type3|Sample text 30|2024-07-04 15:00:00|NULL      |
|29 |backend2  |http://api.example.com/2|type2|Sample text 29|2024-07-04 14:00:00|NULL      |
|28 |backend1  |http://api.example.com/1|type1|Sample text 28|2024-07-04 13:00:00|NULL      |
|27 |backend3  |http://api.example.com/3|type3|Sample text 27|2024-07-04 12:00:00|NULL      |
|26 |backend2  |http://api.example.com/2|type2|Sample text 26|2024-07-04 11:00:00|NULL      |
|25 |backend1  |http://api.example.com/1|type1|Sample text 25|2024-07-04 10:00:00|NULL      |
|24 |backend3  |http://api.example.com/3|type3|Sample text 24|2024-07-04 09:00:00|NULL      |
|23 |backend2  |http://api.example.com/2|type2|Sample text 2

                                                                                

---

#### __Delete Table (ID 2, delete the table)__

In [13]:
from pyspark.sql import SparkSession
import time

# Paths to JAR files needed for Hadoop, AWS SDK, and Iceberg integration
hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

# AWS credentials and location for Iceberg metadata on S3
s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/iceberg-tables"

# Create a Spark session with configurations for Iceberg and S3
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# SQL query to delete data from 'test_tables' where the id is 2
delete_data_query = """
    DELETE FROM spark_catalog.default.test_tables
    WHERE id = 2
"""

# Measure the time taken to execute the data deletion
start_time_delete = time.time()

# Execute the deletion query
spark.sql(delete_data_query)

end_time_delete = time.time()
delete_duration = end_time_delete - start_time_delete
print(f"Data deletion took {delete_duration:.2f} seconds")

# Verify that the data has been deleted by trying to select rows with id = 2
deleted_result = spark.sql("SELECT * FROM spark_catalog.default.test_tables WHERE id = 2")
deleted_result.show(truncate=False)

# Fetch and display all rows from the table to confirm the current state
result = spark.sql("SELECT * FROM spark_catalog.default.test_tables")
result.show(truncate=False)

# Stop the Spark session to release resources
spark.stop()

24/07/10 05:40:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Data deletion took 5.81 seconds
+---+----------+------------------------+-----+--------------+-------------------+----------+
|id |backend_id|function_api_url        |type |text          |log_time           |new_column|
+---+----------+------------------------+-----+--------------+-------------------+----------+
|1  |backend1  |http://api.example.com/1|type1|Updated text  |2024-07-03 10:00:00|NULL      |
|3  |backend3  |http://api.example.com/3|type3|Sample text 3 |2024-07-03 12:00:00|NULL      |
|4  |backend1  |http://api.example.com/1|type1|Sample text 4 |2024-07-03 13:00:00|NULL      |
|5  |backend2  |http://api.example.com/2|type2|Sample text 5 |2024-07-03 14:00:00|NULL      |
|6  |backend3  |http://api.example.com/3|type3|Sample text 6 |2024-07-03 15:00:00|NULL      |
|7  |backend1  |http://api.example.com/1|type1|Sample text 7 |2024-07-03 16:00:00|NULL      |
|8  |backend2  |http://api.example.com/2|type2|Sample text 8 |2024-07-03 17:00:00|NULL      |
|9  |backend3  |http://api.e

---

#### __Iceberg table integration and JSON data processing with PySpark__

In [1]:
from pyspark.sql import SparkSession

# Paths and credentials
hadoop_aws_jar = "/home/elicer/spark/jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "/home/elicer/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
iceberg_spark_jar = "/home/elicer/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
hadoop_common_jar = "/home/elicer/spark/jars/hadoop-common-3.3.4.jar"

s3_access_key = "Add your own access_key value"
s3_secret_key = "Add your own secret_key value"
iceberg_metadata_location = "s3a://iceberg-poc-4l0g37aq/helpy_table"

# Spark Session Build
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_jar},{iceberg_spark_jar},{hadoop_common_jar}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_metadata_location) \
    .config("fs.s3a.access.key", s3_access_key) \
    .config("fs.s3a.secret.key", s3_secret_key) \
    .config("fs.s3a.endpoint", "https://datahub-central-01.elice.io") \
    .config("fs.s3a.connection.ssl.enabled", "true") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

try:
    # Create Iceberg Table
    spark.sql("""
    CREATE TABLE IF NOT EXISTS spark_catalog.default.helpy (
        conversations ARRAY<STRUCT<from: STRING, value: STRING>>,
        chosen STRUCT<from: STRING, value: STRING>,
        rejected STRUCT<from: STRING, value: STRING>
    ) 
    USING iceberg
    """)

    # Converting JSON data to Spark DataFrame
    file_path = "/home/elicer/workload/file.json"
    df = spark.read.option("multiLine", True).json(file_path)

    # Check if _corrupt_record column exists, and output a problematic record
    if "_corrupt_record" in df.columns:
        corrupt_records = df.filter(df["_corrupt_record"].isNotNull())
        if corrupt_records.count() > 0:
            print("Corrupt records found:")
            corrupt_records.show(truncate=False)

    # Store data in the Iceberg table
    df.write \
        .format("iceberg") \
        .mode("overwrite") \
        .saveAsTable("spark_catalog.default.helpy")

    df.show()
    
    print("Data saved to Iceberg table successfully.")

finally:
    # Spark Session Ending
    spark.stop()

24/08/01 05:46:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/08/01 05:46:46 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

+--------------------+--------------------+--------------------+
|              chosen|       conversations|            rejected|
+--------------------+--------------------+--------------------+
|{gpt, That sounds...|[{human, Hi! I'd ...|{gpt, Hello! I'd ...|
|{gpt, In this tas...|[{system, You are...|{gpt, Sure, I'd b...|
+--------------------+--------------------+--------------------+

Data saved to Iceberg table successfully.


In [13]:
ls /mnt/elice/datahub/iceberg-poc/helpy_table/default/helpy

[0m[01;34mdata[0m/  [01;34mmetadata[0m/
