# Delta Lake House

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import  StructType, StructField, FloatType, StringType

from delta import *

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")  
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)


your 131072x1 screen size is bogus. expect trouble
24/07/29 11:36:12 WARN Utils: Your hostname, Home-PC resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/07/29 11:36:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/amslimaa/miniconda3/envs/PySparkLab/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/amslimaa/.ivy2/cache
The jars for the packages stored in: /home/amslimaa/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8968428f-4d75-4eb5-b427-ab10d341cdae;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 195ms :: artifacts dl 8ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0

In [3]:
data = [
    ("ID001", "CLIENTE_X","SP","ATIVO",   250000.00),
    ("ID002", "CLIENTE_Y","SC","INATIVO", 400000.00),
    ("ID003", "CLIENTE_Z","DF","ATIVO",   1000000.00)
]

schema = (
    StructType([
        StructField("ID_CLIENTE",     StringType(),True),
        StructField("NOME_CLIENTE",   StringType(),True),
        StructField("UF",             StringType(),True),
        StructField("STATUS",         StringType(),True),
        StructField("LIMITE_CREDITO", FloatType(), True)
    ])
)

df = spark.createDataFrame(data=data,schema=schema)

df.show(truncate=False)

                                                                                

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE|UF |STATUS |LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|ID001     |CLIENTE_X   |SP |ATIVO  |250000.0      |
|ID002     |CLIENTE_Y   |SC |INATIVO|400000.0      |
|ID003     |CLIENTE_Z   |DF |ATIVO  |1000000.0     |
+----------+------------+---+-------+--------------+



# Gravar delta table

In [4]:
(
    df
    .write
    .format("delta")
    .mode("overwrite")
    .save("./RAW/CLIENTES")
)

24/07/29 11:36:24 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

# NOVOS DADOS

In [5]:
new_data = [
    ("ID001","CLIENTE_X","SP","INATIVO", 0.00),
    ("ID002","CLIENTE_Y","SC","ATIVO",   400000.00),
    ("ID004","CLIENTE_Z","DF","ATIVO",   5000000.00)
]

df_new = spark.createDataFrame(data=new_data, schema=schema)

df_new.show()

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
+----------+------------+---+-------+--------------+



# UPSERT / MERGE

In [6]:
deltaTable = DeltaTable.forPath(spark, "./RAW/CLIENTES")

In [7]:
deltaTable.toDF().show()

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID002|   CLIENTE_Y| SC|INATIVO|      400000.0|
|     ID001|   CLIENTE_X| SP|  ATIVO|      250000.0|
|     ID003|   CLIENTE_Z| DF|  ATIVO|     1000000.0|
+----------+------------+---+-------+--------------+



In [8]:
(
    deltaTable.alias("dados_atuais")
    .merge(
        df_new.alias("novos_dados"),
        "dados_atuais.ID_CLIENTE = novos_dados.ID_CLIENTE"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

In [9]:
deltaTable.toDF().show()

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
|     ID003|   CLIENTE_Z| DF|  ATIVO|     1000000.0|
+----------+------------+---+-------+--------------+



# Delete dados

In [10]:
deltaTable.delete("LIMITE_CREDITO < 400000.0")


In [11]:
deltaTable.toDF().show()

+----------+------------+---+------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF|STATUS|LIMITE_CREDITO|
+----------+------------+---+------+--------------+
|     ID002|   CLIENTE_Y| SC| ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF| ATIVO|     5000000.0|
|     ID003|   CLIENTE_Z| DF| ATIVO|     1000000.0|
+----------+------------+---+------+--------------+



# HISTORY

In [12]:
(
    deltaTable
    .history()
)

DataFrame[version: bigint, timestamp: timestamp, userId: string, userName: string, operation: string, operationParameters: map<string,string>, job: struct<jobId:string,jobName:string,runId:string,jobOwnerId:string,triggerType:string>, notebook: struct<notebookId:string>, clusterId: string, readVersion: bigint, isolationLevel: string, isBlindAppend: boolean, operationMetrics: map<string,string>, userMetadata: string, engineInfo: string]

In [13]:
df_delta = deltaTable.history()

(
    df_delta
    .select("version", "timestamp", "operation", "operationMetrics")
    .show(truncate=False, vertical=True)
)

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version          | 10                                                                                                                                                                                                                                                                                                                                                                                                                                                     

# Time Travel

In [14]:
(
    spark
    .read
    .format("delta")
    .option("versionAsOf", 2) # voltando para a versao inicial  de carga dessa delta table
    .load("./RAW/CLIENTES")
    .show()
)

AnalysisException: Cannot time travel Delta table to version 2. Available versions: [7, 10].

In [None]:
deltaTable.restoreToVersion(1) #Restaurando para a versao desejada... existe um comando chamado vacum que faz a remocao fisica dos arquivos parquet.

DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]