<p align="center">
  <a href="" rel="noopener">
 <img width=500px height=100px src="https://docs.delta.io/latest/_static/delta-lake-logo.png" alt="Project logo"></a>
</p>

<h6 align="center">Delta Lake é um projeto de código aberto que permite a construção de uma arquitetura Lakehouse. Fornecendo features como, transações ACID, manipulação de metadados escalonáveis e unificação do modo de realizar processamento de dados em lote e streaming sobre os data lakes existentes, como S3, ADLS, GCS e HDFS.</h6>

<div align="center">
</div>

<h6 align="center">
Delta Lake é uma camada de armazenamento que utiliza o formato Parquet como padrão e que fornece transações compatíveis com ACID e benefícios adicionais para Data Lakes. O Delta Lake pode ajudar a resolver diversos problemas alguns deles são:
</h6>
    
- Dificuldade em anexar dados
- Jobs extremamente custosos que falham durante a execução
- Modificações de dados armazenados são difíceis
- Operações em tempo real
- É caro manter versões históricas de dados
- Difícil de lidar com metadados grandes
- Problemas de “muitos arquivos”
- É difícil obter um ótimo desempenho
- Problemas de qualidade de dados


## Jars for Delta

In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("0.a3lab - Delta Lake")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

21/11/15 23:40:12 WARN Utils: Your hostname, Carloss-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.15.8 instead (on interface en0)
21/11/15 23:40:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/carlosbarbosa/opt/miniconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/carlosbarbosa/.ivy2/cache
The jars for the packages stored in: /Users/carlosbarbosa/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f71eb7e1-c25d-42f5-bb25-4fc8249025de;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 553ms :: artifacts dl 28ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;1.0.0 from central in [default]
	org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
	org.antlr#ST4;4.0.8 from central in [default]
	org

In [2]:
from delta.tables import DeltaTable

## Read Raw Data

In [3]:
df_user = (
    spark
    .read
    .format("json")
    .load("../landing-zone/user")
    .select("user_id", "email")
)

df_user.show(n=5, truncate=False)
print(f"AMOUNT {df_user.count()}")

                                                                                

+-------+-----------------------+
|user_id|email                  |
+-------+-----------------------+
|1703   |daron.bailey@email.com |
|3650   |jonah.barrows@email.com|
|8809   |carla.hansen@email.com |
|4606   |tomas.ledner@email.com |
|1      |alyse.ortiz@email.com  |
+-------+-----------------------+
only showing top 5 rows

AMOUNT 600


## Write Delta Table

In [9]:
(
    df_user
    .write
    .format("delta")
    .mode("overwrite") # overwrite | append
    .save("../bronze/delta/user")
)

                                                                                

## Delta Object Instance

In [None]:
deltaTable = DeltaTable.forPath(spark, "../bronze/delta/user")
deltaTable

## Delete

In [None]:
print("BEFORE DELETE")
history = deltaTable.toDF()
history.where("user_id == 1").show(truncate=False)

print("AFTER DELTE")
deltaTable.delete(
    "user_id == 1"
)

history = deltaTable.toDF()
history.where("user_id == 1").show(truncate=False)

## Update

In [None]:
print("BEFORE UPDATE")
history = deltaTable.toDF()
history.where("email == 'marcos.collier@email.com'").show(truncate=False)

print("AFTER UPDATE")
deltaTable.update(
    condition = "email = 'marcos.collier@email.com'",
    set = { "email": "'a3lab@gmail.com'" } 
)

history = deltaTable.toDF()
history.where("email == 'a3lab@gmail.com'").show(truncate=False)

## Old Data

In [None]:
df_old = (
    spark.read.format("delta").load("../bronze/delta/user")
)

df_old.where("user_id in (1703, 3650)").show(truncate=False)

## New Data

In [None]:
items = [("1703","carlao@gmail.com"), ("3650", "leozao@a3data.com.br")]
cols = ["user_id", "email"]

In [None]:
df_new = spark.createDataFrame(items, cols)
df_new.show()

## Upserts

In [None]:
(
    deltaTable.alias("old")
    .merge(
        df_new.alias("new"),"old.user_id = new.user_id"
    )
    .whenMatchedUpdateAll(
        condition = "old.user_id = new.user_id"
    )
    .whenNotMatchedInsertAll()
    .execute()
)

## After Upsert

In [None]:
final_delta = deltaTable.toDF()
final_delta.where("user_id in (1703, 3650)").show(truncate=False)

print(f"AMOUNT: {final_delta.count()}")

In [None]:
df_old.count() == final_delta.count()

## Time Travel

In [None]:
history_delta = deltaTable.history()
history_delta.show(vertical=True, n=10, truncate=False)

In [None]:
(
    history_delta
    .select("version","operation","timestamp")
    .where("operation == 'WRITE'")
    .show(truncate=False)
)

In [None]:
time_travel_version_0 = (
    spark
    .read
    .format("delta")
    .option("versionAsOf", "0")
    .load("../bronze/delta/user")
)

(
    time_travel_version_0
    .where("user_id in (1703, 3650)")
    .show(truncate=False)
)

In [None]:
(
    history_delta
    .select("version","operation","timestamp")
    .where("operation == 'MERGE'")
    .show(truncate=False)
)

In [None]:
time_travel_timestamp = (
    spark
    .read
    .format("delta")
    .option("timestampAsOf", "2021-11-09 17:25:33.472") # register timestamp
    .load("../bronze/delta/user")
)

time_travel_timestamp.where("user_id in (1703, 3650)").show(truncate=False)

## Verify Changes Beetween to Versions Delta

In [None]:
time_travel_timestamp.exceptAll(time_travel_version_0).show()

## Merge Schema (Evolution Schema)

In [None]:
df_user = (
    spark
    .read
    .format("json")
    .load("../landing-zone/user")
    .select("user_id", "email") #remove gender
)

(
    df_user
    .write
    .format("delta")
    .mode("overwrite")
    .option("mergeSchema", True)
    .save("../bronze/delta/user")
)

In [None]:
df_merge_schema = (
    spark
    .read
    .format("delta")
    .load("../bronze/delta/user")
    .show(n=5, truncate=False)
)

## Overwrite Schema (Evolution Schema)

In [None]:
df_user = (
    spark
    .read
    .format("json")
    .load("../landing-zone/user")
    .select("user_id", "email") #remove gender
)

df_user.show(truncate=False, n=5)

(
    df_user
    .repartition(10)
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", True)
    .save("../bronze/delta/user")
)

In [None]:
df_merge_schema = (
    spark
    .read
    .format("delta")
    .load("../bronze/delta/user")
    .show(n=5, truncate=False)
)

## Symlink Format Manifest

In [None]:
deltaTable = DeltaTable.forPath(spark, "../bronze/delta/user")
deltaTable.generate("symlink_format_manifest")

## Compact Old Files

In [None]:
# deltaTable.vacuum()
# deltaTable.vacuum(200)

## Migrate Data Lake Parquet to Delta

In [None]:
# deltaTable = DeltaTable.convertToDelta(spark, f"parquet.`../datalake-in-parquet/user`")