#Databricks Delta Lake
En este notebook aprenderemos acerca de los Delta Lake y a como utilizarlo

### Datos de origen para este cuaderno
Los datos utilizados son una versión modificada de los datos públicos de [Lending Club] (https://www.kaggle.com/wendykan/lending-club-loan-data). Incluye todos los préstamos financiados de 2012 a 2017. Cada préstamo incluye la información del solicitante así como el estado actual del préstamo (actual, atrasado, pagado en su totalidad, etc.) y la información de pago más reciente.

## Setup

In [0]:
db = "deltadb"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
spark.sql(f"USE {db}")

spark.sql("SET spark.databricks.delta.formatCheck.enabled = false")
spark.sql("SET spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true")

Out[31]: DataFrame[key: string, value: string]

In [0]:
import random
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *


def my_checkpoint_dir(): 
  return "/tmp/delta_demo/chkpt/%s" % str(random.randint(0, 10000))

# User-defined function to generate random state
@udf(returnType=StringType())
def random_state():
  return str(random.choice(["CA", "TX", "NY", "WA"]))


# Function to start a streaming query with a stream of randomly generated load data and append to the parquet table
def generate_and_append_data_stream(table_format, table_name, schema_ok=False, type="batch"):
  
  stream_data = (spark.readStream.format("rate").option("rowsPerSecond", 500).load()
    .withColumn("loan_id", 10000 + col("value"))
    .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer"))
    .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000))
    .withColumn("addr_state", random_state())
    .withColumn("type", lit(type)))
    
  if schema_ok:
    stream_data = stream_data.select("loan_id", "funded_amnt", "paid_amnt", "addr_state", "type", "timestamp")
      
  query = (stream_data.writeStream
    .format(table_format)
    .option("checkpointLocation", my_checkpoint_dir())
    .trigger(processingTime = "5 seconds")
    .table(table_name))

  return query

In [0]:
# Function to stop all streaming queries 
def stop_all_streams():
    print("Stopping all streams")
    for s in spark.streams.active:
        try:
            s.stop()
        except:
            pass
    print("Stopped all streams")
    dbutils.fs.rm("/tmp/delta_demo/chkpt/", True)


def cleanup_paths_and_tables():
    dbutils.fs.rm("/tmp/delta_demo/", True)
    dbutils.fs.rm("file:/dbfs/tmp/delta_demo/loans_parquet/", True)
        
    for table in ["deltadb.loans_parquet", "deltadb.loans_delta", "deltadb.loans_delta2"]:
        spark.sql(f"DROP TABLE IF EXISTS {table}")
    
cleanup_paths_and_tables()

In [0]:
%sh mkdir -p /dbfs/tmp/delta_demo/loans_parquet/; wget -O /dbfs/tmp/delta_demo/loans_parquet/loans.parquet https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet

--2021-11-14 20:13:26--  https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet
Resolving pages.databricks.com (pages.databricks.com)... 104.17.74.206, 104.17.70.206, 104.17.71.206, ...
Connecting to pages.databricks.com (pages.databricks.com)|104.17.74.206|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 164631 (161K) [text/plain]
Saving to: ‘/dbfs/tmp/delta_demo/loans_parquet/loans.parquet’

     0K .......... .......... .......... .......... .......... 31% 9.11M 0s
    50K .......... .......... .......... .......... .......... 62% 4.76M 0s
   100K .......... .......... .......... .......... .......... 93% 16.7M 0s
   150K ..........                                            100% 66.9M=0.02s

2021-11-14 20:13:26 (8.40 MB/s) - ‘/dbfs/tmp/delta_demo/loans_parquet/loans.parquet’ saved [164631/164631]



# Empezando con <img src="https://docs.delta.io/latest/_static/delta-lake-logo.png" width=300/>

Una capa de almacenamiento de código abierto que trae transacciones ACID a Apache Spark ™ y cargas de trabajo de big data.

* **Transacciones ACID**: garantiza la integridad de los datos y la coherencia de lectura con datos concurrentes y complejos.
* **Sistema unificado para Streaming y Bacth**: La ingesta de datos en streaming, el reabastecimiento del histórico en batch y las consultas interactivas funcionan de inmediato para ambos sistemas de obtención de datos
* **Schema Enforcement and Evolution**: asegura la estructura de datos, y permite actualizarla bajo demanda facilmente
* **Recuperación de datos**: Consulta versiones anteriores de la tabla por tiempo o número de versión.
* **Elimina y actualiza**: admite la eliminación y la inserción en tablas con APIs.
* **Formato abierto**: almacenado como formato Parquet en el almacenamiento de blobs.
* **Historial de auditoría**: Historial de todas las operaciones que ocurrieron en la tabla.
* **Gestión de metadatos escalables**: Capaz de manejar millones de archivos, escalan las operaciones de metadatos con Spark.

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Convertir a formato Delta Lake

Delta Lake es 100% compatible con Apache Spark & trade ;, lo que facilita comenzar si ya usa Spark para sus flujos de trabajo de big data.
Delta Lake cuenta con API para ** SQL **, ** Python ** y ** Scala **, para que pueda usarlo en cualquier idioma en el que se sienta más cómodo.

<img src="https://databricks.com/wp-content/uploads/2020/12/simplysaydelta.png" width=600/>

en ** Python **: lea sus datos en un Spark DataFrame, luego escríbalos en formato Delta Lake directamente, sin necesidad de una definición de esquema inicial.

In [0]:
parquet_path = "file:/dbfs/tmp/delta_demo/loans_parquet/"

df = (spark.read.format("parquet").load(parquet_path)
      .withColumn("type", lit("batch"))
      .withColumn("timestamp", current_timestamp()))

df.write.format("delta").mode("overwrite").saveAsTable("loans_delta")

** SQL: ** Use la declaración `CREATE TABLE` con SQL (no se necesita una definición de esquema inicial)

In [0]:
%sql
CREATE TABLE loans_delta2
USING delta
AS SELECT * FROM parquet.`dbfs/tmp/delta_demo/loans_parquet/loans.parquet`


** SQL **: Use `CONVERT TO DELTA` para convertir archivos Parquet al formato Delta Lake en su lugar

In [0]:
%sql CONVERT TO DELTA parquet.`/tmp/delta_demo/loans_parquet`

### Ver los datos en la tabla de Delta Lake
** ¿Cuántos registros hay y cómo se ven los datos? **

In [0]:
spark.sql("select count(*) from loans_delta").show()
spark.sql("select * from loans_delta").show(3)

+--------+
|count(1)|
+--------+
|   14705|
+--------+

+-------+-----------+---------+----------+-----+--------------------+
|loan_id|funded_amnt|paid_amnt|addr_state| type|           timestamp|
+-------+-----------+---------+----------+-----+--------------------+
|      0|       1000|   182.22|        CA|batch|2021-11-14 20:13:...|
|      1|       1000|   361.19|        WA|batch|2021-11-14 20:13:...|
|      2|       1000|   176.26|        TX|batch|2021-11-14 20:13:...|
+-------+-----------+---------+----------+-----+--------------------+
only showing top 3 rows



## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Unificar batch + streaming para procesamiento de datos con múltiples procesos de lectura y escritura

##### Escribe 2 flujos de datos diferentes en nuestra tabla de Delta Lake al mismo tiempo.

In [0]:
# Set up 2 streaming writes to our table
stream_query_A = generate_and_append_data_stream(table_format="delta", table_name="loans_delta", schema_ok=True, type='stream A')
stream_query_B = generate_and_append_data_stream(table_format="delta", table_name="loans_delta", schema_ok=True, type='stream B')

##### Cree 2 lectores de transmisión continua de nuestro Delta Lake para ilustrar el progreso de la transmisión.

In [0]:
# Streaming read #1
display(spark.readStream.format("delta").table("loans_delta").groupBy("type").count().orderBy("type"))

type,count
batch,14705
stream A,363000
stream B,361500


In [0]:
# Streaming read #2
display(spark.readStream.format("delta").table("loans_delta").groupBy("type", window("timestamp", "10 seconds")).count().orderBy("window"))

type,window,count
batch,"List(2021-11-14T20:13:30.000+0000, 2021-11-14T20:13:40.000+0000)",14705
stream A,"List(2021-11-14T20:13:50.000+0000, 2021-11-14T20:14:00.000+0000)",4243
stream B,"List(2021-11-14T20:13:50.000+0000, 2021-11-14T20:14:00.000+0000)",3596
stream B,"List(2021-11-14T20:14:00.000+0000, 2021-11-14T20:14:10.000+0000)",5000
stream A,"List(2021-11-14T20:14:00.000+0000, 2021-11-14T20:14:10.000+0000)",5000
stream B,"List(2021-11-14T20:14:10.000+0000, 2021-11-14T20:14:20.000+0000)",5000
stream A,"List(2021-11-14T20:14:10.000+0000, 2021-11-14T20:14:20.000+0000)",5000
stream B,"List(2021-11-14T20:14:20.000+0000, 2021-11-14T20:14:30.000+0000)",5000
stream A,"List(2021-11-14T20:14:20.000+0000, 2021-11-14T20:14:30.000+0000)",5000
stream A,"List(2021-11-14T20:14:30.000+0000, 2021-11-14T20:14:40.000+0000)",5000


##### Agrega una consulta por lotes para comprobar el funcionamiento

In [0]:
%sql
SELECT addr_state, COUNT(*)
FROM loans_delta
GROUP BY addr_state

addr_state,count(1)
AZ,329
SC,174
LA,167
MN,256
NJ,541
DC,38
OR,178
VA,413
RI,66
WY,31


In [0]:
dbutils.notebook.exit("stop")

stop

In [0]:
stop_all_streams()

Stopping all streams
Stopped all streams


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Transacciones ACID

Ver el registro de transacciones de Delta Lake

In [0]:
%sql DESCRIBE HISTORY loans_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
3,2021-11-14T19:01:50.000+0000,4719036200183107,databootcamp2021@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 2126ee26-9894-4516-bdb9-ea8a2c2d3ce0, epochId -> 0)",,List(15965762905323),1114-185530-wz9qwk33,1.0,SnapshotIsolation,True,"Map(numRemovedFiles -> 0, numOutputRows -> 0, numOutputBytes -> 0, numAddedFiles -> 0)",
2,2021-11-14T19:01:48.000+0000,4719036200183107,databootcamp2021@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> b199f7d3-d1ee-46a5-8ac8-075b0ad05c28, epochId -> 0)",,List(15965762905323),1114-185530-wz9qwk33,1.0,SnapshotIsolation,True,"Map(numRemovedFiles -> 0, numOutputRows -> 0, numOutputBytes -> 0, numAddedFiles -> 0)",
1,2021-11-14T18:59:13.000+0000,4719036200183107,databootcamp2021@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {""delta.autoOptimize.optimizeWrite"":""true""})",,List(15965762905323),1114-185530-wz9qwk33,0.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 165377, numOutputRows -> 14705)",
0,2021-11-14T10:57:04.000+0000,4719036200183107,databootcamp2021@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {""delta.autoOptimize.optimizeWrite"":""true""})",,List(15965762905323),1114-102120-r1yeuw6n,,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 165377, numOutputRows -> 14705)",


<img src="https://databricks.com/wp-content/uploads/2020/09/delta-lake-medallion-model-scaled.jpg" width=1012/>

##  ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Utilice Schema Enforcement para proteger la calidad de los datos

Para mostrarle cómo funciona la aplicación del esquema, creemos una nueva tabla que tenga una columna adicional, `credit_score`, que no coincide con nuestro esquema de tabla de Delta Lake existente.

###### Escribe DataFrame con una columna adicional, `credit_score`, en la tabla de Delta Lake

In [0]:
# Generate `new_data` with additional column
new_column = [StructField("credit_score", IntegerType(), True)]

new_schema = StructType(spark.table("loans_delta").schema.fields + new_column)
data = [(99997, 10000, 1338.55, "CA", "batch", datetime.now(), 649),
        (99998, 20000, 1442.55, "NY", "batch", datetime.now(), 702)]

new_data = spark.createDataFrame(data, new_schema)
new_data.printSchema()

root
 |-- loan_id: long (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- paid_amnt: double (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- type: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- credit_score: integer (nullable = true)



In [0]:
# Uncommenting this cell will lead to an error because the schemas don't match.
# Attempt to write data with new column to Delta Lake table
new_data.write.format("delta").mode("append").saveAsTable("loans_delta")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-15965762905360>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m# Uncommenting this cell will lead to an error because the schemas don't match.[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0;31m# Attempt to write data with new column to Delta Lake table[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 3[0;31m [0mnew_data[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"delta"[0m[0;34m)[0m[0;34m.[0m[0mmode[0m[0;34m([0m[0;34m"append"[0m[0;34m)[0m[0;34m.[0m[0msaveAsTable[0m[0;34m([0m[0;34m"loans_delta"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/readwriter.py[0m in [0;36msaveAsTable[0;34m(self, name, format, mode, partitionBy, **options)[0m
[1;32m   1183[0m         [0;3

**La aplicación del esquema ayuda a mantener nuestras tablas limpias y ordenadas para que podamos confiar en los datos que hemos almacenado en Delta Lake.** Las escrituras anteriores se bloquearon porque el esquema de los nuevos datos no coincidía con el esquema de la tabla

##  ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Utilice Schema Evolution para agregar nuevas columnas al esquema

Si *queremos* actualizar nuestra tabla de Delta Lake para que coincida con el esquema de esta fuente de datos, podemos hacerlo mediante la evolución del esquema. Simplemente agregue lo siguiente al comando de escritura de Spark: `.option (" mergeSchema "," true ")`

In [0]:
new_data.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("loans_delta")

In [0]:
%sql SELECT * FROM loans_delta WHERE loan_id IN (99997, 99998)

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp,credit_score
99997,10000,1338.55,CA,batch,2021-11-14T19:02:11.299+0000,649
99998,20000,1442.55,NY,batch,2021-11-14T19:02:11.299+0000,702


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Delta Lake Time Travel, recuperación de versiones

Las capacidades de Delta Lake simplifican la creación de canalizaciones de datos para casos de uso que incluyen:

* Auditoría de cambios de datos
* Reproducción de experimentos e informes
* Retrocesos

A medida que escribe en una tabla o directorio Delta, cada operación se versiona automáticamente.

<img src="https://github.com/risan4841/img/blob/master/transactionallogs.png?raw=true" width=250/>

Puedes consultar instantáneas de sus tablas con:
1. **Version number**, 
2. **Timestamp.**

utilizando sintaxis de Python, Scala y / o SQL; para estos ejemplos usaremos la sintaxis SQL.

#### Revisar el historial de tablas de Delta Lake para auditoría y gobernanza
Todas las transacciones para esta tabla se almacenan dentro de esta tabla, incluido el conjunto inicial de inserciones, actualización, eliminación, fusión e inserciones con modificación de esquema

In [0]:
%sql
DESCRIBE HISTORY loans_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
35,2021-11-14T20:28:12.000+0000,4719036200183107,databootcamp2021@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 72f040c1-7444-4aec-88bc-fde249085f0d, epochId -> 16)",,List(15965762905323),1114-185530-wz9qwk33,33.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 50500, numOutputBytes -> 1124098, numAddedFiles -> 1)",
34,2021-11-14T20:27:40.000+0000,4719036200183107,databootcamp2021@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 7533dd04-063f-4836-a990-4fba3ce7ceb2, epochId -> 17)",,List(15965762905323),1114-185530-wz9qwk33,33.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 35000, numOutputBytes -> 787907, numAddedFiles -> 1)",
33,2021-11-14T20:27:05.000+0000,4719036200183107,databootcamp2021@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 72f040c1-7444-4aec-88bc-fde249085f0d, epochId -> 15)",,List(15965762905323),1114-185530-wz9qwk33,31.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 38500, numOutputBytes -> 863871, numAddedFiles -> 1)",
32,2021-11-14T20:26:01.000+0000,4719036200183107,databootcamp2021@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 7533dd04-063f-4836-a990-4fba3ce7ceb2, epochId -> 16)",,List(15965762905323),1114-185530-wz9qwk33,31.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 19000, numOutputBytes -> 438755, numAddedFiles -> 1)",
31,2021-11-14T20:25:22.000+0000,4719036200183107,databootcamp2021@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 7533dd04-063f-4836-a990-4fba3ce7ceb2, epochId -> 15)",,List(15965762905323),1114-185530-wz9qwk33,29.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 26000, numOutputBytes -> 589531, numAddedFiles -> 1)",
30,2021-11-14T20:25:20.000+0000,4719036200183107,databootcamp2021@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 72f040c1-7444-4aec-88bc-fde249085f0d, epochId -> 14)",,List(15965762905323),1114-185530-wz9qwk33,29.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 27000, numOutputBytes -> 611787, numAddedFiles -> 1)",
29,2021-11-14T20:24:42.000+0000,4719036200183107,databootcamp2021@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 7533dd04-063f-4836-a990-4fba3ce7ceb2, epochId -> 14)",,List(15965762905323),1114-185530-wz9qwk33,28.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 20000, numOutputBytes -> 461070, numAddedFiles -> 1)",
28,2021-11-14T20:23:52.000+0000,4719036200183107,databootcamp2021@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 72f040c1-7444-4aec-88bc-fde249085f0d, epochId -> 13)",,List(15965762905323),1114-185530-wz9qwk33,26.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 25500, numOutputBytes -> 578990, numAddedFiles -> 1)",
27,2021-11-14T20:23:51.000+0000,4719036200183107,databootcamp2021@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 7533dd04-063f-4836-a990-4fba3ce7ceb2, epochId -> 13)",,List(15965762905323),1114-185530-wz9qwk33,26.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 25000, numOutputBytes -> 565520, numAddedFiles -> 1)",
26,2021-11-14T20:23:02.000+0000,4719036200183107,databootcamp2021@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 72f040c1-7444-4aec-88bc-fde249085f0d, epochId -> 12)",,List(15965762905323),1114-185530-wz9qwk33,24.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 18000, numOutputBytes -> 417043, numAddedFiles -> 1)",


###### Usa el viaje en el tiempo para seleccionar y ver la versión original de nuestra tabla (Versión 0).
Como puede ver, esta versión contiene los 14.705 registros originales.

In [0]:
spark.sql("SELECT * FROM loans_delta VERSION AS OF 0").show(3)
spark.sql("SELECT COUNT(*) FROM loans_delta VERSION AS OF 0").show()

+-------+-----------+---------+----------+-----+--------------------+
|loan_id|funded_amnt|paid_amnt|addr_state| type|           timestamp|
+-------+-----------+---------+----------+-----+--------------------+
|      0|       1000|   182.22|        CA|batch|2021-11-14 20:13:...|
|      1|       1000|   361.19|        WA|batch|2021-11-14 20:13:...|
|      2|       1000|   176.26|        TX|batch|2021-11-14 20:13:...|
+-------+-----------+---------+----------+-----+--------------------+
only showing top 3 rows

+--------+
|count(1)|
+--------+
|   14705|
+--------+



In [0]:
%sql SELECT COUNT(*) FROM loans_delta

count(1)
824705


###### Revierte una tabla a una versión específica usando `RESTORE`

In [0]:
%sql RESTORE loans_delta VERSION AS OF 0

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
165377,1,33,0,18412217,0


In [0]:
%sql SELECT COUNT(*) FROM loans_delta

count(1)
14705


##![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Soporte completo de DML: `DELETE`,` UPDATE`, `MERGE INTO`

Delta Lake trae transacciones ACID y soporte DML completo a los lagos de datos.

> Parquet **no** admite estos comandos, son exclusivos de Delta Lake.

###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) `DELETE`: Maneje las solicitudes de GDPR o CCPA en su Data Lake

Imagínese que estamos respondiendo a una solicitud de eliminación de datos de GDPR. El usuario con ID de préstamo # 4420 quiere que eliminemos sus datos. Así de fácil es.

In [0]:
%sql
SELECT * FROM loans_delta WHERE loan_id=4420

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp,credit_score


**Elimine los datos del usuario individual con un solo comando `DELETE` usando Delta Lake.**

Nota: El comando `DELETE` no es compatible con Parquet.

In [0]:
%sql
DELETE FROM loans_delta WHERE loan_id=4420;
-- Confirm the user's data was deleted
SELECT * FROM loans_delta WHERE loan_id=4420

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp,credit_score


###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Utilice el viaje en el tiempo e `INSERT INTO` para volver a agregar al usuario a nuestra tabla

In [0]:
%sql
INSERT INTO loans_delta
SELECT * FROM loans_delta VERSION AS OF 0
WHERE loan_id=4420

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql SELECT * FROM loans_delta WHERE loan_id=4420

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp
4420,22000,1050.94,TX,batch,2021-11-14T20:13:39.587+0000
4420,22000,1050.94,TX,batch,2021-11-14T20:13:39.587+0000


### ![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) `UPDATE`: Modifica los registros existentes en una tabla en un comando

In [0]:
%sql UPDATE loans_delta SET funded_amnt = 22000 WHERE loan_id = 4420

num_affected_rows
2


In [0]:
%sql SELECT * FROM loans_delta WHERE loan_id = 4420

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp
4420,22000,1050.94,TX,batch,2021-11-14T20:13:39.587+0000
4420,22000,1050.94,TX,batch,2021-11-14T20:13:39.587+0000


###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Admite flujos de trabajo de captura de datos modificados y otros casos de uso de ingesta a través de `MERGE INTO`

Con una canalización de datos heredada, para insertar o actualizar una tabla, debe:
1. Identifique las nuevas filas que se insertarán
2. Identifique las filas que serán reemplazadas (es decir, actualizadas)
3. Identifique todas las filas que no se ven afectadas por la inserción o actualización.
4. Cree una nueva tabla basada en las tres declaraciones de inserción
5. Elimina la tabla original (y todos esos archivos asociados).
6. "Cambiar el nombre" de la tabla temporal al nombre de la tabla original
7. Suelta la tabla temporal

<img src="https://pages.databricks.com/rs/094-YMS-629/images/merge-into-legacy.gif" alt='Merge process' width=600/>


#### INSERTAR o ACTUALIZAR con Delta Lake

Proceso de 2 pasos:
1. Identificar filas para insertar o actualizar
2. Utilice "MERGE"

In [0]:
# Create merge table with 1 row update, 1 insertion
data = [(4420, 22000, 21500.00, "NY", "update", datetime.now()),  # record to update
        (99999, 10000, 1338.55, "CA", "insert", datetime.now())]  # record to insert
schema = spark.table("loans_delta").schema
spark.createDataFrame(data, schema).createOrReplaceTempView("merge_table")
spark.sql("SELECT * FROM merge_table").show()

+-------+-----------+---------+----------+------+--------------------+
|loan_id|funded_amnt|paid_amnt|addr_state|  type|           timestamp|
+-------+-----------+---------+----------+------+--------------------+
|   4420|      22000|  21500.0|        NY|update|2021-11-14 19:15:...|
|  99999|      10000|  1338.55|        CA|insert|2021-11-14 19:15:...|
+-------+-----------+---------+----------+------+--------------------+



In [0]:
%sql
MERGE INTO loans_delta AS l
USING merge_table AS m
ON l.loan_id = m.loan_id
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED 
  THEN INSERT *;

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,1,0,1


In [0]:
%sql SELECT * FROM loans_delta WHERE loan_id IN (4420, 99999)

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp
4420,22000,21500.0,NY,update,2021-11-14T19:15:03.656+0000
99999,10000,1338.55,CA,insert,2021-11-14T19:15:03.656+0000


## ![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Optimizaciones de rendimiento y compactación de archivos = consultas más rápidas

In [0]:
%sql
-- Vacuum deletes all files no longer needed by the current version of the table.
VACUUM loans_delta

path
dbfs:/user/hive/warehouse/deltadb.db/loans_delta


### <img src="https://pages.databricks.com/rs/094-YMS-629/images/dbsquare.png" width=30/> Tabla de caché en la memoria (solo Databricks Delta Lake)

In [0]:
%sql CACHE SELECT * FROM loans_delta

### <img src="https://pages.databricks.com/rs/094-YMS-629/images/dbsquare.png" width=30/> Optimización de orden Z (solo Databricks Delta Lake)

In [0]:
%sql OPTIMIZE loans_delta ZORDER BY addr_state

path,metrics
dbfs:/user/hive/warehouse/deltadb.db/loans_delta,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(0, 0), List(1, 165450), 0, List(0, 0), 0, null), 0, 1, 1, false)"


In [0]:
cleanup_paths_and_tables()

<img src="https://docs.delta.io/latest/_static/delta-lake-logo.png" width=300/>