In [1]:
!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
                        
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_292
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z
Revision de351e30a90dd988b133b3d00fa6218bfcaba8b8
Url https://github.com/apache/spark
Type --help for more information.


In [59]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import DataFrame

In [60]:
spark = pyspark.sql.SparkSession.builder.appName("Pyspark - Deltalake")\
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0")\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .config("spark.sql.repl.eagerEval.maxNumRows", 5)\
    .config("spark.sql.repl.eagerEval.enabled", True)\
    .getOrCreate()

In [61]:
def read_data(path: str, format: str) -> DataFrame:
        """[Read data on local path]

        Args:
            path (str): [data location path]

        Returns:
            DataFrame: [Dataframe with data]
        """        
        df = spark.read.format(format)\
            .option("inferSchema", "true")\
            .option("header", "true")\
            .load(path)

        return df

In [62]:
def write_data(df: DataFrame, path: str, write_delta_mode: str, format_file: str) -> str:
        """[Write data on format delta]

        Args:
            df (DataFrame): [Dataframe with data]
            path (str): [data location path]
            write_delta_mode (str): [append, overwrite]

        Returns:
            str: [Message after write]
        """        
        df.write.mode(write_delta_mode)\
        .format(format_file)\
        .save(path)

        return f"Data saved successfully on -> {path}"

## Lendo os dados da Landing [Users]

In [63]:
df = read_data(f"../landing/user/*.json", "json")

- Limitando as colunas para facilitar na visualização

In [64]:
df = df.select("user_id","email")

- Verificando se as colunas estão corretas

In [72]:
df

user_id,email
1703,daron.bailey@emai...
3650,jonah.barrows@ema...
8809,carla.hansen@emai...
4606,tomas.ledner@emai...
1,alyse.ortiz@email...


- Escrevendo os dados em formato delta com mode append

In [66]:
write_data(df, f"../delta/bronze/user/", "append", "delta")

'Data saved successfully on -> ../delta/bronze/user/'

## Cria novos dados para realização de Upserts

- Obs: os Ids 1703 e 3650, existem no DataFrame lido diretamente da landing. O objetivo disso é que esses são os user_id que vão ter o email modificado

In [68]:
items = [("1703","jpcarlos336@gmail.com"), ("3650", "carlos.barbosa@a3data.com.br")]

- Herdando schema do DataFrame chamado na landing

In [70]:
cols = df.columns
cols

['user_id', 'email']

- Criando DataFrame a partir dos items e cols

In [71]:
merge_table = spark.createDataFrame(items, cols)
merge_table

user_id,email
1703,jpcarlos336@gmail...
3650,carlos.barbosa@a3...


## Importando lib para acessar métodos do Delta

In [46]:
from delta.tables import *

- Seleciona usuários que irão sofrer Updates

In [76]:
# DataFrame que vai sofrer atualização nos user_id (1703, 3650)
df.where("user_id in (1703, 3650)").show(truncate=False)

+-------+-----------------------+
|user_id|email                  |
+-------+-----------------------+
|1703   |daron.bailey@email.com |
|3650   |jonah.barrows@email.com|
+-------+-----------------------+



In [77]:
# Dados que irão atualizar o DataFrame acima
merge_table.show(truncate=False)

+-------+----------------------------+
|user_id|email                       |
+-------+----------------------------+
|1703   |jpcarlos336@gmail.com       |
|3650   |carlos.barbosa@a3data.com.br|
+-------+----------------------------+



In [78]:
# Acessa a Tabela Delta Lake
deltaTable = DeltaTable.forPath(spark, "../delta/bronze/user")

In [80]:
# Acessando os metadados das operações realizados no Delta Lake
deltaTable.history()

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
0,2021-07-20 13:41:22,,,WRITE,"{mode -> Append, ...",,,,,,True,"{numFiles -> 6, n...",


- Realização do Merge no qual atualizamos o campo email para os user_id (1703, 3650)

In [81]:
deltaTable.alias("deltaTable") \
    .merge(merge_table.alias("updates"),"deltaTable.user_id = updates.user_id") \
    .whenMatchedUpdateAll(condition = "deltaTable.user_id = updates.user_id" ) \
    .whenNotMatchedInsertAll() \
    .execute()

In [82]:
# Convertemos a Delta Table para Dataframe para visualizar a modificação
delta = deltaTable.toDF()

- Verificamos a mudança dos emails para os usuários.

In [83]:
delta.where("user_id in (1703, 3650)").show(truncate=False)

+-------+----------------------------+
|user_id|email                       |
+-------+----------------------------+
|1703   |jpcarlos336@gmail.com       |
|3650   |carlos.barbosa@a3data.com.br|
+-------+----------------------------+



In [85]:
# Exibindo dados completos
delta.show()

+-------+--------------------+
|user_id|               email|
+-------+--------------------+
|   3395|marcos.collier@em...|
|   1556|elina.hills@email...|
|   1879|enedina.schroeder...|
|   7805|colin.ryan@email.com|
|   3982|dallas.boyle@emai...|
|   7274|grover.towne@emai...|
|   3184|dexter.schmitt@em...|
|    550|novella.weber@ema...|
|   8365|lesley.mccullough...|
|   4942|marti.marks@email...|
|   8327|shawnna.keebler@e...|
|   9464|guillermo.beahan@...|
|   4123|sid.bechtelar@ema...|
|   2281|merrill.upton@ema...|
|   6998|felipe.ward@email...|
|   7440|willie.walsh@emai...|
|   8397|jae.krajcik@email...|
|   9437|wilfredo.bailey@e...|
|   3148|josefa.marvin@ema...|
|   2535|loyd.hintz@email.com|
+-------+--------------------+
only showing top 20 rows

