# Riak

In [1]:
import riak
from pprintpp import pprint as pp
import json
import pandas as pd

import uuid
import hashlib

## Lectura de información en Pandas

Partimos del dataset normalizado y lo desnormalizamos para guardarlo todo junto en RIAK

In [2]:
df_mov = pd.read_excel("../../data/black.xlsx", sheetname= "Movimientos")
df_miembros = pd.read_excel("../../data/black.xlsx", sheetname= "Miembros")
df = pd.merge(df_mov, df_miembros, on = ['id_miembro'], how = 'inner')
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 7624 entries, 0 to 7623
Data columns (total 11 columns):
id_miembro            7624 non-null int64
fecha                 7624 non-null datetime64[ns]
minuto                7624 non-null int64
hora                  7624 non-null int64
importe               7624 non-null float64
comercio              6889 non-null object
actividad_completa    7623 non-null object
actividad             7623 non-null object
nombre                7624 non-null object
funcion               7624 non-null object
organizacion          6041 non-null object
dtypes: datetime64[ns](1), float64(1), int64(3), object(6)
memory usage: 714.8+ KB


In [3]:
df.head(5)

Unnamed: 0,id_miembro,fecha,minuto,hora,importe,comercio,actividad_completa,actividad,nombre,funcion,organizacion
0,30,2009-09-14,52,9,19.26,FARMACIA M JESUS MARTINEZ,FARMACIAS,SALUD,Jesús Pedroche Nieto,concejal,Partido Popular
1,30,2009-06-04,22,16,55.0,EL CORTE INGLES,EL CORTE INGLES,COMPRA BIENES,Jesús Pedroche Nieto,concejal,Partido Popular
2,30,2011-08-26,3,14,10.05,"IBERPISTAS, S.A.C.E.",AUTOPISTAS (TERMINALES),COCHE,Jesús Pedroche Nieto,concejal,Partido Popular
3,30,2011-09-21,44,23,19.9,JUTECO,BODYBELL JUTECO,ESTETICA,Jesús Pedroche Nieto,concejal,Partido Popular
4,30,2008-12-19,37,11,153.0,DELSA NEBLI,"LIBRERIAS, PAPELERIAS Y DISCOS",LIBRERIA,Jesús Pedroche Nieto,concejal,Partido Popular


## Riak ...

In [4]:
# connect to database
cliente = riak.RiakClient()
cliente.ping()

True

Utilizamos un bucket para guardar los movimientos en formato JSON y otro para guardar el importe acumulado por miembro.

En el bucket de movimientos utilizamos una clave autogenerada (no vamos a poder localizar los movimientos por clave) y en el bucket con el importe acumulado la clave es el nombre de la persona en formato HASH, para no tener problemas ni con los espacios del nombre ni con los acentos

Se van a crear los siguientes buckets:

|bucket|Clave|Contenido|
|-|-|-|
|movimientos|Marca temporal|Todos los datos del dataset en formnato JSON y desnormalizados|
|acum_importes|Código HASH de la persona que realiza el movimiento|Mapa con el nombre de la persona y el importe acumulado|

El bucket de tweets tendrá los siguientes índices:

|Nombre del índice|Contenido|¿Que busquedas permite hacer?|
|-|-|-|
|idx_miembro_bin|Nombre de la persona|Localizar movimientos de una persona concreta|

In [5]:
BUCKET_MOVIMIENTOS = 'movimientos'
BUCKET_ACUM_IMPORTES = 'acum_importes'

bucket_mov = cliente.bucket(BUCKET_MOVIMIENTOS)
bucket_acum_importes = cliente.bucket_type('maps').bucket(BUCKET_ACUM_IMPORTES)

Función para eliminar los datos de un BUCKET de Riak

In [6]:
def drop_keys(bucket):
    for keys in bucket.stream_keys():
        # print(keys)
        for keys in bucket.stream_keys():
            for key in keys:
                bucket.delete(key)

In [7]:
# Borramos los datos ..
drop_keys(bucket_mov)
drop_keys(bucket_acum_importes)

## Inserción de información en Riak

Para generar los datos en formato JSON partimos del DataFrame de Pandas y lo exportamos a JSON (en formato String) para luego cargarlo en un diccionario de Python

Observa que cuando se genera el dato en formato JSON, los campos fechas se almacenan como un [TIMESTAMP de unix](http://www.unixtimestamp.com), por lo que habría que volver a convertirlo a fecha según el caso

In [8]:
json_string = df.to_json(orient = 'records')
json_list = json.loads(json_string)

In [9]:
print(len(json_list))

7624


In [10]:
pp(json_list[0])

{
    u'actividad': u'SALUD',
    u'actividad_completa': u'FARMACIAS',
    u'comercio': u'FARMACIA M JESUS MARTINEZ',
    u'fecha': 1252886400000,
    u'funcion': u'concejal',
    u'hora': 9,
    u'id_miembro': 30,
    u'importe': 19.26,
    u'minuto': 52,
    u'nombre': u'Jesús Pedroche Nieto',
    u'organizacion': u'Partido Popular',
}


Vamos a utilizar el nombre de la persona como clave, por lo que necesitamos convertirla previamente a un código que no lleve ni espacios ni acentos.

Este es precisamente el objetivo de la siguiente función ...

In [11]:
def hash_string(s):
    s_utf8 = s.encode('utf-8')
    return hashlib.md5(s_utf8).hexdigest()

Recorremos el dataset de movimientos, almacenando la información tanto en el bucket de movimientos como en el bucket que acumula los movimientos por cliente.

En este último utilizamos una funcionalidad que nos da Riak para guardar la información tipificada. Utilizamos un mapa que tiene dos elementos: Un registro donde guardamos el nombre del cliente y un contador donde guardamos el importe (sin decimales)

In [13]:
# Bucle de carga de datos ...
for movimiento_json in json_list:
    key = uuid.uuid1().hex
    hash_nombre =  hash_string(movimiento_json["nombre"])
    
    movimiento = bucket_mov.new(key, movimiento_json)
    movimiento.add_index('idx_miembro_bin', hash_nombre)
    movimiento.store()
    
    map_nombre = bucket_acum_importes.new(hash_nombre)
    map_nombre.registers['nombre'].assign(movimiento_json["nombre"].encode('utf-8'))
    map_nombre.counters['importe'].increment(int(round(movimiento_json["importe"] * 100)))
    map_nombre.store()
    

# Lectura de información en Riak

En la siguiente función partimos del objeto JSON que nos devuelve RIAK y generamos un DataFrame Pandas.

En el caso de la fecha, lo convertimos desde un UNIX Timestamp al formato correcto para Pandas

In [14]:
def json_to_pandas(rows):
    # Convertimos el objeto JSON en un objeto pandas 
    df = pd.read_json(json.dumps(rows))
    
    if 'fecha' in df.columns:
        # Las fechas están en formato UNIX TIMESTAMP. Las volvemos a convertir a formato Date...
        df = df.assign(fecha = pd.to_datetime(df.fecha, unit = 'ms'))

    return df

### Lectura del bucket de movimientos

Cargamos todos los movimientos de la base de datos y lo guardamos en un objeto Pandas, ya que hay ciertas preguntas que no pueden resolverse directamente por esta base de datos

Recuerda que RIAK permite obtener información de un clave, pero no le es posible devolver la información ordenada ...

In [15]:
rows = []
for keys in bucket_mov.stream_keys():
    for key in keys:
        # print('Key %s' % key )
        rows.append(bucket_mov.get(key).data)
        
# Convertimos el objeto json en un objeto pandas 
df_movimientos = json_to_pandas(rows)

In [16]:
df_movimientos.head()

Unnamed: 0,actividad,actividad_completa,comercio,fecha,funcion,hora,id_miembro,importe,minuto,nombre,organizacion
0,SUPERMERCADO,HIPERCOR SUPERMERCADOS EL CORTE INGLES,EL CORTE INGLES,2004-01-22,directivo,14,28,236.41,10,Ildefonso José Sánchez Barcoj,
1,COCHE,GASOLINERAS,,2010-08-13,directivo,14,28,73.51,34,Ildefonso José Sánchez Barcoj,
2,SALIDAS,ESPECTACULOS Y DEPORTES,TICKETMASTER ENTRADAS WEB,2007-03-08,directivo,18,28,15.3,18,Ildefonso José Sánchez Barcoj,
3,COCHE,AUTOPISTAS (TERMINALES),AUTOPISTAS AUMAR SA,2003-11-03,directivo,15,28,4.5,49,Ildefonso José Sánchez Barcoj,
4,ROPA,CONFECCION TEXTIL EN GENERAL,TIENDAS NIKE - ARONA,2009-12-05,directivo,1,28,162.8,58,Ildefonso José Sánchez Barcoj,


### Los 10 movimientos mas caros

Esta consulta no la podemos contestar directamente con la Base de Datos por lo que nos apoyamos en un proceso en el cliente

In [17]:
df_movimientos.sort_values('importe', ascending=False)[['nombre', 'fecha', 'actividad_completa', 'importe']].head(10)

Unnamed: 0,nombre,fecha,actividad_completa,importe
3281,Ricardo Romero de Tejada y Picatoste,2007-11-25,AGENCIAS BANCARIAS(ANTICIPO VENTANILLA),11930.0
4151,Carlos Vela García,2006-11-01,VIAJES MARSANS-INTERNACIONAL EXPRESSO,9825.0
4299,Ildefonso José Sánchez Barcoj,2009-03-30,EL CORTE INGLES,9804.15
1093,Carmen Contreras Gómez,2010-08-15,"HOTELES 4 Y 5 ESTRELLAS,BALNEARIOS,CAMPI",9076.76
6751,Enrique de la Torre Martínez,2006-11-29,AGENCIAS BANCARIAS(ANTICIPO VENTANILLA),8000.0
782,Ildefonso José Sánchez Barcoj,2005-07-19,AGENCIAS BANCARIAS(ANTICIPO VENTANILLA),7200.0
6817,Miguel Blesa de la Parra,2007-08-02,EL CORTE INGLES,6990.87
5627,María Elena Gil García,2004-03-30,JOYERIAS Y RELOJERIAS,6905.72
3842,Enrique de la Torre Martínez,2005-11-24,AGENCIAS BANCARIAS(ANTICIPO VENTANILLA),6000.0
3757,María Elena Gil García,2003-08-28,JOYERIAS Y RELOJERIAS,6000.0


### Lectura a través de un índice

In [18]:
rows = []
keys = bucket_mov.stream_index("idx_miembro_bin", hash_string(u"Javier de Miguel Sánchez"))
for keys in keys.results:
    for movimiento_key in keys:
        rows.append(bucket_mov.get(movimiento_key).data)
    
df = json_to_pandas(rows)

### Los movimientos de una persona concreta

Para obtener los movimientos de una persona en concreto si podemos utilizar el índice que habíamos creado Ad-hoc, aunque la ordenación se realiza en Pandas

In [19]:
df.sort_values('importe', ascending=False)[['nombre', 'fecha', 'actividad_completa', 'importe']].head(10)

Unnamed: 0,nombre,fecha,actividad_completa,importe
145,Javier de Miguel Sánchez,2008-06-12,"ELECTRODOMESTICOS,EQUIPOS ELECTRICOS",572.0
163,Javier de Miguel Sánchez,2006-05-01,"HOTELES,MOTELES,BALNEARIOS,CAMPINGS REST",571.65
22,Javier de Miguel Sánchez,2003-09-30,AUTOM.Y MOTOCICLETAS ( VENTAS Y REPARAC),417.88
87,Javier de Miguel Sánchez,2010-01-12,"MATERIALES CONSTRUCCION,FONTANERIA,SANEA",383.0
106,Javier de Miguel Sánchez,2006-12-08,EL CORTE INGLES,304.66
147,Javier de Miguel Sánchez,2008-03-16,"HOTELES 4 Y 5 ESTRELLAS,BALNEARIOS,CAMPI",287.07
90,Javier de Miguel Sánchez,2009-06-19,HIPERCOR SUPERMERCADOS EL CORTE INGLES,283.0
85,Javier de Miguel Sánchez,2009-12-22,"MATERIALES CONSTRUCCION,FONTANERIA,SANEA",272.85
41,Javier de Miguel Sánchez,2003-08-13,"HOTELES,MOTELES,BALNEARIOS,CAMPINGS REST",268.0
155,Javier de Miguel Sánchez,2009-08-16,PARADORES NACIONALES,250.38


### Lectura de una información agregada

La información agregada está en un mapa, por lo que tenemos que procesarla y convertirla a formato JSON.

In [20]:
rows = []
for keys in bucket_acum_importes.stream_keys():
    #print(keys)
    for key in keys:
        map_nombre = bucket_acum_importes.get(key)
        rows.append({'nombre' : map_nombre.registers['nombre'].value,
                     "importe" : float(map_nombre.counters['importe'].value) / 100})
        
# Convertimos el objeto json en un objeto pandas 
df = json_to_pandas(rows)

### Las 10 personas que mas han gastado

Fácil si disponemos de un agregado. Utilizamos Pandas para ordenar la información ..,

In [21]:
df.sort_values('importe', ascending=False).head(10)

Unnamed: 0,importe,nombre
79,66444.15,Ildefonso José Sánchez Barcoj
18,40356.29,José Antonio Moral Santín
7,37639.57,Carlos Vela García
8,37608.99,Miguel Blesa de la Parra
28,37472.01,Enrique de la Torre Martínez
40,36590.68,Matías Amat Roca
69,36086.89,Maria Mercedes de la Merced Monge
41,35901.41,Ricardo Romero de Tejada y Picatoste
34,35574.91,Ricardo Morado Iglesias
62,35136.97,Ramón Ferraz Ricarte
