### DWH en la capa golden en formato .parquet
Se realiza el modelado de los datos utilizando un Esquema Estrella.

<p align="center">
  <img src="DER_Modelo_Covid19.png" width="500">
</p>

Se obtienen los datos desde la capa Silver para modelarlos.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, year, month, dayofmonth, date_format

# Crear la sesión de Spark
spark = SparkSession.builder \
    .appName("dwh_covid") \
    .master("local[*]") \
    .getOrCreate()

df_spark = spark.read.parquet("/opt/spark/scripts/data/silver/casos_diarios")
df_spark.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/22 19:38:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/07/22 19:38:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/07/22 19:38:35 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

+--------------------+-------------------+---------+----------+----------+-----------+-------+-----------+-------+--------------------+
|    provincia_estado|               pais|      lat|      long|     fecha|confirmados|muertes|recuperados|activos|          region_who|
+--------------------+-------------------+---------+----------+----------+-----------+-------+-----------+-------+--------------------+
|         Desconocido|        Afghanistan| 33.93911| 67.709953|2020-01-22|          0|      0|          0|      0|Eastern Mediterra...|
|         Desconocido|            Albania|  41.1533|   20.1683|2020-01-22|          0|      0|          0|      0|              Europe|
|         Desconocido|            Algeria|  28.0339|    1.6596|2020-01-22|          0|      0|          0|      0|              Africa|
|         Desconocido|            Andorra|  42.5063|    1.5218|2020-01-22|          0|      0|          0|      0|              Europe|
|         Desconocido|             Angola| -11.2

Se crea la **dimension "ubicacion"** a partir de la tabla con un distinct. Para el id de la dimension también podria ser utilizando un hash para generar la clave subrogada, en este caso, se utiliza una columna autonumérica.

In [2]:
dim_ubicacion = df_spark.select(
    col("pais").alias("pais"), 
    col("provincia_estado").alias("provincia_estado"),
    col("lat").cast("double").alias("latitud"),
    col("long").cast("double").alias("longitud"),
    col("region_who").alias("region_oms")
).distinct().withColumn("id_ubicacion", monotonically_increasing_id())

Se crea la **dimension "fecha"** a partir de la tabla con un distinct. La dimension fecha normalmente es una dimension estatica y se podria generar una unica vez, en este caso se genera a partir de los datos de la tabla.

In [3]:
df_con_fecha = df_spark.withColumn("fecha", col("fecha").cast("date"))
dim_fecha = df_con_fecha.select("fecha").distinct() \
    .withColumn("id_fecha", date_format("fecha", "yyyyMMdd").cast("int")) \
    .withColumn("anio", year("fecha")) \
    .withColumn("mes", month("fecha")) \
    .withColumn("dia", dayofmonth("fecha")) \
    .withColumn("dia_semana", date_format("fecha", "EEEE"))

Se crea la **tabla de hechos fact_covid_casos**. Normalmente una tabla de hechos contiene mayormente datos numericos (medidas) y los ids de las dimensiones. 

In [4]:
# Primero unir con dimension ubicación para obtener id_ubicacion
df_hechos = df_con_fecha.join(dim_ubicacion,
    (df_con_fecha["pais"] == dim_ubicacion["pais"]) &
    (df_con_fecha["provincia_estado"] == dim_ubicacion["provincia_estado"])
)

# Luego unir con dimensión fecha
df_fact_covid = df_hechos.join(dim_fecha, df_con_fecha["fecha"] == dim_fecha["fecha"])

# Seleccionar columnas finales
fact_covid_casos = df_fact_covid.select(
    "id_fecha",
    "id_ubicacion",
    col("confirmados").cast("int").alias("confirmados"),
    col("muertes").cast("int").alias("muertes"),
    col("recuperados").cast("int").alias("recuperados"),
    col("activos").cast("int").alias("activos")
)

fact_covid_casos.show(truncate  = False)

                                                                                

+--------+------------+-----------+-------+-----------+-------+
|id_fecha|id_ubicacion|confirmados|muertes|recuperados|activos|
+--------+------------+-----------+-------+-----------+-------+
|20200122|48          |0          |0      |0          |0      |
|20200122|13          |0          |0      |0          |0      |
|20200122|58          |0          |0      |0          |0      |
|20200122|188         |0          |0      |0          |0      |
|20200122|8           |0          |0      |0          |0      |
|20200122|251         |0          |0      |0          |0      |
|20200122|153         |0          |0      |0          |0      |
|20200122|77          |0          |0      |0          |0      |
|20200122|180         |0          |0      |0          |0      |
|20200122|46          |0          |0      |0          |0      |
|20200122|42          |0          |0      |0          |0      |
|20200122|258         |0          |0      |0          |0      |
|20200122|85          |0          |0    

Se guardan las dimensiones y la tabla de hechos en parquet en la capa Golden

In [5]:
dim_ubicacion.write.mode("overwrite").parquet("/opt/spark/scripts/data/golden/dim_ubicacion")
dim_fecha.write.mode("overwrite").parquet("/opt/spark/scripts/data/golden/dim_fecha")
fact_covid_casos.write.mode("overwrite").parquet("/opt/spark/scripts/data/golden/fact_covid_casos")

                                                                                

Se verifican las dimensiones y tablas de hechos

In [6]:
dim_ubicacion_loaded = spark.read.parquet("/opt/spark/scripts/data/golden/dim_ubicacion")
dim_fecha_loaded = spark.read.parquet("/opt/spark/scripts/data/golden/dim_fecha")
fact_covid_casos_loaded = spark.read.parquet("/opt/spark/scripts/data/golden/fact_covid_casos")

dim_ubicacion_loaded.show()
dim_fecha_loaded.show()
fact_covid_casos_loaded.show()

+--------------------+--------------------+---------+-------------------+--------------------+------------+
|                pais|    provincia_estado|  latitud|           longitud|          region_oms|id_ubicacion|
+--------------------+--------------------+---------+-------------------+--------------------+------------+
|   Equatorial Guinea|         Desconocido|   1.6508|            10.2679|              Africa|           0|
|               Malta|         Desconocido|  35.9375|            14.3754|              Europe|           1|
|             Moldova|         Desconocido|  47.4116|            28.3699|              Europe|           2|
|              Kosovo|         Desconocido|42.602636|          20.902977|              Europe|           3|
|      United Kingdom|            Anguilla|  18.2206|           -63.0686|              Europe|           4|
|      Western Sahara|         Desconocido|  24.2155|           -12.8858|              Africa|           5|
|          Azerbaijan|      