# Demo and Comparison of Big Data File Formats

### CSV and JSON
Old data formats that are not designed for big data and scaling  
**Typical feature:** human-readable

### Avro, ORC, Parquet
First generation of special big data formats that allow fast writes, fast reads or both  
**Typical features:** splittable, compressible, data skipping and predicat pushdown, data schema included

### Delta, Iceberg, Hudi
Latest generation of big data formats that support ACID transactions, audit save transaction logs and time travel  
**Typical features:** enhancing first generation format with additonal meta data and read/write procedures

## 1. Import

#### Import the necessary libraries for data processing using PySpark. Some of the important imports include:
- SparkContext and SparkConf from pyspark: these libraries are used to initialize the Spark cluster and set up the configuration for the cluster.
- SparkSession and SQLContext from pyspark.sql: These libraries are used for creating and interacting with Spark SQL contexts.


In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.functions import col

from delta import *

import datetime
from datetime import datetime, timedelta

import json
import csv

# use 95% of the screen for jupyter cell
from IPython.display import display, HTML
display(HTML("<style>.container {width:100% !important; }<style>"))

## 2. Launch Spark Jupyter and Configuration

#### Configure a Spark session for Kubernetes cluster with S3 support
### CLUSTER MANAGER
- set the Kubernetes master URL as Cluster Manager(“k8s://https://” is NOT a typo, this is how Spark knows the “provider” type)

### KUBERNETES
- set the namespace that will be used for running the driver and executor pods
- set the docker image from which the Worker/Exectutor pods are created
- set the Kubernetes service account name and provide the authentication details for the service account (required to create worker pods)

### SPARK
- set the driver host and the driver port (find name of the driver service with 'kubectl get services' or in the helm chart configuration)
- enable Delta Lake, Iceberg, and Hudi support by setting the spark.sql.extensions
- configure Hive catalog for Iceberg
- enable S3 connector
- set the number of worker pods, their memory and cores (HINT: number of possible tasks = cores * executores)

### SPARK SESSION
- create the Spark session using the SparkSession.builder object
- get the Spark context from the created session and set the log level to "ERROR".


In [None]:
spark.stop()

In [None]:
appName="jupyter-spark"

conf = SparkConf()

# CLUSTER MANAGER

conf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")

# CONFIGURE KUBERNETES

conf.set("spark.kubernetes.namespace","frontend")
conf.set("spark.kubernetes.container.image", "thinkportgmbh/workshops:spark-3.3.2")
conf.set("spark.kubernetes.container.image.pullPolicy", "Always")

conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
conf.set("spark.kubernetes.authenticate.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
conf.set("spark.kubernetes.authenticate.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token")

# CONFIGURE SPARK

conf.set("spark.sql.session.timeZone", "Europe/Berlin")
conf.set("spark.driver.host", "jupyter-spark-driver.frontend.svc.cluster.local")
conf.set("spark.driver.port", "29413")

conf.set("spark.jars", "/opt/spark/jars/spark-avro_2.12-3.3.2.jar")
conf.set("spark.driver.extraClassPath","/opt/spark/jars/spark-avro_2.12-3.3.2.jar")
conf.set("spark.executor.extraClassPath","/opt/spark/jars/spark-avro_2.12-3.3.2.jar")

conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension, org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions, org.apache.spark.sql.hudi.HoodieSparkSessionExtension")

######## Hive als Metastore einbinden
conf.set("hive.metastore.uris", "thrift://hive-metastore.hive.svc.cluster.local:9083") 

######## Iceberg configs
conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
conf.set("spark.sql.catalog.ice","org.apache.iceberg.spark.SparkCatalog") 
conf.set("spark.sql.catalog.ice.type","hive") 
conf.set("spark.sql.catalog.ice.uri","thrift://hive-metastore.hive.svc.cluster.local:9083") 


####### Hudi configs
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

# CONFIGURE S3 CONNECTOR
conf.set("spark.hadoop.fs.s3a.endpoint", "minio.minio.svc.cluster.local:9000")
conf.set("spark.hadoop.fs.s3a.access.key", "trainadm")
conf.set("spark.hadoop.fs.s3a.secret.key", "train@thinkport")
conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
conf.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
conf.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")

# CONFIGURE WORKER (Customize based on workload)

conf.set("spark.executor.instances", "1")
conf.set("spark.executor.memory", "1G")
conf.set("spark.executor.cores", "2")

# SPARK SESSION

spark = SparkSession\
    .builder\
    .config(conf=conf) \
    .config('spark.sql.session.timeZone', 'Europe/Berlin') \
    .appName(appName)\
    .enableHiveSupport() \
    .getOrCreate()


sc=spark.sparkContext
sc.setLogLevel("ERROR")

# get the configuration object to check all the configurations the session was startet with
for entry in sc.getConf().getAll():
        if entry[0] in ["spark.app.name","spark.kubernetes.namespace","spark.executor.memory","spark.executor.cores","spark.driver.host","spark.master","spark.sql.extensions"]:
            print(entry[0],"=",entry[1])

## Create sample data

In [None]:
# initial daten
account_data1 = [
    (1,"alex","2019-01-01",1000),
    (2,"alex","2019-02-01",1500),
    (3,"alex","2019-03-01",1700),
    (4,"maria","2020-01-01",5000)
    ]

# update mit Änderung und neuem Datensat
account_data2 = [
    (1,"alex","2019-03-01",3300),
    (2,"peter","2021-01-01",100)
    ]

# Update mit neuer Spalte
account_data3 = [
    (1,"otto","2019-10-01",4444,"neue Spalte 1")
]

schema = ["id","account","dt_transaction","balance"]
schema3 = ["id","account","dt_transaction","balance","new"]

df1 = spark.createDataFrame(data=account_data1, schema = schema).withColumn("dt_transaction",col("dt_transaction").cast("date")).repartition(3)
df2 = spark.createDataFrame(data=account_data2, schema = schema).withColumn("dt_transaction",col("dt_transaction").cast("date")).repartition(3)
df3 = spark.createDataFrame(data=account_data3, schema = schema3).withColumn("dt_transaction",col("dt_transaction").cast("date")).repartition(3)

print("++ create new dataframe and show schema and data")
print("################################################")

# df1.printSchema()
df1.show(truncate=False)
df3.show(truncate=False)

## Configure boto3

In [None]:
import boto3
from botocore.client import Config

options = {
    'endpoint_url': 'http://minio.minio.svc.cluster.local:9000',
    'aws_access_key_id': 'trainadm',
    'aws_secret_access_key': 'train@thinkport',
    'config': Config(signature_version='s3v4'),
    'verify': False}

s3_resource = boto3.resource('s3', **options)  

s3_client = boto3.client('s3', **options)

bucket = "fileformats"
bucket_path="s3://"+bucket

In [None]:
def ls(bucket,prefix):
    '''List objects from bucket/prefix'''
    try:
        for obj in s3_resource.Bucket(bucket).objects.filter(Prefix=prefix):
            print(obj.key)
    except Exception as e: 
        print(e)
    
    
def cat(bucket,prefix,binary=False):
    '''Show content of one or several files with same prefix/wildcard'''
    try:
        for obj in s3_resource.Bucket(bucket).objects.filter(Prefix=prefix):
            print("File:",obj.key)
            print("----------------------")
            if binary==True:
                print(obj.get()['Body'].read())
            else: 
                print(obj.get()['Body'].read().decode())
            print("######################")
    except Exception as e: 
        print(e)
            
def rm(bucket,prefix):
    '''Delete everything from bucket/prefix'''
    for object in s3_resource.Bucket(bucket).objects.filter(Prefix=prefix):
        print(object.key)
        s3_client.delete_object(Bucket=bucket, Key=object.key)
    print(f"Deleted files from {bucket}/{prefix}*")


In [None]:
# show everything in bucket
ls(bucket,"")
print("#############################")
# show folder
ls(bucket,"csv")
print("#############################")
# show subfolder
ls(bucket,"delta/_delta_log/")
print("#############################")
print("")
# show content of one or several files with same prefix/wildcard
cat(bucket,'csv/part')

In [None]:
rm("fileformats", "csv")

## CSV

In [None]:
print("Number of Partitions:", df1.rdd.getNumPartitions())

write_csv=(df1
           .write
           .format("csv")
           .mode("overwrite") # append
           .save(f"s3://{bucket}/csv")
          )


In [None]:
ls(bucket,"csv")

In [None]:
cat(bucket,"csv/part")

In [None]:
read_csv=spark.read.format("csv").load(f"s3://{bucket}/csv")

read_csv.printSchema()
read_csv.show()

In [None]:
write_csv=(df3
           .write
           .format("csv")
           .mode("append") # append
           .save(f"s3://{bucket}/csv")
          )

In [None]:
ls(bucket,"csv")

In [None]:
cat(bucket,"csv/part")

In [None]:
read_csv=spark.read.format("csv").load(f"s3://{bucket}/csv")

read_csv.printSchema()
read_csv.show()

* kein Schema (Typen)
* kein anfügen neuer Spalten

## JSON

In [None]:
print("Number of Partitions:", df1.rdd.getNumPartitions())

write_json=(df1
           .write
           .format("json")
           .mode("overwrite") # append
           .save(f"s3://{bucket}/json")
          )


In [None]:
ls(bucket,"json")

In [None]:
cat(bucket,"json/part")

In [None]:
write_csv=(df3
           .write
           .format("json")
           .mode("append") # append
           .save(f"s3://{bucket}/json")
          )

In [None]:
read_json=spark.read.format("json").load(f"s3://{bucket}/json")

read_json.printSchema()
read_json.show()

* Kein Schema
* Neue Spalten werden als neues Attribut hinzugefügt

## Avro

In [None]:
print("Number of Partitions:", df1.rdd.getNumPartitions())

write_avro=(df1
           .write
           .format("avro")
           .mode("overwrite") # append
           .save(f"s3://{bucket}/avro")
          )

In [None]:
ls(bucket,"avro")

In [None]:
cat(bucket,"avro/part",True)

In [None]:
read_json=spark.read.format("avro").load(f"s3://{bucket}/avro")
read_json.printSchema()
read_json.show()

In [None]:
write_avro=(df3
           .write
           .format("avro")
           .mode("append") # append
           .save(f"s3://{bucket}/avro")
          )

In [None]:
read_json=spark.read.format("avro").load(f"s3://{bucket}/avro")
read_json.printSchema()
read_json.show()

* Schema erhalten
* Schema evolution 

## Parquet

In [None]:
print("Number of Partitions:", df1.rdd.getNumPartitions())

write_parquet=(df1
           .write
           .partitionBy("account")
           .format("parquet")
           .mode("overwrite") # append
           .save(f"s3://{bucket}/parquet")
          )


In [None]:
ls(bucket,"parquet")

In [None]:
cat(bucket,"parquet/account=maria",True)

In [None]:
read_parquet=(spark
              .read.format("parquet")
              .load(f"s3://{bucket}/parquet")
              .filter(col("account")=="alex")
             )

read_parquet.printSchema()
read_parquet.show()

## Delta

- a **storage layer** that runs on top of existing data lakes
- supports ACID transactions and data versioning
- allows data lineage tracking
- provides optimization for streaming workloads

In [None]:
write_delta=(df1
           .write
           .format("delta")
           .option("mergeSchema", "false")
           .mode("overwrite") # append
           .save(f"s3://{bucket}/delta")
          )

In [None]:
ls(bucket,"delta")

In [None]:
ls(bucket,"delta/_delta_log")

In [None]:
cat(bucket,"delta/_delta_log")

In [None]:
write_delta=(df2
           .write
           .format("delta")
           .mode("append") # append
           .save(f"s3://{bucket}/delta")
          )

In [None]:
write_delta=(df3
           .write
           .format("delta")
           .option("mergeSchema", "true")
           .mode("overwrite") # append
           .save(f"s3://{bucket}/delta")
          )

In [None]:
cat(bucket,"delta/_delta_log")

In [None]:
deltaTable = DeltaTable.forPath(spark, f"s3://{bucket}/delta")
# --> Vermutlich falsche Delta Version zu Spark
fullHistoryDF = deltaTable.history()    # get the full history of the table

In [None]:
fullHistoryDF.select("version","readVersion","timestamp","userId","operation","operationParameters","operationMetrics","userMetadata").show(truncate=True)

In [None]:
spark.read.format("delta").load(f"s3://{bucket}/delta").show()

## Delta: Time travel

In [None]:
spark.read.format("delta").option("versionAsOf", "1").load(f"s3://{bucket}/delta").show()


## Delta: Merge

In [None]:
deltaTable2 = DeltaTable.forPath(spark, f"s3://{bucket}/delta")


df2a=df2.withColumn("new",f.lit("test"))
df2a.show()
deltaTable2.toDF().show()

In [None]:
dt3=(deltaTable2.alias("oldData")
      .merge(df2a.alias("newData"),
            "oldData.account = newData.account AND oldData.dt_transaction = newData.dt_transaction")
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
      .execute()
    )

deltaTable2.toDF().show()

In [None]:
result=(deltaTable2
        .toDF()
        .withColumn("month",f.month(col("dt_transaction")))
        .groupBy("account","month").agg(f.sum("balance"))
        .sort("account","month")
       )
result.show()

In [None]:
result=(spark.read
        .format("delta")
        .option("versionAsOf", "1")
        .load(f"s3://{bucket}/delta")
        .withColumn("month",f.month(col("dt_transaction")))
        .groupBy("account","month").agg(f.sum("balance"))
        .sort("account","month")
       )
result.show()

* Schema
* Schema evolution
* Transaction Log
* Time Travel

## Iceberg

- a **table format**
- supports schema evolution and provides a portable table metadata format
- best suited for analytical workloads

In [None]:
print("Current Catalog:",spark.catalog.currentDatabase())
print("List Catalogs:",spark.catalog.listDatabases())
print("List Tables in current Catalog:",spark.catalog.listTables())

In [None]:
# create a Database(name=<db_name>, locationUri='s3a://<bucket>/')
spark.sql(f"CREATE DATABASE iceberg_db LOCATION 's3a://{bucket}/'")

In [None]:
### show databases and tables in iceberg catalog (only sees iceberg formated tables)
# all databases from hive are shown
spark.sql("SHOW databases from ice").show()

In [None]:
spark.sql("show tables from iceberg_db").show()

In [None]:
#### Delete Iceberg tables: first drop the table 
#spark.sql("drop table iceberg_db.iceberg_table")
#delete_objects("aleks-test", "iceberg_table")

In [None]:
write_iceberg=(df1
                  .write
                  .format("iceberg")
                  .mode("overwrite")
                  .saveAsTable("iceberg_db.iceberg")
               )


In [None]:
ls(bucket,"iceberg")

In [None]:

write_iceberg=(df2
                   .write
                   .format("iceberg")
                   .mode("append") # append
                   .saveAsTable("iceberg_db.iceberg")
                 )

In [None]:
ls(bucket,"iceberg")

In [None]:
cat(bucket,"iceberg/metadata/00000-2346cea3-3db3-46a4-bb55-419ae993156b.metadata.json",False)

In [None]:
#ALTER TABLE myTable ADD COLUMNS (address VARCHAR) - the number of columns in the df3 does not match the schema of the table, so we modify the schema of the existing table
spark.sql("ALTER TABLE iceberg_db.iceberg ADD COLUMNS (new VARCHAR(50))")

In [None]:
write_iceberg=(df3
                  .write
                  .format("iceberg")
                  .mode("append") # append
                  .option("schema", schema3)
                  .saveAsTable("iceberg_db.iceberg")
                  )

In [None]:
## Read Iceberg table:

iceberg_df = spark.read.table("iceberg_db.iceberg")
iceberg_df.printSchema()

In [None]:

spark.sql("SELECT * FROM iceberg_db.iceberg.history;").show()
spark.sql("SELECT * FROM iceberg_db.iceberg.files;").show()
spark.sql("SELECT * FROM iceberg_db.iceberg.snapshots;").show()

## alternative syntax example:
# spark.read.format("iceberg").load("iceberg_db.iceberg_table.files").show()


### Iceberg: Time Travel
- ```snapshot-id``` selects a specific table snapshot
- ```as-of-timestamp``` selects the current snapshot at a timestamp, in milliseconds
- ```branch``` selects the head snapshot of the specified branch. Note that currently branch cannot be combined with as-of-timestamp.
- ```tag``` selects the snapshot associated with the specified tag. Tags cannot be combined with as-of-timestamp.

In [None]:
# from the results of iceberg_table.snapshots get the snapshots IDs
snapshot1 = spark.read \
                 .option("snapshot-id", "2282180466624073266") \
                 .format("iceberg") \
                 .load("iceberg_db.iceberg").show()

In [None]:
snapshot2 = spark.read \
                 .option("snapshot-id", "4263160168885610306") \
                 .format("iceberg") \
                 .load("iceberg_db.iceberg").show()

In [None]:
tsToExpire = f.current_timestamp() - timedelta(minutes=10)
print(tsToExpire)

In [None]:
## need iceberg.table
## geth nicht verstehe ich. noch nicht??
table.expireSnapshots().expireOlderThan(tsToExpire).commit();

# Hudi

- a **storage abstraction layer** 
- enables data ingestion and query capability on large-scale, evolving datasets
- well-suited for real-time streaming workloads and batch processing

In [None]:
# update partition path, i.e. "id/dt_transaction"
record_key = "id"
partition_path = "id"

hudi_options = {
    "hoodie.table.name": df1,
    "hoodie.datasource.write.recordkey.field": record_key,
    "hoodie.datasource.write.partitionpath.field": partition_path,
    "hoodie.datasource.write.table.name": df1,
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.precombine.field": "ts",  # This field is used by Hoodie to resolve conflicts between records with the same key (in this case, id) 
    "hoodie.upsert.shuffle.parallelism": 2,
    "hoodie.insert.shuffle.parallelism": 2
}

In [None]:
write_hudi=(df1.withColumn("ts", f.current_timestamp()).write.format("hudi") # "ts" field is a mandatory field in Hoodie that specifies the timestamp of the record, so we add a new column and use simple current_timestamp() function
               .options(**hudi_options)
               .mode("overwrite")
               .save(f"s3://{bucket}/hudi")
               )

In [None]:
ls(bucket,"hudi")

In [None]:
write_hudi=(df2.withColumn("ts", f.current_timestamp()).write.format("hudi") # "ts" field is a mandatory field in Hoodie that specifies the timestamp of the record, so we add a new column and use simple current_timestamp() function
               .options(**hudi_options)
               .mode("append")
               .save(f"s3://{bucket}/hudi")
               )

In [None]:
write_hudi=(df3.withColumn("ts", f.current_timestamp()).write.format("hudi") # "ts" field is a mandatory field in Hoodie that specifies the timestamp of the record, so we add a new column and use simple current_timestamp() function
               .options(**hudi_options)
               .mode("append")
               .save(f"s3://{bucket}/hudi")
               )

In [None]:
df_hudi = spark.read.format("hudi").load(f"s3://{bucket}/hudi").show()

In [None]:
ls(bucket,"hudi")

#### Hudi: Time Travel 

In [None]:
## Get the commit time from the Hudi table

spark.read.format("hudi")\
     .option("as.of.instant", "20230515122339203")\
     .load(f"s3://{bucket}/hudi").show()

In [None]:
account_data4 = [
    (5,"anna","2020-11-01",2000,"neue Spalte 1")
]
df4 = spark.createDataFrame(data=account_data4, schema = schema3).withColumn("dt_transaction",col("dt_transaction").cast("date")).repartition(3)

write_hudi=(df4.withColumn("ts", f.current_timestamp()).write.format("hudi")
               .options(**hudi_options)
               .mode("append")
               .save(f"s3://{bucket}/hudi")
               )

In [None]:
## Incremental query:

spark.read.format("hudi"). \
  load(f"s3://{bucket}/hudi"). \
  createOrReplaceTempView("hudi_snapshots")

In [None]:
commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_snapshots order by commitTime").limit(10).collect()))
print(commits)

beginTime = commits[len(commits) - 4] # commit time we are interested in


In [None]:
# incrementally query data
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': beginTime,
}

hudiIncrementalDF = spark.read.format("hudi"). \
  options(**incremental_read_options). \
  load(f"s3://{bucket}/hudi")
hudiIncrementalDF .createOrReplaceTempView("hudi_incremental")

spark.sql("select `_hoodie_commit_time`, account, balance, dt_transaction, ts from hudi_incremental").show(truncate=False)

### Hudi: Table maintenance
Hudi can run async or inline table services while running Strucrured Streaming query and takes care of cleaning, compaction and clustering. There's no operational overhead for the user.
For CoW tables, table services work in inline mode by default.
For MoR tables, some async services are enabled by default.

In [None]:
spark.stop()