# Manipulación de datos a través de jupyterhub con ayuda  de spark, sql y s3

En el siguiente documento haremos una análisis exploratorio rápido de la base de datos climate_db, valiendonos de las herramientas que nos proporciona **pyspark**.


### Inicializamos el ambiente spark

In [1]:
spark
sc

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1646601584569_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-1>

### Se importan las funciones de pyspark para uso en sql

In [2]:
from pyspark.sql.functions import *

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Podemos ver las bases que tenemos actualmente utilizando el siguiente comando

In [3]:
spark.sql("SHOW DATABASES").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+
|       namespace|
+----------------+
|      airlinesdb|
|      climate_db|
|       climatedb|
|         default|
|           onudb|
|      trickit_db|
|zip_db_bkproject|
+----------------+

La anterior lista mustra las diferentes bases creadas a partir de las practicas en el taller del curso y en las individuales. Nos centraremos en la llamada **climate_db**.

### Cargamos la base de nuestro interés

In [4]:
spark.sql("USE climate_db;").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

++
||
++
++

### Hacemos una lectura de la tabla de interés dentro de la base y la alojamos en una variable que llamaremos climate.

In [5]:
climate = spark.sql("SELECT * FROM climate;")
climate.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+---------------------+----------+-----------+------------+-----------------------+------------+--------------------+--------------------+----------------+--------------------+--------+--------+-------------+--------------------+
|               fecha|autoridad_ambiental|nombre_de_la_estaci_n|tecnolog_a|    latitud|    longitud|c_digo_del_departamento|departamento|c_digo_del_municipio|nombre_del_municipio|tipo_de_estaci_n|tiempo_de_exposici_n|variable|unidades|concentraci_n|     geocoded_column|
+--------------------+-------------------+---------------------+----------+-----------+------------+-----------------------+------------+--------------------+--------------------+----------------+--------------------+--------+--------+-------------+--------------------+
|               fecha|autoridad_ambiental| nombre_de_la_esta...|tecnolog_a|       null|        null|                   null|departamento|                null|nombre_del_municipio|tipo_de_

La anterior es guardada como un dataframe de spark.

### Encontramos ahora el número de registros en nuestra tabla

Lo haremos de dos maneras, usando un comando de sql y otro de pyspark

In [6]:
spark.sql("SELECT count(*) FROM climate;").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|11426916|
+--------+

In [7]:
climate.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

11426916

Por lo tanto las dimensiones iniciales de nuestro dataframe **climate** son:

In [8]:
print(("{} filas y {} columnas").format(climate.count(),len(climate.columns)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

11426916 filas y 16 columnas

### A grandes rasgos nuestro dataframe tiene las siguientes características

In [11]:
climate.describe().select('fecha','nombre_de_la_estaci_n','departamento','nombre_del_municipio','concentraci_n').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------------------+------------+--------------------+------------------+
|               fecha|nombre_de_la_estaci_n|departamento|nombre_del_municipio|     concentraci_n|
+--------------------+---------------------+------------+--------------------+------------------+
|            11426916|             11426916|    11426916|            11426916|          11056074|
|                null|                 null|        null|                null|125.24522733592697|
|                null|                 null|        null|                null|496.00295032775153|
|01/01/2011 01:00:...|           Base Aérea|   ANTIOQUIA|        BARRANQUILLA|            -35.66|
|               fecha| nombre_de_la_esta...|departamento|nombre_del_municipio|          733378.0|
+--------------------+---------------------+------------+--------------------+------------------+

En el anterior caso se filtraron para algunas columnas debido a que no era muy legible para la totalidad de estas.

### Las columnas que tenemos en nuestro spark dataframe son 

In [12]:
climate.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['fecha', 'autoridad_ambiental', 'nombre_de_la_estaci_n', 'tecnolog_a', 'latitud', 'longitud', 'c_digo_del_departamento', 'departamento', 'c_digo_del_municipio', 'nombre_del_municipio', 'tipo_de_estaci_n', 'tiempo_de_exposici_n', 'variable', 'unidades', 'concentraci_n', 'geocoded_column']

### Para nuestro ínteres de análisis no son necesarias todas las columnas, por tal razón quitaremos algunas de nuestro DataFrame

In [14]:
climate = climate.select('fecha','nombre_de_la_estaci_n','departamento','nombre_del_municipio','concentraci_n')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Ahora tenemos un datafram de menor dimensión, lo que nos permitirá mayor agilidad en las operaciones con este. A partir de este momento nos podemos hacer algunas preguntas con respecto a los datos.

### ¿ De cuales municipios tenemos registro en nuestra base?

Para esto revisamos los valores únicos en la columna asociada a los municipios

In [15]:
climate.select('nombre_del_municipio').distinct().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|nombre_del_municipio|
+--------------------+
|            MEDELLÍN|
|nombre_del_municipio|
| CARTAGENA DE INDIAS|
|        BOGOTÁ. D.C.|
|                CALI|
|        BARRANQUILLA|
+--------------------+

### Como vemos tenemos dentro de la columna una opción que no corresponde al nombre de un municipio, la cual es "nombre_del_municipio". Veamos que registros tienen este valor:

In [16]:
climate.filter(climate.nombre_del_municipio =='nombre_del_municipio').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+---------------------+------------+--------------------+-------------+
|fecha|nombre_de_la_estaci_n|departamento|nombre_del_municipio|concentraci_n|
+-----+---------------------+------------+--------------------+-------------+
|fecha| nombre_de_la_esta...|departamento|nombre_del_municipio|         null|
|fecha| nombre_de_la_esta...|departamento|nombre_del_municipio|         null|
|fecha| nombre_de_la_esta...|departamento|nombre_del_municipio|         null|
|fecha| nombre_de_la_esta...|departamento|nombre_del_municipio|         null|
|fecha| nombre_de_la_esta...|departamento|nombre_del_municipio|         null|
+-----+---------------------+------------+--------------------+-------------+

### Vemos que para este caso no tenemos registros de información relevante, por lo que se puede hacer limpieza de estas entradas. Para esto creamos una nueva variable climate_ft, la cual alojará el dataframe sin dichos datos.

In [17]:
climate_ft = climate.filter(climate.nombre_del_municipio!= 'nombre_del_municipio')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

El nuevo número de registros en nuestro dataframe es

In [18]:
climate_ft.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

11426911

A grandes rasgos en el nuevo dataframe es de la forma:

In [19]:
climate_ft.describe().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+---------------------+---------------+--------------------+------------------+
|summary|               fecha|nombre_de_la_estaci_n|   departamento|nombre_del_municipio|     concentraci_n|
+-------+--------------------+---------------------+---------------+--------------------+------------------+
|  count|            11426911|             11426911|       11426911|            11426911|          11056074|
|   mean|                null|                 null|           null|                null|125.24522733592701|
| stddev|                null|                 null|           null|                null|496.00295032775165|
|    min|01/01/2011 01:00:...|           Base Aérea|      ANTIOQUIA|        BARRANQUILLA|            -35.66|
|    max|    31/12/2018 23:00| Zona Franca La Ca...|VALLE DEL CAUCA|            MEDELLÍN|          733378.0|
+-------+--------------------+---------------------+---------------+--------------------+------------------+

### Comprobamos que ya no tengamos dichos registros

In [20]:
climate_ft.select('nombre_del_municipio').distinct().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|nombre_del_municipio|
+--------------------+
|            MEDELLÍN|
| CARTAGENA DE INDIAS|
|        BOGOTÁ. D.C.|
|                CALI|
|        BARRANQUILLA|
+--------------------+

Se evidencia que ya solo tenemos valores acordes para dicha entrada de municipios.

### Podemos preguntarnos por la concentración promedio por ciudad; antes de respondernos esta inquietud verificamos si tenemos valores nulos para alguna variable.

#### Nombre del municipio

In [21]:
climate_ft.filter(climate.nombre_del_municipio== 'null').count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

#### Concentración

In [22]:
climate_ft.filter(climate.concentraci_n== 'null').count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

#### Nombre de la estación

In [23]:
climate_ft.filter(climate.nombre_de_la_estaci_n== 'null').count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

#### Departamento

In [24]:
climate_ft.filter(climate.departamento== 'null').count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

De esta manera observamos que no tenemos valores nulos para las variables filtradas, lo que nos permite continuar con nuestro análisis. Antes de esto guardaremos nuestro dataframe como un **csv** en la carpeta trusted de nuestro bucket.

In [25]:
climate_ft.coalesce(1).write.format('csv').option("header","true").save("s3://bkprojectone/data/trusted/climate_filtered")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

De esta manera los datos quedan guardados en nuestra carpeta trusted.

### Ahora bien, podemos preguntarnos por el número de casos que tenemos asociados a cada municipio presente en nuestro dataframe, para esto hacemos lo siguiente

In [26]:
distri_registros = climate_ft.groupBy("nombre_del_municipio").count().orderBy(desc('count'))
distri_registros.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+
|nombre_del_municipio|  count|
+--------------------+-------+
|        BOGOTÁ. D.C.|6849391|
|            MEDELLÍN|2604177|
|                CALI|1614369|
| CARTAGENA DE INDIAS| 190485|
|        BARRANQUILLA| 168489|
+--------------------+-------+

Esta tabla la podemos guardar, ya que luego podría ser usada para mostrar un gráfico de la distribución de casos por municipio.

In [27]:
distri_registros.coalesce(1).write.format('csv').option("header","true").save("s3://bkprojectone/data/refined/climate_dist_mun")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

En este punto nos podemos responder la pregunta, plantead líneas atrás, acerca de la **concentración promedio por ciudad**, para esto ejecutamos las siguientes lineas

In [28]:
concen_mun = climate_ft.groupBy('nombre_del_municipio').agg({'concentraci_n':'mean'})
concen_mun.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+
|nombre_del_municipio|avg(concentraci_n)|
+--------------------+------------------+
|            MEDELLÍN| 151.8538019096056|
| CARTAGENA DE INDIAS|274.02173696134975|
|        BOGOTÁ. D.C.| 123.0516412479049|
|                CALI| 69.04594798798497|
|        BARRANQUILLA|177.57875409959706|
+--------------------+------------------+

Redondeando un poco la cifra obtenemos lo siguiente

In [29]:
concen_mun = concen_mun.withColumn("round_concentracion", round(col("avg(concentraci_n)"), 2)).select('nombre_del_municipio','round_concentracion')

concen_mun.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+
|nombre_del_municipio|round_concentracion|
+--------------------+-------------------+
|            MEDELLÍN|             151.85|
| CARTAGENA DE INDIAS|             274.02|
|        BOGOTÁ. D.C.|             123.05|
|                CALI|              69.05|
|        BARRANQUILLA|             177.58|
+--------------------+-------------------+

De nuevo guardamos los datos para futuras aplicaciones, tipo gráficos.

In [30]:
concen_mun.coalesce(1).write.format('csv').option("header","true").save("s3://bkprojectone/data/refined/concent_mun")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### De esta manera pudimos observar un proceso simple de manipulación de datos a través de jupyterhub con la ayuda del procesamiento de spark. De igual manera se hizo uso de la implementación de las diferentes zonas de trabajo dentro de nuestro datalake, este último un bucket de s3.