# Big Data Aplicado - Práctica 2 - Hadoop+Flume+PIG

Se cargan datos con Flume a partir de la técnica de spooldir, para su salida transformarla y cargarla en fichero de HDFS con PIG.

## Instalación de herramientas

### Instalación de Hadoop

Descargamos Hadoop, descomprimimos y movemos a la carpeta de programas.

In [1]:
%%bash
wget -q https://downloads.apache.org/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz
tar -xzf hadoop-3.3.1.tar.gz
mv hadoop-3.3.1/ /usr/local/

Creamos las variables de entorno necesarias.

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64/"
os.environ["PATH"] = os.environ["PATH"] + ":" + "/usr/local/hadoop-3.3.1/bin"

### Instalación de Flume

Descargamos Flume, descomprimimos y movemos a la carpeta de programas.

In [3]:
%%bash
wget -q https://dlcdn.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
tar xzf apache-flume-1.9.0-bin.tar.gz
mv apache-flume-1.9.0-bin/ /usr/local/

Creamos las variables de entorno necesarias.

In [4]:
os.environ["FLUME_HOME"] = "/usr/local/apache-flume-1.9.0-bin"
os.environ["PATH"] = os.environ["PATH"] + ":" + "/usr/local/apache-flume-1.9.0-bin/bin"
os.environ["JAVA_OPTS"]= "-Xms400m -Xmx3000m -Dcom.sun.management.jmxremote"

### Instalación de PIG

Descargamos PIG, descomprimimos y movemos a la carpeta de programas.

In [5]:
%%bash
wget -q https://downloads.apache.org/pig/pig-0.17.0/pig-0.17.0.tar.gz
tar -xzf pig-0.17.0.tar.gz
mv pig-0.17.0/ /usr/local/

Creamos las variables de entorno necesarias.

In [6]:
os.environ["PIG_HOME"] = "/usr/local/pig-0.17.0"
os.environ["PATH"] = os.environ["PATH"] + ":" + "/usr/local/pig-0.17.0/bin"
os.environ["PIG_CLASSPATH"] = "/usr/local/hadoop-3.3.1/conf"

### Comprobación de instalación

Ejecutamos los comandos para ver las distintas versiones instaladas de las herramientas.

In [7]:
!hadoop version

Hadoop 3.3.1
Source code repository https://github.com/apache/hadoop.git -r a3b9c37a397ad4188041dd80621bdeefc46885f2
Compiled by ubuntu on 2021-06-15T05:13Z
Compiled with protoc 3.7.1
From source with checksum 88a4ddb2299aca054416d6b7f81ca55
This command was run using /usr/local/hadoop-3.3.1/share/hadoop/common/hadoop-common-3.3.1.jar


In [8]:
!flume-ng version

Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9


In [21]:
!pig -version

Apache Pig version 0.17.0 (r1797386) 
compiled Jun 02 2017, 15:41:58


## Ejecución de Flume por Spooling Dir

### Preparación de datos

Descargamos un CSV que contiene 'Indicadores clave de enfermedades cardíacas 2020' (utilizado en la práctica de Sistemas de Big Data), renombrándolo a *datos.csv*.

Enlace a la fuente original: https://www.kaggle.com/kamilpytlak/personal-key-indicators-of-heart-disease

In [22]:
!gdown -O datos.csv "1_nupX9-MOto1mTcM7RsHzM4PzOZXnnT6"

Downloading...
From: https://drive.google.com/uc?id=1_nupX9-MOto1mTcM7RsHzM4PzOZXnnT6
To: /content/datos.csv
100% 25.2M/25.2M [00:00<00:00, 63.2MB/s]


Debido a que la descarga de datos, a veces no es capaz de realizarla, por seguridad se incluyen los siguientes borrados de directorios que tienen que estar vacíos o no tienen que existir para un correcto funcionamiento de una posible re-ejecución.

In [23]:
%%bash
rm -rf entrada_datos/
rm -rf salida/
rm -rf results/

Creamos el directorio de entrada que monitorizará el agente de Flume para el *'spooling dir'*.

In [24]:
!mkdir -p entrada_datos


Como el fichero tiene más de 300 mil registros, vamos quedarnos solo con los 10 mil primeros, eliminando la primera línea de cabecera y generándose en la carpeta creada anteriormente.

In [25]:
!head -10001 datos.csv > /content/entrada_datos/datos.csv 
!sed -i 1d /content/entrada_datos/datos.csv 

Verificamos que el fichero se ha creado, listando el contenido del directorio.

In [26]:
!hdfs dfs -ls entrada_datos/

Found 1 items
-rw-r--r--   1 root root     797964 2022-05-05 11:47 entrada_datos/datos.csv


### Agente Flume

Creamos el fichero de configuración del agente de Flume, donde indicamos como entrada que es de tipo *spooldir* y el directorio que tiene que monitorizar. Como canal indicamos que será de tipo fichero. Y como sumidero ponemos que el tipo será HDFS, el directorio donde se guardará (lo crea Flume directamente si no existe), la escritura del fichero será en modo texto, y añadimos varias propiedades *roll* para que genere ficheros más grandes de lo definido por defecto.<br> 
Si no se definen estas últimas propiedades, se generará un fichero por cada 10-20 líneas de entrada, por lo que, a efectos de visualización, se complica mostrarlo todo.

In [27]:
%%writefile monitor_flume.conf
# Configuración del Agente para monitorización de un directorio

#Definición del agente: nombre y componentes
agente.sources  = s1  
agente.sinks    = k1  
agente.channels = c1  
   
#Configuración de propiedades Source
agente.sources.s1.type              = spooldir
agente.sources.s1.spoolDir          = /content/entrada_datos

# Configuración de propiedades del canal
agente.channels.c1.type = file

# Configuración de propiedades del sink
agente.sinks.k1.type = hdfs
agente.sinks.k1.hdfs.path = /content/salida
agente.sinks.k1.hdfs.fileType = DataStream
agente.sinks.k1.hdfs.writeFormat = Text
agente.sinks.k1.hdfs.rollInterval = 0
agente.sinks.k1.hdfs.rollSize = 81920
agente.sinks.k1.hdfs.rollCount = 0

#Vinculación de source y sink al canal creado
agente.sources.s1.channels = c1
agente.sinks.k1.channel    = c1

Overwriting monitor_flume.conf


Ejecución del agente Flume.

MATERIAL EXTRA PROPIO<br>
Como Flume se queda ejecutando "ad eternum" y hay que manualmente 'Interrumpir Ejecución' ya sea parándolo en la misma celda o por el menú, se ha creado un script para no tener que preocuparse por este motivo.<br>
Primero ejecutamos nuestro agente, pero enviando su ejecución a background, liberando así la sesión, y guardando su salida por pantalla en un fichero (*agente.out*).<br>
Hacemos que espere durante 60 segundos con el comando *sleep*, para después proceder a matar la ejecución del agente. Como este se ejecuta como una instancia de *java*, pues es lo que debemos matar.<br>
Por último, mostramos la salida del agente que habíamos guardado en fichero para comprobar que se han realizado todos los pasos correctamente.<br>
Durante las pruebas, entre 10 y 30 segundos se realiza el procesamiento de los datos, pero se ha puesto a 60 segundos solo por asegurar, por si la máquina estuviese algo más lenta.

In [28]:
%%bash
nohup flume-ng agent --conf ./ -f ./monitor_flume.conf -n agente -Dflume.root.logger=INFO -Xmx3000m > agente.out &
sleep 60
pkill java
cat agente.out

Info: Including Hadoop libraries found via (/usr/local/hadoop-3.3.1/bin/hadoop) for HDFS access
Info: Including Hive libraries found via () for Hive access
+ exec /usr/lib/jvm/java-11-openjdk-amd64//bin/java -Xmx20m -Dflume.root.logger=INFO -Xmx3000m -cp '/content:/usr/local/apache-flume-1.9.0-bin/lib/*:/usr/local/hadoop-3.3.1/etc/hadoop:/usr/local/hadoop-3.3.1/share/hadoop/common/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/common/*:/usr/local/hadoop-3.3.1/share/hadoop/hdfs:/usr/local/hadoop-3.3.1/share/hadoop/hdfs/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/hdfs/*:/usr/local/hadoop-3.3.1/share/hadoop/mapreduce/*:/usr/local/hadoop-3.3.1/share/hadoop/yarn:/usr/local/hadoop-3.3.1/share/hadoop/yarn/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/yarn/*:/lib/*' -Djava.library.path=:/usr/local/hadoop-3.3.1/lib/native org.apache.flume.node.Application -f ./monitor_flume.conf -n agente
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/apache-flume-1.9.0

Comprobamos que se han creado los ficheros en el directorio `salida`, listando en directorio y viendo el contenido de las primeras líneas de los ficheros.

In [29]:
!hdfs dfs -ls /content/salida | head

Found 10 items
-rw-r--r--   1 root root      83044 2022-05-05 11:47 /content/salida/FlumeData.1651751253759
-rw-r--r--   1 root root      83052 2022-05-05 11:47 /content/salida/FlumeData.1651751253760
-rw-r--r--   1 root root      83052 2022-05-05 11:47 /content/salida/FlumeData.1651751253761
-rw-r--r--   1 root root      82989 2022-05-05 11:47 /content/salida/FlumeData.1651751253762
-rw-r--r--   1 root root      82970 2022-05-05 11:47 /content/salida/FlumeData.1651751253763
-rw-r--r--   1 root root      82970 2022-05-05 11:47 /content/salida/FlumeData.1651751253764
-rw-r--r--   1 root root      82960 2022-05-05 11:47 /content/salida/FlumeData.1651751253765
-rw-r--r--   1 root root      82979 2022-05-05 11:47 /content/salida/FlumeData.1651751253766
-rw-r--r--   1 root root      83011 2022-05-05 11:47 /content/salida/FlumeData.1651751253767


In [30]:
!hdfs dfs -cat /content/salida/FlumeData.* | head

No,16.6,Yes,No,No,3.0,30.0,No,Female,55-59,White,Yes,Yes,Very good,5.0,Yes,No,Yes
No,20.34,No,No,Yes,0.0,0.0,No,Female,80 or older,White,No,Yes,Very good,7.0,No,No,No
No,26.58,Yes,No,No,20.0,30.0,No,Male,65-69,White,Yes,Yes,Fair,8.0,Yes,No,No
No,24.21,No,No,No,0.0,0.0,No,Female,75-79,White,No,No,Good,6.0,No,No,Yes
No,23.71,No,No,No,28.0,0.0,Yes,Female,40-44,White,No,Yes,Very good,8.0,No,No,No
Yes,28.87,Yes,No,No,6.0,0.0,Yes,Female,75-79,Black,No,No,Fair,12.0,No,No,No
No,21.63,No,No,No,15.0,0.0,No,Female,70-74,White,No,Yes,Fair,4.0,Yes,No,Yes
No,31.64,Yes,No,No,5.0,0.0,Yes,Female,80 or older,White,Yes,No,Good,9.0,Yes,No,No
No,26.45,No,No,No,0.0,0.0,No,Female,80 or older,White,"No, borderline diabetes",No,Fair,5.0,No,Yes,No
No,40.69,No,No,No,0.0,0.0,Yes,Male,65-69,White,No,Yes,Good,10.0,No,No,No
cat: Unable to write to output stream.
cat: Unable to write to output stream.
cat: Unable to write to output stream.
cat: Unable to write to output stream.
cat: Unable to write to outpu

## Ejecución de PIG

### Consulta - Media y conteo del Índice de Masa Corporal (IMC) agrupado por Sexo y Edad

Generamos el fichero de comandos de PIG.

Primero obtenemos los datos de los ficheros del directorio *salida* que ha generado Flume en los pasos anteriores.<br>
Para esta carga en PIG, le indicamos los nombres de las columnas y el tipo de cada uno de los datos, que serán del tipo *chararray* o *float*.<br>
Continuamos aplicando un filtro *HeartDisease == 'Yes'* donde solo nos quedamos con aquellas personas que tienen detectada una anomalía coronaria.<br>
Los datos los vamos a agrupar por Sexo y Edad *(Sex, AgeCategory)*.<br>
Transformamos el contenido, recorriendo los datos aplicando el filtro y la agrupación y generando como salida solo 4 campos: el Sexo, campo *Sex*, la Edad, campo *AgeCategory*, y la media y el conteo del IMC, campo *BMI* (Body Mass Index - Índice de Masa Corporal en inglés).<br>
Por último, una vez transformado el contenido, se carga en fichero separado por comas en el directorio *result*.

In [31]:
%%writefile coronarias.pig
-- Cargar datos desde el csv
datosEntrada = LOAD '/content/salida/FlumeData.*' USING PigStorage(',') AS 
(HeartDisease:chararray, BMI:float, Smoking:chararray, AlcoholDrinking:chararray, Stroke:chararray,
 PhysicalHealth:float, MentalHealth:float, DiffWalking:chararray, Sex:chararray, AgeCategory:chararray, Race:chararray,
 Diabetic:chararray, PhysicalActivity:chararray, GenHealth:chararray, SleepTime:float,
 Asthma:chararray, KidneyDisease:chararray, SkinCancer:chararray);

-- filtro de datos
conEnfermedad = FILTER datosEntrada BY (HeartDisease == 'Yes');

-- agrupación por varias columnas
agrupadosSexoEdad = group conEnfermedad BY (Sex, AgeCategory);

-- recorrido del contenido
mediaBMI = FOREACH agrupadosSexoEdad GENERATE FLATTEN(group) AS (Sexo, Edad), AVG(conEnfermedad.BMI), COUNT(conEnfermedad.BMI);

-- almacenar (directorio no existente)
STORE mediaBMI INTO 'results/' USING PigStorage (',');

Overwriting coronarias.pig


Ejecutamos el fichero PIG creado.<br>
Durante la ejecución del fichero podemos y ir viendo como se van procesando los *map-reduce* correspondientes.

In [32]:
!pig coronarias.pig

2022-05-05 11:48:37,157 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
2022-05-05 11:48:37,159 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
2022-05-05 11:48:37,159 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType
2022-05-05 11:48:37,275 [main] INFO  org.apache.pig.Main - Apache Pig version 0.17.0 (r1797386) compiled Jun 02 2017, 15:41:58
2022-05-05 11:48:37,275 [main] INFO  org.apache.pig.Main - Logging error messages to: /content/pig_1651751317261.log
2022-05-05 11:48:37,738 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /root/.pigbootup not found
2022-05-05 11:48:37,828 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2022-05-05 11:48:37,828 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
2022-05-05 11:48:37,869 [main] INFO  org.apache.pig.PigServer - Pig Script ID for the ses

Verificamos listando el contenido del fichero.

In [33]:
!hdfs dfs -cat /content/results/part*

Male,18-24,30.540000915527344,1
Male,25-29,27.963333129882812,3
Male,30-34,36.62833213806152,6
Male,35-39,32.24428612845285,7
Male,40-44,28.741666793823242,6
Male,45-49,30.614545475352894,11
Male,50-54,32.25333302815755,30
Male,55-59,30.192187130451202,32
Male,60-64,30.09371449606759,70
Male,65-69,29.114456508470617,92
Male,70-74,29.848672562995844,113
Male,75-79,28.38057465388857,87
Male,80 or older,26.992307607944195,104
Female,18-24,47.01499938964844,2
Female,30-34,29.130000114440918,2
Female,35-39,41.36000061035156,2
Female,40-44,31.240000009536743,8
Female,45-49,37.636250257492065,8
Female,50-54,33.87199981689453,25
Female,55-59,28.912926929753002,41
Female,60-64,29.297777864668106,36
Female,65-69,30.60471431187221,70
Female,70-74,29.403466567993163,75
Female,75-79,29.380645013624623,62
Female,80 or older,25.646559120506367,93


## Explicación de los datos resultantes

En la salida podemos ver como la primera columna es el sexo, la segunda la categoría de edad en la que se encuadra, la tercera es la media del IMC y la última el numero de personas que integran ese registro.<br>
Como explicación general de esta salida podemos indicar que todas las personas de la muestra con problemas coronarios tienen mínimo sobrepeso (IMC > 25). Si hiciesemos la media de forma global nos acercaríamos a 30, que es donde empieza la obesidad.<br>
Por tanto, se puede determinar que un IMC elevado puede provocar problemas cardíacos. 