In [1]:
import logging
import os
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")

In [2]:
# start spark session
spark = SparkSession.builder \
    .appName('parquet-load') \
    .getOrCreate()

24/12/04 14:33:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
for key, value in spark.sparkContext.getConf().getAll():
    print(f"{key}: {value}")

spark.app.id: local-1733322832494
spark.eventLog.enabled: true
spark.app.startTime: 1733322832075
spark.history.fs.logDirectory: /home/iceberg/spark-events
spark.sql.warehouse.dir: file:/home/iceberg/notebooks/notebooks/spark-warehouse
spark.sql.catalog.demo.s3.endpoint: http://minio:9000
spark.eventLog.dir: /home/iceberg/spark-events
spark.serializer.objectStreamReset: 100
spark.master: local[*]
spark.submit.deployMode: client
spark.driver.extraJavaOptions: -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.inter

In [4]:
# adding iceberg configs
conf = (
    SparkConf()
    .set("spark.sql.extensions", 
         "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # Use Iceberg with Spark
    .set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .set("spark.sql.catalog.demo.warehouse", "s3a://warehouse")
    .set("spark.sql.catalog.demo.s3.endpoint", "http://minio:9000")
    .set("spark.sql.defaultCatalog", "demo") # Name of the Iceberg catalog
    .set("spark.sql.catalogImplementation", "in-memory")
    .set("spark.sql.catalog.data.type", "hive") # Iceberg catalog type
    .set("spark.executor.heartbeatInterval", "300000")
    .set("spark.network.timeout", "400000")
    .set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2,aws-java-sdk-bundle:1.12.756")
)

In [5]:
# add confs
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "admin")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "password")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://minio:9000")
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", 
                                   "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

In [6]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

24/12/04 14:34:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [7]:
# Disable below line to see INFO logs
spark.sparkContext.setLogLevel("ERROR")

In [8]:
def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "admin"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
                                                 os.getenv("AWS_SECRET_ACCESS_KEY", "password"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "http://minio:9000"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")

In [9]:
load_config(spark.sparkContext)

In [10]:
spark.conf.get("spark.sql.catalog.demo.uri")

'http://metastore:9083'

In [11]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [12]:
spark.sql("SHOW TABLES IN default").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [15]:
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType, IntegerType

In [16]:
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("fn",StringType(),True), \
    StructField("mid",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])

In [17]:
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()

root
 |-- fn: string (nullable = true)
 |-- mid: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [18]:
df.show(5)

                                                                                

+-------+----+--------+-----+------+------+
|     fn| mid|lastname|   id|gender|salary|
+-------+----+--------+-----+------+------+
|  James|    |   Smith|36636|     M|  3000|
|Michael|Rose|        |40288|     M|  4000|
| Robert|    |Williams|42114|     M|  4000|
|  Maria|Anne|   Jones|39192|     F|  4000|
|    Jen|Mary|   Brown|     |     F|    -1|
+-------+----+--------+-----+------+------+



In [19]:
df.write \
    .mode("overwrite") \
    .format("parquet") \
    .save("s3a://warehouse/test")

                                                                                

In [None]:
# spark.sql("""
#     CREATE TABLE IF NOT EXISTS demo.default.my_iceberg (
#         fn STRING,
#         mid STRING,
#         lastname STRING,
#         id STRING,
#         gender STRING,
#         salary INT
#     )
#     USING iceberg
#     PARTITIONED BY (id)
# """)

In [20]:
df.writeTo("default.my_iceberg").createOrReplace()

                                                                                

In [30]:
spark.sql("SHOW TABLES IN default").show()

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|  default|my_iceberg|      false|
+---------+----------+-----------+



In [21]:
%%sql
select * from demo.default.my_iceberg;

fn,mid,lastname,id,gender,salary
James,,Smith,36636.0,M,3000
Michael,Rose,,40288.0,M,4000
Robert,,Williams,42114.0,M,4000
Maria,Anne,Jones,39192.0,F,4000
Jen,Mary,Brown,,F,-1


In [22]:
%%sql
describe extended demo.default.my_iceberg;

col_name,data_type,comment
fn,string,
mid,string,
lastname,string,
id,string,
gender,string,
salary,int,
,,
# Metadata Columns,,
_spec_id,int,
_partition,struct<>,


In [25]:
spark.sql("create database my_iceberg")

DataFrame[]

In [29]:
spark.sql("SHOW DATABASES").show()

+----------+
| namespace|
+----------+
|   default|
|my_iceberg|
+----------+



In [31]:
spark.sql("create database hello_world_iceberg")

DataFrame[]

In [32]:
spark.sql("SHOW DATABASES").show()

+-------------------+
|          namespace|
+-------------------+
|            default|
|hello_world_iceberg|
|         my_iceberg|
+-------------------+



In [33]:
df.writeTo("hello_world_iceberg.iceberg_test").createOrReplace()

In [34]:
spark.sql("create namespace my_namespace_test")

DataFrame[]