<p align="center">
  <a href="" rel="noopener">
 <img width=500px height=100px src="https://docs.delta.io/latest/_static/delta-lake-logo.png" alt="Project logo"></a>
</p>

<h6 align="center">Delta Lake é um projeto de código aberto que permite a construção de uma arquitetura Lakehouse. Fornecendo features como, transações ACID, manipulação de metadados escalonáveis e unificação do modo de realizar processamento de dados em lote e streaming sobre os data lakes existentes, como S3, ADLS, GCS e HDFS.</h6>

<div align="center">
</div>

<h6 align="center">
Delta Lake é uma camada de armazenamento que utiliza o formato Parquet como padrão e que fornece transações compatíveis com ACID e benefícios adicionais para Data Lakes. O Delta Lake pode ajudar a resolver diversos problemas alguns deles são:
</h6>
    
- Dificuldade em anexar dados
- Jobs extremamente custosos que falham durante a execução
- Modificações de dados armazenados são difíceis
- Operações em tempo real
- É caro manter versões históricas de dados
- Difícil de lidar com metadados grandes
- Problemas de “muitos arquivos”
- É difícil obter um ótimo desempenho
- Problemas de qualidade de dados


In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("aula1 - mdw deltalake")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

spark

In [2]:
from delta.tables import DeltaTable

## Lendo dados brutos

In [3]:
path_landing = {
    "user":"/home/jovyan/work/datalake/s3/landing-zone/user"
}

In [4]:
df_user = (
    spark
    .read
    .format("json")
    .load(path_landing["user"])
    .select("user_id", "email")
)

df_user.show(n=5, truncate=False)
print(f"QTD REGISTROS: {df_user.count()}")

+-------+-----------------------+
|user_id|email                  |
+-------+-----------------------+
|1703   |daron.bailey@email.com |
|3650   |jonah.barrows@email.com|
|8809   |carla.hansen@email.com |
|4606   |tomas.ledner@email.com |
|1      |alyse.ortiz@email.com  |
+-------+-----------------------+
only showing top 5 rows

QTD REGISTROS: 800


## Escrevendo os dados em formato Delta

In [5]:
path_bronze = {
    "user":"/home/jovyan/work/datalake/s3/bronze/user"
}

In [21]:
(
    df_user
    .write
    .format("delta")
    .mode("overwrite") # overwrite | append
    .save(path_bronze["user"])
)

## Histórico da Delta Table

In [82]:
deltaTable = DeltaTable.forPath(spark, path_bronze["user"])
deltaTable.history().show(truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 25                                                                                                                                                                                                                                                                                
 timestamp           | 2022-02-11 00:13:07.326                                                                                                                                                                                                                                                           
 userId              | null                                                                               

## Delete

In [23]:
print("ANTES DE DELETAR")
history = deltaTable.toDF()
history.where("user_id == 1").show(truncate=False)

print("APÓS DELETAR")
deltaTable.delete(
    "user_id == 1"
)

history = deltaTable.toDF()
history.where("user_id == 1").show(truncate=False)

ANTES DE DELETAR
+-------+---------------------+
|user_id|email                |
+-------+---------------------+
|1      |alyse.ortiz@email.com|
+-------+---------------------+

APÓS DELETAR
+-------+-----+
|user_id|email|
+-------+-----+
+-------+-----+



## Update

In [26]:
print("ANTES DE ATUALIZAR")
history = deltaTable.toDF()
history.where("email == 'marcos.collier@email.com'").show(truncate=False)

print("APÓS ATUALIZAR")
deltaTable.update(
    condition = "email = 'marcos.collier@email.com'",
    set = { "email": "'pucminas@gmail.com'" } 
)

history = deltaTable.toDF()
history.where("email == 'pucminas@gmail.com'").show(truncate=False)

ANTES DE ATUALIZAR
+-------+------------------------+
|user_id|email                   |
+-------+------------------------+
|3395   |marcos.collier@email.com|
+-------+------------------------+

APÓS ATUALIZAR
+-------+------------------+
|user_id|email             |
+-------+------------------+
|3395   |pucminas@gmail.com|
+-------+------------------+



## Gerando novos dados para serem atualizados de forma Incremental

In [27]:
items = [
        ("3395", "pucminas3@gmail.com"), 
        ("3650", "pucminas4@gmail.com.br")
]

items

[('3395', 'pucminas3@gmail.com'), ('3650', 'pucminas4@gmail.com.br')]

In [28]:
cols = [
        "user_id",
        "email"
]

cols

['user_id', 'email']

In [34]:
print("NOVOS DADOS PARA ATUALIZAR")
df_new = spark.createDataFrame(items, cols)
df_new.show(truncate=False)
print(df_new.count())

print("DADOS DESATUALIZADOS")
df_user.where("user_id in (3395, 3650)").orderBy("user_id").show(truncate=False)
df_user.count()

NOVOS DADOS PARA ATUALIZAR
+-------+----------------------+
|user_id|email                 |
+-------+----------------------+
|3395   |pucminas3@gmail.com   |
|3650   |pucminas4@gmail.com.br|
+-------+----------------------+

2
DADOS DESATUALIZADOS
+-------+------------------------+
|user_id|email                   |
+-------+------------------------+
|3395   |marcos.collier@email.com|
|3650   |jonah.barrows@email.com |
+-------+------------------------+



800

## Upserts

In [None]:
# !rm -rf /home/jovyan/work/datalake/s3/bronze/user/

In [35]:
(
    deltaTable.alias("desatualizados")
    .merge(
        df_new.alias("atualizados"),"desatualizados.user_id = atualizados.user_id"
    )
    .whenMatchedUpdateAll(
        condition = "desatualizados.user_id = atualizados.user_id"
    )
    .whenNotMatchedInsertAll()
    .execute()
)

## Após Upserts

In [40]:
delta_atualizado = deltaTable.toDF()

(
    delta_atualizado
    .where(
        """
        user_id 
            in (3650, 3395)
        """)
    .show(truncate=False)
)

+-------+----------------------+
|user_id|email                 |
+-------+----------------------+
|3395   |pucminas3@gmail.com   |
|3650   |pucminas4@gmail.com.br|
+-------+----------------------+



## Viagem no Tempo

In [41]:
history_delta = deltaTable.history()

In [42]:
history_delta.show(
    vertical=True,  
    truncate=False,
    n=10
)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 21                                                                                                                                                                                                                                                                                
 timestamp           | 2022-02-10 23:55:00.826                                                                                                                                                                                                                                                           
 userId              | null                                                                               

In [50]:
(
    history_delta
    .select("version",
            "operation",
            "timestamp",
            "operationMetrics"
    )
    .where("operation == 'WRITE'")
    .show(truncate=False)
)

+-------+---------+-----------------------+--------------------------------------------------------------+
|version|operation|timestamp              |operationMetrics                                              |
+-------+---------+-----------------------+--------------------------------------------------------------+
|18     |WRITE    |2022-02-10 23:46:57.332|{numFiles -> 4, numOutputRows -> 800, numOutputBytes -> 18915}|
|17     |WRITE    |2022-02-10 23:46:31.807|{numFiles -> 4, numOutputRows -> 800, numOutputBytes -> 18915}|
|16     |WRITE    |2022-02-10 23:45:59.469|{numFiles -> 4, numOutputRows -> 800, numOutputBytes -> 18915}|
|15     |WRITE    |2022-02-10 23:41:50.094|{numFiles -> 4, numOutputRows -> 800, numOutputBytes -> 18915}|
|14     |WRITE    |2022-02-10 23:39:22.25 |{numFiles -> 4, numOutputRows -> 800, numOutputBytes -> 18915}|
|13     |WRITE    |2022-02-10 23:39:14.852|{numFiles -> 4, numOutputRows -> 800, numOutputBytes -> 18915}|
|12     |WRITE    |2022-02-10 23:38:0

In [51]:
time_travel_version_0 = (
    spark
    .read
    .format("delta")
    .option("versionAsOf", "0")
    .load(path_bronze["user"])
)

print("VIAJANDO NO TEMPO PARA A VERSÃO 0 DA INGESTÃO")
(
    time_travel_version_0
    .where("user_id in (3650, 3395)")
    .show(truncate=False)
)

VIAJANDO NO TEMPO PARA A VERSÃO 0 DA INGESTÃO
+-------+------------------------+
|user_id|email                   |
+-------+------------------------+
|3395   |marcos.collier@email.com|
|3650   |jonah.barrows@email.com |
+-------+------------------------+



In [54]:
(
    history_delta
    .select("version",
            "operation",
            "timestamp",
            "operationMetrics"
    )
    .where("operation == 'WRITE'")
    .show(vertical=True, 
          truncate=False
    )
)

-RECORD 0--------------------------------------------------------------------------
 version          | 18                                                             
 operation        | WRITE                                                          
 timestamp        | 2022-02-10 23:46:57.332                                        
 operationMetrics | {numFiles -> 4, numOutputRows -> 800, numOutputBytes -> 18915} 
-RECORD 1--------------------------------------------------------------------------
 version          | 17                                                             
 operation        | WRITE                                                          
 timestamp        | 2022-02-10 23:46:31.807                                        
 operationMetrics | {numFiles -> 4, numOutputRows -> 800, numOutputBytes -> 18915} 
-RECORD 2--------------------------------------------------------------------------
 version          | 16                                                      

In [55]:
time_travel_timestamp = (
    spark
    .read
    .format("delta")
    .option("timestampAsOf", "2022-02-10 23:45:59.469") # register timestamp
    .load(path_bronze["user"])
)

print("VIAJANDO NO TEMPO PARA DATA/HORA QUE HOUVE O UPSERT DOS DADOS")

time_travel_timestamp.where("user_id in (3650, 3395)").show(truncate=False)

VIAJANDO NO TEMPO PARA DATA/HORA QUE HOUVE O UPSERT DOS DADOS
+-------+------------------------+
|user_id|email                   |
+-------+------------------------+
|3395   |marcos.collier@email.com|
|3650   |jonah.barrows@email.com |
+-------+------------------------+



## Comparando modificações entre diferentes versões do Delta

In [61]:
#time_travel_timestamp.exceptAll(time_travel_version_0).show()

## Evolution Schema (Evolução da estrutura da Tabela)

## Concistencia do DADO (ACID) - C

In [62]:
df_user = (
    spark
    .read
    .format("json")
    .load(path_landing["user"])
    .select("user_id", "email", "gender")
)

print("*"*20)
print("NOVA COLUNA A SER INSERIDA")
df_user.printSchema()
print("*"*20)
print("ÚLTIMO ESTADO DA TABELA DELTA")
delta_atualizado.printSchema()

********************
NOVA COLUNA A SER INSERIDA
root
 |-- user_id: long (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)

********************
ÚLTIMO ESTADO DA TABELA DELTA
root
 |-- user_id: long (nullable = true)
 |-- email: string (nullable = true)



## Tentativa de Sobrescrever o dado sem evoluir o Schema (Concistência)

In [64]:
# (
#     df_user
#     .write
#     .format("delta")
#     .mode("overwrite")
#     .save(path_bronze["user"])
# )

## Realizando a evolução do Schema com a opção MergeSchema
- A opção mergeSchema vai substituir tudo que não existir no último estado com o valor padrão nulo.

In [65]:
(
    df_user
    .write
    .format("delta")
    .mode("overwrite")
    .option("mergeSchema", True)
    .save(path_bronze["user"])
)

## Lendo os dados após o MergeSchema

In [66]:
df_user = (
    spark
    .read
    .format("delta")
    .load(path_bronze["user"])
    .select("user_id", "email", "gender")
)

print("*"*20)
print("ESTRUTURA DA TABELA DELTA")
df_user.printSchema()
print("*"*20)
print("VISUALIZANDO TABELA DELTA")
df_user.show()

********************
ESTRUTURA DA TABELA DELTA
root
 |-- user_id: long (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)

********************
VISUALIZANDO TABELA DELTA
+-------+--------------------+-----------+
|user_id|               email|     gender|
+-------+--------------------+-----------+
|   9177|danilo.krajcik@em...|   Bigender|
|   6483|santos.nitzsche@e...|Genderfluid|
|   4060|dwain.bernier@ema...|       Male|
|   4493|felica.schroeder@...|   Bigender|
|   4069|jesica.nienow@ema...|Genderqueer|
|   9060|delores.ratke@ema...|     Female|
|   8413|dorian.macejkovic...|    Agender|
|   4567| roman.rau@email.com| Polygender|
|    327|heriberto.cronin@...| Non-binary|
|   8234|richie.bahringer@...|Genderqueer|
|   5541|lelia.quitzon@ema...|   Bigender|
|   7531|lianne.heathcote@...| Polygender|
|   2425|melani.turner@ema...|Genderqueer|
|   5042|kellee.parisian@e...|       Male|
|   5002|dannie.schroeder@...|    Agender|
|   9083|diana

## Removendo coluna e fazendo o MergeSchema novamente

In [67]:
df_user = df_user.drop("gender")
df_user.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- email: string (nullable = true)



In [68]:
(
    df_user
    .write
    .format("delta")
    .mode("overwrite")
    .option("mergeSchema", True)
    .save(path_bronze["user"])
)

## Lendo os dados após o MergeSchema

In [69]:
df_user = (
    spark
    .read
    .format("delta")
    .load(path_bronze["user"])
    .select("user_id", "email", "gender")
)

print("*"*20)
print("ESTRUTURA DA TABELA DELTA")
df_user.printSchema()
print("*"*20)
print("VISUALIZANDO TABELA DELTA")
df_user.show()

********************
ESTRUTURA DA TABELA DELTA
root
 |-- user_id: long (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)

********************
VISUALIZANDO TABELA DELTA
+-------+--------------------+------+
|user_id|               email|gender|
+-------+--------------------+------+
|   9177|danilo.krajcik@em...|  null|
|   6483|santos.nitzsche@e...|  null|
|   4060|dwain.bernier@ema...|  null|
|   4493|felica.schroeder@...|  null|
|   4069|jesica.nienow@ema...|  null|
|   9060|delores.ratke@ema...|  null|
|   8413|dorian.macejkovic...|  null|
|   4567| roman.rau@email.com|  null|
|    327|heriberto.cronin@...|  null|
|   8234|richie.bahringer@...|  null|
|   5541|lelia.quitzon@ema...|  null|
|   7531|lianne.heathcote@...|  null|
|   2425|melani.turner@ema...|  null|
|   5042|kellee.parisian@e...|  null|
|   5002|dannie.schroeder@...|  null|
|   9083|diana.hudson@emai...|  null|
|   7880|kenneth.connelly@...|  null|
|   3802|kristyn.dicki@ema..

## Realizando a evolução do Schema com a opção Overwrite Schema
- A opção Overwrite Schema vai gravar uma nova versão da tabela delta removendo a coluna de forma literal.

In [74]:
df_user = (
    spark
    .read
    .format("json")
    .load(path_landing["user"])
    .select("user_id","email","gender")
)

df_user = df_user.drop("gender")
print("*"*20)
print("ESTRUTURA DA TABELA DELTA")
df_user.printSchema()
print("*"*20)
print("VISUALIZANDO TABELA DELTA")
df_user.show()

********************
ESTRUTURA DA TABELA DELTA
root
 |-- user_id: long (nullable = true)
 |-- email: string (nullable = true)

********************
VISUALIZANDO TABELA DELTA
+-------+--------------------+
|user_id|               email|
+-------+--------------------+
|   1703|daron.bailey@emai...|
|   3650|jonah.barrows@ema...|
|   8809|carla.hansen@emai...|
|   4606|tomas.ledner@emai...|
|      1|alyse.ortiz@email...|
|   9245|russell.kulas@ema...|
|   3425|armida.lehner@ema...|
|   4264|tad.sanford@email...|
|   1668|rosia.jones@email...|
|    343|candy.conroy@emai...|
|   7393|dulcie.gottlieb@e...|
|   3909|rodrigo.reynolds@...|
|   9952|jenna.bode@email.com|
|   2364|dan.herman@email.com|
|   1611|stanley.witting@e...|
|   1723|clarinda.kilback@...|
|   7032|charley.carroll@e...|
|    549|cameron.harris@em...|
|   4161|reyes.stracke@ema...|
|    503|jolynn.schulist@e...|
+-------+--------------------+
only showing top 20 rows



## Sobrescrevendo a estrutura da Tabela

In [75]:
(
    df_user
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", True)
    .save(path_bronze["user"])
)

## Lendo após sobrescrita da estrutura da Tabela

In [76]:
df_user = (
    spark
    .read
    .format("delta")
    .load(path_bronze["user"])
)

print("*"*20)
print("ESTRUTURA DA TABELA DELTA")
df_user.printSchema()
print("*"*20)
print("VISUALIZANDO TABELA DELTA")
df_user.show()

********************
ESTRUTURA DA TABELA DELTA
root
 |-- user_id: long (nullable = true)
 |-- email: string (nullable = true)

********************
VISUALIZANDO TABELA DELTA
+-------+--------------------+
|user_id|               email|
+-------+--------------------+
|   9177|danilo.krajcik@em...|
|   6483|santos.nitzsche@e...|
|   4060|dwain.bernier@ema...|
|   4493|felica.schroeder@...|
|   4069|jesica.nienow@ema...|
|   9060|delores.ratke@ema...|
|   8413|dorian.macejkovic...|
|   4567| roman.rau@email.com|
|    327|heriberto.cronin@...|
|   8234|richie.bahringer@...|
|   5541|lelia.quitzon@ema...|
|   7531|lianne.heathcote@...|
|   2425|melani.turner@ema...|
|   5042|kellee.parisian@e...|
|   5002|dannie.schroeder@...|
|   9083|diana.hudson@emai...|
|   7880|kenneth.connelly@...|
|   3802|kristyn.dicki@ema...|
|   7982| shyla.rau@email.com|
|   9925|keena.bahringer@e...|
+-------+--------------------+
only showing top 20 rows



## Forma fácil de realizar uma Migração de Data Lake para Delta

In [83]:
#deltaTable = DeltaTable.convertToDelta(spark, f"parquet.`/home/jovyan/work/datalake/legacy-parquet/user/`")
deltaTable = DeltaTable.forPath(spark, "/home/jovyan/work/datalake/legacy-parquet/user/")

In [88]:
deltaTable.history().show(truncate=False,vertical=True)

-RECORD 0--------------------------------------------------------------------------
 version             | 0                                                           
 timestamp           | 2022-02-11 00:19:01.977                                     
 userId              | null                                                        
 userName            | null                                                        
 operation           | CONVERT                                                     
 operationParameters | {numFiles -> 4, partitionedBy -> [], collectStats -> false} 
 job                 | null                                                        
 notebook            | null                                                        
 clusterId           | null                                                        
 readVersion         | -1                                                          
 isolationLevel      | Serializable                                         