## Código desenvolvido em Java para criação da aplicação que conta o número de hashtags e retorna a n quantidade de hashtags com maior frequência, onde n é um parâmetro especificado pelo usuário da aplicação, em uma base de dados do twitter. A aplicação também verifica se a pasta destino do Job já existe, se sim, a pasta é excluida. Projeto inicial chamado TopNHashTagParam contendo a estrutura-base das classes.

### 1-Desenvolvimento das 3 classes Mapper, Reducer e Driver para implementação da aplicação

In [None]:
//// Desenvolvimento da classe Driver TopNHashTagDriver.java

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

public class TopNHashTagDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
		Configuration conf = new Configuration();
		//recebendo o parametro qtd do usuario
		conf.set("qtd", args[2]);
		
		//verica se a pasta destino já existe, se sim, a pasta será excluida
		conf.set("fs.defaultFS","hdfs://localhost:8020/");
		FileSystem dfs = FileSystem.get(conf);
		String nomeDir = args[1];
		Path caminhoDir = new Path(dfs.getWorkingDirectory()+"/"+nomeDir);
		
		if(dfs.exists(caminhoDir)){
			System.out.println("Pasta destino já existe e será excluida");
			dfs.delete(caminhoDir);
		}

		Job job = Job.getInstance(conf);
		job.setJarByClass(TopNHashTagDriver.class);
		job.setMapperClass(TopNHashTagMapper.class);
		job.setReducerClass(TopNHashTagReducer.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);
		job.setNumReduceTasks(1);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		

		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

In [None]:
// Desenvolvimento da classe Mapper TopNHashTagMapper.java

import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TopNHashTagMapper extends Mapper<Object, Text, NullWritable, Text>{
	private TreeMap<Integer, Text> topN = new TreeMap<>();


	public void map(Object key, Text value, Context context)throws IOException, InterruptedException {
		//capturando o parametro qtd informado pelo usuario
		Configuration conf = context.getConfiguration();
		int qtd = Integer.parseInt(conf.get("qtd"));

		String[] listaHashtags = value.toString().toLowerCase().split("\t") ;

		//Entada: hashtag - qtd de hashtags
		if (listaHashtags.length < 2) {
			return;
		}

		topN.put(Integer.parseInt(listaHashtags[1].toLowerCase()), new
				Text(value));
		if (topN.size() > qtd) {
			topN.remove(topN.firstKey());
		}
	}

	protected void cleanup(Context context) throws IOException, InterruptedException {
		for (Text t : topN.values()) {
			context.write(NullWritable.get(), t);
		}
	}

}

In [None]:
// Desesenvolvimento da classe Reducer TopNHashTagReducer.java

import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TopNHashTagReducer extends Reducer<NullWritable, Text, NullWritable,Text> {
	private TreeMap<Integer, Text> topN = new TreeMap<>();
	
	public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
		for (Text value : values) {
			 String[] hashtags = value.toString().toLowerCase().split("\t") ;

			 //capturando o parametro qtd informado pelo usuario
			 Configuration conf = context.getConfiguration();
			 int qtd = Integer.parseInt(conf.get("qtd"));
			 topN.put(Integer.parseInt(hashtags[1].toLowerCase()),
			 new Text(value));
			 if (topN.size() > qtd) {
			 topN.remove(topN.firstKey());
		}
	}
		for (Text word : topN.descendingMap().values()) {
			context.write(NullWritable.get(), word);
		}
	}
}

### 2-Geração do arquivo JAR da aplicação e definição do path:

Exemplo: /home/cloudera/Documents/topnhashtagparam.jar

### 3-Execução do job JAR no ambiente Hadoop, consumo do arquivo bases/base_tw.txt já armazenado no HDFS e passado o parametro 5 para retorno das 5 hashtags mais citadas:

hadoop jar ~/Documents/TopNHashTagParam.jar TopNHashTagDriver bases/base_tw.txt twitter/saida 5

### 4-Visualização do resultado da aplicação

hdfs dfs -text twitter/saida/part-r-00000