In [2]:
!pip install pyspark py4j

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=4f90233fe38c45d0ed13fc8697cca260b37cc942538a78c28bc7c694f623e76b
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
!pip install geopy



In [30]:
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from datetime import datetime
from geopy.geocoders import Nominatim
from pyspark.sql.functions import lit

# Lista de lugares con sus coordenadas
lugares = [
    {"nombre": "Amazonas/Peru", "lat": -4.999999999682269, "lon": -78},
    {"nombre": "Ancash/Peru", "lat": -9.499999999871093, "lon": -77.75},
    # Agrega más lugares si es necesario
]

# Crear una sesión Spark en Colab
spark = SparkSession.builder.master("local[*]").appName("WeatherAnalysis").getOrCreate()

# Definir el esquema para el DataFrame de Spark
schema = StructType([
    # Define tus estructuras aquí según tus necesidades
])

# Crear una lista para almacenar los DataFrames individuales
dataframes = []

# Iterar sobre la lista de lugares
for lugar in lugares:
    # Obtener los datos de la API
    url = f"https://api.openweathermap.org/data/3.0/onecall?lat={lugar['lat']}&lon={lugar['lon']}&exclude=hourly,minutely&appid=107298a9f77f36886d2b680a82c145cf"
    response = requests.get(url)
    data = response.json()

    # Extraer la información diaria de la respuesta
    daily_data = data.get("daily", [])

    # Convertir datos a un DataFrame de pandas
    df = pd.json_normalize(daily_data)

    # Convertir la columna "dt" a formato datetime
    df["dt"] = pd.to_datetime(df["dt"], unit="s")

    # Convertir el DataFrame de pandas en un DataFrame de Spark
    spark_df = spark.createDataFrame(df)

    # Obtener la información de la ubicación (región y país)
    geolocator = Nominatim(user_agent="weather_analysis")
    location = geolocator.reverse((lugar["lat"], lugar["lon"]), language="en")

    # Extraer información de ubicación
    region = location.raw['address']['state']
    country = location.raw['address']['country']

    # Agregar las columnas de ubicación al DataFrame de Spark
    spark_df = spark_df.withColumn("region", lit(region))
    spark_df = spark_df.withColumn("country", lit(country))

    # Agregar una columna con el nombre del lugar
    spark_df = spark_df.withColumn("nombre_lugar", lit(lugar["nombre"]))

    # Agregar el DataFrame actual a la lista
    dataframes.append(spark_df)

# Unir todos los DataFrames en uno solo
consolidated_df = dataframes[0]
for df in dataframes[1:]:
    consolidated_df = consolidated_df.union(df)

# Mostrar el esquema y los primeros registros del DataFrame consolidado
print("Información consolidada:")
consolidated_df.printSchema()
consolidated_df.show()


Información consolidada:
root
 |-- dt: timestamp (nullable = true)
 |-- sunrise: long (nullable = true)
 |-- sunset: long (nullable = true)
 |-- moonrise: long (nullable = true)
 |-- moonset: long (nullable = true)
 |-- moon_phase: double (nullable = true)
 |-- summary: string (nullable = true)
 |-- pressure: long (nullable = true)
 |-- humidity: long (nullable = true)
 |-- dew_point: double (nullable = true)
 |-- wind_speed: double (nullable = true)
 |-- wind_deg: long (nullable = true)
 |-- wind_gust: double (nullable = true)
 |-- weather: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: long (valueContainsNull = true)
 |-- clouds: long (nullable = true)
 |-- pop: double (nullable = true)
 |-- rain: double (nullable = true)
 |-- uvi: double (nullable = true)
 |-- temp.day: double (nullable = true)
 |-- temp.min: double (nullable = true)
 |-- temp.max: double (nullable = true)
 |-- temp.night: double (nullable = true

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import monotonically_increasing_id, col, from_unixtime

# Crear una sesión Spark en Colab
spark = SparkSession.builder.master("local[*]").appName("WeatherAnalysis").getOrCreate()

# Definir el esquema para la DimTiempo
schema_dim_tiempo = StructType([
    StructField("idTiempo", StringType(), False),  # Clave primaria
    StructField("dt", TimestampType(), True),
    StructField("sunrise", TimestampType(), True),
    StructField("sunset", TimestampType(), True),
    StructField("nombre_lugar", StringType(), False)  # Nueva columna para el nombre del lugar
])

# Convertir la columna sunrise de Unix a formato de fecha
consolidated_df = consolidated_df.withColumn("sunrise", from_unixtime("sunrise"))

# Convertir la columna sunset de Unix a formato de fecha
consolidated_df = consolidated_df.withColumn("sunset", from_unixtime("sunset"))

# Crear DataFrame para la DimTiempo y ordenar por la columna dt
data_dim_tiempo = consolidated_df.select("dt", "sunrise", "sunset", "nombre_lugar").distinct().orderBy("dt")
data_dim_tiempo = data_dim_tiempo.withColumn("idTiempo", monotonically_increasing_id())
df_dim_tiempo = data_dim_tiempo.select("idTiempo", "dt", "sunrise", "sunset", "nombre_lugar")

# Mostrar el esquema y los primeros registros del DataFrame DimTiempo
print("DimTiempo después de la conversión:")
df_dim_tiempo.printSchema()
df_dim_tiempo.show()


DimTiempo después de la conversión:
root
 |-- idTiempo: long (nullable = false)
 |-- dt: timestamp (nullable = true)
 |-- sunrise: string (nullable = true)
 |-- sunset: string (nullable = true)
 |-- nombre_lugar: string (nullable = false)

+--------+-------------------+-------------------+-------------------+-------------+
|idTiempo|                 dt|            sunrise|             sunset| nombre_lugar|
+--------+-------------------+-------------------+-------------------+-------------+
|       0|2023-11-25 16:00:00|2023-11-25 10:47:58|2023-11-25 23:10:17|Amazonas/Peru|
|       1|2023-11-25 16:00:00|2023-11-25 10:39:59|2023-11-25 23:16:15|  Ancash/Peru|
|       2|2023-11-26 16:00:00|2023-11-26 10:48:12|2023-11-26 23:10:40|Amazonas/Peru|
|       3|2023-11-26 16:00:00|2023-11-26 10:40:09|2023-11-26 23:16:43|  Ancash/Peru|
|       4|2023-11-27 16:00:00|2023-11-27 10:48:26|2023-11-27 23:11:04|Amazonas/Peru|
|       5|2023-11-27 16:00:00|2023-11-27 10:40:19|2023-11-27 23:17:11|  Ancash/P

In [None]:
lugares = [
    {"nombre": "Amazonas/Peru", "lat": -4.999999999682269, "lon": -78},
    {"nombre": "Ancash/Peru", "lat": -9.499999999871093, "lon": -77.75},
    # Agrega más lugares si es necesario
]

In [None]:
from pyspark.sql.functions import row_number, lit, when
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, DoubleType


# Definir el esquema para la DimUbicacion
schema_dim_ubicacion = StructType([
    StructField("idUbicacion", IntegerType(), False),  # Clave primaria
    StructField("region", StringType(), True),
    StructField("country", StringType(), True),
    StructField("nombre_lugar", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True),
])

# Obtener datos de ubicación únicos
data_dim_ubicacion = consolidated_df.select("region", "country", "nombre_lugar").distinct()

# Asignar identificadores únicos a través de row_number y Window
window = Window.orderBy("region", "country", "nombre_lugar")
data_dim_ubicacion = data_dim_ubicacion.withColumn("idUbicacion", row_number().over(window))

# Crear DataFrame para la DimUbicacion
df_dim_ubicacion = data_dim_ubicacion.select("idUbicacion", "region", "country", "nombre_lugar")

# Agregar las columnas lat y lon con los datos del array lugares
df_dim_ubicacion = df_dim_ubicacion.withColumn("lat", lit(None).cast(DoubleType()))
df_dim_ubicacion = df_dim_ubicacion.withColumn("lon", lit(None).cast(DoubleType()))

# Actualizar las columnas lat y lon con los valores del array lugares
for lugar_info in lugares:
    condition = (df_dim_ubicacion["nombre_lugar"] == lugar_info["nombre"])
    df_dim_ubicacion = df_dim_ubicacion.withColumn("lat", when(condition, lugar_info["lat"]).otherwise(df_dim_ubicacion["lat"]))
    df_dim_ubicacion = df_dim_ubicacion.withColumn("lon", when(condition, lugar_info["lon"]).otherwise(df_dim_ubicacion["lon"]))

# Mostrar el esquema y los primeros registros del DataFrame DimUbicacion
print("DimUbicacion:")
df_dim_ubicacion.printSchema()
df_dim_ubicacion.show()


DimUbicacion:
root
 |-- idUbicacion: integer (nullable = false)
 |-- region: string (nullable = false)
 |-- country: string (nullable = false)
 |-- nombre_lugar: string (nullable = false)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)

+-----------+--------+-------+-------------+------------------+------+
|idUbicacion|  region|country| nombre_lugar|               lat|   lon|
+-----------+--------+-------+-------------+------------------+------+
|          1|Amazonas|   Peru|Amazonas/Peru|-4.999999999682269| -78.0|
|          2|  Ancash|   Peru|  Ancash/Peru|-9.499999999871093|-77.75|
+-----------+--------+-------+-------------+------------------+------+



In [None]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Definir el esquema para la DimClima
schema_dim_clima = StructType([
    StructField("idClima", IntegerType(), False),  # Clave primaria
    StructField("summary", StringType(), True),

])

# Obtener datos de clima únicos
data_dim_clima = consolidated_df.select("summary")

# Asignar identificadores únicos a través de row_number y Window
window = Window.orderBy("summary")
data_dim_clima = data_dim_clima.withColumn("idClima", row_number().over(window))

# Crear DataFrame para la DimClima
df_dim_clima = data_dim_clima.select("idClima", "summary")

# Mostrar el esquema y los primeros registros del DataFrame DimClima
print("DimClima:")
df_dim_clima.printSchema()
df_dim_clima.show()


DimClima:
root
 |-- idClima: integer (nullable = false)
 |-- summary: string (nullable = true)

+-------+--------------------+
|idClima|             summary|
+-------+--------------------+
|      1|Expect a day of p...|
|      2|Expect a day of p...|
|      3|Expect a day of p...|
|      4|Expect a day of p...|
|      5|Expect a day of p...|
|      6|Expect a day of p...|
|      7|Expect a day of p...|
|      8|Expect a day of p...|
|      9|Expect a day of p...|
|     10|Expect a day of p...|
|     11|Expect a day of p...|
|     12|The day will star...|
|     13|There will be rai...|
|     14|There will be rai...|
|     15|You can expect pa...|
|     16|You can expect ra...|
+-------+--------------------+



In [None]:
from pyspark.sql.functions import row_number, col
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

# Renombrar las columnas
consolidated_df = consolidated_df.withColumnRenamed("temp.day", "temp_day")\
                                 .withColumnRenamed("temp.min", "temp_min")\
                                 .withColumnRenamed("temp.max", "temp_max")\
                                 .withColumnRenamed("temp.night", "temp_night")\
                                 .withColumnRenamed("temp.eve", "temp_eve")\
                                 .withColumnRenamed("temp.morn", "temp_morn")

# Definir el esquema para la DimTemperatura
schema_dim_Temperatura = StructType([
    StructField("idTemperatura", IntegerType(), False),  # Clave primaria
    StructField("temp_day", DoubleType(), True),
    StructField("temp_min", DoubleType(), True),
    StructField("temp_max", DoubleType(), True),
    StructField("temp_night", DoubleType(), True),
    StructField("temp_eve", DoubleType(), True),
    StructField("temp_morn", DoubleType(), True),
])

# Obtener datos de temperatura únicos
data_dim_Temperatura = consolidated_df.select(
    col("temp_day"),
    col("temp_min"),
    col("temp_max"),
    col("temp_night"),
    col("temp_eve"),
    col("temp_morn")
).distinct()

# Asignar identificadores únicos a través de row_number y Window
window = Window.orderBy("temp_day", "temp_min", "temp_max", "temp_night", "temp_eve", "temp_morn")
data_dim_Temperatura = data_dim_Temperatura.withColumn("idTemperatura", row_number().over(window))

# Crear DataFrame para la DimTemperatura
df_dim_Temperatura = data_dim_Temperatura.select(
    "idTemperatura", "temp_day", "temp_min", "temp_max", "temp_night", "temp_eve", "temp_morn"
)

# Mostrar el esquema y los primeros registros del DataFrame DimTemperatura
print("DimTemperatura:")
df_dim_Temperatura.printSchema()
df_dim_Temperatura.show()


DimTemperatura:
root
 |-- idTemperatura: integer (nullable = false)
 |-- temp_day: double (nullable = true)
 |-- temp_min: double (nullable = true)
 |-- temp_max: double (nullable = true)
 |-- temp_night: double (nullable = true)
 |-- temp_eve: double (nullable = true)
 |-- temp_morn: double (nullable = true)

+-------------+--------+--------+--------+----------+--------+---------+
|idTemperatura|temp_day|temp_min|temp_max|temp_night|temp_eve|temp_morn|
+-------------+--------+--------+--------+----------+--------+---------+
|            1|  284.35|  281.63|  285.13|    281.63|  282.36|   282.55|
|            2|  284.74|  281.55|  284.83|    282.29|  282.88|   282.09|
|            3|  285.42|  281.53|  285.42|    281.53|  282.56|   281.94|
|            4|  285.68|  281.97|  285.73|    282.78|  282.76|   281.97|
|            5|  286.22|  282.41|  286.22|    282.45|   283.5|   282.41|
|            6|  286.54|  281.93|  286.54|    283.07|  283.44|   282.35|
|            7|  287.03|  282.4

In [None]:
from pyspark.sql.functions import row_number, col
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType

# Renombrar las columnas si es necesario
# consolidated_df = consolidated_df.withColumnRenamed(...)

# Definir el esquema para la DimLuna
schema_dim_Luna = StructType([
    StructField("idLuna", IntegerType(), False),  # Clave primaria
    StructField("moonrise", TimestampType(), True),
    StructField("moonset", TimestampType(), True),
    StructField("moon_phase", DoubleType(), True),
])

# Obtener datos de la Luna únicos
data_dim_Luna = consolidated_df.select(
    col("moonrise"),
    col("moonset"),
    col("moon_phase")
).distinct()

# Asignar identificadores únicos a través de row_number y Window
window = Window.orderBy("moonrise", "moonset", "moon_phase")
data_dim_Luna = data_dim_Luna.withColumn("idLuna", row_number().over(window))

# Crear DataFrame para la DimLuna
df_dim_Luna = data_dim_Luna.select(
    "idLuna", "moonrise", "moonset", "moon_phase"
)

# Mostrar el esquema y los primeros registros del DataFrame DimLuna
print("DimLuna:")
df_dim_Luna.printSchema()
df_dim_Luna.show()


DimLuna:
root
 |-- idLuna: integer (nullable = false)
 |-- moonrise: long (nullable = true)
 |-- moonset: long (nullable = true)
 |-- moon_phase: double (nullable = true)

+------+----------+----------+----------+
|idLuna|  moonrise|   moonset|moon_phase|
+------+----------+----------+----------+
|     1|1700859240|1700814060|      0.41|
|     2|1700859420|1700813820|      0.41|
|     3|1700948820|1700903280|      0.44|
|     4|1700949120|1700902920|      0.44|
|     5|1701038580|1700992680|      0.48|
|     6|1701039000|1700992200|      0.48|
|     7|1701128340|1701082260|       0.5|
|     8|1701128880|1701081660|       0.5|
|     9|1701218280|1701171960|      0.55|
|    10|1701218820|1701171300|      0.55|
|    11|1701307980|1701261720|      0.58|
|    12|1701308580|1701261060|      0.58|
|    13|1701397620|1701351480|      0.61|
|    14|1701398160|1701350820|      0.61|
|    15|1701486960|1701441120|      0.64|
|    16|1701487380|1701440520|      0.64|
+------+----------+----------+

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

# Unir df_dim_tiempo con consolidated_df y eliminar duplicados basados en columnas que no son de tipo MAP
fact_table_temp = consolidated_df.join(df_dim_tiempo, "dt", 'inner').select(consolidated_df["*"], df_dim_tiempo["idTiempo"]).dropDuplicates([c for c in consolidated_df.columns if c != 'weather'])

# Unir df_dim_ubicacion con fact_table_temp y eliminar duplicados
fact_table = fact_table_temp.join(df_dim_ubicacion, ["region", "country"], 'inner').select(fact_table_temp["*"], df_dim_ubicacion["idUbicacion"]).dropDuplicates([c for c in consolidated_df.columns if c != 'weather'])

# Unir df_dim_clima con fact_table_temp
fact_table = fact_table.join(df_dim_clima, "summary", 'inner').select(fact_table["*"], df_dim_clima["idClima"]).dropDuplicates([c for c in consolidated_df.columns if c != 'weather'])

# Unir df_dim_Temperatura con fact_table utilizando las columnas de temperatura
fact_table = fact_table.join(df_dim_Temperatura, ["temp_day", "temp_min", "temp_max", "temp_night", "temp_eve", "temp_morn"], 'inner').select(fact_table["*"], df_dim_Temperatura["idTemperatura"])

# Unir df_dim_Luna con fact_table utilizando las columnas relacionadas con la Luna
fact_table = fact_table.join(df_dim_Luna, ["moonrise", "moonset", "moon_phase"], 'inner').select(fact_table["*"], df_dim_Luna["idLuna"])


# Seleccionar solo las columnas de ID para crear la tabla de hechos
fact_table = fact_table.select('idTiempo', 'idUbicacion', 'idTemperatura', 'idLuna','idClima' )
# Agregar una columna 'id' a fact_table
fact_table = fact_table.withColumn('id', monotonically_increasing_id())

# Mover la columna 'id' al principio del DataFrame
columns = ['id'] + [col for col in fact_table.columns if col != 'id']
fact_table = fact_table.select(columns)

print("FactTable:")
fact_table.printSchema()
fact_table.show()

FactTable:
root
 |-- id: long (nullable = false)
 |-- idTiempo: long (nullable = false)
 |-- idUbicacion: integer (nullable = false)
 |-- idTemperatura: integer (nullable = false)
 |-- idLuna: integer (nullable = false)
 |-- idClima: integer (nullable = false)

+---+--------+-----------+-------------+------+-------+
| id|idTiempo|idUbicacion|idTemperatura|idLuna|idClima|
+---+--------+-----------+-------------+------+-------+
|  0|       3|          1|           10|     3|     11|
|  1|       5|          1|           12|     5|     14|
|  2|       7|          1|            9|     7|     16|
|  3|       1|          1|           13|     1|     11|
|  4|      11|          1|           16|    11|     15|
|  5|       9|          1|           15|     9|     11|
|  6|      15|          1|           11|    15|     11|
|  7|      13|          1|           14|    13|     11|
|  8|       3|          2|            8|     4|     11|
|  9|       5|          2|            3|     6|     12|
| 10|     

In [None]:
openweatherHive = SparkSession.builder.appName("DIMENSIONES-HIVE").enableHiveSupport().getOrCreate()

In [None]:
openweatherHive.sql("SHOW DATABASES").show()
openweatherHive.sql("CREATE DATABASE IF NOT EXISTS dimensiones")


+---------+
|namespace|
+---------+
|  default|
+---------+



DataFrame[]

In [None]:
df_dim_tiempo.write.saveAsTable("dimensiones.df_dim_tiempo")
df_dim_ubicacion.write.saveAsTable("dimensiones.df_dim_ubicacion")
df_dim_clima.write.saveAsTable("dimensiones.df_dim_clima")
df_dim_Temperatura.write.saveAsTable("dimensiones.df_dim_Temperatura")
df_dim_Luna.write.saveAsTable("dimensiones.df_dim_Luna")

In [None]:
df_dim_tiempo.show()

+--------+-------------------+----------+----------+-------------+
|idTiempo|                 dt|   sunrise|    sunset| nombre_lugar|
+--------+-------------------+----------+----------+-------------+
|       0|2023-11-24 16:00:00|1700822865|1700867393|Amazonas/Peru|
|       1|2023-11-24 16:00:00|1700822391|1700867748|  Ancash/Peru|
|       2|2023-11-25 16:00:00|1700909278|1700953817|Amazonas/Peru|
|       3|2023-11-25 16:00:00|1700908799|1700954175|  Ancash/Peru|
|       4|2023-11-26 16:00:00|1700995692|1701040240|Amazonas/Peru|
|       5|2023-11-26 16:00:00|1700995209|1701040603|  Ancash/Peru|
|       6|2023-11-27 16:00:00|1701082106|1701126664|Amazonas/Peru|
|       7|2023-11-27 16:00:00|1701081619|1701127031|  Ancash/Peru|
|       8|2023-11-28 16:00:00|1701168031|1701213460|  Ancash/Peru|
|       9|2023-11-28 17:00:00|1701168522|1701213089|Amazonas/Peru|
|      10|2023-11-29 16:00:00|1701254443|1701299889|  Ancash/Peru|
|      11|2023-11-29 17:00:00|1701254938|1701299514|Amazonas/P

In [None]:
#Si se usa colab, ejecutar esta celda
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
df_dim_tiempo.write.csv('/cont ent/drive/MyDrive/BigData/Tab las/df_dim_tiempo.csv',header=True,mode='overwrite')
df_dim_ubicacion.write.csv('/content/drive/MyDrive/BigData/Tablas/df_dim_ubicacion.csv', header=True, mode='overwrite')
df_dim_clima.write.csv('/content/drive/MyDrive/BigData/Tablas/df_dim_clima.csv', header=True, mode='overwrite')
df_dim_Temperatura.write.csv('/content/drive/MyDrive/BigData/Tablas/df_dim_Temperatura.csv', header=True, mode='overwrite')
df_dim_Luna.write.csv('/content/drive/MyDrive/BigData/Tablas/df_dim_Luna.csv', header=True, mode='overwrite')
fact_table.write.csv('/content/drive/MyDrive/BigData/Tablas/fact_table.csv', header=True, mode='overwrite')
