<a href="https://colab.research.google.com/github/Ropebird/Foto/blob/main/Actividad2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Instalar PySpark y dependencias de Java si no están ya instaladas
!pip install pyspark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Establecer la variable de entorno para Java
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"



In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
# Descargar el conector de Hadoop para GCS
!wget -q https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar

# Configurar SparkSession con soporte para GCS
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GCS_Spark") \
    .config("spark.jars", "gcs-connector-hadoop3-latest.jar") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/content/key.json") \
    .getOrCreate()

In [None]:
# Descargar e instalar Kafka en Google Colab
!wget -qO - https://archive.apache.org/dist/kafka/3.5.1/kafka_2.12-3.5.1.tgz | tar -xz

# Definir el directorio de Kafka
KAFKA_DIR = "/content/kafka_2.12-3.5.1"

# Iniciar Zookeeper (necesario para Kafka)
!$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties

# Iniciar el servidor Kafka
!$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties

# Esperar unos segundos para que Kafka arranque
import time
time.sleep(10)

print("✅ Kafka y Zookeeper están en ejecución")

✅ Kafka y Zookeeper están en ejecución


In [None]:
from pyspark.sql import SparkSession

# Iniciar la sesión de Spark con el conector de Kafka
spark = SparkSession.builder \
    .appName("KafkaStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

print("✅ Sesión de Spark creada con Kafka")

✅ Sesión de Spark creada con Kafka


In [None]:
# Reemplaza la URL con la del archivo que quieres descargar
!wget -O flights.csv https://raw.githubusercontent.com/Ropebird/Tarea-Kafka/refs/heads/main/flights.csv

--2025-02-24 22:25:34--  https://raw.githubusercontent.com/Ropebird/Tarea-Kafka/refs/heads/main/flights.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10033769 (9.6M) [text/plain]
Saving to: ‘flights.csv’


2025-02-24 22:25:35 (25.8 MB/s) - ‘flights.csv’ saved [10033769/10033769]



In [150]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean

# Crear sesión de Spark
spark = SparkSession.builder.appName("SimulacionStreaming").getOrCreate()

# Leer un CSV como si fuera un stream
df = spark.read.option("header", True).csv("/content/flights.csv")

# Simular un Streaming DataFrame con los datos necesarios
retrasosStreamingDF = df.select(col("dest"), col("arr_delay").cast("float"))

# Definir la función retrasoMedio
# Esta función calcula el retraso medio por destino
def retrasoMedio(df):
    return df.groupBy("dest").agg(mean("arr_delay").alias("retraso_medio"))

# Aplicar la función retrasoMedio
resultado = retrasoMedio(retrasosStreamingDF)

# Mostrar resultados
resultado.show()

+----+-------------------+
|dest|      retraso_medio|
+----+-------------------+
| MSY| -17.96039603960396|
| GEG|  2.731818181818182|
| SNA|-1.5201612903225807|
| BUR|-1.6365357311878845|
| EUG| 1.2041800643086817|
| OAK| 10.067460317460318|
| DCA| -4.000928505106778|
| RDM| 2.5788732394366196|
| KTN|   3.66692789968652|
| LIH| -6.059011164274322|
| IAH|-0.9413524835427888|
| HNL|  -1.56978289765175|
| CVG| -6.253164556962025|
| SJC|  4.642902408111533|
| AUS| -2.691588785046729|
| LGB|-1.7124856815578464|
| RNO|            5.55625|
| JAC| 1.7857142857142858|
| BOS| 0.5697230181470869|
| EWR|  1.042455006922012|
+----+-------------------+
only showing top 20 rows



In [151]:
# Mostrar el esquema del DataFrame
retrasosStreamingDF.printSchema()

root
 |-- dest: string (nullable = true)
 |-- arr_delay: float (nullable = true)



In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType

# Asegurar que 'arr_delay' es de tipo float
retrasosFinalDF = retrasosStreamingDF \
    .withColumn("arr_delay", col("arr_delay").cast(FloatType())) \
    .select("dest", "arr_delay")

# Mostrar el esquema para verificar que todo esté correcto
retrasosFinalDF.printSchema()
retrasosFinalDF.show(5)

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql import functions as F

# Definir el esquema
esquema = StructType([
    StructField("dest", StringType(), True),
    StructField("arr_delay", DoubleType(), True)
])

# Transformar el DataFrame
parsedDF = retrasosStreamingDF \
    .withColumn("arr_delay", F.col("arr_delay").cast(DoubleType())) \
    .select("dest", "arr_delay")

# Mostrar el esquema y los primeros datos
parsedDF.printSchema()
parsedDF.show(5)

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql import functions as F

# Definir el esquema para la columna JSON "value"
esquema = StructType([
    StructField("dest", StringType(), True),
    StructField("arr_delay", DoubleType(), True)
])

# Simular la columna "value" convirtiendo el DataFrame a JSON en una nueva columna
parsedDF = retrasosStreamingDF \
    .withColumn("value", F.to_json(F.struct(F.col("dest"), F.col("arr_delay")))) \
    .withColumn("parejas", F.from_json(F.col("value"), esquema)) \
    .withColumn("dest", F.col("parejas.dest")) \
    .withColumn("arr_delay", F.col("parejas.arr_delay").cast(DoubleType()))

# Verificar el esquema final
parsedDF.printSchema()
parsedDF.show(5)

In [None]:
tipos = parsedDF.dtypes
assert(("value", "string") in tipos)
assert(('parejas', 'struct<dest:string,arr_delay:double>') in tipos)
assert(('dest', 'string') in tipos)
assert(('arr_delay', 'double') in tipos)

print("✅ Todas las aserciones pasaron correctamente.")

In [None]:
# Simular un flujo de datos en lotes (batch processing)
from pyspark.sql import DataFrame

def simulacionStreaming(df: DataFrame):
    print("📡 Simulando actualización de datos...\n")
    retrasoMedioDF = retrasoMedio(df)
    retrasoMedioDF.show()

# Ejecutar la simulación
simulacionStreaming(parsedDF)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import time

# Crear la sesión de Spark
spark = SparkSession.builder.appName("SimulacionStreaming").getOrCreate()

# Definir el esquema
esquema = StructType([
    StructField("dest", StringType(), True),
    StructField("arr_delay", DoubleType(), True)
])

# Datos simulados que llegarían desde Kafka
mensajes_kafka = [
    {"dest": "GRX", "arr_delay": 2.6},
    {"dest": "MAD", "arr_delay": 5.4},
    {"dest": "GRX", "arr_delay": 1.5},
    {"dest": "MAD", "arr_delay": 20.0}
]

# Crear un DataFrame vacío para ir agregando los datos
retrasosDF = spark.createDataFrame([], esquema)

# Simulación de llegada de datos en streaming
for mensaje in mensajes_kafka:
    print(f"\n📡 Procesando nuevo mensaje: {mensaje}")

    # Crear un DataFrame temporal con el nuevo mensaje
    nuevoDF = spark.createDataFrame([mensaje], esquema)

    # Agregar el nuevo mensaje al DataFrame acumulado
    retrasosDF = retrasosDF.union(nuevoDF)

    # Aplicar la función retrasoMedio
    retrasoMedioDF = retrasosDF.groupBy("dest").agg(mean("arr_delay").alias("retraso_medio"))

    # Mostrar resultados actualizados
    retrasoMedioDF.show()

    # Simular un pequeño retraso entre mensajes
    time.sleep(3)

In [None]:
# Simular la consulta a una vista en memoria
agregadosDF = retrasoMedioDF  # Usamos el DataFrame calculado en la simulación

# Mostrar los resultados simulados
agregadosDF.show()

In [None]:
# Verificar que el DataFrame tiene las columnas correctas
columnas = agregadosDF.columns

# Aserciones para confirmar la estructura
assert(len(columnas) == 2)
assert("dest" in columnas)
assert("retraso_medio" in columnas)

print("✅ Todas las aserciones pasaron correctamente.")

In [None]:
agregadosDF.show()

In [None]:
# Obtener el retraso medio de GRX después del primer mensaje
retraso_medio_GRX_primer_mensaje = agregadosDF.filter(agregadosDF.dest == "GRX") \
                                              .select("retraso_medio") \
                                              .collect()[0][0]

# Mostrar el resultado capturado
print(f"📌 Retraso medio para GRX después del primer mensaje: {retraso_medio_GRX_primer_mensaje}")

In [None]:
# Unir el nuevo mensaje con el DataFrame acumulado
retrasosDF = retrasosDF.union(nuevoDF)

In [None]:
# Recalcular el retraso medio después de agregar nuevos datos
agregadosDF = retrasosDF.groupBy("dest").agg(mean("arr_delay").alias("retraso_medio"))

In [None]:
# Agregar nuevos datos al DataFrame
retrasosDF = retrasosDF.union(nuevoDF)

# Recalcular agregados
agregadosDF = retrasosDF.groupBy("dest").agg(mean("arr_delay").alias("retraso_medio"))

# Mostrar los datos actualizados
agregadosDF.show()

In [None]:
# Obtener los retrasos medios de GRX y MAD después del segundo mensaje
retraso_medio_GRX_segundo_mensaje = agregadosDF.filter(agregadosDF.dest == "GRX") \
                                               .select("retraso_medio") \
                                               .collect()[0][0]

retraso_medio_MAD_segundo_mensaje = agregadosDF.filter(agregadosDF.dest == "MAD") \
                                               .select("retraso_medio") \
                                               .collect()[0][0]

print(f"📌 Retraso medio para GRX después del segundo mensaje: {retraso_medio_GRX_segundo_mensaje}")
print(f"📌 Retraso medio para MAD después del segundo mensaje: {retraso_medio_MAD_segundo_mensaje}")

In [None]:
# Obtener el retraso medio de GRX después del primer mensaje
retraso_medio_GRX_primer_mensaje = agregadosDF.filter(agregadosDF.dest == "GRX") \
                                              .select("retraso_medio") \
                                              .collect()[0][0]

# Mostrar el resultado capturado
print(f"📌 Retraso medio para GRX después del primer mensaje: {retraso_medio_GRX_primer_mensaje}")

In [None]:
# Unir el nuevo mensaje con el DataFrame acumulado
retrasosDF = retrasosDF.union(nuevoDF)

In [None]:
# Recalcular el retraso medio después de agregar nuevos datos
agregadosDF = retrasosDF.groupBy("dest").agg(mean("arr_delay").alias("retraso_medio"))

In [None]:
agregadosDF.show()

In [None]:
# Obtener los retrasos medios después del tercer mensaje
retraso_medio_GRX_tercer_mensaje = agregadosDF.filter(agregadosDF.dest == "GRX") \
                                              .select("retraso_medio") \
                                              .collect()[0][0]

retraso_medio_MAD_tercer_mensaje = agregadosDF.filter(agregadosDF.dest == "MAD") \
                                              .select("retraso_medio") \
                                              .collect()[0][0]

print(f"📌 Retraso medio para GRX después del tercer mensaje: {retraso_medio_GRX_tercer_mensaje}")
print(f"📌 Retraso medio para MAD después del tercer mensaje: {retraso_medio_MAD_tercer_mensaje}")

In [None]:
# Obtener el retraso medio de GRX después del primer mensaje
retraso_medio_GRX_primer_mensaje = agregadosDF.filter(agregadosDF.dest == "GRX") \
                                              .select("retraso_medio") \
                                              .collect()[0][0]

# Mostrar el resultado capturado
print(f"📌 Retraso medio para GRX después del primer mensaje: {retraso_medio_GRX_primer_mensaje}")

In [None]:
# Unir el nuevo mensaje con el DataFrame acumulado
retrasosDF = retrasosDF.union(nuevoDF)

In [None]:
# Recalcular el retraso medio después de agregar nuevos datos
agregadosDF = retrasosDF.groupBy("dest").agg(mean("arr_delay").alias("retraso_medio"))

In [None]:
# Obtener los retrasos medios después del tercer mensaje
retraso_medio_GRX_cuarto_mensaje = agregadosDF.filter(agregadosDF.dest == "GRX") \
                                              .select("retraso_medio") \
                                              .collect()[0][0]

retraso_medio_MAD_cuarto_mensaje = agregadosDF.filter(agregadosDF.dest == "MAD") \
                                              .select("retraso_medio") \
                                              .collect()[0][0]

print(f"📌 Retraso medio para GRX después del tercer mensaje: {retraso_medio_GRX_tercer_mensaje}")
print(f"📌 Retraso medio para MAD después del tercer mensaje: {retraso_medio_MAD_tercer_mensaje}")