# Práctica - Redis y MongoDB

0. Inicial en un terminal los Jupyter Notebooks, algo que ya has hecho si estás leyendo esto.
1. Necesitaremos una conexión a redis, quizás la que ya hemos creado en redislabs
2. También mongo, quizás la conexión a Atlas vista en una de las prácticas voluntarias anteriores (o la conexión en local, en este caso recordar arrancar el servidor mongod -dbpath datos)
3. Bajar los ficheros tweet.json y user.json
4. Desde otro terminal de linux teclear en la carpeta donde se han descargado los ficheros

```
mongoimport --db test --collection user  --drop --file user.json
mongoimport --db test --collection tweet --drop --file tweet.json
```
o importarlos desde Atlas. 

Se importaran 141 y 17834 documentos, respectivamente. Los datos corresponden a una muestra de tweets obtenidos durante las últimas elecciones USA, separados en una colección con los datos de los usuarios y otra colección con los datos de los tweets.

Por ejemplo, un tweet (no es el formato original, han sido preprocesados).

```
{
	"_id" : NumberLong("795892189377286144"),
	"text" : "#ImVotingBecause @realDonaldTrump supports EVERY American citizen.\n\n#MAGA #ElectionNight #Election2016 https://t.co/iqnVh1o2YH",
	"created_at" : ISODate("2016-11-08T07:34:19Z"),
	"user" : {
		"_id" : "721446222549147648",
		"verified" : false,
		"screen" : "FacMagnaAmerica" // nombre en twitter
	},
	"RT" : false,            // no es retweet
	"clinton" : false,       // no habla de clinton
	"trump" : true,          // sí habla de trump
	"mentions" : [           // gente a la que menciona
		"realDonaldTrump"
	],
	"hlabel" : 0,            // opinión sobre clinton
	"tlabel" : 1,            // opinión sobre trump
	"opinion" : 1            // si el tweet contiene opinión
}
```

user:

```
{
	"_id" : "246939630",  // id del user
	"lang" : "en",        // idioma preferido
	"verified" : true,    // cuenta verificada
	"screen_name" : "shannonrwatts",
	"url" : "http://www.momsdemandaction.org",
	"created_at" : ISODate("2011-02-03T19:32:59Z"),
	"time_zone" : "Indiana (East)",
	"tweets" : {
		"all" : 41602,   // total de tweets en esta cuenta
		"total" : 103,   // total de tweets en este dataset
		"RT" : 59,       // de los cuales son RTs
		"original" : 44  // de los cuales son originales
	},
	"followers" : 19677,
	"location" : null,
	"following" : 1950,
	"favourites_count" : 31228,
	"geo_enabled" : false,
	"id" : "246939630",
	"clinton" : true, // si ha hablado de clinton
	"trump" : true,   // si ha hablado de trump
	"mentions" : 510, // num. menciones realizadas en dataset 
	"RTin" : 4779, // número de RTs recibidos en dataset
	"rank" : {  // datos normalizados
		"followers" : 17689.5,
		"mentions" : 450.5,
		"RTin" : 122
	},
	"norm" : {
		"mentions" : 11.590909090909092,
		"RTin" : 108.61363636363636
	},
	"normrank" : {
		"followers" : 17689.5,
		"mentions" : 10483.25,
		"RTin" : 731
	}
}
```
Los valores hlabel y tlabel pueden ser -1,0 y +1.


Empezamos importando librerías y conectando con mongo y redis:


In [14]:
import json
import pymongo
from pprint import pprint
import redis
import time

# iniciamos mongo, en mi caso en local
from pymongo import MongoClient
client = MongoClient('mongodb://127.0.0.1:27017/')
print("Bases de datos en MongoDB",list(client.list_databases()))
db = client.test

# iniciamos redis, en mi caso en red:

redisconexion = "redis-12588.c135.eu-central-1-1.ec2.cloud.redislabs.com"
redispuerto = 12588
passwd = "YNUBRlaFh6kJnTKfDxsXur44M3jOkNqy"

r = redis.Redis(host=redisconexion, password=passwd, port=redispuerto, charset="utf-8", decode_responses=True, db=0)
print("Conexión con Redis: ",r.ping())

Bases de datos en MongoDB [{'name': 'ABD', 'sizeOnDisk': 73728.0, 'empty': False}, {'name': 'Product_Catalogue', 'sizeOnDisk': 77824.0, 'empty': False}, {'name': 'admin', 'sizeOnDisk': 32768.0, 'empty': False}, {'name': 'astronomia', 'sizeOnDisk': 61440.0, 'empty': False}, {'name': 'config', 'sizeOnDisk': 110592.0, 'empty': False}, {'name': 'local', 'sizeOnDisk': 77824.0, 'empty': False}, {'name': 'movilmongo', 'sizeOnDisk': 53248.0, 'empty': False}, {'name': 'practica', 'sizeOnDisk': 6250496.0, 'empty': False}, {'name': 'test', 'sizeOnDisk': 73728.0, 'empty': False}, {'name': 'videoclub', 'sizeOnDisk': 122880.0, 'empty': False}]
Conexión con Redis:  True


Comenzamos mostrando una función que tenemos que modificar. 
Sustituirá a aggregate, pero utilizando Redis como cache.
De momento es una versión básica, que usaremos como base para el ejercio 1. Esta función recibe 3 parámetros.
- db: la conexión con MongoDB
- coleccion: la colección a la que se quiere consultar (como string)
- pipeline: las consulta de agregación a realizar.

Devuelve un máximo de 10 documentos (una limitación que asumimos para simplificar)

In [15]:
def aggregate(db, coleccion, pipeline):
    pipe = pipeline.copy() # para no modificar lo que nos da el usuario
    pipe.append({"$limit":10}) # añadimos una etapa que limita a 10
    res = db[coleccion].aggregate(pipe) # a mongo!
    salida = [doc for doc in res]
    return salida

Para probarla podemos ejecutar esta consulta simple

In [16]:
# en lugar de 
#res = db.tweet.aggregate([
#{'$project':{"user.screen":1, "text":1, "RT":1, "_id":0}}
#])
res = aggregate(db,'tweet',[
{'$project':{"user.screen":1, "text":1, "RT":1, "_id":0}}
])
pprint(res)



[{'RT': False,
  'text': "I've lived in the USA for 7 years. I have a green card but I can't "
          'vote. If you are able to then please vote @HillaryClinton NOT the '
          'buffoon',
  'user': {'screen': 'JosephMorgan'}},
 {'RT': True,
  'text': 'RT @jpm05880: @realDonaldTrump latinos storming precinct 10 sw '
          'Miami to vote for our one and only president Trump!! '
          'https://t.co/46GC1n8LU6',
  'user': {'screen': 'FacMagnaAmerica'}},
 {'RT': True,
  'text': 'RT @Chairmnoomowmow: @realDonaldTrump @CNN @MSNBC @NBCNews '
          '@ABCPolitics @CBSNews \n'
          '\n'
          '#HillaryClinton #wikileaks #LeftistHate https://t.co/mA8…',
  'user': {'screen': 'FacMagnaAmerica'}},
 {'RT': False,
  'text': '@GVkacha @TheLastWord @realDonaldTrump \n\nhttps://t.co/zaluOXL8N0',
  'user': {'screen': 'FacMagnaAmerica'}},
 {'RT': True,
  'text': 'RT @DanScavino: Thank you Pensacola, Florida!\n'
          '@realDonaldTrump @mike_pence will #DrainTheSwamp with you-

## Pregunta 1. [2 puntos]

Escribir una versión de aggregate con la misma cabecera tal
que utilice Redis como cache:
    
- Si la consulta está en cache, devolverá la respuesta almacenada por Redis sin llamar a Mongo,  y le actualizará el tiempo de vida a 5 segundos
- Si no lo está hará la consulta a mongo tal y como muestra la versión anterior de aggregate, pero antes de devolver el resultado pondrá en cache el resultado, con un tiempo de vida de 5 segundos

En ambos casos, la función mostrará por pantalla un mensaje 'recuperado de cache' (primer caso) o 'acceso a Mongo' (segundo caso).

El valor que se devolverá será un array de respuestas, no un string.

Idea:
La forma de hacerlo es representar la propia consulta como clave y su respuesta como valor. 

- Para convertir el pipeline en un string que será la clave en redis podemos utilizar json.dumps(pipeline), que devuelve un string a partir de un pipeline, ese string será la clave

- Igualmente json.dumps(l) nos convertirá la respuesta obtenido por mongo en un string que podemos almacenar como valor de la clave. En este caso `l`es la lista con los 10 primeros documentos devueltos por la consulta.

- Finalmente, si la consulta ya existe, tenemos que convertir el valor de una consulta que ya se ha hecho a mongo para no devolver un string podemos usar ` json.loads(valor)`


     

In [93]:
def aggregate(db,coleccion,pipeline):
    pipe = pipeline.copy() # para no modificar lo que nos da el usuario
    pipe.append({"$limit":10}) # añadimos una etapa que limita a 10
    
    queryString = json.dumps(pipe)
    if r.exists(queryString):
        queryResult = r.get(queryString)
        r.set(queryString, queryResult, ex=5)
        print("Acceso a cache")
        return json.loads(queryResult)
    else:
        res = db[coleccion].aggregate(pipe) # a mongo!
        salida = [doc for doc in res]
        r.set(queryString, json.dumps(salida), ex=5)
        print("Acceso a Mongo")
        return salida

In [122]:
### Tests de la pregunta 1. No cambiar
r.flushall()
res = aggregate(db,'tweet',[{'$sortByCount':'$hlabel'}])
pprint(res)
time.sleep(3)
res = aggregate(db,'tweet',[{'$sortByCount':'$hlabel'}])
pprint(res)
time.sleep(6)
res = aggregate(db,'tweet',[{'$sortByCount':'$hlabel'}])

# salida esperada (sin # al principio, claro)
# acceso a Mongo
#[{'_id': 0, 'count': 11338},
# {'_id': -1, 'count': 4645},
# {'_id': 1, 'count': 1851}]
#acceso a cache
#[{'_id': 0, 'count': 11338},
# {'_id': -1, 'count': 4645},
# {'_id': 1, 'count': 1851}]
#acceso a Mongo


Acceso a Mongo
[{'_id': 0, 'count': 11338},
 {'_id': -1, 'count': 4645},
 {'_id': 1, 'count': 1851}]
Acceso a cache
[{'_id': 0, 'count': 11338},
 {'_id': -1, 'count': 4645},
 {'_id': 1, 'count': 1851}]
Acceso a cache


### Pregunta 2

Usando la colección tweet, para cada usuario con al menos 10 tweets, indicar el número total de tweets. El usuario corresponde al campo "user.screen".  La salida será de la forma:

```
[{'_id': 'FoxNews', 'ntweets': 471},
 {'_id': 'DanScavino', 'ntweets': 51},
 {'_id': 'KellyannePolls', 'ntweets': 18},
 {'_id': 'LouDobbs', 'ntweets': 34},
 {'_id': 'FoxBusiness', 'ntweets': 271},
 {'_id': 'GenFlynn', 'ntweets': 21},
 {'_id': 'Darren32895836', 'ntweets': 454},
 {'_id': 'VigilanteArtist', 'ntweets': 31},
 {'_id': 'Harlan', 'ntweets': 49},
 {'_id': 'DiamondandSilk', 'ntweets': 13}]
```

Nota: si no se ha podido hacer el ejercicio 1 utilizar el aggregate definido al principio de la práctica, o si no simplemente db.aggregate

In [111]:
# solución
salida = aggregate(db,"tweet",[
    {"$group":{"_id":"$user.screen", "ntweets":{"$sum": 1}}},
    {"$match": {"ntweets": {"$gte": 10}}}
])
pprint(salida)


Acceso a cache
[{'_id': 'TeamTrump', 'ntweets': 302},
 {'_id': 'DanScavino', 'ntweets': 51},
 {'_id': 'IngrahamAngle', 'ntweets': 66},
 {'_id': 'JamesOKeefeIII', 'ntweets': 24},
 {'_id': 'LouDobbs', 'ntweets': 34},
 {'_id': 'GenFlynn', 'ntweets': 21},
 {'_id': 'VigilanteArtist', 'ntweets': 31},
 {'_id': 'DiamondandSilk', 'ntweets': 13},
 {'_id': 'DonaldJTrumpJr', 'ntweets': 75},
 {'_id': 'WDFx2EU7', 'ntweets': 104}]


### Pregunta 3
La misma consulta que en la pregunta 2, pero mostrando solo al usuario con más tweets. 

Salida esperada:
```    
acceso a Mongo
[{'_id': 'TrumpDynastyUSA', 'ntweets': 2000}]
```

In [113]:
salida = aggregate(db,"tweet",[
    {"$group":{"_id":"$user.screen", "ntweets":{"$sum": 1}}},
    {"$match": {"ntweets": {"$gte": 10}}},
    {"$sort": {"ntweets":-1}},
    {"$limit": 1}
])
pprint(salida)

Acceso a cache
[{'_id': 'TrumpDynastyUSA', 'ntweets': 2000}]


### Pregunta 4
En tweet, queremos saber qué usuarios han sido los más mencionados (campo mentions). Hacer que se muestre por pantalla una lista con cada usuario y su número de menciones, ordenado de más a menos menciones.

Salida esperada:
```
acceso a Mongo
[{'_id': 'HillaryClinton', 'total': 4832},
 {'_id': 'realDonaldTrump', 'total': 3976},
 {'_id': 'USFreedomArmy', 'total': 664},
 {'_id': 'DonaldTrumpVote', 'total': 629},
 {'_id': 'Karennola719', 'total': 623},
 {'_id': 'Jnbarke', 'total': 507},
 {'_id': 'POTUS', 'total': 324},
 {'_id': 'hillaryclinton', 'total': 179},
 {'_id': 'TeamTrump', 'total': 141},
 {'_id': 'FoxNews', 'total': 140}]
 ```

In [115]:
salida = aggregate(db,"tweet",[
    {"$unwind": "$mentions"},
    {"$group":{"_id":"$mentions", "total":{"$sum": 1}}},
    {"$sort": {"total":-1}}
])
pprint(salida)

Acceso a cache
[{'_id': 'HillaryClinton', 'total': 4832},
 {'_id': 'realDonaldTrump', 'total': 3976},
 {'_id': 'USFreedomArmy', 'total': 664},
 {'_id': 'DonaldTrumpVote', 'total': 629},
 {'_id': 'Karennola719', 'total': 623},
 {'_id': 'Jnbarke', 'total': 507},
 {'_id': 'POTUS', 'total': 324},
 {'_id': 'hillaryclinton', 'total': 179},
 {'_id': 'TeamTrump', 'total': 141},
 {'_id': 'FoxNews', 'total': 140}]


### Pregunta 5
Número medio de seguidores (followers) de todos los usuarios en el dataset

Salida esperada:
```
acceso a Mongo
[{'_id': 0, 'media': 1561724.7375886524}]
```

In [117]:
salida = aggregate(db,"user",[
    {"$group": {"_id":0, "media":{"$avg":"$followers"}}}
])
pprint(salida)

Acceso a cache
[{'_id': 0, 'media': 1561724.7375886524}]


### Pregunta 6
Para agrupar, por ejemplo por el año de creación del usuario podemos usar ```'_id'```:{'$year':"$created_at"}. Utilizar esta pista para mostrar un listado de usuarios ordenado por año de creación de su cuenta. También debe mostrarse el número medio de tweets totals emitidos por los usuarios con ese año de creación (clave *all* de la clave *tweets*). Debe salir algo como:

```
acceso a Mongo
[{'_id': 2007, 'media': 113450.85714285714, 'total': 7},
 {'_id': 2008, 'media': 25509.18181818182, 'total': 11},
 {'_id': 2009, 'media': 20826.975, 'total': 40},
 {'_id': 2010, 'media': 32492.53846153846, 'total': 13},
 {'_id': 2011, 'media': 39527.6, 'total': 15},
 {'_id': 2012, 'media': 43158.769230769234, 'total': 13},
 {'_id': 2013, 'media': 78925.16666666667, 'total': 6},
 {'_id': 2014, 'media': 44603.166666666664, 'total': 6},
 {'_id': 2015, 'media': 13929.0, 'total': 13},
 {'_id': 2016, 'media': 10040.235294117647, 'total': 17}]
```

In [119]:
salida = aggregate(db,"user",[
    {"$group": {"_id":{"$year": "$created_at"}, "media":{"$avg":"$tweets.all"}, "total":{"$sum":1}}},
    {"$sort": {"_id": 1}}
])
pprint(salida)

Acceso a cache
[{'_id': 2007, 'media': 113450.85714285714, 'total': 7},
 {'_id': 2008, 'media': 25509.18181818182, 'total': 11},
 {'_id': 2009, 'media': 20826.975, 'total': 40},
 {'_id': 2010, 'media': 32492.53846153846, 'total': 13},
 {'_id': 2011, 'media': 39527.6, 'total': 15},
 {'_id': 2012, 'media': 43158.769230769234, 'total': 13},
 {'_id': 2013, 'media': 78925.16666666667, 'total': 6},
 {'_id': 2014, 'media': 44603.166666666664, 'total': 6},
 {'_id': 2015, 'media': 13929.0, 'total': 13},
 {'_id': 2016, 'media': 10040.235294117647, 'total': 17}]


### Pregunta 7 
Repetir de nuevo la consulta de la pregunta 2, pero mostrando además para cada usuario qué proporción de los tweets son retweets (campo RT a True). 

Salida:

```
acceso a Mongo
[{'_id': 'FoxNews', 'ntweets': 471, 'ratio': 0.040339702760084924},
 {'_id': 'DanScavino', 'ntweets': 51, 'ratio': 0.058823529411764705},
 {'_id': 'KellyannePolls', 'ntweets': 18, 'ratio': 0.1111111111111111},
 {'_id': 'LouDobbs', 'ntweets': 34, 'ratio': 0.11764705882352941},
 {'_id': 'FoxBusiness', 'ntweets': 271, 'ratio': 0.02952029520295203},
 {'_id': 'GenFlynn', 'ntweets': 21, 'ratio': 0.0},
 {'_id': 'Darren32895836', 'ntweets': 454, 'ratio': 0.7687224669603524},
 {'_id': 'VigilanteArtist', 'ntweets': 31, 'ratio': 0.8387096774193549},
 {'_id': 'Harlan', 'ntweets': 49, 'ratio': 0.5918367346938775},
 {'_id': 'DiamondandSilk', 'ntweets': 13, 'ratio': 0.46153846153846156}]
``` 

In [121]:
salida = aggregate(db,"tweet",[
    {"$project": {
        "_id":0,
        "user":1,
        "trues": {"$cond":[{"$eq":['$RT', True]}, 1, 0]}
    }},
    {"$group":{"_id":"$user.screen", "ntweets":{"$sum": 1}, "rtTrue":{"$sum":"$trues"} }},
    {"$project": {
        "_id":1,
        "ntweets":1,
        "ratio":{"$divide":["$rtTrue", "$ntweets"]}
    }}
])
pprint(salida)

Acceso a cache
[{'_id': 'TeamTrump', 'ntweets': 302, 'ratio': 0.4304635761589404},
 {'_id': 'DanScavino', 'ntweets': 51, 'ratio': 0.058823529411764705},
 {'_id': 'IngrahamAngle', 'ntweets': 66, 'ratio': 0.10606060606060606},
 {'_id': 'JamesOKeefeIII', 'ntweets': 24, 'ratio': 0.5},
 {'_id': 'LouDobbs', 'ntweets': 34, 'ratio': 0.11764705882352941},
 {'_id': 'GenFlynn', 'ntweets': 21, 'ratio': 0.0},
 {'_id': 'VigilanteArtist', 'ntweets': 31, 'ratio': 0.8387096774193549},
 {'_id': 'DiamondandSilk', 'ntweets': 13, 'ratio': 0.46153846153846156},
 {'_id': 'DonaldJTrumpJr', 'ntweets': 75, 'ratio': 0.8933333333333333},
 {'_id': 'WDFx2EU7', 'ntweets': 104, 'ratio': 0.3942307692307692}]


### Pregunta 8 (2 puntos)

Vamos a escribir una nueva versión de aggregate con las siguientes ideas:

Ahora ya no queremos que la consulta expire del caché a los 5 segundos, en lugar de eso queremos guardar las últimas 6 consultas. Es decir cuando llega una nueva consulta diferente de las que ya hay se añade si todavía no hay 6 almacenadas. Si sí que hay 6 eliminará la más antigua y almacenará la nueva.

Habrá que usar alguna estructura (listas, hash, set, lo que queráis).

Nota: no hace falta hacer que funcione en un entorno concurrente, no preocuparse por condiciones de carrera, atomicidad, etc.


In [107]:
def aggregate(db,coleccion,pipeline):
    pipe = pipeline.copy() # para no modificar lo que nos da el usuario
    pipe.append({"$limit":10}) # añadimos una etapa que limita a 10
    
    queryString = json.dumps(pipe)
    if r.hexists("consultas", queryString):
        queryResult = r.hget("consultas", queryString)
        print("Acceso a cache")
        return json.loads(queryResult)
    else:
        res = db[coleccion].aggregate(pipe) # a mongo!
        salida = [doc for doc in res]
        
        if r.llen("antiguedad") is 6:
            oldQuery = r.rpop("antiguedad")
            r.hdel("consultas", oldQuery)
            
        r.hset("consultas", queryString, json.dumps(salida))
        r.lpush("antiguedad", queryString)
            
        print("Acceso a Mongo")
        return salida