In [61]:
import pyspark
from pyspark.sql import SparkSession
import os

In [62]:
## DEFINE SENSITIVE VARIABLES
NESSIE_URI = "http://nessie:19120/api/v1"
MINIO_ACCESS_KEY = "admin"
MINIO_SECRET_KEY = "password"

In [63]:
conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
  		#packages
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.3_2.12:0.67.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', NESSIE_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.nessie.warehouse', 's3a://warehouse')
        .set('spark.sql.catalog.nessie.s3.endpoint', 'http://minio:9000')
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
  		#MINIO CREDENTIALS
        .set('spark.hadoop.fs.s3a.access.key', MINIO_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', MINIO_SECRET_KEY)
)

In [64]:
## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

Spark Running


In [65]:
## Test Run a Query
spark.sql("CREATE TABLE IF NOT EXISTS nessie.test1 (name string) USING iceberg;").show()

++
||
++
++



In [66]:
spark.sql("INSERT INTO nessie.test1 VALUES ('test2');").show()

++
||
++
++



In [67]:
spark.sql("SELECT * FROM nessie.test1;").show()

+-----+
| name|
+-----+
|test2|
|test2|
+-----+



In [68]:
spark.sql("DELETE FROM nessie.test1 WHERE name = 'test1'").show()

++
||
++
++



In [42]:
## LOAD A CSV INTO AN SQL VIEW
csv_df = spark.read.format("csv").option("header", "true").load("datasets/df_open_2023.csv")
csv_df.createOrReplaceTempView("csv_open_2023")

In [69]:
csv_df.show(3)

+------------+--------------+---------+---------+------+------+-------------------+-------------------+--------+------------------+-----------+--------------------+---+------+------+-----------+------------+--------+----+
|competitorId|competitorName|firstName| lastName|status|gender|countryOfOriginCode|countryOfOriginName|regionId|        regionName|affiliateId|       affiliateName|age|height|weight|overallRank|overallScore|genderId|year|
+------------+--------------+---------+---------+------+------+-------------------+-------------------+--------+------------------+-----------+--------------------+---+------+------+-----------+------------+--------+----+
|      469656| Jeffrey Adler|  Jeffrey|    Adler|   ACT|     M|                 CA|             Canada|      35|North America East|      18059| CrossFit Wonderland| 29| 69 in|197 lb|          1|         107|       1|2023|
|      300638|Tola Morakinyo|     Tola|Morakinyo|   ACT|     M|                 US|      United States|      35|

In [70]:
## CREATE AN ICEBERG TABLE FROM THE SQL VIEW
spark.sql("CREATE TABLE IF NOT EXISTS nessie.df_open_2023 USING iceberg AS SELECT * FROM csv_open_2023;").show()



++
||
++
++



                                                                                

In [71]:
## QUERY THE ICEBERG TABLE
spark.sql("SELECT * FROM nessie.df_open_2023 limit 10;").show()

+------------+-----------------+---------+-----------+------+------+-------------------+-------------------+--------+------------------+-----------+--------------------+---+------+------+-----------+------------+--------+----+
|competitorId|   competitorName|firstName|   lastName|status|gender|countryOfOriginCode|countryOfOriginName|regionId|        regionName|affiliateId|       affiliateName|age|height|weight|overallRank|overallScore|genderId|year|
+------------+-----------------+---------+-----------+------+------+-------------------+-------------------+--------+------------------+-----------+--------------------+---+------+------+-----------+------------+--------+----+
|      469656|    Jeffrey Adler|  Jeffrey|      Adler|   ACT|     M|                 CA|             Canada|      35|North America East|      18059| CrossFit Wonderland| 29| 69 in|197 lb|          1|         107|       1|2023|
|      300638|   Tola Morakinyo|     Tola|  Morakinyo|   ACT|     M|                 US|    