### 1. Importaciones

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth, weekofyear, concat, lit, avg, max, min, count, countDistinct
import os

### 2. Configuración de rutas y carga de datos

In [0]:
# Crear sesión de Spark
spark = SparkSession.builder.appName("IoTDeviceModel").getOrCreate()

# Configuración de rutas de entrada y salida
input_data = "dbfs:/FileStore/tables"  
output_data = "/tmp/output/"

# Función para leer datos desde CSV
def read_csv(file_name, has_header=True):
    return spark.read.csv(os.path.join(input_data, file_name), header=has_header, inferSchema=True)

# Cargar las tablas de dispositivos
df_bot_devices = read_csv("bot_devices.csv")
df_iot_devices = read_csv("iot_devices.csv")

# Ver datos
df_bot_devices.show(2)
df_iot_devices.show(2)

+-------------------+--------------------+-------------+--------+-----------+-----+------+
|               date|                name|     location|humidity|temperature|sound|    id|
+-------------------+--------------------+-------------+--------+-----------+-----+------+
|2016-03-20 03:21:00|robo-sensor-pad-1...|United States|      26|         34|   41|188778|
|2016-03-20 03:21:00|robo-meter-gauge-...|     Malaysia|      96|         16|   73|188779|
+-------------------+--------------------+-------------+--------+-----------+-----+------+
only showing top 2 rows

+-------------+---------+----+----+-------------+---------+--------------------+--------+-------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|           cn|device_id|         device_name|humidity|           ip|latitude|   lcd|longitude|  scale|temp|    timestamp|
+-------------+---------+----+----+-------------+---------+--------------------+--------+-------------+--------+---

### 3. Dimensiones y hechos

In [0]:
# Dimensión fecha
def create_dim_fecha(df):
    return df.select(
        col("date").alias("id_fecha"),
        year("date").alias("anio"),
        month("date").alias("mes"),
        dayofmonth("date").alias("dia"),
        weekofyear("date").alias("semana")
    ).distinct()

# Dimensión device
def create_dim_device(df):
    return df.select(
        col("device_id").alias("id_device"),
        col("device_name").alias("nombre_dispositivo"),
        col("scale").alias("escala"),
        col("battery_level").alias("nivel_bateria"),
        col("c02_level").alias("nivel_co2")
    ).distinct()

# Dimensión location (desde iot_devices para incluir latitud y longitud)
def create_dim_location(df):
    return df.select(
        col("device_id").alias("id_location"),
        col("latitude").alias("latitud"),
        col("longitude").alias("longitud"),
        col("cn").alias("pais")
    ).distinct()

# Tabla de hechos
def create_hechos_dispositivos(df_bot, df_iot):
    """
    Crea la tabla de hechos 'hechos_dispositivos' uniendo las tablas 'bot_devices' e 'iot_devices'
    en base solo a 'location' y 'humidity' para mejorar la cantidad de coincidencias.
    """
    hechos_dispositivos = df_bot.alias("bot").join(
        df_iot.alias("iot"),
        (col("bot.location") == col("iot.cn")) & (col("bot.humidity") == col("iot.humidity")),
        "inner"
    ).select(
        col("bot.id").alias("hecho_id"),
        col("iot.device_id").alias("id_device"),
        col("bot.location").alias("id_location"),
        col("bot.date").alias("id_fecha"),
        col("iot.ip"),
        col("iot.latitude"),
        col("iot.longitude"),
        col("iot.temp").alias("temperature"),
        col("bot.humidity"),
        col("iot.c02_level").alias("nivel_co2"),
        col("iot.battery_level").alias("nivel_bateria"),
        col("iot.timestamp").alias("created")
    )

    return hechos_dispositivos

# Crear la tabla de hechos
dim_fecha = create_dim_fecha(df_bot_devices)
dim_device = create_dim_device(df_iot_devices)
dim_location = create_dim_location(df_iot_devices)
hechos_dispositivos = create_hechos_dispositivos(df_bot_devices, df_iot_devices)

# Mostrar algunos registros de la tabla de hechos para verificar
dim_fecha.show(5)
dim_device.show(5)
dim_location.show(5)
hechos_dispositivos.show(5)

+-------------------+----+---+---+------+
|           id_fecha|anio|mes|dia|semana|
+-------------------+----+---+---+------+
|2016-03-20 03:21:01|2016|  3| 20|    11|
|2016-03-20 03:21:00|2016|  3| 20|    11|
+-------------------+----+---+---+------+

+---------+--------------------+-------+-------------+---------+
|id_device|  nombre_dispositivo| escala|nivel_bateria|nivel_co2|
+---------+--------------------+-------+-------------+---------+
|   158706|sensor-pad-158706...|Celsius|            3|     1108|
|   158708|sensor-pad-158708...|Celsius|            1|     1549|
|   158707|meter-gauge-15870...|Celsius|            9|     1534|
|   158710|sensor-pad-158710...|Celsius|            9|     1218|
|   158711|meter-gauge-15871...|Celsius|            5|      924|
+---------+--------------------+-------+-------------+---------+
only showing top 5 rows

+-----------+-------+--------+-----------------+
|id_location|latitud|longitud|             pais|
+-----------+-------+--------+---------

### 4. Reportes

In [0]:
# 1. Nivel promedio de CO2 y batería por ubicación
reporte1 = hechos_dispositivos \
    .groupBy("id_location") \
    .agg(avg("nivel_bateria").alias("promedio_bateria"), avg("nivel_co2").alias("promedio_co2"))
reporte1.show()

+-----------------+------------------+------------------+
|      id_location|  promedio_bateria|      promedio_co2|
+-----------------+------------------+------------------+
|           Russia| 4.608724388631857|1228.6093853271645|
|           Sweden| 4.598062953995157|1185.3268765133173|
|Republic of Korea| 4.426619588578446|1204.2156892846178|
|      Philippines| 4.636363636363637|1191.0454545454545|
|           Jersey|               2.0|            1485.0|
|         Malaysia| 4.151515151515151| 1181.060606060606|
|        Singapore|5.3544303797468356| 1191.392405063291|
|           Turkey| 4.153846153846154|1201.3846153846155|
|          Germany|  4.60968660968661|1196.8276353276353|
|      Afghanistan|               0.0|            1321.0|
|         Cambodia|               6.0|            1475.0|
|            Sudan|               8.0|            1292.0|
|           France| 4.236421725239617|1206.5183706070288|
|           Greece|             3.375|            1293.5|
|           Ta

In [0]:
# 2. Temperatura promedio y máxima por ubicación
reporte2 = hechos_dispositivos \
    .groupBy("id_location", "latitude", "longitude") \
    .agg(avg("temperature").alias("temp_promedio"), max("temperature").alias("max_temp"))
reporte2.show()

+-------------+--------+---------+------------------+--------+
|  id_location|latitude|longitude|     temp_promedio|max_temp|
+-------------+--------+---------+------------------+--------+
|        China|   43.88|   125.32| 21.56937799043062|      34|
|United States|   40.89|   -73.95|              31.0|      31|
|        China|   34.26|   108.93|22.276041666666668|      34|
|       Russia|   48.72|     44.5|            28.125|      30|
|United States|   34.05|   -81.11|              15.0|      15|
|United States|   33.31|   -87.59|              16.0|      16|
|      Germany|   51.25|     6.81|              15.0|      15|
|       Canada|   45.53|   -73.55|26.142857142857142|      29|
|United States|   38.85|  -105.32|              31.0|      31|
|       Poland|   50.27|    19.02|              24.0|      24|
|United States|   35.49|   -98.98|              14.0|      14|
|United States|    43.6|  -116.42|              24.0|      24|
|United States|   45.28|  -111.37|              27.0|  

In [0]:
# 3. Cantidad de dispositivos distintos por ubicación
reporte3 = hechos_dispositivos \
    .groupBy("id_location") \
    .agg(countDistinct("id_device").alias("cantidad_dispositivos"))
reporte3.show()

+-----------------+---------------------+
|      id_location|cantidad_dispositivos|
+-----------------+---------------------+
|           Russia|                  440|
|           Sweden|                  177|
|Republic of Korea|                  947|
|      Philippines|                   17|
|           Jersey|                    1|
|         Malaysia|                   26|
|        Singapore|                   52|
|           Turkey|                   13|
|          Germany|                  606|
|      Afghanistan|                    1|
|         Cambodia|                    1|
|            Sudan|                    1|
|           France|                  380|
|           Greece|                    6|
|           Taiwan|                  105|
|        Argentina|                   33|
|          Belgium|                   15|
|          Ecuador|                    1|
|          Finland|                   14|
|        Nicaragua|                    1|
+-----------------+---------------

In [0]:
# 4. Humedad promedio y mínima por ubicación y fecha
reporte4 = hechos_dispositivos \
    .groupBy("id_location", "id_fecha") \
    .agg(avg("humidity").alias("humedad_promedio"), min("humidity").alias("humedad_minima"))
reporte4.show()

+--------------------+-------------------+------------------+--------------+
|         id_location|           id_fecha|  humedad_promedio|humedad_minima|
+--------------------+-------------------+------------------+--------------+
|      Czech Republic|2016-03-20 03:21:00| 65.70588235294117|            30|
|              Taiwan|2016-03-20 03:21:00|63.445945945945944|            25|
|             Germany|2016-03-20 03:21:01|59.878846153846155|            25|
|              Serbia|2016-03-20 03:21:01|49.333333333333336|            27|
|            Bulgaria|2016-03-20 03:21:01|              60.0|            26|
|       United States|2016-03-20 03:21:00| 62.68962538917344|            25|
|             Belgium|2016-03-20 03:21:00| 68.22222222222223|            37|
|              Sweden|2016-03-20 03:21:00| 62.38068181818182|            26|
|         Puerto Rico|2016-03-20 03:21:00|              60.0|            60|
|         Switzerland|2016-03-20 03:21:00| 67.02564102564102|            36|

In [0]:
# 5. Días con mayor número de dispositivos activos
reporte5 = hechos_dispositivos \
    .groupBy("id_fecha") \
    .agg(countDistinct("id_device").alias("dispositivos_activos")) \
    .orderBy(col("dispositivos_activos").desc()) \
    .limit(5)
reporte5.show()

+-------------------+--------------------+
|           id_fecha|dispositivos_activos|
+-------------------+--------------------+
|2016-03-20 03:21:01|               11831|
|2016-03-20 03:21:00|               11476|
+-------------------+--------------------+



In [0]:
# 6. Promedio de nivel de CO2 por mes
reporte6 = hechos_dispositivos \
    .withColumn("mes", month("id_fecha")) \
    .groupBy("mes") \
    .agg(avg("nivel_co2").alias("promedio_co2"))
reporte6.show()

+---+------------------+
|mes|      promedio_co2|
+---+------------------+
|  3|1199.7321204897935|
+---+------------------+



In [0]:
# 7. Nivel de batería promedio por dispositivo y ubicación
reporte7 = hechos_dispositivos \
    .groupBy("id_device", "id_location") \
    .agg(avg("nivel_bateria").alias("bateria_promedio"))
reporte7.show()

+---------+-----------------+----------------+
|id_device|      id_location|bateria_promedio|
+---------+-----------------+----------------+
|   158790|    United States|             3.0|
|   158936|            Japan|             4.0|
|   159266|           Poland|             2.0|
|   159969|           Canada|             0.0|
|   160061|    United States|             5.0|
|   160618|    United States|             6.0|
|   160716|    United States|             5.0|
|   161264|    United States|             5.0|
|   161653|    United States|             1.0|
|   161692|            Japan|             4.0|
|   161870|           Russia|             6.0|
|   162050|Republic of Korea|             5.0|
|   162489|          Hungary|             3.0|
|   163308|    United States|             7.0|
|   163423|    United States|             5.0|
|   164260|          Germany|             9.0|
|   164969|        Australia|             5.0|
|   165203|           Poland|             8.0|
|   165373|  

In [0]:
# Finalizar sesión de Spark
spark.stop()