In [0]:
from pyspark.sql.functions import from_json, col, from_unixtime, when, lit, hour, dayofmonth, month, to_timestamp, regexp_replace
from pyspark.sql.types import TimestampType, StringType, StructType, StructField, LongType, DoubleType, BooleanType

In [0]:
#%sql
#TRUNCATE TABLE flights_gold;

In [0]:
# Esquema del mensaje JSON
schema = StructType([
    StructField("icao24", StringType()),
    StructField("callsign", StringType()),
    StructField("origin_country", StringType()),
    StructField("time_position", LongType()),
    StructField("last_contact", LongType()),
    StructField("longitude", DoubleType()),
    StructField("latitude", DoubleType()),
    StructField("baro_altitude", DoubleType()),
    StructField("on_ground", BooleanType()),
    StructField("velocity", DoubleType()),
    StructField("heading", DoubleType()),
    StructField("timestamp_ingest", StringType())
])

In [0]:
# Leer desde Kafka
df_raw = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "52.205.209.139:9092")
        .option("subscribe", "flight_stream")
        .option("startingOffsets", "latest")
        .load()
)

# Convertir el value a String y luego a JSON
df_parsed = df_raw.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), schema).alias("data")) \
    .select("data.*")

df_parsed.printSchema()

# Transformaciones
df_transformed = df_parsed.withColumn(
    "FechaHora_Posicion", from_unixtime(col("time_position")).cast(TimestampType())
).withColumn(
    "FechaHora_Ingesta", to_timestamp(regexp_replace(col("timestamp_ingest"), "Z$", ""))
).withColumn(
    "Altitud_Pies", col("baro_altitude") * 3.28084
).withColumn(
    "Velocidad_Kmh", col("velocity") * 3.6
).withColumn(
    "estado_de_vuelo",
    when(col("on_ground") == True, "En Tierra")
    .when((col("on_ground") == False) & (col("Velocidad_Kmh") < 300), "Maniobra (Despegue/Aterrizaje)")
    .when((col("on_ground") == False) & (col("Velocidad_Kmh") >= 300) & (col("Altitud_Pies") <= 15000), "Patron de Espera / Ascenso / Descenso")
    .otherwise("En Vuelo de Crucero")
).withColumn(
    "Indicador_Congestion",
    when(
        (col("on_ground") == False) &
        (col("Velocidad_Kmh") >= 300) & (col("Velocidad_Kmh") <= 450) &
        (col("Altitud_Pies") >= 5000) & (col("Altitud_Pies") <= 15000),
        lit(True)
    ).otherwise(lit(False))
).withColumn(
    "Dia_Posicion", dayofmonth(col("FechaHora_Posicion"))
).withColumn(
    "Mes_Posicion", month(col("FechaHora_Posicion"))
).withColumn(
    "Hora_Posicion_UTC", hour(col("FechaHora_Posicion"))
).withColumn(
    "Dia_Ingesta", dayofmonth(col("FechaHora_Ingesta"))
).withColumn(
    "Mes_Ingesta", month(col("FechaHora_Ingesta"))
).withColumn(
    "Hora_Ingesta_UTC", hour(col("FechaHora_Ingesta"))
).withColumn(
    "Cuadrante",
    when(
        (col("latitude") >= -22.5) & (col("latitude") <= 15) & (col("longitude") >= -90) & (col("longitude") <= -60), "Cuadrante 0"
    ).when(
        (col("latitude") >= -22.5) & (col("latitude") <= 15) & (col("longitude") > -60) & (col("longitude") <= -30), "Cuadrante 1"
    ).when(
        (col("latitude") >= -60) & (col("latitude") < -22.5) & (col("longitude") >= -90) & (col("longitude") <= -60), "Cuadrante 2"
    ).when(
        (col("latitude") >= -60) & (col("latitude") < -22.5) & (col("longitude") > -60) & (col("longitude") <= -30), "Cuadrante 3"
    ).otherwise("Fuera de Cuadrantes")
)


# Nuevos datos limpios y transformados en la tabla GOLD
df_transformed.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoints/flight_data_gold") \
    .option("mergeSchema", "true") \
    .start("/mnt/datalake/flight_data_gold")

root
 |-- icao24: string (nullable = true)
 |-- callsign: string (nullable = true)
 |-- origin_country: string (nullable = true)
 |-- time_position: long (nullable = true)
 |-- last_contact: long (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- baro_altitude: double (nullable = true)
 |-- on_ground: boolean (nullable = true)
 |-- velocity: double (nullable = true)
 |-- heading: double (nullable = true)
 |-- timestamp_ingest: string (nullable = true)

Out[34]: <pyspark.sql.streaming.query.StreamingQuery at 0x7ff1bca59970>

In [0]:
spark.sql("CREATE TABLE flight_table USING DELTA LOCATION '/mnt/datalake/flight_data_gold'")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2730206984380594>:1[0m
[0;32m----> 1[0m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mCREATE TABLE flight_table USING DELTA LOCATION [39;49m[38;5;124;43m'[39;49m[38;5;124;43m/mnt/datalake/flight_data_gold[39;49m[38;5;124;43m'[39;49m[38;5;124;43m"[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m

In [0]:
%sql
CREATE TABLE IF NOT EXISTS flights_gold
USING DELTA
LOCATION '/mnt/datalake/flight_data_gold';

In [0]:
%sql
SELECT *
FROM delta.`/mnt/datalake/flight_data_gold`;

icao24,callsign,origin_country,time_position,last_contact,longitude,latitude,baro_altitude,on_ground,velocity,heading,timestamp_ingest,FechaHora_Posicion,Altitud_Pies,Velocidad_Kmh,estado_de_vuelo,Indicador_Congestion,Hora_Del_Dia,FechaHora_Ingesta,Dia_Posicion,Mes_Posicion,Hora_Posicion_UTC,Dia_Ingesta,Mes_Ingesta,Hora_Ingesta_UTC,Cuadrante
e0b1ce,FBZ5005,Argentina,1750556880,1750556881,-58.5664,-34.6243,2278.38,False,150.81,107.88,2025-06-22T01:48:03.846022+00:00Z,2025-06-22T01:48:00.000+0000,7475.000239200001,542.916,Patron de Espera / Ascenso / Descenso,False,,2025-06-22T01:48:03.846+0000,22,6,1,22,6,1,Cuadrante 3
e07581,ARG1793,Argentina,1750556881,1750556881,-58.6798,-33.9859,4899.66,False,177.08,175.17,2025-06-22T01:48:03.846399+00:00Z,2025-06-22T01:48:01.000+0000,16075.0005144,637.488,En Vuelo de Crucero,False,,2025-06-22T01:48:03.846+0000,22,6,1,22,6,1,Cuadrante 3
e49eec,GLO7475,Brazil,1750556881,1750556881,-57.322,-33.8387,8008.62,False,185.19,223.99,2025-06-22T01:48:03.846486+00:00Z,2025-06-22T01:48:01.000+0000,26275.0008408,666.684,En Vuelo de Crucero,False,,2025-06-22T01:48:03.846+0000,22,6,1,22,6,1,Cuadrante 3
ac21af,AAL908,United States,1750556880,1750556880,-59.1038,-33.5089,9563.1,False,237.57,344.04,2025-06-22T01:48:03.846566+00:00Z,2025-06-22T01:48:00.000+0000,31375.001004,855.252,En Vuelo de Crucero,False,,2025-06-22T01:48:03.846+0000,22,6,1,22,6,1,Cuadrante 3
e49f5b,AZU2891,Brazil,1750556881,1750556881,-47.9743,-27.3633,11277.6,False,234.99,45.18,2025-06-22T01:48:03.846815+00:00Z,2025-06-22T01:48:01.000+0000,37000.001184,845.964,En Vuelo de Crucero,False,,2025-06-22T01:48:03.846+0000,22,6,1,22,6,1,Cuadrante 3
e0b112,FBZ5245,Argentina,1750556880,1750556881,-58.3234,-34.8892,1165.86,False,110.01,7.52,2025-06-22T01:48:03.846889+00:00Z,2025-06-22T01:48:00.000+0000,3825.0001224,396.036,Patron de Espera / Ascenso / Descenso,False,,2025-06-22T01:48:03.846+0000,22,6,1,22,6,1,Cuadrante 3
e0b149,ARG1269,Argentina,1750556881,1750556881,-50.629,-29.5758,10972.8,False,216.77,228.17,2025-06-22T01:48:03.846962+00:00Z,2025-06-22T01:48:01.000+0000,36000.001152,780.3720000000001,En Vuelo de Crucero,False,,2025-06-22T01:48:03.846+0000,22,6,1,22,6,1,Cuadrante 3
e0b146,FBZ5212,Argentina,1750556881,1750556881,-59.7218,-32.8336,11582.4,False,202.8,327.64,2025-06-22T01:48:03.847034+00:00Z,2025-06-22T01:48:01.000+0000,38000.001216,730.08,En Vuelo de Crucero,False,,2025-06-22T01:48:03.847+0000,22,6,1,22,6,1,Cuadrante 3
e49d24,AZU4212,Brazil,1750556881,1750556881,-46.5389,-23.1834,3947.16,False,122.09,202.03,2025-06-22T01:48:03.847177+00:00Z,2025-06-22T01:48:01.000+0000,12950.0004144,439.524,Patron de Espera / Ascenso / Descenso,True,,2025-06-22T01:48:03.847+0000,22,6,1,22,6,1,Cuadrante 3
e80616,JES3056,Chile,1750556574,1750556580,-58.3942,-34.5713,144.78,False,64.07,304.2,2025-06-22T01:48:03.847253+00:00Z,2025-06-22T01:42:54.000+0000,475.0000152,230.652,Maniobra (Despegue/Aterrizaje),False,,2025-06-22T01:48:03.847+0000,22,6,1,22,6,1,Cuadrante 3
