<a href="https://colab.research.google.com/github/andrea-rockt/colab-notebooks/blob/main/project_nessie_training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Nessie lab

We need to build our own enviroment to test things out and learn about git like file systems

* as data engineers we want to try things out in order to properly understand systems that we are building.

* as data scientist we want an environment able to support our experimentations.

* as developers we want reproducible enviroments to validate our code on.

Let's begin by configuring our environment, nobody has ever been fired by defining a bit of infrastructure.

We are going to create an environment based on

* Apache Spark: our distributed execution engine, this will be the compute layer of our lab environment and will shuffle data around your cluster and crunch the numbers.
* The local filesystem: we need to store the actual data on a distributed filesystem, we are only going to only use one node so we will select the local filesystem viewing it as a *special* case of a more general distributed filesystem.
* Project nessie: our metadata management solution, table formats describe plain files as collection of related content by attaching metadata to those files, we will store this metadata inside project nessie in order to get time travel on metadata.  

# Installing prerequisites

We are going to configure this colab instance by:

* downloading `spark-3.1.2`
* downloading a binary distribution of `nessie`
* downloading `ngrok`

We are going to access web uis via tunnels provided by `ngrok` (register with your github account or google account on `ngrok.com` and get your auth token)

replace `THE_AUTH_TOKEN_FOR_NGROK` with your actual auth token

In [None]:
%%shell
mkdir -p build
cd build
echo "Installing SPARK"
wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
tar xf spark-3.1.2-bin-hadoop3.2.tgz
echo "Installing FINDSPARK"
pip -q install findspark 
pip -q install pynessie
echo "Installing NESSIE"
wget -q https://github.com/andrea-rockt/colab-notebooks/raw/main/data/nessie-quarkus-0.9.2.tar.gz
tar xf nessie-quarkus-0.9.2.tar.gz
chmod +x nessie-quarkus-0.9.2.bin
wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.tgz
tar xf ngrok-stable-linux-amd64.tgz

./ngrok authtoken THE_AUTH_TOKEN_FOR_NGROK

Installing SPARK
Installing FINDSPARK
[K     |████████████████████████████████| 53 kB 1.4 MB/s 
[K     |████████████████████████████████| 47 kB 3.6 MB/s 
[K     |████████████████████████████████| 130 kB 10.3 MB/s 
[K     |████████████████████████████████| 8.5 MB 51.1 MB/s 
[K     |████████████████████████████████| 138 kB 70.8 MB/s 
[K     |████████████████████████████████| 127 kB 70.2 MB/s 
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
datascience 0.10.6 requires folium==0.2.1, but you have folium 0.8.3 which is incompatible.[0m
[?25hInstalling NESSIE
Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml




We will start nessie as a background process, nessie will serve its web UI at localhost:19120

nessie will use in-memory persistence so everything we do will be ephemeral

In [None]:
import os
os.system("/content/build/nessie-quarkus-0.9.2.bin -Xmx512m 2>&1 > nessie.log &")

0

# Firing up spark

Let's create a pyspark session.

In [None]:
import findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/build/spark-3.1.2-bin-hadoop3.2"

# Full url of the Nessie API endpoint to nessie
url = "http://localhost:19120/api/v1"
# Where to store nessie tables
full_path_to_warehouse = '/warehouse/'
# The ref or context that nessie will operate on (if different from default branch).
# Can be the name of a Nessie branch or tag or a Nessie commit SHA.
ref = "main"
# Nessie authentication type (BASIC, NONE or AWS)
auth_type = "NONE"

findspark.init()
from pyspark.sql import SparkSession
spark= SparkSession \
       .builder \
       .appName("spark-nessie-training") \
       .config("spark.jars.packages",
              "org.apache.iceberg:iceberg-spark3-runtime:0.12.0,org.projectnessie:nessie-spark-extensions:0.18.0") \
        .config("spark.sql.extensions", 
               "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions") \
        .config("spark.sql.catalog.nessie.uri", url) \
        .config("spark.sql.catalog.nessie.ref", ref) \
        .config("spark.sql.catalog.nessie.authentication.type", auth_type) \
        .config("spark.sql.catalog.nessie.catalog-impl", 
              "org.apache.iceberg.nessie.NessieCatalog") \
        .config("spark.sql.catalog.nessie.warehouse", full_path_to_warehouse) \
        .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.nessie.cache-enabled","false") \
       .getOrCreate()
spark

# Input dataset preparation

We are going to prepare data directly in the main branch to simulate a starting state of our initial data pipeline

In [None]:
#we are going to download a dataset of nba players
!wget -q https://github.com/sivabalanb/Data-Analysis-with-Pandas-and-Python/raw/master/nba.csv

In [None]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, DecimalType
from pyspark.sql.functions import mean
playersSchema = StructType([
  StructField("Name",StringType(),False), \
  StructField("Team",StringType(),True), \
  StructField("Number",StringType(),True), \
  StructField("Position", StringType(), True), \
  StructField("Age", StringType(), True), \
  StructField("Height", StringType(), True), \
  StructField("Weight", DoubleType(), True), \
  StructField("College", StringType(), True), \
  StructField("Salary", DecimalType(14, 2), True)
])


In [None]:
playersDfRaw = spark.read.csv('nba.csv', header=True, schema=playersSchema)
playersDf = playersDfRaw.select(playersDfRaw.Name,
                                playersDfRaw.Team,
                                playersDfRaw.Number.cast(IntegerType()),
                                playersDfRaw.Position,
                                playersDfRaw.Age.cast(IntegerType()),
                                playersDfRaw.Height,
                                playersDfRaw.Weight,
                                playersDfRaw.College,
                                playersDfRaw.Salary)
createPlayersTableStatement = """
CREATE TABLE if not exists nessie.nba.player (
  Name STRING,
  Team STRING,
  Number INTEGER,
  Position STRING,
  Age INTEGER,
  Height STRING,
  Weight DOUBLE,
  College STRING,
  Salary DECIMAL(14,2)
) USING iceberg
"""

createSalaryTableStatement = """
CREATE TABLE if not exists nessie.nba.salary (
  Position STRING,
  MeanSalary DECIMAL(14,2)
) USING iceberg
"""

spark.sql(createPlayersTableStatement)
spark.sql(createSalaryTableStatement)

playersDf.write.format('iceberg').mode('overwrite').save('nessie.nba.player')
playersDf.groupBy('Position').agg(mean('Salary').alias('MeanSalary')).write.format('iceberg').mode('overwrite').save('nessie.nba.salary')

In [None]:
spark.sql('SHOW LOG fix_null_salaries IN nessie').selectExpr('author', 'message','hash').show()
spark.sql('CREATE TAG initial_state IN nessie')

+------+--------------+--------------------+
|author|       message|                hash|
+------+--------------+--------------------+
|  root|iceberg commit|5404b96ae155513a7...|
|  root|iceberg commit|235c36f81f563bf25...|
|  root|iceberg commit|2bab29394448aa07a...|
|  root|iceberg commit|ae530ac6021d7a974...|
+------+--------------+--------------------+



DataFrame[refType: string, name: string, hash: string]

In [None]:
spark.sql('USE REFERENCE initial_state IN nessie')

spark.sql("""
SELECT 
  SUM(CAST (Salary   is NULL as INTEGER)) ,
  SUM(CAST (College  is NULL as INTEGER)) ,
  SUM(CAST (Weight   is NULL as INTEGER)) ,
  SUM(CAST (Height   is NULL as INTEGER)) ,
  SUM(CAST (Age      is NULL as INTEGER)) ,
  SUM(CAST (Position is NULL as INTEGER)) ,
  SUM(CAST (Number   is NULL as INTEGER)) ,
  SUM(CAST (Team     is NULL as INTEGER)) ,
  SUM(CAST (Name     is NULL as INTEGER))
FROM 
  nessie.nba.player 
""").show()


spark.sql('CREATE BRANCH fix_null_row IN nessie FROM main')
spark.sql('CREATE BRANCH fix_null_salaries IN nessie FROM main')


+----------------------------------+-----------------------------------+----------------------------------+----------------------------------+-------------------------------+------------------------------------+----------------------------------+--------------------------------+--------------------------------+
|sum(CAST((Salary IS NULL) AS INT))|sum(CAST((College IS NULL) AS INT))|sum(CAST((Weight IS NULL) AS INT))|sum(CAST((Height IS NULL) AS INT))|sum(CAST((Age IS NULL) AS INT))|sum(CAST((Position IS NULL) AS INT))|sum(CAST((Number IS NULL) AS INT))|sum(CAST((Team IS NULL) AS INT))|sum(CAST((Name IS NULL) AS INT))|
+----------------------------------+-----------------------------------+----------------------------------+----------------------------------+-------------------------------+------------------------------------+----------------------------------+--------------------------------+--------------------------------+
|                                12|                         

DataFrame[refType: string, name: string, hash: string]

In [None]:
spark.sql('USE REFERENCE fix_null_row IN nessie')


spark.sql(
"""
DELETE FROM nessie.nba.player
WHERE
Salary is NULL AND
College is NULL AND
Weight is NULL AND
Height is NULL AND
Age is NULL AND
Position is NULL AND
Number is NULL AND
Team is NULL AND
Name is NULL 
""")

DataFrame[]

In [None]:
spark.sql('USE REFERENCE fix_null_salaries IN nessie')

spark.read.format('iceberg').load('nessie.nba.player').where('salary is NULL').show()

spark.sql("""
UPDATE nessie.nba.player
SET Salary = 100000.00
WHERE Salary is NULL
""")


spark.sql('select * from nessie.nba.player').groupBy('Position').agg(mean('Salary').alias('MeanSalary')).write.format('iceberg').mode('overwrite').save('nessie.nba.salary')
spark.sql('select * from nessie.nba.player').groupBy('Position').agg(mean('Salary').alias('MeanSalary')).show()

+--------------+--------------------+------+--------+----+------+------+--------------------+------+
|          Name|                Team|Number|Position| Age|Height|Weight|             College|Salary|
+--------------+--------------------+------+--------+----+------+------+--------------------+------+
|  John Holland|      Boston Celtics|    30|      SG|  27|   6-5| 205.0|   Boston University|  null|
|   Elton Brand|  Philadelphia 76ers|    42|      PF|  37|   6-9| 254.0|                Duke|  null|
| Dahntay Jones| Cleveland Cavaliers|    30|      SG|  35|   6-6| 225.0|                Duke|  null|
| Jordan Farmar|   Memphis Grizzlies|     4|      PG|  29|   6-2| 180.0|                UCLA|  null|
|  Ray McCallum|   Memphis Grizzlies|     5|      PG|  24|   6-3| 190.0|             Detroit|  null|
|Xavier Munford|   Memphis Grizzlies|    14|      PG|  24|   6-3| 180.0|        Rhode Island|  null|
|Alex Stepheson|   Memphis Grizzlies|    35|      PF|  28|  6-10| 270.0|                 US

In [None]:
spark.sql('SHOW LOG IN nessie').show()
spark.sql('SHOW LOG main IN nessie').selectExpr('1 as main', '*').join(
spark.sql('SHOW LOG fix_null_row IN nessie').selectExpr('0 as main', '*'), 'hash', 'fullouter').sort('hash').show(100000)


+------+---------+--------------------+--------------+-----------+--------------------+--------------------+--------------------+
|author|committer|                hash|       message|signedOffBy|          authorTime|       committerTime|          properties|
+------+---------+--------------------+--------------+-----------+--------------------+--------------------+--------------------+
|  root|         |a1da59817c66cbf8e...|iceberg commit|           |2022-01-20 12:05:...|2022-01-20 12:05:...|{application-type...|
|  root|         |e490defc27af52a10...|iceberg commit|           |2022-01-20 12:05:...|2022-01-20 12:05:...|{application-type...|
|  root|         |5404b96ae155513a7...|iceberg commit|           |2022-01-20 12:05:...|2022-01-20 12:05:...|{application-type...|
|  root|         |235c36f81f563bf25...|iceberg commit|           |2022-01-20 12:05:...|2022-01-20 12:05:...|{application-type...|
|  root|         |2bab29394448aa07a...|iceberg commit|           |2022-01-20 12:05:...|202

In [None]:
spark.sql('MERGE BRANCH fix_null_row INTO main IN nessie').collect()
spark.sql('MERGE BRANCH fix_null_salaries INTO main IN nessie').collect()
spark.sql('SHOW LOG main IN nessie').selectExpr('1 as main', '*').join(
spark.sql('SHOW LOG fix_null_salaries IN nessie').selectExpr('0 as main', '*'), 'hash', 'fullouter').show(100000)

+--------------------+----+------+---------+--------------+-----------+--------------------+--------------------+--------------------+----+------+---------+--------------+-----------+--------------------+--------------------+--------------------+
|                hash|main|author|committer|       message|signedOffBy|          authorTime|       committerTime|          properties|main|author|committer|       message|signedOffBy|          authorTime|       committerTime|          properties|
+--------------------+----+------+---------+--------------+-----------+--------------------+--------------------+--------------------+----+------+---------+--------------+-----------+--------------------+--------------------+--------------------+
|235c36f81f563bf25...|   1|  root|         |iceberg commit|           |2022-01-20 12:05:...|2022-01-20 12:05:...|{application-type...|   0|  root|         |iceberg commit|           |2022-01-20 12:05:...|2022-01-20 12:05:...|{application-type...|
|c8a5e53946d