# PROCESAMIENTO DE LOS DATOS EN STREAMING

## 1.- Conexión a EventHubs

Lo primero que debemos hacer, es establecer la conexión a EventHubs para poder procesar los datos que nos llegan desde ahí, por lo que deberemos de proporcionar en el código tanto la cadena de conexión como el nombre del eventhubs.

In [0]:
connectionString = "<your_connection_string>"
eventHubName = "<your_event_hub_name>"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}

Una vez establecida la conexión con EventHubs, tenemos que crear un dataframe que va a empezar a escuchar los eventos que recibe el servicio antes mencionado

In [0]:

# Reading stream: Load data from Azure Event Hub into DataFrame 'df' using the previously configured settings
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

# Displaying stream: Show the incoming streaming data for visualization and debugging purposes
df.display()

## 2.- Capas de información

Para poder procesar los datos correctamente y tener todos los registros de ellos, vamos a crear 3 capas, que desempeñarán distintos roles:
- Bronze layer: datos crudos (tal cual llegan del EventHubs)
- Silver layer: datos limpios (con estructura definida)
- Gold layer: datos definitivos (sacando nuevos valores, cribando los datos...)

### 2.1- Bronze layer

Para la Bronze layer vamos a crear una base de datos *streaming_bronze* para alojar allí los datos, y guardaremos la información que nos llega directa desde el EventHubs

In [0]:
try:
    # Crear la base de datos/ esquema 'streaming_bronze' en el catálogo predeterminado
    spark.sql("CREATE DATABASE IF NOT EXISTS streaming_bronze")
    print("Database 'streaming_bronze' created successfully or already exists.")
except Exception as e:
    print(f"Error while creating database: {e}")



Database 'streaming_bronze' created successfully or already exists.


In [0]:
df.writeStream \
    .option("checkpointLocation", "/mnt/streaming/bronze_prueba/taxi_bronze") \
    .outputMode("append") \
    .format("delta") \
    .toTable("streaming_bronze.taxi")

<pyspark.sql.streaming.query.StreamingQuery at 0x7fba3fbf9bd0>

### 2.2- Silver layer

Para la creación de la silver layer, debemos crear el esquema que queremos para nuestros datos, y una vez definido deberemos limpiar el dataframe para escribir en la base de datos

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

json_schema = StructType([
    StructField("car_id", StringType(), True),
    StructField("elevation", DoubleType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("distance_km", DoubleType(), True),
    StructField("timestamp", StringType(), True),
])

In [0]:
try:
    # Crear la base de datos/ esquema 'streaming_bronze' en el catálogo predeterminado
    spark.sql("CREATE DATABASE IF NOT EXISTS streaming_silver")
    print("Database 'streaming_silver' created successfully or already exists.")
except Exception as e:
    print(f"Error while creating database: {e}")

df = spark.readStream\
    .format("delta")\
    .table("streaming_bronze.taxi")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.*")

# Displaying stream: Visualize the transformed data in the DataFrame for verification and analysis
df.display()

# Writing stream: Save the transformed data to the 'streaming.silver.weather' Delta table in 'append' mode with checkpointing for data reliability
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/silver_prueba/taxi_silver")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming_silver.taxi")

### 2.3- Gold Layer

Para la capa gold, hemos incluido algunas transformaciones y hemos añadido ciertas columnas. Estos son nuestros requisitos:
- Añadir el campo de **velocidad**: calculamos la velocidad en km/h según la distancia recorrida en un segundo
- Añadir el campo de **consumo**: calculamos el consumo en cada tramo, estableciendo la constante del consumo del coche (en nuestro caso es 8 l/km), y teniendo en cuenta la distancia recorrida en cada tramo
- Eliminar el campo elevation: decidimos esto ya que no tenía ninguna relevancia en nuestros datos
- Filtrar velocidades: al ver que algunas velocidades eran irreales (datos malos), hemos filtrado el dataframe para cambiar el dato de velocidad, cuando este supera los 200 km/h, por un dato estándar

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql import functions as F
from pyspark.sql.window import Window

try:
    # Crear la base de datos/ esquema 'streaming_bronze' en el catálogo predeterminado
    spark.sql("CREATE DATABASE IF NOT EXISTS streaming_gold")
    print("Database 'streaming_gold' created successfully or already exists.")
except Exception as e:
    print(f"Error while creating database: {e}")

df = df.where(F.col("distance_km") > 0)

df = df.withColumn("speed_kmh", F.round(df.distance_km * 3600, 2))

df = df.withColumn("consumo", F.round(df.distance_km * 8 / 100, 5))

df = df.drop("elevation")


# Aplicar la función de ventana
df = df.withColumn(
    "speed_kmh",
    F.when(
        df.speed_kmh > 200,
        F.lit(102.0)
    ).otherwise(df.speed_kmh)
)

df.display()

df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/gold_prueba1/taxi_gold")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming_gold.taxi")

## 3.- Envío de datos a Power Bi

Finalmente, enviamos el dataframe final a un streaming dataset creado en Power Bi (en la nube), para poder hacer visualizaciones en tiempo real de nuestros datos

In [0]:

import requests
import json
from pyspark.sql import functions as F
 
# Endpoint de la API de Power BI para el streaming dataset
url = f"<your_powerbi_streaming_dataset_url>"
 
# Crear una función para enviar los datos a Power BI
def send_to_powerbi(data):
    headers = {"Content-Type": "application/json"}
    response = requests.post(url, headers=headers, data=json.dumps(data))
    if response.status_code == 200:
        print("Datos enviados correctamente a Power BI.")
    else:
        print(f"Error al enviar los datos: {response.status_code}")
 
# Transformar el dataframe de Spark a una lista de diccionarios para enviar a Power BI
def transform_and_send_to_powerbi(batch_df, batch_id):
    # Convierte el dataframe a un formato adecuado para la API de Power BI
    data = batch_df.select("car_id","latitude", "longitude", "elevation", "timestamp", "distance_km",
                           "speed_kmh", "consumo").toPandas().to_dict(orient='records')
   
    # Enviar los datos a Power BI
    send_to_powerbi(data)
 
# Enviar los datos de manera continua usando writeStream
df.writeStream \
    .foreachBatch(transform_and_send_to_powerbi) \
    .outputMode("append") \
    .start()
 
df.display()