# Apache Spark com Delta Lake

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

from delta import *

In [2]:
# Create SparkSession
spark = (
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

:: loading settings :: url = jar:file:/home/julia/Downloads/spark-delta-iceberg/spark-delta/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/julia/.ivy2/cache
The jars for the packages stored in: /home/julia/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-df824b2d-6949-4e7b-840e-5a75c0fcd710;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 1498ms :: artifacts dl 24ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0 

In [3]:
spark

25/04/21 13:38:17 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


## Criação das Tabelas Delta

In [4]:
spark.sql("""
  CREATE TABLE cliente_delta (
    id INT,
    nome STRING,
    email STRING,
    telefone STRING
  ) USING delta
""")

spark.sql("""
  CREATE TABLE carro_delta (
    id INT,
    placa STRING,
    cliente_id INT
  ) USING delta
""")

                                                                                

DataFrame[]

In [5]:
spark.sql("SELECT * FROM cliente_delta").show()

25/04/21 13:39:15 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+---+----+-----+--------+
| id|nome|email|telefone|
+---+----+-----+--------+
+---+----+-----+--------+



In [6]:
spark.sql("SELECT * FROM carro_delta").show()

                                                                                

+---+-----+----------+
| id|placa|cliente_id|
+---+-----+----------+
+---+-----+----------+



In [7]:
from delta.tables import DeltaTable

cliente = DeltaTable.forPath(spark, "./spark-warehouse/cliente_delta")

carro = DeltaTable.forPath(spark, "./spark-warehouse/carro_delta")

In [8]:
cliente.history().show()

                                                                                

+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+--------------+-------------+----------------+------------+--------------------+
|version|           timestamp|userId|userName|   operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+--------------+-------------+----------------+------------+--------------------+
|      0|2025-04-21 13:38:...|  NULL|    NULL|CREATE TABLE|{partitionBy -> [...|NULL|    NULL|     NULL|       NULL|  Serializable|         true|              {}|        NULL|Apache-Spark/3.5....|
+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+--------------+-------------+----------------+------------+--------------------+



In [9]:
carro.history().show()

+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+--------------+-------------+----------------+------------+--------------------+
|version|           timestamp|userId|userName|   operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+--------------+-------------+----------------+------------+--------------------+
|      0|2025-04-21 13:39:...|  NULL|    NULL|CREATE TABLE|{partitionBy -> [...|NULL|    NULL|     NULL|       NULL|  Serializable|         true|              {}|        NULL|Apache-Spark/3.5....|
+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+--------------+-------------+----------------+------------+--------------------+



## Inserção de Dados nas Tabelas Delta

In [10]:
spark.sql("""
  INSERT INTO cliente_delta VALUES
  (1, 'Ana Souza', 'ana@email.com', '(11) 91234-5678'),
  (2, 'Bruno Lima', 'bruno@email.com', '(21) 99876-5432'),
  (3, 'Carla Mendes', 'carla@email.com', '(31) 98765-4321')
""")

spark.sql("""
  INSERT INTO carro_delta VALUES
  (1, 'XYZ1J34', 1),
  (2, 'RLC5B93', 2),
  (3, 'ABC9Z88', 1)
""")

                                                                                

DataFrame[]

In [11]:
spark.sql("SELECT * FROM cliente_delta").show()
spark.sql("SELECT * FROM carro_delta").show()

                                                                                

+---+------------+---------------+---------------+
| id|        nome|          email|       telefone|
+---+------------+---------------+---------------+
|  3|Carla Mendes|carla@email.com|(31) 98765-4321|
|  2|  Bruno Lima|bruno@email.com|(21) 99876-5432|
|  1|   Ana Souza|  ana@email.com|(11) 91234-5678|
+---+------------+---------------+---------------+



                                                                                

+---+-------+----------+
| id|  placa|cliente_id|
+---+-------+----------+
|  2|RLC5B93|         2|
|  3|ABC9Z88|         1|
|  1|XYZ1J34|         1|
+---+-------+----------+



## Comandos de Manipulação: UPDATE e DELETE

In [12]:
spark.sql("""
  UPDATE cliente_delta
  SET telefone = '(11) 90000-0000'
  WHERE id = 1
""")

spark.sql("""
  DELETE FROM carro_delta
  WHERE id = 2
""")

                                                                                

DataFrame[num_affected_rows: bigint]

## Visualização dos Dados e Histórico Delta Lake

In [13]:
spark.sql("SELECT * FROM cliente_delta").show()
spark.sql("SELECT * FROM carro_delta").show()


cliente.history().show(truncate=False)
carro.history().show(truncate=False)

                                                                                

+---+------------+---------------+---------------+
| id|        nome|          email|       telefone|
+---+------------+---------------+---------------+
|  3|Carla Mendes|carla@email.com|(31) 98765-4321|
|  2|  Bruno Lima|bruno@email.com|(21) 99876-5432|
|  1|   Ana Souza|  ana@email.com|(11) 90000-0000|
+---+------------+---------------+---------------+



                                                                                

+---+-------+----------+
| id|  placa|cliente_id|
+---+-------+----------+
|  3|ABC9Z88|         1|
|  1|XYZ1J34|         1|
+---+-------+----------+

+-------+-----------------------+------+--------+------------+----------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation   |operationParameters                                                                           |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                   