In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, StructType, IntegerType

In [2]:
spark = SparkSession.builder \
    .appName("IcebergWithHiveMetastore") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.spark_catalog.uri", "thrift://hive-metastore:9083") \
    .config("spark.hadoop.fs.s3a.access.key", "dp-root-user") \
    .config("spark.hadoop.fs.s3a.secret.key", "dp-root-password") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.2.0,org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .enableHiveSupport() \
    .getOrCreate()


In [3]:
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

In [4]:
bucket_name = "testing-iceberg"
file_path = f"s3a://{bucket_name}/people/"

In [5]:
spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS default.people (
    name STRING,
    age BIGINT
) USING iceberg
LOCATION '{file_path}'
""")


DataFrame[]

In [6]:
spark.sql("SHOW TABLES;").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|   people|      false|
+---------+---------+-----------+



In [7]:
data = [
    ("John", 30), 
    ("Steve", 25), 
    ("Bill", 40), 
    ("Donald", 45), 
    ("Jenny", 23), 
    ("Lucas", 27), 
    ("Emma", 35), 
    ("Grace", 28), 
    ("Liam", 32), 
    ("Claire", 29)
]

df = spark.createDataFrame(data, schema)

In [8]:
df.write \
    .format("iceberg") \
    .mode("append") \
    .saveAsTable("people")

spark.sql("SELECT * FROM people;").show()

+------+---+
|  name|age|
+------+---+
|  John| 30|
| Steve| 25|
|  Bill| 40|
|Donald| 45|
| Jenny| 23|
| Lucas| 27|
|  Emma| 35|
| Grace| 28|
|  Liam| 32|
|Claire| 29|
+------+---+



In [11]:
df = spark.read \
    .format("iceberg") \
    .load("people")

df.show()

+------+---+
|  name|age|
+------+---+
|  John| 30|
| Steve| 25|
|  Bill| 40|
|Donald| 45|
| Jenny| 23|
| Lucas| 27|
|  Emma| 35|
| Grace| 28|
|  Liam| 32|
|Claire| 29|
+------+---+

