# Uso de tablas delta en Apache Spark


Las tablas de un lakehouse de Microsoft Fabric se basan en el formato Delta Lake de código abierto para Apache Spark. Delta Lake agrega compatibilidad con la semántica relacional para las operaciones de datos por lotes y de streaming, y permite la creación de una arquitectura de Lakehouse en la que se puede usar Apache Spark para procesar y consultar datos en tablas basadas en archivos subyacentes de un lago de datos.

- Crear un espacio de trabajo 
- Crear una casa de lago y cargar datos

**Exploración de datos en un marco de datos**

Cargar datos > Spark.

In [10]:
df = spark.read.format("csv").option("header","true").load("Files/products/products.csv")
# df now is a Spark DataFrame containing CSV data from "Files/products/products.csv".
display(df)

StatementMeta(, f5a450d2-4e1f-4f4f-90d3-75eb9f16c723, 13, Finished, Available)

SynapseWidget(Synapse.DataFrame, 943db528-f47e-4882-8332-fbaa13c492b3)

**Creación de una tabla administrada**

In [11]:
df.write.format("delta").saveAsTable("managed_products")

StatementMeta(, f5a450d2-4e1f-4f4f-90d3-75eb9f16c723, 14, Finished, Available)

**Crear una tabla externa**

In [14]:
df.write.format("delta").saveAsTable("external_products", path="abfss://prueba@onelake.dfs.fabric.microsoft.com/prueba.Lakehouse/Files/external_products")

StatementMeta(, f5a450d2-4e1f-4f4f-90d3-75eb9f16c723, 17, Finished, Available)

1. En el panel Explorador de Lakehouse, en el menú ... de la carpeta Archivos, seleccione Copiar ruta de acceso ABFS.
La ruta ABFS es la ruta completa a la carpeta Archivos en el almacenamiento de OneLake para su lakehouse, similar a esta:
 abfss://workspace@tenant-onelake.dfs.fabric.microsoft.com/lakehousename.Lakehouse/Files
 
7. En el código que ha introducido en la celda de código, reemplace abfs_path por la ruta de acceso que copió en el portapapeles para que el código guarde el marco de datos como una tabla externa con archivos de datos en una carpeta denominada external_products en la ubicación de la carpeta Archivos. La ruta completa debería tener un aspecto similar al siguiente:
abfss://workspace@tenant-onelake.dfs.fabric.microsoft.com/lakehousename.Lakehouse/Files/external_products
 
11. En el panel Explorador de Lakehouse, en el menú ... de la carpeta Tablas, seleccione Actualizar. A continuación, expanda el nodo Tablas y compruebe que se ha creado la tabla external_products.
13. En el panel Explorador de Lakehouse, en el menú ... de la carpeta Archivos, seleccione Actualizar. A continuación, expanda el nodo Archivos y compruebe que se ha creado la carpeta external_products para los archivos de datos de la tabla.

**Comparación de tablas administradas y externas**

In [6]:
%%sql

DESCRIBE FORMATTED managed_products;

StatementMeta(, f5a450d2-4e1f-4f4f-90d3-75eb9f16c723, 8, Finished, Available)

<Spark SQL result set with 12 rows and 3 fields>

In [1]:
%%sql

DESCRIBE FORMATTED external_products;

StatementMeta(, 99f464d7-ec46-453b-acc7-160e74075f2d, 2, Finished, Available)

<Spark SQL result set with 12 rows and 3 fields>

 **Eliminar los metadatos de la tabla externa, pero afectar los archivos**

In [4]:
%%sql

DROP TABLE managed_products;
DROP TABLE external_products;

StatementMeta(, , -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

**Usar SQL para crear una tabla**

In [2]:
%%sql

CREATE TABLE products2
USING DELTA
LOCATION 'Files/external_products';

StatementMeta(, 7d5ea90d-2b7a-4e61-950c-14c0bbf13ea3, 3, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

**Visualiza la tabla**

In [5]:
%%sql

SELECT * FROM products2;

StatementMeta(, 7d5ea90d-2b7a-4e61-950c-14c0bbf13ea3, 7, Finished, Available)

<Spark SQL result set with 295 rows and 4 fields>

**Explorar el control de versiones de tablas**

In [8]:
%%sql

UPDATE products2
SET ListPrice = ListPrice * 0.9
WHERE Category = 'Mountain Bikes';

StatementMeta(, 7d5ea90d-2b7a-4e61-950c-14c0bbf13ea3, 10, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>

**historial de transacciones registradas para la tabla.**

In [10]:
%%sql

DESCRIBE HISTORY products2;

StatementMeta(, 7d5ea90d-2b7a-4e61-950c-14c0bbf13ea3, 12, Finished, Available)

<Spark SQL result set with 2 rows and 15 fields>

In [11]:
delta_table_path = 'Files/external_products'

# Get the current data
current_data = spark.read.format("delta").load(delta_table_path)
display(current_data)

# Get the version 0 data
original_data = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
display(original_data)

StatementMeta(, 7d5ea90d-2b7a-4e61-950c-14c0bbf13ea3, 14, Finished, Available)

SynapseWidget(Synapse.DataFrame, 785625e7-4e87-4eec-ba16-23ea42f81001)

SynapseWidget(Synapse.DataFrame, c479cadf-788e-4178-8e56-37f247beb389)

<mark>Los resultados muestran dos dataframes: uno que contiene los datos después de la reducción de precios y el otro que muestra la versión original de los datos.</mark>

# Uso de tablas delta para la transmisión de datos

**Delta Lake admite datos de streaming. Las tablas delta pueden ser un receptor o un origen de flujos de datos creados mediante la API de Spark Structured Streaming. En este ejemplo, usará una tabla delta como receptor para algunos datos de streaming en un escenario simulado de Internet de las cosas (IoT).**

In [12]:
from notebookutils import mssparkutils
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a folder
inputPath = 'Files/data/'
mssparkutils.fs.mkdirs(inputPath)

# Create a stream that reads data from the folder, using a JSON schema
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''
mssparkutils.fs.put(inputPath + "data.txt", device_data, True)
print("Source stream created...")

StatementMeta(, 7d5ea90d-2b7a-4e61-950c-14c0bbf13ea3, 15, Finished, Available)

Source stream created...


Este código escribe los datos del dispositivo de streaming en formato delta en una carpeta denominada iotdevicedata. Dado que la ruta de acceso de la ubicación de la carpeta se encuentra en la carpeta Tablas, se creará automáticamente una tabla para ella.

In [13]:
# Write the stream to a delta table
delta_stream_table_path = 'Tables/iotdevicedata'
checkpointpath = 'Files/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")

StatementMeta(, 7d5ea90d-2b7a-4e61-950c-14c0bbf13ea3, 16, Finished, Available)

Streaming to delta sink...


**consulta la tabla IotDeviceData, que contiene los datos del dispositivo del origen de streaming.**

In [14]:
%%sql

SELECT * FROM IotDeviceData;

StatementMeta(, 7d5ea90d-2b7a-4e61-950c-14c0bbf13ea3, 17, Finished, Available)

<Spark SQL result set with 9 rows and 2 fields>

**escribe más datos de dispositivo hipotéticos en el origen de streaming.**

In [15]:
# Add more data to the source stream
more_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

StatementMeta(, 7d5ea90d-2b7a-4e61-950c-14c0bbf13ea3, 18, Finished, Available)

True

In [16]:
%%sql

SELECT * FROM IotDeviceData;

StatementMeta(, 7d5ea90d-2b7a-4e61-950c-14c0bbf13ea3, 19, Finished, Available)

<Spark SQL result set with 16 rows and 2 fields>


**detiene la secuencia.**

In [17]:
deltastream.stop()

StatementMeta(, 7d5ea90d-2b7a-4e61-950c-14c0bbf13ea3, 20, Finished, Available)