In [None]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder \
    .appName("Testando Operações DeltaLake") \
    .enableHiveSupport() \
    .getOrCreate()

spark

## Criando dados artificiais

Vamos criar uma tabela com dados artificiais com base em um dicionário python

In [2]:
dataDict = [
    (200001, "Michael", "Scott", "Regional Manager"),
    (200002, "Dwight", "Schrute", "Sales"),
    (200003, "Jim", "Halpert", "Sales"),
    (200004, "Phyllis", "Lapin", "Sales"),
    (200005, "Stanley", "Hudson", "Sales"),
    (200006, "Angela", "Martin", "Accounting"),
    (200007, "Kevin", "Malone", "Accounting"),
    (200008, "Oscar", "Martinez", "Accounting"),
    (200009, "Creed", "Bratton", "Quality Assurance"),
    (200010, "Meredith", "Palmer", "Supplier Relations"),
    (200011, "Pamela", "Beesly", "Recepctionist"),
    (200012, "Kelly", "Kapoor", "Customer Service"),
    (200013, "Ryan", "Howard", "Temp"),
    (200014, "Toby", "Flenderson", "Human Resources"),
    (200015, "Darryl", "Philbin", "Warehouse Foreman")
]

In [3]:
df1 = spark.createDataFrame(data = dataDict, schema = ["empId", "firstName", "lastName", "job"])
df1.show()

                                                                                

+------+---------+----------+------------------+
| empId|firstName|  lastName|               job|
+------+---------+----------+------------------+
|200001|  Michael|     Scott|  Regional Manager|
|200002|   Dwight|   Schrute|             Sales|
|200003|      Jim|   Halpert|             Sales|
|200004|  Phyllis|     Lapin|             Sales|
|200005|  Stanley|    Hudson|             Sales|
|200006|   Angela|    Martin|        Accounting|
|200007|    Kevin|    Malone|        Accounting|
|200008|    Oscar|  Martinez|        Accounting|
|200009|    Creed|   Bratton| Quality Assurance|
|200010| Meredith|    Palmer|Supplier Relations|
|200011|   Pamela|    Beesly|     Recepctionist|
|200012|    Kelly|    Kapoor|  Customer Service|
|200013|     Ryan|    Howard|              Temp|
|200014|     Toby|Flenderson|   Human Resources|
|200015|   Darryl|   Philbin| Warehouse Foreman|
+------+---------+----------+------------------+



## Criando Tabelas Delta no Data Lake + Metastore

Criando o database dundermifflin caso ele não exista em nosso metastore e criar a tabela DELTA employees com os dados que criamos

In [4]:
# Criando o DATABASE (caso não exista)

spark.sql("CREATE DATABASE IF NOT EXISTS dundermifflin")

# Dropando as tabelas caso já tenham sido criadas em outra sessão)
spark.sql("DROP TABLE IF EXISTS dundermifflin.employees")
spark.sql("DROP TABLE IF EXISTS dundermifflin.employees_updated")

23/02/24 00:16:10 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/02/24 00:16:10 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
23/02/24 00:16:12 WARN ObjectStore: Failed to get database dundermifflin, returning NoSuchObjectException
23/02/24 00:16:12 WARN ObjectStore: Failed to get database dundermifflin, returning NoSuchObjectException
23/02/24 00:16:12 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
23/02/24 00:16:12 WARN ObjectStore: Failed to get database dundermifflin, returning NoSuchObjectException


DataFrame[]

In [5]:
# Escrevendo o conteúdo do DataFrame df1 na tabela delta dundermifflin.employees

df1.write \
    .option("overwriteSchema", "true")\
    .format("delta")\
    .mode("overwrite")\
    .saveAsTable("dundermifflin.employees")

                                                                                

23/02/24 00:16:41 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `dundermifflin`.`employees` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
23/02/24 00:16:41 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
23/02/24 00:16:41 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
23/02/24 00:16:41 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/02/24 00:16:41 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist


É possível listar as tabelas dentro de um determinado database através da função abaixo

In [6]:
spark.catalog.listTables(dbName = "dundermifflin")

[Table(name='employees', database='dundermifflin', description=None, tableType='MANAGED', isTemporary=False)]

Ou através do Spark SQL:

In [7]:
spark.sql("SHOW TABLES IN dundermifflin").show()

+-------------+---------+-----------+
|    namespace|tableName|isTemporary|
+-------------+---------+-----------+
|dundermifflin|employees|      false|
+-------------+---------+-----------+



# Lendo Tabelas Delta diretamente do S3

In [8]:
df1 = spark.read.format("delta").load("s3a://warehouse/dundermifflin.db/employees")
df1.show()

+------+---------+----------+------------------+
| empId|firstName|  lastName|               job|
+------+---------+----------+------------------+
|200010| Meredith|    Palmer|Supplier Relations|
|200015|   Darryl|   Philbin| Warehouse Foreman|
|200014|     Toby|Flenderson|   Human Resources|
|200009|    Creed|   Bratton| Quality Assurance|
|200001|  Michael|     Scott|  Regional Manager|
|200012|    Kelly|    Kapoor|  Customer Service|
|200011|   Pamela|    Beesly|     Recepctionist|
|200008|    Oscar|  Martinez|        Accounting|
|200006|   Angela|    Martin|        Accounting|
|200007|    Kevin|    Malone|        Accounting|
|200002|   Dwight|   Schrute|             Sales|
|200005|  Stanley|    Hudson|             Sales|
|200004|  Phyllis|     Lapin|             Sales|
|200003|      Jim|   Halpert|             Sales|
|200013|     Ryan|    Howard|              Temp|
+------+---------+----------+------------------+



# Lendo Tabelas Delta a partir do Metastore

### • Utilizando Spark SQL + Metastore

In [9]:
df1 = spark.sql("SELECT * FROM dundermifflin.employees")
df1.show()

+------+---------+----------+------------------+
| empId|firstName|  lastName|               job|
+------+---------+----------+------------------+
|200010| Meredith|    Palmer|Supplier Relations|
|200015|   Darryl|   Philbin| Warehouse Foreman|
|200014|     Toby|Flenderson|   Human Resources|
|200009|    Creed|   Bratton| Quality Assurance|
|200001|  Michael|     Scott|  Regional Manager|
|200012|    Kelly|    Kapoor|  Customer Service|
|200011|   Pamela|    Beesly|     Recepctionist|
|200008|    Oscar|  Martinez|        Accounting|
|200006|   Angela|    Martin|        Accounting|
|200007|    Kevin|    Malone|        Accounting|
|200002|   Dwight|   Schrute|             Sales|
|200005|  Stanley|    Hudson|             Sales|
|200004|  Phyllis|     Lapin|             Sales|
|200003|      Jim|   Halpert|             Sales|
|200013|     Ryan|    Howard|              Temp|
+------+---------+----------+------------------+



### • Utilizando PySpark + Metastore

In [10]:
df1 = spark.read.table("dundermifflin.employees")
df1.show()

+------+---------+----------+------------------+
| empId|firstName|  lastName|               job|
+------+---------+----------+------------------+
|200010| Meredith|    Palmer|Supplier Relations|
|200015|   Darryl|   Philbin| Warehouse Foreman|
|200014|     Toby|Flenderson|   Human Resources|
|200009|    Creed|   Bratton| Quality Assurance|
|200001|  Michael|     Scott|  Regional Manager|
|200012|    Kelly|    Kapoor|  Customer Service|
|200011|   Pamela|    Beesly|     Recepctionist|
|200008|    Oscar|  Martinez|        Accounting|
|200006|   Angela|    Martin|        Accounting|
|200007|    Kevin|    Malone|        Accounting|
|200002|   Dwight|   Schrute|             Sales|
|200005|  Stanley|    Hudson|             Sales|
|200004|  Phyllis|     Lapin|             Sales|
|200003|      Jim|   Halpert|             Sales|
|200013|     Ryan|    Howard|              Temp|
+------+---------+----------+------------------+



## Atualizando registros na tabela

Com tabelas Delta podemos atualizar dados diretamente em uma tabela gravada no deu data lake.

Vamos atualizar o campo __job__ do empregado __Dwight Schrute__ para __'Assistant To The Regional Manager'__

In [11]:
from delta.tables import *
from pyspark.sql.functions import *

Podemos utilizar PySpark para obter o __empId__ de Dwight e utilizá-lo como chave para alteração do registro

In [12]:
dwight_id = df1\
    .select("empId")\
    .where(col('firstName') == 'Dwight')\
    .collect()[0]\
    .__getitem__('empId')
dwight_id

200002

Podemos atualizar o dado diretamente na tabela utilizando as classes do pacote delta.tables

• Utilizando um string SQL formatado

In [13]:
deltaTable = DeltaTable.forName(spark, "dundermifflin.employees")
#deltaTable= DeltaTable.forPath(spark, "s3a://warehouse/dundermifflin.db/employees")

deltaTable.update(
  condition = f"empId = {dwight_id}",
  set = {"job": "'Assistant To The Regional Manager'"}
)

                                                                                

• Utilizando funções de Spark SQL

In [14]:
deltaTable = DeltaTable.forName(spark, "dundermifflin.employees")
#deltaTable= DeltaTable.forPath(spark, "s3a://warehouse/dundermifflin.db/employees")

deltaTable.update(
  condition = col('empId') == dwight_id,
  set = {"job": lit('Assistant To The Regional Manager')}
)

                                                                                

In [15]:
spark.read.table("dundermifflin.employees").show(truncate = False)

+------+---------+----------+---------------------------------+
|empId |firstName|lastName  |job                              |
+------+---------+----------+---------------------------------+
|200002|Dwight   |Schrute   |Assistant To The Regional Manager|
|200010|Meredith |Palmer    |Supplier Relations               |
|200015|Darryl   |Philbin   |Warehouse Foreman                |
|200014|Toby     |Flenderson|Human Resources                  |
|200009|Creed    |Bratton   |Quality Assurance                |
|200001|Michael  |Scott     |Regional Manager                 |
|200012|Kelly    |Kapoor    |Customer Service                 |
|200011|Pamela   |Beesly    |Recepctionist                    |
|200008|Oscar    |Martinez  |Accounting                       |
|200006|Angela   |Martin    |Accounting                       |
|200007|Kevin    |Malone    |Accounting                       |
|200005|Stanley  |Hudson    |Sales                            |
|200004|Phyllis  |Lapin     |Sales      

## Deletando Registro na tabela

Com tabelas Delta podemos deletar dados diretamente em uma tabela gravada no deu data lake.

Vamos deletar o empregado __Ryan Howard__ da tabela __Employees__

Podemos utilizar PySpark para obter o __empId__ de Ryan e utilizá-lo como chave para alteração do registro

In [16]:
ryan_id = df1\
    .select("empId")\
    .where( (col('firstName') == 'Ryan') & (col('lastname') == 'Howard') )\
    .collect()[0]\
    .__getitem__('empId')
ryan_id

200013

In [18]:
deltaTable = DeltaTable.forName(spark, "dundermifflin.employees")
#deltaTable= DeltaTable.forPath(spark, "s3a://warehouse/dundermifflin.db/employees")

deltaTable.delete(col('empId') == ryan_id)

# Pode executar também com string fromatado SQL
# deltaTable.delete(f"empId = {ryan_id}")

spark.read.table("dundermifflin.employees").show(truncate = False)

+------+---------+----------+---------------------------------+
|empId |firstName|lastName  |job                              |
+------+---------+----------+---------------------------------+
|200002|Dwight   |Schrute   |Assistant To The Regional Manager|
|200010|Meredith |Palmer    |Supplier Relations               |
|200015|Darryl   |Philbin   |Warehouse Foreman                |
|200014|Toby     |Flenderson|Human Resources                  |
|200009|Creed    |Bratton   |Quality Assurance                |
|200001|Michael  |Scott     |Regional Manager                 |
|200012|Kelly    |Kapoor    |Customer Service                 |
|200011|Pamela   |Beesly    |Recepctionist                    |
|200008|Oscar    |Martinez  |Accounting                       |
|200006|Angela   |Martin    |Accounting                       |
|200007|Kevin    |Malone    |Accounting                       |
|200005|Stanley  |Hudson    |Sales                            |
|200004|Phyllis  |Lapin     |Sales      

## Executando UPSERTS (Merges)

UPSERTS são operações que permitem a escrita e alteração de poucos registros de uma tabela, sem a necessidade de sobrescrever a tabela inteira

### UPSERTS = UPDATES + INSERTS. 

Ao aplicar um UPSERT você insere novos registros e atualiza os registros já existentes em uma tabela alvo, processando apenas os registros necessários.

É possível também provocar a deleção de alguns registros ao utilizar essas operações.

Vamos resetar nossa tabela __employees__ para executar todas as alterações novamente usando um UPSERT 

In [19]:
dataDict = [
    (200001, "Michael", "Scott", "Regional Manager"),
    (200002, "Dwight", "Schrute", "Sales"),
    (200003, "Jim", "Halpert", "Sales"),
    (200004, "Phyllis", "Lapin", "Sales"),
    (200005, "Stanley", "Hudson", "Sales"),
    (200006, "Angela", "Martin", "Accounting"),
    (200007, "Kevin", "Malone", "Accounting"),
    (200008, "Oscar", "Martinez", "Accounting"),
    (200009, "Creed", "Bratton", "Quality Assurance"),
    (200010, "Meredith", "Palmer", "Supplier Relations"),
    (200011, "Pamela", "Beesly", "Recepctionist"),
    (200012, "Kelly", "Kapoor", "Customer Service"),
    (200013, "Ryan", "Howard", "Temp"),
    (200014, "Toby", "Flenderson", "Human Resources"),
    (200015, "Darryl", "Philbin", "Warehouse Foreman")
]

df1 = spark.createDataFrame(data = dataDict, schema = ["empId", "firstName", "lastName", "job"])

df1.write \
    .option("overwriteSchema", "true")\
    .format("delta")\
    .mode("overwrite")\
    .saveAsTable("dundermifflin.employees")

                                                                                

Um UPSERT necessita de dois DataFrames: um __original__ e outro com as __atualizado__

Já temos o __original__ armazenado em nosso metastore (tabela employees) , agora precisamos criar um DataFrame com as atualizações que queremos e chamando-o de dfUpdates

In [20]:
deltaTableUpdates = DeltaTable.forName(spark, "dundermifflin.employees")
#deltaTableUpdates = DeltaTable.forPath(spark, "s3a://warehouse/dundermifflin.db/employees")

dfUpdates = deltaTableUpdates.toDF()
dfUpdates.show()

+------+---------+----------+------------------+
| empId|firstName|  lastName|               job|
+------+---------+----------+------------------+
|200010| Meredith|    Palmer|Supplier Relations|
|200015|   Darryl|   Philbin| Warehouse Foreman|
|200009|    Creed|   Bratton| Quality Assurance|
|200014|     Toby|Flenderson|   Human Resources|
|200001|  Michael|     Scott|  Regional Manager|
|200012|    Kelly|    Kapoor|  Customer Service|
|200011|   Pamela|    Beesly|     Recepctionist|
|200008|    Oscar|  Martinez|        Accounting|
|200006|   Angela|    Martin|        Accounting|
|200007|    Kevin|    Malone|        Accounting|
|200002|   Dwight|   Schrute|             Sales|
|200005|  Stanley|    Hudson|             Sales|
|200004|  Phyllis|     Lapin|             Sales|
|200003|      Jim|   Halpert|             Sales|
|200013|     Ryan|    Howard|              Temp|
+------+---------+----------+------------------+



## As alterações que faremos serão:

• Alterar os sobrenomes de alguns funcionários

• Alterar o cargo de Dwight Schrute para Assistant to the Regional Manager

• Criar uma coluna __markedTermination__ que será utilizada como marcador para demissão

• Adicionar um funcionário novo na lista


In [21]:
# Alterando nomes
dfUpdates = dfUpdates.withColumn('lastName',
                                 when(col('empId') == 200011, 'Beesly Halpert')
                                 .when(col('empId') == 200004, 'Lapin Vance')
                                 .otherwise(col('lastName')))
# Alterando cargos
dfUpdates = dfUpdates.withColumn('job',\
                                 when(col('firstName') == 'Dwight', 'Assistant to the Regional Manager')\
                                 .otherwise(col('job')))
# Criando um campo novo
dfUpdates = dfUpdates.withColumn('markedTermination',\
                                 when(col('firstName') == 'Kevin', True)\
                                 .otherwise(False))

dfUpdates.show(truncate = False)

+------+---------+--------------+---------------------------------+-----------------+
|empId |firstName|lastName      |job                              |markedTermination|
+------+---------+--------------+---------------------------------+-----------------+
|200010|Meredith |Palmer        |Supplier Relations               |false            |
|200015|Darryl   |Philbin       |Warehouse Foreman                |false            |
|200009|Creed    |Bratton       |Quality Assurance                |false            |
|200014|Toby     |Flenderson    |Human Resources                  |false            |
|200001|Michael  |Scott         |Regional Manager                 |false            |
|200012|Kelly    |Kapoor        |Customer Service                 |false            |
|200011|Pamela   |Beesly Halpert|Recepctionist                    |false            |
|200008|Oscar    |Martinez      |Accounting                       |false            |
|200006|Angela   |Martin        |Accounting           

In [22]:
newEmp = [
    (200016, "Andy", "Bernard", "Sales", False)
]

dfnewEmp = spark.createDataFrame(data = newEmp, schema = ["empId", "firstName", "lastName", "job", "markedTermination"])

dfnewEmp.show()

+------+---------+--------+-----+-----------------+
| empId|firstName|lastName|  job|markedTermination|
+------+---------+--------+-----+-----------------+
|200016|     Andy| Bernard|Sales|            false|
+------+---------+--------+-----+-----------------+



In [23]:
dfUpdates = dfUpdates.union(dfnewEmp)

dfUpdates.show(truncate = False)

+------+---------+--------------+---------------------------------+-----------------+
|empId |firstName|lastName      |job                              |markedTermination|
+------+---------+--------------+---------------------------------+-----------------+
|200010|Meredith |Palmer        |Supplier Relations               |false            |
|200015|Darryl   |Philbin       |Warehouse Foreman                |false            |
|200009|Creed    |Bratton       |Quality Assurance                |false            |
|200014|Toby     |Flenderson    |Human Resources                  |false            |
|200001|Michael  |Scott         |Regional Manager                 |false            |
|200012|Kelly    |Kapoor        |Customer Service                 |false            |
|200011|Pamela   |Beesly Halpert|Recepctionist                    |false            |
|200008|Oscar    |Martinez      |Accounting                       |false            |
|200006|Angela   |Martin        |Accounting           

Essas alterações foram feitas apenas para fins didáticos. Na vida real provavelmente você vai ter mais um batch de alterações armazenado em algum bucket ou tabela e que será necessário processar

In [24]:
dfUpdates.write \
    .option("overwriteSchema", "true")\
    .format("delta")\
    .mode("overwrite")\
    .saveAsTable("dundermifflin.employees_updated")

                                                                                

23/02/24 00:19:07 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `dundermifflin`.`employees_updated` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


In [25]:
deltaTable = DeltaTable.forName(spark, "dundermifflin.employees")

deltaTableUpdates = DeltaTable.forName(spark, "dundermifflin.employees_updated")

In [26]:
deltaTable.alias('original').merge(dfUpdates.alias('updates'), 'original.empId = updates.empId')\
    .whenMatchedDelete(condition = "updates.markedTermination = true")\
    .whenMatchedUpdate(condition = "updates.markedTermination = false",
                       set = 
                       {
                           "empId": "updates.empId",
                           "firstName": "updates.firstName",
                           "lastName": "updates.lastName",
                           "job": "updates.job"
                       }
                      ) \
    .whenNotMatchedInsert(values = 
                          {
                              "empId": "updates.empId",
                              "firstName": "updates.firstName",
                              "lastName": "updates.lastName",
                              "job": "updates.job"
                          }
                         )\
.execute()

spark.read.table("dundermifflin.employees").show(truncate = False)

                                                                                

+------+---------+--------------+---------------------------------+
|empId |firstName|lastName      |job                              |
+------+---------+--------------+---------------------------------+
|200001|Michael  |Scott         |Regional Manager                 |
|200002|Dwight   |Schrute       |Assistant to the Regional Manager|
|200003|Jim      |Halpert       |Sales                            |
|200004|Phyllis  |Lapin Vance   |Sales                            |
|200005|Stanley  |Hudson        |Sales                            |
|200006|Angela   |Martin        |Accounting                       |
|200008|Oscar    |Martinez      |Accounting                       |
|200009|Creed    |Bratton       |Quality Assurance                |
|200010|Meredith |Palmer        |Supplier Relations               |
|200011|Pamela   |Beesly Halpert|Recepctionist                    |
|200012|Kelly    |Kapoor        |Customer Service                 |
|200013|Ryan     |Howard        |Temp           

In [27]:
spark.sql('DESCRIBE HISTORY dundermifflin.employees').show()

+-------+-------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      5|2023-02-24 00:19:17|  null|    null|               MERGE|{predicate -> (or...|null|    null|     null|          4|  Serializable|        false|{numTargetRowsCop...|        null|Apache-Spark/3.3....|
|      4|2023-02-24 00:18:13|  null|    null|CREATE OR REPLACE...|{isManaged -> tru...|null|    null|     null|          3|  Serializable|        false|{numFiles -> 16,

In [29]:
# https://github.com/delta-io/delta/pull/1255 <- aguardando resolução
#spark.sql('SHOW CREATE TABLE dundermifflin.employees').show()