In [1]:
import os

import pyspark
from pyspark.sql import SparkSession

In [2]:
## DEFINE SENSITIVE VARIABLES
NESSIE_URI = "http://catalog:19120/api/v1"
MINIO_URI = "http://storage:9000"
MINIO_ACCESS_KEY = "myaccesskey"
MINIO_SECRET_KEY = "mysecretkey"


In [3]:
conf = (
    pyspark.SparkConf()
    .setAppName("app_name")
    # packages
    .set(
        "spark.jars.packages",
        "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.101.3,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178",
    )
    # SQL Extensions
    .set(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions",
    )
    # Configuring Catalog
    .set("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.nessie.uri", NESSIE_URI)
    .set("spark.sql.catalog.nessie.ref", "main")
    .set("spark.sql.catalog.nessie.authentication.type", "NONE")
    .set(
        "spark.sql.catalog.nessie.catalog-impl",
        "org.apache.iceberg.nessie.NessieCatalog",
    )
    .set("spark.sql.catalog.nessie.warehouse", "s3a://warehouse")
    .set("spark.sql.catalog.nessie.s3.endpoint", MINIO_URI)
    .set("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    # MINIO CREDENTIALS
    .set("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY)
    .set("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY)
)

In [4]:
## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-dadcc0fa-7f90-4632-b5b5-4a500a229181;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.101.3 in central
	found software.amazon.awssdk#bundle;2.17.178 in central
	found software.amazon.eventstream#eventstream;1.0.1 in central
	found software.amazon.awssdk#url-connection-client;2.17.178 in central
	found software.amazon.awssdk#utils;2.17.178 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found softw

Spark Running


In [5]:
## LOAD A CSV INTO AN SQL VIEW
csv_df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("/workspace/datasets/df_open_2023.csv")
)
csv_df.createOrReplaceTempView("csv_open_2023")


                                                                                

In [6]:
## CREATE AN ICEBERG TABLE FROM THE SQL VIEW
spark.sql(
    "CREATE TABLE IF NOT EXISTS nessie.df_open_2023_lesson2 USING iceberg AS SELECT * FROM csv_open_2023"
).show()


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
                                                                                

++
||
++
++



In [7]:
## QUERY THE ICEBERG TABLE
spark.sql("SELECT * FROM nessie.df_open_2023_lesson2 limit 10").show()


+------------+-----------------+---------+-----------+------+------+-------------------+-------------------+--------+------------------+-----------+--------------------+---+------+------+-----------+------------+--------+----+
|competitorId|   competitorName|firstName|   lastName|status|gender|countryOfOriginCode|countryOfOriginName|regionId|        regionName|affiliateId|       affiliateName|age|height|weight|overallRank|overallScore|genderId|year|
+------------+-----------------+---------+-----------+------+------+-------------------+-------------------+--------+------------------+-----------+--------------------+---+------+------+-----------+------------+--------+----+
|      469656|    Jeffrey Adler|  Jeffrey|      Adler|   ACT|     M|                 CA|             Canada|      35|North America East|      18059| CrossFit Wonderland| 29| 69 in|197 lb|          1|         107|       1|2023|
|      300638|   Tola Morakinyo|     Tola|  Morakinyo|   ACT|     M|                 US|    

In [8]:
## QUERY THE COUNT OF ENTRIES
spark.sql("SELECT Count(*) as Total FROM nessie.df_open_2023_lesson2").show()


+------+
| Total|
+------+
|302231|
+------+



In [9]:
## CREATE A BRANCH WITH NESSIE
spark.sql("CREATE BRANCH IF NOT EXISTS lesson2 IN nessie")


DataFrame[refType: string, name: string, hash: string]

In [10]:
## SWITCH TO THE NEW BRANCH
spark.sql("USE REFERENCE lesson2 IN nessie")


DataFrame[refType: string, name: string, hash: string]

In [11]:
## DELETE ALL RECORDS WHERE countryOfOriginCode = 'FR'
spark.sql("DELETE FROM nessie.df_open_2023_lesson2 WHERE countryOfOriginCode = 'FR'")


                                                                                

DataFrame[]

In [12]:
## QUERY THE COUNT OF ENTRIES
spark.sql("SELECT Count(*) as Total FROM nessie.df_open_2023_lesson2").show()


+------+
| Total|
+------+
|280003|
+------+



In [13]:
## SWITCH BACK TO MAIN BRANCH
spark.sql("USE REFERENCE main IN nessie")


DataFrame[refType: string, name: string, hash: string]

In [14]:
## QUERY THE COUNT OF ENTRIES
spark.sql("SELECT Count(*) as Total FROM nessie.df_open_2023_lesson2").show()


+------+
| Total|
+------+
|302231|
+------+



In [15]:
## MERGE THE CHANGES
spark.sql("MERGE BRANCH lesson2 INTO main IN nessie")


DataFrame[name: string, hash: string]

In [16]:
## QUERY THE COUNT OF ENTRIES
spark.sql("SELECT Count(*) as Total FROM nessie.df_open_2023_lesson2").show()


+------+
| Total|
+------+
|302231|
+------+

