# Ejemplos con Hive

## Descripción de las variables

El dataset, obtenido de <a target = "_blank" href="https://www.transtats.bts.gov/Fields.asp?Table_ID=236">este link</a> está compuesto por las siguientes variables referidas siempre al año 2018:

1. **Month** 1-4
2. **DayofMonth** 1-31
3. **DayOfWeek** 1 (Monday) - 7 (Sunday)
4. **FlightDate** fecha del vuelo
5. **Origin** código IATA del aeropuerto de origen
6. **OriginCity** ciudad donde está el aeropuerto de origen
7. **Dest** código IATA del aeropuerto de destino
8. **DestCity** ciudad donde está el aeropuerto de destino  
9. **DepTime** hora real de salida (local, hhmm)
10. **DepDelay** retraso a la salida, en minutos
11. **ArrTime** hora real de llegada (local, hhmm)
12. **ArrDelay** retraso a la llegada, en minutos: se considera que un vuelo ha llegado "on time" si aterrizó menos de 15 minutos más tarde de la hora prevista en el Computerized Reservations Systems (CRS).
13. **Cancelled** si el vuelo fue cancelado (1 = sí, 0 = no)
14. **CancellationCode** razón de cancelación (A = aparato, B = tiempo atmosférico, C = NAS, D = seguridad)
15. **Diverted** si el vuelo ha sido desviado (1 = sí, 0 = no)
16. **ActualElapsedTime** tiempo real invertido en el vuelo
17. **AirTime** en minutos
18. **Distance** en millas
19. **CarrierDelay** en minutos: El retraso del transportista está bajo el control del transportista aéreo. Ejemplos de sucesos que pueden determinar el retraso del transportista son: limpieza de la aeronave, daño de la aeronave, espera de la llegada de los pasajeros o la tripulación de conexión, equipaje, impacto de un pájaro, carga de equipaje, servicio de comidas, computadora, equipo del transportista, problemas legales de la tripulación (descanso del piloto o acompañante) , daños por mercancías peligrosas, inspección de ingeniería, abastecimiento de combustible, pasajeros discapacitados, tripulación retrasada, servicio de inodoros, mantenimiento, ventas excesivas, servicio de agua potable, denegación de viaje a pasajeros en mal estado, proceso de embarque muy lento, equipaje de mano no válido, retrasos de peso y equilibrio.
20. **WeatherDelay** en minutos: causado por condiciones atmosféricas extremas o peligrosas, previstas o que se han manifestado antes del despegue, durante el viaje, o a la llegada.
21. **NASDelay** en minutos: retraso causado por el National Airspace System (NAS) por motivos como condiciones meteorológicas (perjudiciales pero no extremas), operaciones del aeropuerto, mucho tráfico aéreo, problemas con los controladores aéreos, etc.
22. **SecurityDelay** en minutos: causado por la evacuación de una terminal, re-embarque de un avión debido a brechas en la seguridad, fallos en dispositivos del control de seguridad, colas demasiado largas en el control de seguridad, etc.
23. **LateAircraftDelay** en minutos: debido al propio retraso del avión al llegar, problemas para conseguir aterrizar en un aeropuerto a una hora más tardía de la que estaba prevista.

Leemos el fichero CSV utilizando el delimitador por defecto de Spark (","). La primera línea contiene encabezados (nombres de columnas) por lo que no es parte de los datos y debemos indicarlo con la opción header.

## 1. Leemos los datos desde el CSV que hay en Google Cloud Storage

In [1]:
# Esto no hace nada: la lectura es lazy así que no se lee en realidad hasta que ejecutemos una acción sobre flightsDF
# Solamente se comprueba que exista el fichero en esa ruta, y se leen los nombres de columnas
flightsDF = spark.read.option("header", "true")\
                 .csv("gs://ucm__bucket/data/flights-jan-apr-2018.csv")

## 2. Vamos a mostrar el contenido del metastore de Hive, que ahora mismo no tiene nada

In [2]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



## 3. Creamos una vista temporal de un DF que tiene solo dos columnas

Esto solamente añade metadatos al metastore de Hive, que además se borrarán cuando cerremos el notebook. No guarda datos físicos de la tabla en ningún lado, puesto que el DF está en memoria. O mejor dicho, el DF del que proviene esta vista temporal "no está en ningún lado", porque no hemos cacheado weatherDistanceDF así que cualquier consulta SQL que hagamos sobre la tabla `weatherDistanceTable` provoca que se tenga que re-calcular el DF `weatherDistanceDF` sobre el cual está creada dicha tabla temporal.

In [3]:
weatherDistanceDF = flightsDF.select("WeatherDelay", "Distance")
weatherDistanceDF.createOrReplaceTempView("weatherDistanceTable")

Ahora vemos que la tabla se ha creado como tabla temporal

In [4]:
spark.sql("show tables").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|        |weatherdistancetable|       true|
+--------+--------------------+-----------+



Podemos hacer consultas sobre ella

In [5]:
flightsLejosDF = spark.sql("select * from weatherDistanceTable where Distance > 200 limit 5")
flightsLejosDF.show()

+------------+--------+
|WeatherDelay|Distance|
+------------+--------+
|        null|  374.00|
|        null|  207.00|
|        null|  395.00|
|        null|  395.00|
|        null|  395.00|
+------------+--------+



In [6]:
spark.sql("CACHE TABLE weatherDistanceTable")

DataFrame[]

## 4. Creamos una tabla persistente manejada, guardando como tabla el resultado de una operación con el DF

In [7]:
flightsJFK = flightsDF.where("Origin = 'JFK'").cache()
flightsJFK.write.saveAsTable("flightsjfk") # es una acción: se guardan físicamente los datos en algún sitio de HDFS

Si volvemos a mostrar las tablas que existen, veremos la nueva. Vemos que **no** es temporal, pero no sabemos si es manejada o es externa.

In [8]:
spark.sql("show tables").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|          flightsjfk|      false|
|        |weatherdistancetable|       true|
+--------+--------------------+-----------+



In [9]:
spark.sql("describe formatted flightsjfk").show(50, truncate = False)

+----------------------------+--------------------------------------------------------------+-------+
|col_name                    |data_type                                                     |comment|
+----------------------------+--------------------------------------------------------------+-------+
|Month                       |string                                                        |null   |
|DayofMonth                  |string                                                        |null   |
|DayOfWeek                   |string                                                        |null   |
|FlightDate                  |string                                                        |null   |
|Origin                      |string                                                        |null   |
|OriginCity                  |string                                                        |null   |
|Dest                        |string                                              

### Como vemos, el campo Location indica la carpeta donde se están guardando las tablas *manejadas*, que es la carpeta /user/warehouse/<nombretabla> de HDFS

In [10]:
spark.sql("describe formatted flightsjfk").where("col_name = 'Type'").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|    Type|  MANAGED|       |
+--------+---------+-------+



## 5. Guardamos flightsDF como fichero parquet en HDFS (nada de tablas aún)

Lo vamos a guardar particionado por la columna Origin. Como este DF sólo tiene dos aeropuertos distintos porque hemos retenido solamente los vuelos que salen de SFO o de LAX, Spark creará dos subcarpetas. Dentro de cada subcarpeta habrá tantos ficheros como particiones tiene el DF, que actualmente son 3. Se puede comprobar con `flightsSFO.rdd.getNumPartitions()`

In [11]:
flightsSFO = flightsDF.where("Origin = 'SFO' or Origin = 'LAX'")\
                      .select("FlightDate", "Origin", "Dest", "Distance")
flightsSFO.write.mode("overwrite").partitionBy("Origin").parquet("/flightsSFO.parquet")

In [12]:
!hdfs dfs -ls /flightsSFO.parquet

Found 3 items
drwxr-xr-x   - root hadoop          0 2022-03-08 21:51 /flightsSFO.parquet/Origin=LAX
drwxr-xr-x   - root hadoop          0 2022-03-08 21:51 /flightsSFO.parquet/Origin=SFO
-rw-r--r--   2 root hadoop          0 2022-03-08 21:51 /flightsSFO.parquet/_SUCCESS


In [13]:
!hdfs dfs -ls /flightsSFO.parquet/Origin=LAX

Found 3 items
-rw-r--r--   2 root hadoop      25790 2022-03-08 21:50 /flightsSFO.parquet/Origin=LAX/part-00000-392c83ec-1db3-4192-9c4c-9b008aa7cbc7.c000.snappy.parquet
-rw-r--r--   2 root hadoop      27613 2022-03-08 21:51 /flightsSFO.parquet/Origin=LAX/part-00001-392c83ec-1db3-4192-9c4c-9b008aa7cbc7.c000.snappy.parquet
-rw-r--r--   2 root hadoop      24851 2022-03-08 21:51 /flightsSFO.parquet/Origin=LAX/part-00002-392c83ec-1db3-4192-9c4c-9b008aa7cbc7.c000.snappy.parquet


In [14]:
flightsSFO.rdd.getNumPartitions()

3

## 6. Ahora vamos a crear una tabla EXTERNA a partir del fichero existente /flightsSFO.parquet (que en realidad es una carpeta)

### Hay varias maneras de crear una tabla externa. Aquí vamos a crear una tabla externa a partir de un fichero (carpeta) que ya existe. Más adelante veremos otra manera.

Ojo: tenemos que especificar bien el esquema de la tabla que estamos creando a partir del fichero. El DF que hemos guardado en Parquet tenía 4 columnas, todas de tipo string

In [25]:
flightsSFO.printSchema()

root
 |-- FlightDate: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)



### 6.1. Primera manera: especificar `external table` e indicar el esquema de la nueva tabla

In [15]:
spark.sql("create external table flightsSFO(FlightDate string, Origin string, Dest string, Distance string)\
          stored as parquet location '/flightsSFO.parquet'")

DataFrame[]

### 6.2. Segunda manera (RECOMENDADA): no indicar el esquema pero indicar `using parquet location <ruta>` y la tabla automáticamente se creará con el esquema de ese fichero Parquet

In [16]:
spark.sql("create table flightssfo2 using parquet location '/flightsSFO.parquet'")

DataFrame[]

### Comprobamos que ahora tenemos dos tablas más, persistentes y con idéntico esquema, llamadas flightSFO y flightssfo2

In [17]:
spark.sql("show tables").show(truncate = False)

+--------+--------------------+-----------+
|database|tableName           |isTemporary|
+--------+--------------------+-----------+
|default |flightsjfk          |false      |
|default |flightssfo          |false      |
|default |flightssfo2         |false      |
|        |weatherdistancetable|true       |
+--------+--------------------+-----------+



### ¿La tabla flightsSFO es externa, o es manejada?

In [19]:
spark.sql("describe formatted flightssfo").show(50, truncate = False)

+----------------------------+--------------------------------------------------------------+-------+
|col_name                    |data_type                                                     |comment|
+----------------------------+--------------------------------------------------------------+-------+
|FlightDate                  |string                                                        |null   |
|Origin                      |string                                                        |null   |
|Dest                        |string                                                        |null   |
|Distance                    |string                                                        |null   |
|                            |                                                              |       |
|# Detailed Table Information|                                                              |       |
|Database                    |default                                             

Interesante: en el primer caso la tabla se creó con provider Hive como vemos más arriba (fue la que habíamos indicado el esquema) y en el segundo, se creó con provider Parquet y entonces nos da info de particionado - a pesar de que ambas tablas están particionadas porque tienen el mismo origen de datos: la carpeta en la que figuran las dos subcarpetas

In [20]:
spark.sql("describe formatted flightssfo2").show(50, truncate = False)

+----------------------------+--------------------------------------------------------------+-------+
|col_name                    |data_type                                                     |comment|
+----------------------------+--------------------------------------------------------------+-------+
|FlightDate                  |string                                                        |null   |
|Dest                        |string                                                        |null   |
|Distance                    |string                                                        |null   |
|Origin                      |string                                                        |null   |
|# Partition Information     |                                                              |       |
|# col_name                  |data_type                                                     |comment|
|Origin                      |string                                              

### Ambas tablas son externas y la ubicación física de los datos asociados a ella es la carpeta /flightsSFO.parquet tal como habíamos indicado al crear ambas

In [16]:
spark.sql("describe formatted flightssfo").where("col_name = 'Type'").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|    Type| EXTERNAL|       |
+--------+---------+-------+



## 6.3 Tercera manera de crear una tabla externa: en una misma operación guardamos nuevo el DF en otra ubicación y creamos al mismo tiempo una tabla externa sobre dichos datos

### El detalle para que la tabla sea creada como EXTERNA es indicar `.option("path", "...")` antes de `saveAsTable`. Vamos a guardar el DF en /user/flights de HDFS

In [24]:
flightsJFK.select("FlightDate", "Origin", "Dest", "Distance")\
          .write.option("path", "/user/flights").saveAsTable("flightsjfk_externa")

In [25]:
spark.sql("show tables").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|          flightsjfk|      false|
| default|  flightsjfk_externa|      false|
| default|          flightssfo|      false|
| default|         flightssfo2|      false|
|        |weatherdistancetable|       true|
+--------+--------------------+-----------+



Comprobamos que efectivamente se ha creado como tabla externa:

In [27]:
spark.sql("describe formatted flightsjfk_externa").show(50, truncate = False)

+----------------------------+--------------------------------------------------------------+-------+
|col_name                    |data_type                                                     |comment|
+----------------------------+--------------------------------------------------------------+-------+
|FlightDate                  |string                                                        |null   |
|Origin                      |string                                                        |null   |
|Dest                        |string                                                        |null   |
|Distance                    |string                                                        |null   |
|                            |                                                              |       |
|# Detailed Table Information|                                                              |       |
|Database                    |default                                             

## Vamos a comprobar el efecto de guardar un DF en una carpeta ya existente, sobreescribiendo solamente las particiones de los valores que estén presentes en los datos que ahora estamos guardando

Como flightsJFK sólo tiene Origin el aeropuerto JFK, podríamos guardarlo en la misma ubicación que flightsSFO y pedirle `.mode("overwrite").partitionBy("Origin")` y esa operación NO borrará lo que ya había sino que solamente reemplazará las particiones que ya existieran. Como la única partición que ahora se va a crear es la de JFK y esa no existía, el resultado es que simplemente se añade una partición más (es decir una subcarpeta Origin=JFK) a la carpeta.

**IMPORTANTE**: el DataFrame que vamos a guardar particionado para sobreescribir ciertas particiones debe tener exactamente EL MISMO ESQUEMA que los datos de las particiones ya existentes.

**IMPORTANTE**: esta operación funciona habitualmente sin ninguna configuración adicional en cualquier entorno. Sin embargo en Dataproc es necesario fijar explícitamente el parámetro de Spark llamado `"spark.sql.sources.partitionOverwriteMethod"` al valor `"dynamic"`. 

El método habitual para dar valor a los parámetros es llamar a `spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")`. Podemos consultar qué valor tiene actualmente (es STATIC) poniendo
`spark.conf.get("spark.sql.sources.partitionOverwriteMode")`. 

Aunque en este ejemplo en realidad hemos "añadido" una nueva partición porque los datos que vamos a guardar la segunda vez no contienen ningún origen ya existente (solamente un Origin *nuevo* que es JFK), ocurriría exactamente lo mismo si los datos sí tuviesen algún aeropuerto de particiones ya existentes: se reemplazaría completa la partición de ese aeropuerto ya existente. Si además hubiese aeropuertos que todavía no existían y no tenían subcarpeta, se crearían nuevas particiones (nuevas subcarpetas) para dichos aeropuertos, sin tocar las subcarpetas existentes.

In [31]:
# IMPRESCINDIBLE para que sólo se reemplace en la carpeta las particiones que estemos escribiendo (recordar poner partitionBy porque en caso contrario se reemplaza la carpeta completa!)
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

# IMPRESCINDIBLE para que el DataFrame que vamos a guardar tenga el mismo esquema que los datos ya existentes
flightsJFK.select("FlightDate", "Origin", "Dest", "Distance")\
          .write\
          .mode("overwrite")\
          .partitionBy("Origin")\
          .parquet("/flightsSFO.parquet")

### Comprobamos que ha añadido una subcarpeta `Origin=JFK` y no ha borrado nada de lo que había en esa carpeta

In [32]:
!hdfs dfs -ls /flightsSFO.parquet

Found 4 items
drwxr-xr-x   - root hadoop          0 2022-03-08 22:27 /flightsSFO.parquet/Origin=JFK
drwxr-xr-x   - root hadoop          0 2022-03-08 21:51 /flightsSFO.parquet/Origin=LAX
drwxr-xr-x   - root hadoop          0 2022-03-08 21:51 /flightsSFO.parquet/Origin=SFO
-rw-r--r--   2 root hadoop          0 2022-03-08 22:27 /flightsSFO.parquet/_SUCCESS


## 7. Borramos tabla externa y comprobamos que el fichero Parquet sigue ahí

In [38]:
spark.sql("drop table flightssfo")
spark.sql("show tables").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|          flightsjfk|      false|
| default|  flightsjfk_externa|      false|
| default|         flightssfo2|      false|
|        |weatherdistancetable|       true|
+--------+--------------------+-----------+



### Comprobemos que sigue existiendo la carpeta /flightsSFO.parquet

In [39]:
!hdfs dfs -ls /

Found 5 items
drwxr-xr-x   - root   hadoop          0 2022-03-08 22:27 /flightsSFO.parquet
drwxrwxrwt   - mapred hadoop          0 2022-03-08 21:45 /hadoop
drwxrwxrwt   - mapred hadoop          0 2022-03-08 21:45 /tmp
drwxrwxrwt   - hdfs   hadoop          0 2022-03-08 22:10 /user
drwx-wx-wx   - hive   hadoop          0 2022-03-08 21:45 /var


In [40]:
!hdfs dfs -ls /flightsSFO.parquet

Found 4 items
drwxr-xr-x   - root hadoop          0 2022-03-08 22:27 /flightsSFO.parquet/Origin=JFK
drwxr-xr-x   - root hadoop          0 2022-03-08 21:51 /flightsSFO.parquet/Origin=LAX
drwxr-xr-x   - root hadoop          0 2022-03-08 21:51 /flightsSFO.parquet/Origin=SFO
-rw-r--r--   2 root hadoop          0 2022-03-08 22:27 /flightsSFO.parquet/_SUCCESS


## 8. Comprobamos dónde están los datos de la tabla persistente manejada que habíamos guardado con saveAsTable

In [41]:
!hdfs dfs -ls /user/hive/warehouse

Found 1 items
drwxr-xr-x   - root hadoop          0 2022-03-08 21:50 /user/hive/warehouse/flightsjfk


In [42]:
!hdfs dfs -ls /user/hive/warehouse/flightsjfk

Found 4 items
-rw-r--r--   2 root hadoop          0 2022-03-08 21:50 /user/hive/warehouse/flightsjfk/_SUCCESS
-rw-r--r--   2 root hadoop     150918 2022-03-08 21:50 /user/hive/warehouse/flightsjfk/part-00000-3a083775-c2fc-43bf-afd2-20cb7c111df5-c000.snappy.parquet
-rw-r--r--   2 root hadoop     192483 2022-03-08 21:50 /user/hive/warehouse/flightsjfk/part-00001-3a083775-c2fc-43bf-afd2-20cb7c111df5-c000.snappy.parquet
-rw-r--r--   2 root hadoop     158835 2022-03-08 21:50 /user/hive/warehouse/flightsjfk/part-00002-3a083775-c2fc-43bf-afd2-20cb7c111df5-c000.snappy.parquet


## 9. Borramos la tabla manejada y comprobamos que, al ser manejada, Spark ha borrado físicamente esos datos al borrar la tabla

In [43]:
spark.sql("drop table flightsjfk")
spark.sql("show tables").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|  flightsjfk_externa|      false|
| default|         flightssfo2|      false|
|        |weatherdistancetable|       true|
+--------+--------------------+-----------+



In [44]:
!hdfs dfs -ls /user/hive/warehouse/flightsjfk

ls: `/user/hive/warehouse/flightsjfk': No such file or directory


## 10. Si borramos la tabla flightsjfk_externa, que se creó como externa directamente con saveAsTable, el borrado de la tabla no borrará los datos de HDFS

In [45]:
spark.sql("drop table flightsjfk_externa")
spark.sql("show tables").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|         flightssfo2|      false|
|        |weatherdistancetable|       true|
+--------+--------------------+-----------+



In [46]:
!hdfs dfs -ls /user/flights

Found 4 items
-rw-r--r--   2 root hadoop          0 2022-03-08 22:10 /user/flights/_SUCCESS
-rw-r--r--   2 root hadoop      11516 2022-03-08 22:10 /user/flights/part-00000-6c1f167e-47f4-4bec-93d8-d14677885e49-c000.snappy.parquet
-rw-r--r--   2 root hadoop      13120 2022-03-08 22:10 /user/flights/part-00001-6c1f167e-47f4-4bec-93d8-d14677885e49-c000.snappy.parquet
-rw-r--r--   2 root hadoop      16225 2022-03-08 22:10 /user/flights/part-00002-6c1f167e-47f4-4bec-93d8-d14677885e49-c000.snappy.parquet
