In [1]:
import pyspark
import pyspark
from pyspark.sql import SparkSession
import os

In [2]:
# DEFINE SENSITIVE VARIABLES
NESSIE_URI = "http://nessie:19120/api/v1"
MINIO_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY_ID", "admin")
MINIO_SECRET_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY", "password")
SPARK_EXTRA_JARS = os.environ.get("SPARK_EXTRA_JARS", "")

if not SPARK_EXTRA_JARS:
    raise ValueError("SPARK_EXTRA_JARS env var is required. Rebuild the notebook container.")

conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        # packages staged in the image
        .set('spark.jars', SPARK_EXTRA_JARS)
        #SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
        #Configuring Catalog
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', NESSIE_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.nessie.warehouse', 's3a://warehouse')
        .set('spark.sql.catalog.nessie.s3.endpoint', 'http://minio:9000')
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        #MINIO CREDENTIALS
        .set('spark.hadoop.fs.s3a.access.key', MINIO_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', MINIO_SECRET_KEY)
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")



Spark Running


In [5]:
## LOAD A CSV INTO AN SQL VIEW
csv_df = spark.read.format("csv").option("header", "true").load("../datasets/df_open_2023.csv")
csv_df.createOrReplaceTempView("csv_open_2023")

## CREATE AN ICEBERG TABLE FROM THE SQL VIEW
spark.sql("CREATE TABLE IF NOT EXISTS nessie.df_open_2023_lesson2 USING iceberg AS SELECT * FROM csv_open_2023").show()

## QUERY THE ICEBERG TABLE
spark.sql("SELECT * FROM nessie.df_open_2023_lesson2 limit 10").show()

## QUERY THE COUNT OF ENTRIES
spark.sql("SELECT Count(*) as Total FROM nessie.df_open_2023_lesson2").show()

++
||
++
++

+------------+-----------------+---------+-----------+------+------+-------------------+-------------------+--------+------------------+-----------+--------------------+---+------+------+-----------+------------+--------+----+
|competitorId|   competitorName|firstName|   lastName|status|gender|countryOfOriginCode|countryOfOriginName|regionId|        regionName|affiliateId|       affiliateName|age|height|weight|overallRank|overallScore|genderId|year|
+------------+-----------------+---------+-----------+------+------+-------------------+-------------------+--------+------------------+-----------+--------------------+---+------+------+-----------+------------+--------+----+
|      469656|    Jeffrey Adler|  Jeffrey|      Adler|   ACT|     M|                 CA|             Canada|      35|North America East|      18059| CrossFit Wonderland| 29| 69 in|197 lb|          1|         107|       1|2023|
|      300638|   Tola Morakinyo|     Tola|  Morakinyo|   ACT|     M|           