In [None]:
%env USERNAME=<username>

**Warning:** Remember that for interacting with EDI Big Data Stack you must be authenticated at the system using kinit command. For more information, read the documentation at [Authenticating with Kerberos](https://docs.edincubator.eu/big-data-stack/basic-concepts.html#authenticating-with-kerberos).

In [None]:
kinit -kt ~/work/$USERNAME.service.keytab $USERNAME@EDINCUBATOR.EU

**Note:** before executing this notebookm you must follow create the HBase database following instructions at [Loading data into HBase](https://edincubator.eu/big-data-stack/tools/hbase.html#loading-data-into-hbase).

# HBase
## Loading data into HBase

*HBaseLoadExample.java* contains the unique and main class of this MapReduce job. *HBaseLoadExample* class contains only the *HBaseWriterMapper* class, as this job doesn’t need a reducer.

### HBaseWriterMapper

```java
public static class HBaseWriterMapper extends Mapper<Object, Text, ImmutableBytesWritable, Put> {

      private long checkpoint = 100;
      private long count = 0;

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
          // Extract state using opencsv library
          CSVReader reader = new CSVReader(new StringReader(value.toString()));
          String[] line;

          while ((line = reader.readNext()) != null) {
              // Check that current line is not CSV's header
              if (!line.equals("state")) {
                  context.setStatus("Creating row");
                  byte [] row = Bytes.toBytes(line[0]);
                  Put put = new Put(row);

                  // Insert info
                  byte [] family = Bytes.toBytes("info");

                  // name
                  byte [] qualifier = Bytes.toBytes("name");
                  byte [] hvalue = Bytes.toBytes(line[1]);
                  put.addColumn(family, qualifier, hvalue);

                  // neighborhood
                  qualifier = Bytes.toBytes("neighborhood");
                  hvalue = Bytes.toBytes(line[2]);
                  put.addColumn(family, qualifier, hvalue);

                  // Same with address, city, state, postal_code, latitude,
                  // longitude, is_open and categories
                  [...]

                  // Insert stats
                  family = Bytes.toBytes("stats");

                  // stars
                  qualifier = Bytes.toBytes("stars");
                  hvalue = Bytes.toBytes(line[9]);
                  put.addColumn(family, qualifier, hvalue);

                  // review_count
                  qualifier = Bytes.toBytes("review_count");
                  hvalue = Bytes.toBytes(line[10]);
                  put.addColumn(family, qualifier, hvalue);

                  context.write(new ImmutableBytesWritable(row), put);

                  // Set status every checkpoint lines for avoiding AM timeout
                  if(++count % checkpoint == 0) {
                      context.setStatus("Emitting Put " + count);
                  }
              }
          }
      }
  }
```

The *HBaseWriterMapper* class represents the mapper of our job. Its definition is very simple. It extends the *Mapper* class, receiving a tuple formed by a key of type Object and a value of type Text as input, and generating a tuple formed by a key of type *ImmutableBytesWritable* and a value of type Put as output.

The map method is who processes the input and generates the output to be passed to the reducer. In this function, we take the value, representing a single CSV line and we create an object of type *org.apache.hadoop.hbase.client.Put*. This Put class represents a "put" action into the HBase database. Each column of the database must have a family, a qualifier and a value.

### main & run

At last, check main and run method of the *HBaseLoadExample* class.

```java
public int run(String[] otherArgs) throws Exception {
      Configuration conf = getConf();

      Job job = Job.getInstance(conf, "HBase load example");
      job.setJarByClass(HBaseLoadExample.class);

      FileInputFormat.setInputPaths(job, otherArgs[0]);
      job.setInputFormatClass(TextInputFormat.class);
      job.setMapperClass(HBaseWriterMapper.class);

      TableMapReduceUtil.initTableReducerJob(
              otherArgs[1],
              null,
              job
      );
      job.setNumReduceTasks(0);

      return (job.waitForCompletion(true) ? 0 : 1);
  }

  public static void main(String [] args) throws Exception {
      int status = ToolRunner.run(HBaseConfiguration.create(), new HBaseLoadExample(), args);
      System.exit(status);
  }
```

In the *run* method, the MapReduce job is configured. Concretely, in this example mapper class, input directories and output table (taken from the CLI when launching the job) are set.

### pom.xml

The *pom.xml* file compiles the project and generates the jar that we need to submit to EDI Big Data Stack.

```xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>eu.edincubator.stack.examples</groupId>
  <artifactId>hbaseexample</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-core</artifactId>
          <version>${hadoop.version}</version>
          <scope>provided</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
          <scope>provided</scope>
      </dependency>
      <dependency>
          <groupId>com.opencsv</groupId>
          <artifactId>opencsv</artifactId>
          <version>4.1</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-common</artifactId>
          <version>${hbase.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-client</artifactId>
          <version>${hbase.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-protocol</artifactId>
          <version>${hbase.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-server</artifactId>
          <version>${hbase.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-thrift</artifactId>
          <version>${hbase.version}</version>
      </dependency>
  </dependencies>

  <properties>
      <hadoop.version>3.0.0</hadoop.version>
      <hbase.version>2.0.0</hbase.version>
  </properties>
</project>
```

Opposite to the pom.xml presented at MapReduce & YARN, this one doesn’t generate a "fat jar", so we have to add third party libraries (*com.opencsv*) when submitting the job.

### Compiling and submitting the job

At first, you must create the java package:

In [None]:
cd ~/work/examples/hbaseexample
mvn clean package

Before launching the job, we must download required third party libraries:

In [None]:
mkdir libjars
cd libjars
wget http://central.maven.org/maven2/com/opencsv/opencsv/4.1/opencsv-4.1.jar

Next, at stack-client docker cointainer, we can submit the job using the *hadoop jar* command. Notice the *-libjars* parameter:

In [None]:
cd ..
yarn jar target/hbaseexample-1.0-SNAPSHOT.jar eu.edincubator.stack.examples.hbase.HBaseLoadExample -libjars=libjars/opencsv-4.1.jar /samples/yelp/yelp_business/yelp_business.csv $USERNAME.yelp_business

If we return to HBase shell, we can check that the table has been filled with data:

```
hbase(main):004:0> scan '<username>.yelp_business', {'LIMIT' => 5}
```

## Reading data from Hbase

In this example, we read the data previously loaded into HBase *yelp_business* table, compute it and write it into an HDFS folder. For that, we are going to reproduce the example shown at [MapReduce & YARN](map-reduce-yarn.ipynb), but reading data from HBase instead of a CSV file.

This example is developed at *HBaseReadExample.java*. Its structure is similar to previous examples, even the reducer is the same reducer explained at [MapReduce & YARN](map-reduce-yarn.ipynb). The mapper is coded as follows:

```java
public static class HBaseReadMapper extends TableMapper<Text, IntWritable> {

     private final static IntWritable one = new IntWritable(1);

     public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
         byte[] cell = value.getValue(Bytes.toBytes("info"), Bytes.toBytes("state"));
         context.write(new Text(Bytes.toString(cell)), one);
     }
 }
```

As you can notice, *HBaseReadMapper* extends from *org.apache.hadoop.hbase.mapreduce.TableMapper* instead of *org.apache.hadoop.mapreduce.Mapper*. In *TableMapper* class we only have to define output key and value types of the mapper, as input key and value types are fixed as they are read from HBase. map method receives a row id of *org.apache.hadoop.hbase.io.ImmutableBytesWritable* type and a value of type *org.apache.hadoop.hbase.client.Result*. Similar to the example shown at [MapReduce & YARN](map-reduce-yarn.ipynb), we take the value at column family info and qualifier state as output key and the value of one as output value. The reducer class is a replica of *StateSumReducer* that we coded at [MapReduce & YARN](map-reduce-yarn.ipynb), which aggregates all values for each key (state).

### main & run

```java
public int run(String[] otherArgs) throws Exception {
        Configuration conf = getConf();

        Job job = Job.getInstance(conf, "HBase read example");
        job.setJarByClass(HBaseReadExample.class);

        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);

        TableMapReduceUtil.initTableMapperJob(
                otherArgs[0],
                scan,
                HBaseReadMapper.class,
                Text.class,
                IntWritable.class,
                job
        );

        job.setReducerClass(StateSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String [] args) throws Exception {
        int status = ToolRunner.run(HBaseConfiguration.create(), new HBaseReadExample(), args);
        System.exit(status);
  }
```

As can be seen, *run* method has some differences regarding to previous example. In this case, an instance of *org.apache.hadoop.hbase.client.Scan* class must be set for reading the database. In the same way, the mapper is set using the *initTableMapperJob* method from *org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil*. The reducer class is set in the same way as we saw in other examples.

### Compiling and submitting the job

The package is compiled as we saw in the previous example:

In [None]:
cd ~/work/examples/hbaseexample
mvn clean package

Next, at stack-client docker cointainer, we can submit the job using the *hadoop jar* command.

In [None]:
yarn jar target/hbaseexample-1.0-SNAPSHOT.jar eu.edincubator.stack.examples.hbase.HBaseReadExample $USERNAME.yelp_business /user/$USERNAME/hbase-output

We can see the output at HDFS:

In [None]:
hdfs dfs -ls /user/$USERNAME/hbase-output

In [None]:
hdfs dfs -cat /user/$USERNAME/hbase-output/part-r-00000

As you can see, those results are the same obtained at [MapReduce & YARN example](map-reduce-yarn.ipynb).