# NoSQL (HBase) (sesión 5)

![Image of HBase](http://hbase.apache.org/images/hbase_logo_with_orca_large.png)

Esta hoja muestra cómo acceder a bases de datos HBase y también a conectar la salida con Jupyter.

Se puede utilizar el *shell* propio de HBase en la máquina virtual llamando al programa:

    $ hbase/bin/hbase shell

La diferencia es que ese programa espera código Ruby y aquí trabajaremos con Python.

### Nota sobre la caída de RegionServers

En este entorno con poca memoria son frecuentes las caídas de RegionServers. Sería conveniente:

- dar a la memoria virtual al menos 3GB de memoria,
- aumentar el tamaño del HEAP de los procesos de HBase, y
- aumentar el tiempo de _timeout_ de Zookeeper.

En la [documentación de HBase](http://hbase.apache.org/book.html#trouble.rs) dan unas recomendaciones, sobre todo, para carga inicial, como he realizado estos días para cargar las bases de datos de ejemplo:

> Make sure you give plenty of RAM (in hbase-env.sh), the default of 1GB won’t be able to sustain long running imports.
>
> [...]
>
> If this is happening during an upload which only happens once (like initially loading all your data into HBase), consider bulk loading.

Aunque no usamos _bulk loading_ para mostrar cómo se añaden datos desde Python (el _bulk loading_ hay que hacerlo en Java).

Las caídas en los RegionServers pueden producirse por varias cuestiones: falta de memoria, timeout por la ejecución del GC de Java, etc. Estas caídas son aceptadas como normales por el sistema HBase, que continuará funcionando con el resto de RegionServers y aceptará un RegionServer que terminó abruptamente una vez reiniciado.

En nuestra VM sólo hay un RegionServer, y se puede iniciar si cayó con el comando:

    ~/hbase/bin/start-daemon.sh start regionserver

El siguiente _script_ comprueba cada 30 segundos la salida de depuración del Máster de HBase, y si ve que no hay RegionServers, llama al script de reinicio del único RegionServer. El cliente continuará sin problemas después de unos segundos de inicialización. Al cabo del tiempo, los RegionServer que no funcionan se eliminan por HBase.

In [None]:
%%writefile restart-regionserver.sh
#! /bin/sh
while true ; do
	sleep 30 # Sleep before to give time HBase to start
	ns=`curl -s http://localhost:60010/jmx | grep numRegionServers | tr -cd [0-9]`
	test -z "$ns" || test $ns -gt 0 || ~/hbase/bin/hbase-daemon.sh start regionserver 
done


### Memoria de intercambio

El tamaño de la memoria que requiere puntualmente HBase hace que tengamos que crear un fichero de intercambio si no existe, y activarla. Se usarán 4GB para permitir el uso de memoria. Esto hará el sistema lento en caso de que tenga que hacer uso del intercambio, pero al menos no morirán por falta de memoria los distintos servidores de HBase.

In [None]:
%%bash
if ! sudo grep /swap /proc/swaps 2>&1 >/dev/null
then
    sudo fallocate -l 4GiB /swap
    sudo chmod 0600 /swap
    sudo mkswap /swap
    sudo swapon /swap
fi

Iniciamos HBase. Esto lanza todos los demonios y el demonio de HDFS.

In [None]:
%%bash
~/start-hbase.sh

Iniciamos el script en segundo plano para que reinicie los regionservers que se caen:

In [None]:
%%bash --bg
sh restart-regionserver.sh

In [None]:
# Config
%env DIR=/vagrant

In [None]:
from pprint import pprint as pp
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib

%matplotlib inline
matplotlib.style.use('ggplot')

Usaremos la librería `happybase` para python. La cargamos a continuación y hacemos la conexión.

In [None]:
import happybase

host = '127.0.0.1'
connection = happybase.Connection(host)
connection.tables()

In [None]:
%%bash
(test -e $DIR/Posts.csv && echo "Ya descargado") || (\
(wget http://neuromancer.inf.um.es:8080/es.stackoverflow/Posts.csv.gz -O - 2>/dev/null | gunzip > $DIR/Posts.csv) \
  && echo OK)

In [None]:
%%bash
(test -e $DIR/Users.csv && echo "Ya descargado") || (\
(wget http://neuromancer.inf.um.es:8080/es.stackoverflow/Users.csv.gz -O - 2>/dev/null | gunzip > $DIR/Users.csv) \
  && echo OK)

In [None]:
%%bash
(test -e $DIR/Tags.csv && echo "Ya descargado") || (\
(wget http://neuromancer.inf.um.es:8080/es.stackoverflow/Tags.csv.gz -O - 2>/dev/null | gunzip > $DIR/Tags.csv) \
  && echo OK)

In [None]:
%%bash
(test -e $DIR/Comments.csv && echo "Ya descargado") || (\
(wget http://neuromancer.inf.um.es:8080/es.stackoverflow/Comments.csv.gz -O - 2>/dev/null | gunzip > $DIR/Comments.csv) \
  && echo OK)

In [None]:
%%bash
(test -e $DIR/Votes.csv && echo "Ya descargado") || (\
(wget http://neuromancer.inf.um.es:8080/es.stackoverflow/Votes.csv.gz -O - 2>/dev/null | gunzip > $DIR/Votes.csv) \
  && echo OK)

In [None]:
%%bash
(test -e $DIR/PostHistory.csv && echo "Ya descargado") || (\
(wget http://neuromancer.inf.um.es:8080/es.stackoverflow/PostHistory.csv.gz -O - 2>/dev/null | gunzip > $DIR/PostHistory.csv) \
  && echo OK)

Para la carga inicial, vamos a crear todas las tablas con una única familia de columnas, `rawdata`, donde meteremos toda la información _raw_ comprimida. Después podremos hacer reorganizaciones de los datos para hacer el acceso más eficiente. Es una de las muchas ventajas de no tener un esquema.

In [None]:
# Create tables
tables = ['posts', 'votes', 'users', 'tags', 'comments']
for t in tables:
    try:
        connection.create_table(
            t,
            {
                'rawdata': dict(max_versions=1,compression='GZ')
            })
    except:
        print "Database already exists: {0}.".format(t)
        pass
connection.tables()

El código de importación es siempre el mismo, ya que se coge la primera fila del CSV que contiene el nombre de las columnas y se utiliza para generar nombres de columnas dentro de la familia de columnas dada como parámetro. La función `csv_to_hbase()` acepta un fichero CSV a abrir, un nombre de tabla y una familia de columnas donde agregar las columnas del fichero CSV. En nuestro caso siempre va a ser `rawdata`.

In [None]:
import csv

def csv_to_hbase(file, tablename, cf):
    table = connection.table(tablename)
    
    with open(file) as f:
        # La llamada csv.reader() crea un iterador sobre un fichero CSV
        reader = csv.reader(f, dialect='excel')
        
        # Se leen las columnas. Sus nombres se usarán para crear las diferentes columnas en la familia
        columns = reader.next()
        columns = [cf + ':' + c for c in columns]
        
        with table.batch(batch_size=50) as b:
            for row in reader:
                # La primera columna se usará como Row Key
                b.put(row[0], dict(zip(columns[1:], row[1:])))


In [None]:
import os

for t in tables:
    print "Importando tabla {0}...".format(t)
    %time csv_to_hbase(os.environ['DIR']+'/'+ t.capitalize() + '.csv', t, 'rawdata')

### Construcción de estructuras anidadas

Al igual que pasaba con MongoDB, las bases de datos NoSQL como en este caso HBase permiten almacenar estructuras de datos complejas. En nuestro caso vamos a agregar los comentarios de cada pregunta o respuesta (post) en columnas del mismo. Para ello, creamos una nueva familia de columnas `comments`.

HBase es bueno para añadir columnas sencillas, por ejemplo que contengan un valor. Sin embargo, si queremos añadir objetos complejos, tenemos que jugar con la codificación de la familia de columnas y columna.

Usaremos el shell porque `happybase` no permite alterar tablas ya creadas.

In [None]:
%%bash
cat <<EOF | ~/hbase/bin/hbase shell

disable 'posts'

alter 'posts', {NAME => 'comments', VERSIONS => 1}

enable 'posts'

EOF

Cada comentario que añadimos contiene, al menos:

- un id único
- un texto
- un autor
- etc.

¿Cómo se consigue meterlo en una única familia de columnas?

Hay varias formas. La que usaremos aquí, añadiremos el **id** de cada comentario como parte del nombre de la columna. Por ejemplo, el comentario con Id 2000, generará las columnas:

- `Id_2000` (valor 2000)
- `UserId_2000`
- `PostId_2000`
- `Text_2000`

con sus correspondientes valores. Así, todos los datos relativos al comentario con Id original 2000, estarán almacenados en todas las columnas que termienn en `_2000`. La base de datos permite implementar filtros que nos permiten buscar esto de forma muy sencilla. Los veremos después.

In [None]:
comments = connection.table('comments')
posts = connection.table('posts')

with posts.batch(batch_size=50) as b:
    # Hacer un scan de la tabla
    for key, data in comments.scan():
        comment = {'comments:' + d.split(':')[1] + "_" + str(key): data[d] for d in data.keys()}
        b.put(data['rawdata:PostId'], comment)

El siguiente código permite mostrar de forma amigable las tablas extraídas de la base de datos en forma de diccionario:

In [None]:
# http://stackoverflow.com/a/30525061/62365
class DictTable(dict):
    # Overridden dict class which takes a dict in the form {'a': 2, 'b': 3},
    # and renders an HTML Table in IPython Notebook.
    def _repr_html_(self):
        html = ["<table width=100%>"]
        for key, value in self.iteritems():
            html.append("<tr>")
            html.append("<td>{0}</td>".format(key))
            html.append("<td>{0}</td>".format(value))
            html.append("</tr>")
        html.append("</table>")
        return ''.join(html)

In [None]:
# Muestra cómo queda la fila del Id del Post 9997
DictTable(posts.row('9997'))

## EJERCICIO: ¿Cómo sería el código para saber qué usuarios han comentado un post en particular?

## Wikipedia

Como otro ejemplo de carga de datos y de organización en HBase, veremos de manera simplificada el ejemplo de la wikipedia visto en teoría.

A continuación se descarga una pequeña parte del fichero de la wikipedia en XML:

In [None]:
%%bash
FILE=eswiki.xml
(test -e $DIR/$FILE && echo "Ya descargado") || (\
(wget http://neuromancer.inf.um.es:8080/wikipedia/$FILE.gz -O - 2>/dev/null | gunzip > $DIR/$FILE) \
  && echo OK)

In [None]:
!head -200 $DIR/eswiki.xml

Se crea la tabla para albergar la `wikipedia`. Igual que la vista en teoría, pero aquí se usa `wikipedia` en vez de `wiki` para que no colisionen la versión completa con la reducida.

In [None]:
%%bash
cat <<EOF | ~/hbase/bin/hbase shell
create 'wikipedia' , 'text', 'revision'

disable 'wikipedia' # Para evitar su uso temporal

alter 'wikipedia' , { NAME => 'text', VERSIONS => org.apache.hadoop.hbase.HConstants::ALL_VERSIONS }

alter 'wikipedia' , { NAME => 'revision', VERSIONS => org.apache.hadoop.hbase.HConstants::ALL_VERSIONS }

alter 'wikipedia' , { NAME => 'text', COMPRESSION => 'GZ', BLOOMFILTER => 'ROW'}

enable 'wikipedia'

EOF

Este código, visto en teoría, recorre el árbol XML construyendo documentos y llamando a la función `callback` con cada uno. Los documentos son diccionarios con las claves encontradas dentro de los tags `<page>...</page>`.

In [None]:
import xml.sax
import re

class WikiHandler(xml.sax.handler.ContentHandler):

    def __init__(self):
        self._charBuffer = ''
        self.document = {}

    def _getCharacterData(self):
        data = self._charBuffer
        self._charBuffer = ''
        return data

    def parse(self, f, callback):
        self.callback = callback
        xml.sax.parse(f, self)

    def characters(self, data):
        self._charBuffer = self._charBuffer + data

    def startElement(self, name, attrs):
        if name == 'page':
        # print 'Start of page'
            self.document = {}
        if re.match(r'title|timestamp|username|comment|text', name):
            self._charBuffer = ''

    def endElement(self, name):
        if re.match(r'title|timestamp|username|comment|text', name):
            self.document[name] = self._getCharacterData()
            # print(name, ': ', self.document[name][:20])
        if 'revision' == name:
            self.callback(self.document)


El codigo a continuación, cada vez que el código anterior llama a la función `processdoc()` se añade un documento a la base de datos.

In [None]:
import time

class FillWikiTable():
    """Llena la tabla Wiki"""
    def __init__(self):
        # Conectar a la base de datos a través de Thrift
        self.table = connection.table('wikipedia')

    def run(_s):
        def processdoc(d):
            print "Callback called with", d['title']
            tuple_time = time.strptime(d['timestamp'], "%Y-%m-%dT%H:%M:%SZ")
            timestamp = int(time.mktime(tuple_time))
            _s.table.put(d['title'],
                         {'text:': d.get('text',''),
                          'revision:author': d.get('username',''),
                          'revision:comment': d.get('comment','')},
                         timestamp=timestamp)

        with open(os.environ['DIR']+'/'+'eswiki.xml','rb') as f:
            start = time.time()
            WikiHandler().parse(f, processdoc)
            end = time.time()
            print ("End adding documents. Time: %.5f" % (end - start))

In [None]:
FillWikiTable().run()

El código a continuación permite ver las diferentes versiones de una revisión. Como la versión reducida es muy pequeña no da lugar a que haya ninguna revisión, pero con este código se vería. Hace uso del _shell_ de HBase.

In [None]:
%%bash
cat <<EOF | ~/hbase/bin/hbase shell

get 'wikipedia', 'Commodore Amiga', {COLUMN => 'revision',VERSIONS=>10}

EOF

### Enlazado de documentos en la wikipedia

Los artículos de la wikipedia llevan enlaces entre sí, incluyendo referencias del tipo `[[artículo referenciado]]`. Se pueden extraer estos enlaces y se puede construir un grafo de conexiones. Para cada artículo, se anotarán qué enlaces hay que salen de él y hacia qué otros artículos enlazan y también qué enlaces llegan a él. Esto se hará con dos familias de columnas, `from` y `to`. 

En cada momento, se añadirá una columna `from:artículo` cuando un artículo nos apunte, y otras columnas `to:articulo` con los artículos que nosotros enlazamos.

In [None]:
import sys

class BuildLinks():
    """Llena la tabla de Links"""
    def __init__(self):
        # Create table
        try:
            connection.create_table(
                "wikilinks",
                {
                    'from': dict(bloom_filter_type='ROW',max_versions=1),
                    'to' : dict(bloom_filter_type='ROW',max_versions=1)
                })
        except:
            print ("Database wikilinks already exists.")
            pass

        self.table = connection.table('wikilinks')
        self.wikitable = connection.table('wikipedia')

    def run(self):
        print "run";
        linkpattern = r'\[\[([^\[\]\|\:\#][^\[\]\|:]*)(?:\|([^\[\]\|]+))?\]\]'
        # target, label

        with self.table.batch(batch_size=50) as b:
            for key, data in self.wikitable.scan():
                to_dict = {}
                doc = key.strip()
                print "\n", doc, ":"
                for mo in re.finditer(linkpattern, data['text:']):
                    (target, label) = mo.groups()

                    target = target.strip()

                    if target == '':
                        continue

                    label = '' if not label else label
                    label = label.strip()

                    to_dict['to:' + target] = label

                    sys.stdout.write(".")
                    #print "%s -> %s (%s)" % (doc, target, label)
                    
                    b.put(target, {'from:' + doc : label})

                if bool(to_dict):
                    b.put(doc, to_dict)


In [None]:
BuildLinks().run()

En la siguiente sesión veremos técnicas más sofisticadas de filtrado, pero por ahora se puede jugar con estas construcciones. Se puede seleccionar qué columnas se quiere mostrar e incluso filtros.

In [None]:
%%bash
cat <<EOF | ~/hbase/bin/hbase shell

scan 'wikilinks', {COLUMNS=>'to', FILTER => "ColumnPrefixFilter('A')", LIMIT => 300}

EOF

El proceso de `scan` recorre toda la tabla mostrando sólo las filas seleccionadas. HBase ofrece ciertas optimizaciones para que el escaneo sea eficiente, que veremos en la siguiente sesión.

Una introducción a los filtros y parámetros disponibles se puede ver [aquí](http://www.hadooptpoint.com/filters-in-hbase-shell/).

In [None]:
%%bash
cat <<EOF | ~/hbase/bin/hbase shell

scan 'wikipedia', {COLUMNS=>['revision'] , STARTROW => 'A', ENDROW=>'B'}

EOF

## EJERCICIO: Encontrar páginas que estén enlazadas y que ambas estén en la tabla `wikipedia`

(Ojo, no estarán todas porque es una versión reducida de la wikipedia)

## EJERCICIO: Probar diversas búsquedas sobre las tablas `wikipedia` y `wikilinks`

## EJERCICIO: Modificar la tabla `posts` para añadir una familia de columnas que guarde el histórico de ediciones guardado en `PostHistory.csv`. Usar como ejemplo la función csv_to_hbase