#Apache Spark - Deltalakes

<p><strong>Objetivo: </strong> El objetivo de este cuaderno es crear un delta lake y realizar algunas operaciones con él:</p>

## Carga de datos en una tabla de Delta Lake

In [0]:
# Configure source data path
sourcePath = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
# Configure Delta Lake path
deltaPath = "/tmp/loans_delta3"

In [0]:
# Create the Delta Lake table with the same loans data
(spark.read.format("parquet").load(sourcePath).write.format("delta").save(deltaPath))

In [0]:
# Create a view on the data called loans_delta
spark.read.format("delta").load(deltaPath).createOrReplaceTempView("loans_delta")

## Explorar los datos

In [0]:
# Loans row count
spark.sql("SELECT count(*) FROM loans_delta").show()

+--------+
|count(1)|
+--------+
|   14705|
+--------+



In [0]:
# First 5 rows of loans table
spark.sql("SELECT * FROM loans_delta LIMIT 5").show()

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
|      3|       1000|   1000.0|        OK|
|      4|       1000|   249.98|        PA|
+-------+-----------+---------+----------+



## Imponer el esquema en la escritura para evitar la corrupción de datos

<p>Un problema común con la administración de datos con Spark usando formatos comunes como JSON, Parquet y ORC es la corrupción accidental de datos causada por escribir datos formateados incorrectamente.</p>
<p>El formato de Delta Lake registra el esquema como metadatos a nivel de tabla. Por lo tanto, todas las escrituras en una tabla de Delta Lake pueden verificar si los datos que se escriben tienen un esquema compatible con el de la tabla. Si no es compatible, Spark arrojará un error antes de que los datos se escriban y confirmen en la tabla, evitando así la corrupción accidental de datos.</p>

In [0]:
from pyspark.sql.functions import *
cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']
items = [
(1111111, 1000, 1000.0, 'TX', True),
(2222222, 2000, 0.0, 'CA', False)
]

En la próxima línea es normal que obtengamos un error, pues se está intentando evoluacionar el esquema y el deltalake no lo tiene permitido:

In [0]:
loanUpdates = (spark.createDataFrame(items, cols).withColumn("funded_amnt", col("funded_amnt").cast("int")))
loanUpdates.write.format("delta").mode("append").save(deltaPath)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1364852366998769>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0mloanUpdates[0m [0;34m=[0m [0;34m([0m[0mspark[0m[0;34m.[0m[0mcreateDataFrame[0m[0;34m([0m[0mitems[0m[0;34m,[0m [0mcols[0m[0;34m)[0m[0;34m.[0m[0mwithColumn[0m[0;34m([0m[0;34m"funded_amnt"[0m[0;34m,[0m [0mcol[0m[0;34m([0m[0;34m"funded_amnt"[0m[0;34m)[0m[0;34m.[0m[0mcast[0m[0;34m([0m[0;34m"int"[0m[0;34m)[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mloanUpdates[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"delta"[0m[0;34m)[0m[0;34m.[0m[0mmode[0m[0;34m([0m[0;34m"append"[0m[0;34m)[0m[0;34m.[0m[0msave[0m[0;34m([0m[0mdeltaPath[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/p

## Evolucionar el esquema

Con este comando, la columna "closed" se agregará al esquema de la tabla y se agregarán nuevos datos.

In [0]:
# In Python
(loanUpdates.write.format("delta").mode("append")
.option("mergeSchema", "true")
.save(deltaPath))

## Transformando los datos

<p><b>Actualizar los datos para corregir un error</b></p>
<p>Un caso de uso común cuando se administran datos es corregir errores en los datos. Supongamos que, al revisar los datos, nos dimos cuenta de que todos los préstamos asignados a addr_state = 'OR' deberían haber sido asignados a addr_state = 'WA'.</p>

In [0]:
# In Python
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.update("addr_state = 'OR'", {"addr_state": "'WA'"})
#deltaTable.update("addr_state = 'OR'", {"closed": "False"})
#deltaTable.update("closed = Null", {"closed": False})

In [0]:
spark.sql("SELECT * FROM loans_delta LIMIT 5").show()

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
|      3|       1000|   1000.0|        OK|
|      4|       1000|   249.98|        PA|
+-------+-----------+---------+----------+



<p><b>Eliminar datos relacionados con los usuarios</b></p>
<p>Digamos que es obligatorio eliminar los datos de todos usuarios cuyos préstamos se hayan cancelado en su totalidad.</p>

In [0]:
deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.delete("funded_amnt <= paid_amnt")

In [0]:
spark.sql("SELECT * FROM loans_delta LIMIT 10").show()

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
|      4|       1000|   249.98|        PA|
|      5|       1000|    408.6|        CA|
|      7|       1000|   168.81|        OH|
|      8|       1000|   193.64|        TX|
|      9|       1000|   218.83|        CT|
|     10|       1000|   322.37|        NJ|
|     11|       1000|   400.61|        NY|
+-------+-----------+---------+----------+



<p><b>Actualizar e Insertar (Upserting) datos utilizando <code>merge()</code></b></p>
<p>Un caso de uso común es la captura de modificaciones en los datos, donde debe replicar los cambios de fila realizados en una tabla OLTP a otra tabla para cargas de trabajo OLAP.</p>

<p>Suponga que se tiene otra tabla de información sobre préstamos nuevos, algunos de los cuales son préstamos nuevos y otros son actualizaciones de préstamos existentes. Además, esta tabla de cambios tiene el mismo esquema que la tabla de préstamo_delta.</p>

<p> Se pueden insertar estos cambios en la tabla mediante la operación <code>DeltaTable.merge()</code>, que se basa en el comando MERGE SQL:</p>

In [0]:
(deltaTable
.alias("t")
.merge(loanUpdates.alias("s"), "t.loan_id = s.loan_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())

<p><b>Auditando los cambios</b></p>
<p>Todos los cambios en su tabla de Delta Lake se registran como "commits" en el registro de transacciones de la tabla. A medida que escribe en una tabla o directorio de Delta Lake, cada operación se versiona automáticamente.</p>

In [0]:
deltaTable.history().show()

+-------+-------------------+----------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+
|version|          timestamp|          userId|            userName|operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+----------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+
|      4|2021-12-07 17:11:55|8326243828948258|amonteroposada@gm...|    MERGE|{predicate -> (t....|null|{1364852366998755}|1207-154359-xa6j1i6q|          3|WriteSerializable|        false|{numTargetRowsCop...|        null|
|      3|2021-12-07 17:11:36|8326243828948258|amonteroposada@gm...|   DELETE|{predicate -> ["(...|null|{13648523

Si solo se desean ver las últimas transformaciones:

In [0]:
(deltaTable
.history(3)
.select("version", "timestamp", "operation", "operationParameters")
.show(truncate=False))

+-------+-------------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp          |operation|operationParameters                                                                                                                          |
+-------+-------------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------+
|4      |2021-12-07 17:11:55|MERGE    |{predicate -> (t.`loan_id` = s.`loan_id`), matchedPredicates -> [{"actionType":"update"}], notMatchedPredicates -> [{"actionType":"insert"}]}|
|3      |2021-12-07 17:11:36|DELETE   |{predicate -> ["(CAST(`funded_amnt` AS DOUBLE) <= `paid_amnt`)"]}                                                                            |
|2      |2021-12-07 17:11:22|UPDATE   |{predicate -> (addr_state#4052 = OR)}              