# Tarea 1
## IIC2440 - Procesamiento de Datos Masivos

Integrantes: 
- Rodrigo Nahum
- Fernando Quintana

Notebook 1: LSH y obtención de tweets similares.

# Importar datos

Primero, importamos algunas librerías a usar.

In [39]:
import pandas as pd
import numpy as np
import csv
from tqdm import tqdm
import pickle
import sys
import random
import time

Luego, importamos esta funcion que permite estimar la cantidad de memoria total que utiliza un objeto de Python. Obtenida de https://code.activestate.com/recipes/577504/

In [2]:
from __future__ import print_function
from sys import getsizeof, stderr
from itertools import chain
from collections import deque
try:
    from reprlib import repr
except ImportError:
    pass

def total_size(o, handlers={}, verbose=False):
    """ Returns the approximate memory footprint an object and all of its contents.

    Automatically finds the contents of the following builtin containers and
    their subclasses:  tuple, list, deque, dict, set and frozenset.
    To search other containers, add handlers to iterate over their contents:

        handlers = {SomeContainerClass: iter,
                    OtherContainerClass: OtherContainerClass.get_elements}

    """
    dict_handler = lambda d: chain.from_iterable(d.items())
    all_handlers = {tuple: iter,
                    list: iter,
                    deque: iter,
                    dict: dict_handler,
                    set: iter,
                    frozenset: iter,
                   }
    all_handlers.update(handlers)     # user handlers take precedence
    seen = set()                      # track which object id's have already been seen
    default_size = getsizeof(0)       # estimate sizeof object without __sizeof__

    def sizeof(o):
        if id(o) in seen:       # do not double count the same object
            return 0
        seen.add(id(o))
        s = getsizeof(o, default_size)

        if verbose:
            print(s, type(o), repr(o), file=stderr)

        for typ, handler in all_handlers.items():
            if isinstance(o, typ):
                s += sum(map(sizeof, handler(o)))
                break
        return s

    return sizeof(o)

Luego, importamos el dataset como un dataframe de pandas.

In [3]:
data = pd.read_csv('tweets_2022_abril_junio.csv')

Mostramos algunas filas para entender el formato del dataset.

In [4]:
data.head()

Unnamed: 0,id,created_at,screen_name,text,favorite_count,retweet_count
0,1512186166438637582,2022-04-07 21:50:51 UTC,h0l4d4ni3l4,RT @ValeMirandaCC: Tras casi 50 años del golpe...,0,0
1,1512186202367045642,2022-04-07 21:51:00 UTC,Claudio70932894,RT @UTDTrabajoDigno: Mañana jueves a las 18hrs...,0,0
2,1512186287284924418,2022-04-07 21:51:20 UTC,Cesar_A_RR,RT @JaimeGuajardoR: Aquí está el aporte de @te...,0,0
3,1512186335754301446,2022-04-07 21:51:32 UTC,rosmarieher,RT @melnicksergio: la pelotudez no tiene limit...,0,0
4,1512186407841767424,2022-04-07 21:51:49 UTC,GQuelluen,RT @BSepulvedaHales: Ante la circulación de no...,0,0


# FIlter inicial

Partimos filtrando datos que no importan, como el created_at, o el favorite_count, retweet_count. Con esto, reducimos el tamaño del dataset en 77MB.

In [5]:
filtered_data_raw = data.filter(items=['id', 'screen_name', 'text'])

In [6]:
filtered_data = filtered_data_raw.astype({'screen_name': 'string', 'text': 'string'})

In [7]:
filtered_data.dtypes

id                      int64
screen_name    string[python]
text           string[python]
dtype: object

Tambien, vemos que existen algunos ids duplicados, por lo que tambien los removemos

In [8]:
filtered_data.drop_duplicates(subset=['id'])

Unnamed: 0,id,screen_name,text
0,1512186166438637582,h0l4d4ni3l4,RT @ValeMirandaCC: Tras casi 50 años del golpe...
1,1512186202367045642,Claudio70932894,RT @UTDTrabajoDigno: Mañana jueves a las 18hrs...
2,1512186287284924418,Cesar_A_RR,RT @JaimeGuajardoR: Aquí está el aporte de @te...
3,1512186335754301446,rosmarieher,RT @melnicksergio: la pelotudez no tiene limit...
4,1512186407841767424,GQuelluen,RT @BSepulvedaHales: Ante la circulación de no...
...,...,...,...
4594975,1526652300709679104,Alebarrera74,RT @DanielAbelLope1: @tere_marinovic 😡🤮😡🤮 VIEJ...
4594976,1526641118460334080,gigita29bq,RT @DanielAbelLope1: @tere_marinovic 😡🤮😡🤮 VIEJ...
4594977,1526738292011462657,Elizabe81480339,RT @Gonz1Gorjeperez: @tere_marinovic https://t...
4594978,1526855280151056386,CastilloNafla,RT @Gonz1Gorjeperez: @tere_marinovic https://t...


Luego, guardamos este dataset para no tener que hacer el filtro de nuevo.

In [9]:
filtered_data.to_csv('filtered_tweets_2022_abril_junio.csv', quotechar='"', quoting=csv.QUOTE_NONNUMERIC)

# Primer intento: Fuerza bruta.

Intentamos hacer un doble for en el dataframe. Primero, vemos cuánto demora hacer una iteración completa por el dataset.

In [13]:
for item1 in filtered_data.itertuples():
    pass


Obtenemos que 1 iteracion tomaba como 3.2 seg. Entonces, el doble for iba a tomar la cantidad de filas * 3.2 seg, o sea, ~14.400.000 segundos ~ 166 dias. Obviamente no tenemos este tiempo. Como segundo intento, podemos pasar el dataframe a lista, porque estas son más rápidas para iterar.

In [21]:
dataframe_list = filtered_data.values.tolist()

In [23]:
for item1 in dataframe_list:
    pass

Esta vez, 1 iteración toma como 80ms. De nuevo, el doble for tomaría la cantidad de filas por 0.08s, o sea, 4 días. Si bien es menos, sigue siendo mucho tiempo. Entonces, intentamos algo más inteligente.

# Segundo intento: Shingling y LSH

Nuestra segunda estrategia corresponde a hacer Shingling de los textos de los tweets, y luego usar LSH para obtener tweets similares.

## Shingling

Primero, cargamos el dataset ya filtrado.

In [25]:
filtered_data = pd.read_csv('filtered_tweets_2022_abril_junio.csv', index_col=0, quotechar='"')

Luego, definimos una función para obtener los shingles de los textos, dado un k. Luego de probar distintos valores, decidimos usar k=5.

In [26]:
def k_shingle(text, k=5):
    shingles = set()
    for idx in range(len(text) - k + 1):
        curr_slice = text[idx : idx + k]
        shingles.add(curr_slice)
    return shingles

k = 5

Una vez definido el k de los shingles, procedemos a borrar todos los tweets con menos de k caracteres, ya que a estos no le podemos obtener los shingles.

In [27]:
reduced_data = filtered_data[filtered_data['text'].apply(len) >= k]

Luego, nos damos cuenta que existen muchos tweets repetidos. Entonces, para evitar tratar con textos duplicados, guardamos una asociacion entre el texto, y el ID de los tweets que tienen ese texto.

In [28]:
tweets = {}

for row in reduced_data.itertuples():
    try:
        tweets[row.text].append(row.id)
    except KeyError:
        tweets[row.text] = [row.id]

print(len(tweets))

1548053


Esta tabla muestra la cantidad de tweets unicos para distintos k (el valor es distinto por el filtrado que hacemos, lo de remover los tweets de tamaño menor a k).

| k | tweets  | 
|---|---------|
| 5 | 1548053 |
| 6 | 1547594 |
| 7 | 1547038 |
| 8 | 1546393 |

Vemos que existen solo ~1500000 de tweets unicos, o sea, un tercio de los originales. Entonces, tenemos que procesar un tercio de los documentos. Luego, usamos una lista para definir un orden inicial (arbitrario) de los tweets.

In [29]:
unique_tweets = list(tweets.keys())

In [83]:
import pickle

with open(f"unique_tweets_k={k}.pkl", 'wb') as f:
    pickle.dump(unique_tweets, f)

In [30]:
total_size(unique_tweets)

316745930

Vemos que la cantidad de memoria que utiliza el texto de todos los tweets es de 316MB, un tamaño perfectamente razonable.

### Primera forma: Obtener todos los Shingles y mantenerlos en memoria.

Intentamos computar todos los shingles al inicio, y luego usarlos para hacer LSH más rápido.

In [33]:
tweet_shingles = {}

i = 1
for row in filtered_data.itertuples():
    if i % 100000 == 0:
        print(f"i: {i}")
        ## Ponemos un break para solo computar 100000, y no
        ## todos los shingles, porque esto no se puede.
        break
    tweet_shingles[row.id] = k_shingle(row.text, k)
    i += 1

i: 100000


Primero, obtenemos 100000 conjuntos de shingles, para ver el uso de memoria.

In [35]:
total_size(tweet_shingles)

1252849614

Vemos que tener 100000 shingles usa 1.25GB de memoria. Entonces, computar los ~1500000 conjuntos de shingles utilizaría 18.75GB de memoria. Este valor ya es muy alto, asi que decidimos buscar otra opción.

### Segunda forma: Obtener los shingles de los textos on-the-fly

Para solucionar el problema anterior, calculamos primero todos los shingles existentes (o sea, todos los shingles presentes en algun documento). Notemos que esto usa mucha menos memoria que lo anterior, que computaba un conjunto por cada shingle, y podíamos tener shingles repetidos. Ahora, no tenemos shingles repetidos. Luego, generamos algún ordenamiento (permutación) de estos shingles. Después, para cada texto, computamos sus shingles, vemos el shingle que antes aparece segun esa permutacion, y ese es el valor de LSH(text). Entonces, guardamos solamente {bucket: text_id}, y no tenemos que guardar todos los shingles simultaneamente en memoria.

Entonces, primero computamos todos los shingles existentes

In [36]:
all_shingles = set()

i = 0
for row in tweets.keys():
    if i % 100000 == 0:
        print(f"i: {i}")
    current_shingles = k_shingle(row, k)
    all_shingles.update(current_shingles)
    i += 1

i: 0
i: 100000
i: 200000
i: 300000
i: 400000
i: 500000
i: 600000
i: 700000
i: 800000
i: 900000
i: 1000000
i: 1100000
i: 1200000
i: 1300000
i: 1400000
i: 1500000


In [38]:
print(f"Shingles amount: {len(all_shingles)}")
print(f"Total size     : {total_size(all_shingles)}")

Shingles amount: 4139037
Total size     : 381308486


Vemos que tenemos 4.139.037 shingles en total, y que guardar todos estos usa solamente 381MB de RAM. De nuevo, es un valor razonable.

Luego, usamos una lista para definir un orden inicial (arbitrario) de los shingles.

In [41]:
shingles_list = list(all_shingles)

Luego, creamos un diccionario que, dado un shingle, retorne su posición en el ordenamiento inicial. De esta forma, podemos saber la posición de un shingle en O(1), y no en O(log(n)) usando, por ejemplo, búsqueda binaria.

In [42]:
shingles_dict = {shingle: idx for idx, shingle in enumerate(shingles_list)}

Ahora que tenemos todos los shingles, definimos funciones para computar el LSH de cada texto. Primero, definimos una forma de obtener funciones de hash, para simular una permutación de la lista original.

Luego, definimos una función para obtener el LSH de los textos. Para esto, primero definimos una cantidad de funciones de hash. Luego, para cada función de hash (permutación), obtenemos la posición de cada shingle según esta permutación, y nos quedamos con el mínimo. Hacemos esto para cada función de hash, y el vector resultante será el LSH del texto.

In [45]:
## Forma de definir funciones de hash adaptada de la actividad 6 del curso
## (https://github.com/IIC2440/Syllabus-2023-1/blob/main/Actividades/06%20-%20LocallySensitiveHashing/Shingling_jaccard_universalhashes.ipynb)
def create_hash(n):
    ## Primo de 9 digitos elegido al azar con https://bigprimes.org/
    p = 791639819
    a = random.randint(1, p - 1)
    b = random.randint(1, p - 1)
    return lambda x: ((a * x + b) % p) % n

## Funcion basada en hashing en vez de generar una permutacion.
## Puede tener colisiones.
def compute_LSH_signatures(data, hash_funcs=10):
    ## Primero, obtenemos la cantidad de funciones de hash pedida.
    hashes = [create_hash(len(data)) for _ in range(hash_funcs)]

    ## Luego, inicializamos nuestra matriz de LSH. Cada fila corresponde
    ## a una funcion de hash, y cada columna corresponde a un texto.
    ## Entonces, una columna tendrá el vector correspondiente al valor
    ## de cada funcion de hash para ese texto.
    LSH_signature = np.zeros((hash_funcs, len(data)), dtype=np.int32)

    i = 0
    ## Para cada texto
    for row in tqdm(data):

        ## Obtenemos el conjunto de shingles de ese texto.
        row_shingles = list(k_shingle(row, k))
        
        ## Luego, obtenemos las posiciones de estos shingles en el ordenamiento
        ## original.
        row_shingles_idx = [shingles_dict[shingle] for shingle in row_shingles]

        ## Para cada funcion de hash (permutacion), obtenemos la posicion de cada
        ## shingle en la permutacion, y nos quedamos con el minimo (o sea, el primer
        ## shingle del texto segun la permutacion).
        ## Notamos que esta linea calcula el vector completo.
        row_lsh = [min(map(hash_func, row_shingles_idx)) for hash_func in hashes]

        ## Actualizamos nuestra matriz con este vector.
        LSH_signature[:, i] = row_lsh
        i += 1
    
    return LSH_signature

Luego, usamos 200 funciones de hash, y computamos el LSH de los textos.

In [8]:
hash_funcs = 200

hash_lsh_signature = compute_LSH_signatures(unique_tweets, hash_funcs)

In [96]:
np.save(f"hash_signatures_k={k}_hash_funcs={hash_funcs}.npy", hash_lsh_signature)

## Computo de tweets similares

Una vez que computamos las signatures, procedemos a obtener pares de tweets similares. Para esto, primero definimos b y r, tal que b * r ~ 200 (porque usamos 200 funciones de hash). Luego, diremos que 2 tweets $t_1$ y $t_2$ son similares, si para alguna banda *b*, los *r* hashes de esa banda de $t_1$ y $t_2$ son iguales. Notemos que esta definición nos permite procesar cada banda por separado, o sea, a una banda posterior no le importa el resultado de las bandas anteriores.

In [47]:
r = 10
b = 20

## Set de tweets similares. Cada elemento es una tupla
## (t1, t2), donde t1 y t2 son tweets similares.
similar_items = set()

## Para cada banda
for band in tqdm(range(b)):
    band_hash = {}
    ## Para cada tweet
    for tweet in range(hash_lsh_signature.shape[1]):
        ## Obtenemos el vector asociado a ese tweet
        tweet_vector = tuple(hash_lsh_signature[band * r:(band + 1)*r, tweet])
        ## Lo agregamos a una lista que contiene todos los tweets con ese mismo
        ## vector
        try:
            band_hash[tweet_vector].append(tweet)
        except KeyError:
            band_hash[tweet_vector] = [tweet]
    ## Luego de obtener todas las listas similares, reportamos cada par en
    ## una misma lista como similar.
    for key in band_hash:
        for item1 in band_hash[key]:
            for item2 in band_hash[key]:
                ## Agregamos items similares a nuestro set de 
                ## tweets similares.
                if item1 != item2:
                    ## Agregamos el elemento mas grande como primer
                    ## elemento de la tupla siempre, para evitar
                    ## duplicados. Si (a, b) es una tupla similar, 
                    ## entonces (b, a) no lo sera.
                    if item1 > item2:
                        similar_items.add((item1, item2))
                    else:
                        similar_items.add((item2, item1))
        

100%|██████████| 20/20 [01:33<00:00,  4.68s/it]


In [68]:
with open(f"similar_items_k={k}_hash_funcs={hash_funcs}_b={b}_r={r}.pkl", 'wb') as f:
    pickle.dump(similar_items, f)

In [48]:
len(similar_items)

4716440

In [49]:
getsizeof(similar_items)

134217944

De esta forma, obtenemos 4.716.440 pares de tweets similares. Si bien esto suena como mucho, posteriormente, cuando encontremos personas que escriben similar, este número se reducirá mucho. Además, los items similares pesan 134MB, un tamaño razonable.