# Proceso ETL

Este Notebook realiza los procesos de ETL para las diferentes Tablas raw de IDEAM.

## 1. Tabla Estaciones

In [1]:
df_stations=spark.sql("SELECT * FROM climaticchange.ideamstations_catalog") #Leer Tabla en BD de GLUE
df_stations.printSchema() #Revisando el esquema del DataFrame
print(df_stations.count()) #Validando número de registros
df_stations.show(2)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,application_1693436929519_0004,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- objectid: long (nullable = true)
 |-- codigo: long (nullable = true)
 |-- nombre: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- tecnologia: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- fecha_instalacion: string (nullable = true)
 |-- altitud: long (nullable = true)
 |-- latitud: string (nullable = true)
 |-- longitud: string (nullable = true)
 |-- departamento: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- area_operativa: string (nullable = true)
 |-- area_hidrografica: string (nullable = true)
 |-- zona_hidrografica: string (nullable = true)
 |-- observacion: string (nullable = true)
 |-- corriente: string (nullable = true)
 |-- fecha_suspension: string (nullable = true)
 |-- subzona_hidrografica: string (nullable = true)
 |-- entidad: string (nullable = true)
 |-- subred: string (nullable = true)

9236
+--------+--------+--------------------+-------------------+--------------------+----------+---------

In [2]:
#Analizando las columnas
df_stations.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['objectid', 'codigo', 'nombre', 'categoria', 'tecnologia', 'estado', 'fecha_instalacion', 'altitud', 'latitud', 'longitud', 'departamento', 'municipio', 'area_operativa', 'area_hidrografica', 'zona_hidrografica', 'observacion', 'corriente', 'fecha_suspension', 'subzona_hidrografica', 'entidad', 'subred']

In [3]:
#Filtrando columnas innecesarias y códigos iguales a Null
df_stations_filtered=df_stations.drop('subred')
df_stations_filtered=df_stations_filtered.filter(df_stations['codigo'].isNull()==False)
df_stations_filtered.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------+--------------------+-------------------+--------------------+----------------+-----------------+-------+-----------+------------+------------+-------------------+--------------------+-----------------+-----------------+--------------------+---------+----------------+--------------------+--------------------+
|objectid|  codigo|              nombre|          categoria|          tecnologia|          estado|fecha_instalacion|altitud|    latitud|    longitud|departamento|          municipio|      area_operativa|area_hidrografica|zona_hidrografica|         observacion|corriente|fecha_suspension|subzona_hidrografica|             entidad|
+--------+--------+--------------------+-------------------+--------------------+----------------+-----------------+-------+-----------+------------+------------+-------------------+--------------------+-----------------+-----------------+--------------------+---------+----------------+--------------------+--------------------+
|       1|

In [6]:
#Guardando resultado
url_result_stations='s3://climaticchange-datalake/datasets_ideam/trusted/aux_data/stations_catalog/'
df_stations_filtered.write.format("csv").option("header","true").mode("overwrite").save(url_result_stations)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 2. Tabla Variables

Para crear la `Tabla Variables Trusted`, se deben Leer, Transformar y Consolidar 3 tablas diferentes.

In [4]:
#Tabla 1
df_variables_gen=spark.sql("SELECT `Tipo Red`,Parametro, Etiqueta,Unidad,Periodo,Descripcion FROM climaticchange.ideamcatalogo_catalogovariables_ideam_csv")
df_variables_gen.printSchema()
df_variables_gen = spark.createDataFrame(df_variables_gen.tail(df_variables_gen.count()-1), df_variables_gen.schema)
print(df_variables_gen.count())
df_variables_gen.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Tipo Red: string (nullable = true)
 |-- Parametro: string (nullable = true)
 |-- Etiqueta: string (nullable = true)
 |-- Unidad: string (nullable = true)
 |-- Periodo: string (nullable = true)
 |-- Descripcion: string (nullable = true)

61
+--------------+---------+-----------+------+--------+--------------------+
|      Tipo Red|Parametro|   Etiqueta|Unidad| Periodo|         Descripcion|
+--------------+---------+-----------+------+--------+--------------------+
|Convencionales|    Nivel|   NVLG_CON|    cm| Horario|Nivel del rio hor...|
|Convencionales|    Nivel|   NVLM_CON|    cm|12 horas|Nivel del rio a l...|
|   Autom�ticas|    Nivel|  NV_AUT_60|    cm| Horario|Nivel horario  po...|
|   Autom�ticas|    Nivel|NVMN_AUT_60|    cm| Horario|Nivel m�nimo horario|
|   Autom�ticas|    Nivel|NVMX_AUT_60|    cm| Horario|Nivel m�ximo horario|
+--------------+---------+-----------+------+--------+--------------------+
only showing top 5 rows

In [5]:
#Tabla 2
df_variables_h=spark.sql("SELECT `Tipo Red`,Parametro, Etiqueta,Unidad,Periodo,Descripcion FROM climaticchange.ideamcatalogovariables_derivadas_h_ideam_csv")
df_variables_h.printSchema()
df_variables_h = spark.createDataFrame(df_variables_h.tail(df_variables_h.count()-1), df_variables_h.schema)
print(df_variables_h.count())
df_variables_h.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Tipo Red: string (nullable = true)
 |-- Parametro: string (nullable = true)
 |-- Etiqueta: string (nullable = true)
 |-- Unidad: string (nullable = true)
 |-- Periodo: string (nullable = true)
 |-- Descripcion: string (nullable = true)

52
+------------+---------+----------+------+-------+--------------------+
|    Tipo Red|Parametro|  Etiqueta|Unidad|Periodo|         Descripcion|
+------------+---------+----------+------+-------+--------------------+
|Convencional|    Nivel|   NIVEL_H|    cm|horaria|Nivel horario val...|
|Convencional|    Nivel|NV_MEDIA_D|    cm| Diaria|  Nivel medio diario|
|Convencional|    Nivel|   NV_MX_D|    cm| Diaria| Nivel m�ximo diario|
|Convencional|    Nivel|   NV_MN_D|    cm| Diaria| Nivel m�nimo diario|
|Convencional|    Nivel|NV_MEDIA_M|    cm|Mensual| Nivel medio mensual|
+------------+---------+----------+------+-------+--------------------+
only showing top 5 rows

In [6]:
#Tabla 3
df_variables_m=spark.sql("SELECT `Tipo Red`,Parametro, Etiqueta,Unidad,Periodo,Descripcion FROM climaticchange.ideamcatalogovariables_derivadas_m_ideam_csv",header=1)
df_variables_m.printSchema()
df_variables_m = spark.createDataFrame(df_variables_m.tail(df_variables_m.count()-1), df_variables_m.schema)
print(df_variables_m.count())
df_variables_m.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Tipo Red: string (nullable = true)
 |-- Parametro: string (nullable = true)
 |-- Etiqueta: string (nullable = true)
 |-- Unidad: string (nullable = true)
 |-- Periodo: string (nullable = true)
 |-- Descripcion: string (nullable = true)

351
+--------------+------------+---------+---------+-------+--------------------+
|      Tipo Red|   Parametro| Etiqueta|   Unidad|Periodo|         Descripcion|
+--------------+------------+---------+---------+-------+--------------------+
|Convencionales|BRILLO SOLAR|BSHG_TT_D|horas/sol| Diario|Brillo solar tota...|
|Convencionales|BRILLO SOLAR|BSHG_TT_M|horas/sol|Mensual|Brillo solar tota...|
|Convencionales|BRILLO SOLAR|BSHG_TT_A|horas/sol|  Anual|Brillo solar tota...|
+--------------+------------+---------+---------+-------+--------------------+
only showing top 3 rows

In [7]:
#Tabla Consolidada
df_consolidated = df_variables_gen.union(df_variables_h).union(df_variables_m)
print(df_consolidated.count())
df_consolidated.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

464
+--------------+---------+---------+------+--------+--------------------+
|      Tipo Red|Parametro| Etiqueta|Unidad| Periodo|         Descripcion|
+--------------+---------+---------+------+--------+--------------------+
|Convencionales|    Nivel| NVLG_CON|    cm| Horario|Nivel del rio hor...|
|Convencionales|    Nivel| NVLM_CON|    cm|12 horas|Nivel del rio a l...|
|   Autom�ticas|    Nivel|NV_AUT_60|    cm| Horario|Nivel horario  po...|
+--------------+---------+---------+------+--------+--------------------+
only showing top 3 rows

In [9]:
#Guardando tabla consolidada
url_result_variables='s3://climaticchange-datalake/datasets_ideam/trusted/aux_data/variables_catalog/'
df_consolidated.write.format("csv").option("header","true").mode("overwrite").save(url_result_variables)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 3. Tabla variables climáticas



In [15]:
df_hist=spark.sql("SELECT codigoestacion,entidad,etiqueta,fecha,valor FROM climaticchange.ideamhist_data") #Leer Tabla en BD de GLUE
print("Sin quitar null:" ,df_hist.count())
df_hist=df_hist.filter(df_hist['codigoestacion'].isNull()==False)
df_hist.printSchema() #Revisando el esquema del DataFrame
print("Despues de quitar null:",df_hist.count()) #Validando número de registros
df_hist.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Sin quitar null: 41534
root
 |-- codigoestacion: long (nullable = true)
 |-- entidad: string (nullable = true)
 |-- etiqueta: string (nullable = true)
 |-- fecha: string (nullable = true)
 |-- valor: double (nullable = true)

Despu?s de quitar null: 41529
+--------------+--------------------+-----------+----------------+-----+
|codigoestacion|             entidad|   etiqueta|           fecha|valor|
+--------------+--------------------+-----------+----------------+-----+
|      26205080|INSTITUTO DE HIDR...|HR_CAL_MX_D|2020-03-29 00:00| 88.0|
|      26205080|INSTITUTO DE HIDR...|HR_CAL_MX_D|2020-03-30 00:00| 96.0|
|      26205080|INSTITUTO DE HIDR...|HR_CAL_MX_D|2020-03-31 00:00| 94.0|
|      26205080|INSTITUTO DE HIDR...|HR_CAL_MX_D|2020-04-01 00:00| 96.0|
|      26205080|INSTITUTO DE HIDR...|HR_CAL_MX_D|2020-04-02 00:00| 90.0|
+--------------+--------------------+-----------+----------------+-----+
only showing top 5 rows

In [16]:
#Guardando tabla filtrada
url_result_variables='s3://climaticchange-datalake/datasets_ideam/trusted/hist_data/'
df_hist.write.format("csv").option("header","true").mode("overwrite").save(url_result_variables)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…