Hadoop/MapReduce -- WordCount en Java (Modo Pseudo)
===

**Juan David Velásquez Henao**  
jdvelasq@unal.edu.co   
Universidad Nacional de Colombia, Sede Medellín  
Facultad de Minas  
Medellín, Colombia

---

Haga click [aquí](https://github.com/jdvelasq/apache-hadoop-course/tree/master/) para acceder al repositorio online.

Haga click [aquí](http://nbviewer.jupyter.org/github/jdvelasq/apache-hadoop-course/tree/master/) para explorar el repositorio usando `nbviewer`.

# Definición del problema

Se desea contar la frecuencia de ocurrencia de palabras en un conjunto de documentos. Debido a los requerimientos de diseño (gran volúmen de datos y tiempos rápidos de respuesta) se desea implementar una arquitectura Big Data. **El código debe ser escrito en Java y correrse en el ámbiente distribuido.**

A continuación se generarán tres archivos de prueba para probar el sistema.

In [1]:
## Preparación del directorio de trabajo
!rm -rf input output
!mkdir input

In [2]:
%%writefile input/text0.txt
Analytics is the discovery, interpretation, and communication of meaningful patterns 
in data. Especially valuable in areas rich with recorded information, analytics relies 
on the simultaneous application of statistics, computer programming and operations research 
to quantify performance.

Organizations may apply analytics to business data to describe, predict, and improve business 
performance. Specifically, areas within analytics include predictive analytics, prescriptive 
analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big 
Data Analytics, retail analytics, store assortment and stock-keeping unit optimization, 
marketing optimization and marketing mix modeling, web analytics, call analytics, speech 
analytics, sales force sizing and optimization, price and promotion modeling, predictive 
science, credit risk analysis, and fraud analytics. Since analytics can require extensive 
computation (see big data), the algorithms and software used for analytics harness the most 
current methods in computer science, statistics, and mathematics.

Writing input/text0.txt


In [3]:
%%writefile input/text1.txt
The field of data analysis. Analytics often involves studying past historical data to 
research potential trends, to analyze the effects of certain decisions or events, or to 
evaluate the performance of a given tool or scenario. The goal of analytics is to improve 
the business by gaining knowledge which can be used to make improvements or changes.

Writing input/text1.txt


In [4]:
%%writefile input/text2.txt
Data analytics (DA) is the process of examining data sets in order to draw conclusions 
about the information they contain, increasingly with the aid of specialized systems 
and software. Data analytics technologies and techniques are widely used in commercial 
industries to enable organizations to make more-informed business decisions and by 
scientists and researchers to verify or disprove scientific models, theories and 
hypotheses.

Writing input/text2.txt


# Solución

En este tutorial se utiliza la misma implementación del tutorial anterior.

### Paso 1.

**Este paso es idéntico en el tutorial anterior.** Se implementa el algoritmo de conteo de palabras y se guarda en el archivo `WordCount.java`.

In [5]:
%%writefile WordCount.java

import java.io.IOException;

/*
 * Esta clase permite separar una frase (texto)
 * en las palabras que lo conforman. La lista
 * resultante puede ser iterada en un ciclo for
 */
import java.util.StringTokenizer;

/*
 *
 * Librerias requeridas para ejecutar Hadoop
 *
 */
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*
 * Esta clase implementa el mapper y el reducer
 */
public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{
       
    private final static IntWritable one = new IntWritable(1);

    /* 
     * en esta variable se guarda cada palabra leida        
     * del flujo de entrada
     */     
    private Text word = new Text();

    /* 
     * Este es el mapper. Para cada palabra 
     * leída, emite el par <word, 1>
     */
    public void map(Object key,       // Clave
                    Text value,       // La linea de texto
                    Context context   // Aplicación que se esta ejecutando
                    ) throws IOException, InterruptedException {
                              
      // Convierte la línea de texto en una lista de strings
      StringTokenizer itr = new StringTokenizer(value.toString());
                              
      // Ejecuta el ciclo para cada palabra 
      // de la lista de strings
      while (itr.hasMoreTokens()) {
        // obtiene la palabra
        word.set(itr.nextToken());

        // escribe la pareja <word, 1> 
        // al flujo de salida
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
           
    // Clase para imprimir un entero al flujo de salida       
    private IntWritable result = new IntWritable();

    // Esta función es llamada para reducir 
    // una lista de valores que tienen la misma clave
    public void reduce(Text key,                      // clave
                       Iterable<IntWritable> values,  // lista de valores
                       Context context                // Aplicación que se esta ejecutando
                       ) throws IOException, InterruptedException {
        
      // itera sobre la lista de valores, sumandolos
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
        
      // escribe la pareja <word, valor> al flujo
      // de salida
      context.write(key, result);
    }
  }

    
  /*
   * Se crea la aplicación en Hadoop y se ejecuta
   */
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    
    /*
     * El job corresponde a la aplicacion
     */
    Job job = Job.getInstance(conf, "word count");
      
    /*
     * La clase que contiene el mapper y el reducer
     */
    job.setJarByClass(WordCount.class);
      
    /* 
     * Clase que implementa el mapper  
     */ 
    job.setMapperClass(TokenizerMapper.class);
      
    /*
     * El combiner es un reducer que se coloca a la salida
     * del mapper para agilizar el computo
     */
    job.setCombinerClass(IntSumReducer.class);
    
    /*
     * Clase que implementa el reducer
     */
    job.setReducerClass(IntSumReducer.class);
      
    /*
     * Salida
     */
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    /*
     * Formatos de entrada y salida
     */
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
   
    // resultado de la ejecución.
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}



Overwriting WordCount.java


### Paso 2

**Este paso es idéntico en el tutorial anterior.** Se realiza la compilación del programa. Para que el programa se ejecute correctamente, se debió definir la variable de entorno `$HADOOP_HOME`, la cual apunta al directorio donde se encuentra Hadoop.

In [6]:
## compila el programa
!$HADOOP_HOME/bin/hadoop com.sun.tools.javac.Main WordCount.java

### Paso 3

**Este paso es idéntico en el tutorial anterior.** Se genera el archivo de aplicación de Java, para luego ejecutarlo usando Hadoop.

In [7]:
## genera el archivo *.jar para ejecutarlo en hadoop
!jar cf wc.jar WordCount*.class

El archivo `wc.jar` debe aparecer en el directorio actua.

In [8]:
!ls *.jar

wc.jar


### Paso 4

Se verifica que la instalación este configurada para correr en modo pseudo-distribuido, permitiendo varios hilos de procesamiento. Este modo es similar a la ejecución en un cluster. En este caso, el usuario debe verificar el contenido de los archivos `etc/hadoop/core-site.xml`  y `etc/hadoop/hdfs-site.xml` que se encuentran en el directorio de instalación de Hadoop. Véase http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html

In [9]:
## Por facilidad, este directorio contiene una copia del archivo
## la cual se copiará al directorio de Hadoop para evitar errores
## en la ejecución del programa
!cp core-site.xml.pseudo  $HADOOP_HOME/etc/hadoop/core-site.xml
!cp hdfs-site.xml.pseudo  $HADOOP_HOME/etc/hadoop/hdfs-site.xml

In [10]:
!cat $HADOOP_HOME/etc/hadoop/core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

In [11]:
!cat $HADOOP_HOME/etc/hadoop/hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

 
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

### Paso 5

Para ejecutar Hadoop en modo pseudo-distribuido, debe formatear el sistema de archivos (si aún no lo ha hecho). Abra el Terminal y ejecute

    $HADOOP_HOME/bin/hdfs namenode -format
    
    
Seguidamente, debe iniciar el servicio. Para ello, en la consola de comandos (no se puede realizar directamente desde Jupyter) ejecute:

    $HADOOP_HOME/sbin/start-dfs.sh
    
Una vez se haya iniciado el servicio, verifique que la interface a NameNode funcione correctamente. Abra la dirección http://localhost:9870. La interfaz debe ser similar a la que aparece en la siguiente figura.

![namenode](images/mac-namenode.png)    
   

### Paso 6

Copie los archivos de entrada al HDFS para que puedan ser ejecutados por Hadoop.

La gestión de archivos entre el sistema local y el HDFS se realiza mediante comandos similares a los del sistema operativo Unix en Terminal. A continuación se resumen los principales comandos.

* `hadoop fs -help`:  Imprime la ayuda en pantalla para todos los comandos.


**Gestion de directorios y archivos.**


* `hadoop fs -ls <path>`


* `hadoop fs -mkdir <path>`


* `hadoop fs -rmdir <path>`


* `hadoop fs -cp <src> <dest>`


* `hadoop fs -mv <src> <dest>`


* `hadoop fs -rm <path>`


* `hadoop fs -cat <path>`


* `hadoop fs -head <path>`


* `hadoop fs -tail <path>`


* `hadoop fs -text <path>`. Imprime el arachivo en `<path>` y lo imprime en formato texto. Soporta archivos zip, TextRecordInputStream y Avro.


* `hadoop fs -stat <path>`: Imprime estadísticos de `<path>`.


**Transferencia de información entre el sistema local y el HDFS**.


* `hadoop fs -get <src> <localdest>` / `hadoop fs -copyToLocal <src> <localdest>`. Copia el contenido de `<src>` en el HDFS en `<localdest>` en el sistema local.


* `hadoop fs -put <localsrc> <dest>` / `hadoop fs -copyFromLocal <src> <localdest>`. Copia el contenido de `<localsrc>` en el sistema local a `<dest>` en el HDFS.


* `hadoop fs -count <path>`. Cuenta el número de directorios, archivos y bytes en `<path>`.


* `hadoop fs -appendToFile <localsrc> <dest>`: pega al final de `<dest>` el contenido de los archivos en `<localsrc>`.



In [15]:
## crea el directorio para los archivos de entrada
!$HADOOP_HOME/bin/hadoop fs -mkdir /input

In [17]:
!$HADOOP_HOME/bin/hadoop fs -ls  /

Found 1 items
drwxr-xr-x   - jdvelasq supergroup          0 2018-09-15 06:01 /input


In [19]:
## Copia el contenido de la carpeta input a la carpeta input en el HDFS
!$HADOOP_HOME/bin/hadoop fs -put  input/* /input

In [20]:
## verifica el contenido del directorio
!$HADOOP_HOME/bin/hadoop fs -ls /input/*

-rw-r--r--   1 jdvelasq supergroup       1092 2018-09-15 06:02 /input/text0.txt
-rw-r--r--   1 jdvelasq supergroup        351 2018-09-15 06:02 /input/text1.txt
-rw-r--r--   1 jdvelasq supergroup        439 2018-09-15 06:02 /input/text2.txt


### Paso 7

Ejecuta el proceso.

In [24]:
!$HADOOP_HOME/bin/hadoop jar wc.jar WordCount /input /output

### Paso 8

Se visualiza la salida del programa.

In [25]:
## Contenido del directorio output
!$HADOOP_HOME/bin/hadoop fs -ls /output

Found 2 items
-rw-r--r--   1 jdvelasq supergroup          0 2018-09-15 06:04 /output/_SUCCESS
-rw-r--r--   1 jdvelasq supergroup       1649 2018-09-15 06:04 /output/part-r-00000


In [26]:
## imprime el resultado en pantalla
!$HADOOP_HOME/bin/hadoop fs -cat /output/part-r-00000 | head

(DA)	1
(see	1
Analytics	2
Analytics,	1
Big	1
Data	3
Especially	1
Organizations	1
Since	1
Specifically,	1


---
**Ejercicio.--** Copie el resultado de la corrida al directorio actual.


**Ejercicio.--** Cómo podría mejorar el código anterior? realice una implementación.

---

In [27]:
## se limpia el directoroio de trabajo
!rm WordCount*.* *.jar
!rm -rf input output
!$HADOOP_HOME/bin/hadoop fs -rm /input/* /output/*
!$HADOOP_HOME/bin/hadoop fs -rmdir /input /output

Deleted /input/text0.txt
Deleted /input/text1.txt
Deleted /input/text2.txt
Deleted /output/_SUCCESS
Deleted /output/part-r-00000


---

Hadoop/MapReduce -- WordCount en Java (Modo Pseudo)
===

**Juan David Velásquez Henao**  
jdvelasq@unal.edu.co   
Universidad Nacional de Colombia, Sede Medellín  
Facultad de Minas  
Medellín, Colombia

---

Haga click [aquí](https://github.com/jdvelasq/apache-hadoop-course/tree/master/) para acceder al repositorio online.

Haga click [aquí](http://nbviewer.jupyter.org/github/jdvelasq/apache-hadoop-course/tree/master/) para explorar el repositorio usando `nbviewer`.