# Proyecto Big Data

In [36]:
!pip install -r requirements.txt
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu

Looking in indexes: https://download.pytorch.org/whl/cpu


In [60]:
import json
from pymongo import MongoClient
import os
import requests
import gzip
import shutil
import time
import torch
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
import re
from html import unescape
import codecs
import dask.dataframe as dd
import pandas as pd
from tqdm import tqdm
from dask.diagnostics import ProgressBar

#### 1. Bajamos la data correspondiente

In [2]:
def download_data(link, carpeta_destino):
    if not os.path.exists(carpeta_destino):
        os.makedirs(carpeta_destino)
    
    nombre_archivo = os.path.basename(link)
    ruta_comprimido = os.path.join(carpeta_destino, nombre_archivo)
    ruta_descomprimido = os.path.splitext(ruta_comprimido)[0] # Eliminar la extensión .gz

    response = requests.get(link, stream=True)
    if response.status_code == 200:
        with open(ruta_comprimido, 'wb') as f:
            f.write(response.content)
            print(f'Archivo descargado: {ruta_comprimido}')
    else:
        print(f'Error al descargar el archivo: {ruta_comprimido}')
        return None

    with gzip.open(ruta_comprimido, 'rb') as archivo_comprimido:
        with open(ruta_descomprimido, 'wb') as archivo_descomprimido:
            shutil.copyfileobj(archivo_comprimido, archivo_descomprimido)
    print(f'Archivo descomprimido: {ruta_descomprimido}')

    return ruta_descomprimido

link_gift_cards = "https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_2023/raw/review_categories/Gift_Cards.jsonl.gz"
link_meta_gift_cards = "https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_2023/raw/meta_categories/meta_Gift_Cards.jsonl.gz"
carpeta_destino = "data/raw"
ruta_final_gift_cards = download_data(link_gift_cards, carpeta_destino)
ruta_final_meta_gift_cards = download_data(link_meta_gift_cards, carpeta_destino)

Archivo descargado: data/raw\Gift_Cards.jsonl.gz
Archivo descomprimido: data/raw\Gift_Cards.jsonl
Archivo descargado: data/raw\meta_Gift_Cards.jsonl.gz
Archivo descomprimido: data/raw\meta_Gift_Cards.jsonl


#### 2. Incorporamos la data en Mongo DB

Importante: Crear una conexion en MongoDB con los siguientes nombres de coleccion (gift-cards & meta-gift-cards)

In [38]:
# Conectar a MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["Amazon"]
collection = db["gift-cards"]
collection_meta = db["meta-gift-cards"]

2.1 Gift Cards

In [4]:
# Borramos toda la data antes de volver a insertarla
collection.delete_many({})
print("Todos los documentos de la colección han sido eliminados.")

# Leer el archivo JSON Lines
with open("data/raw/Gift_Cards.jsonl", "r", encoding="utf-8") as file:
    for line in file:
        try:
            data = json.loads(line)
            # Insertamos la data
            collection.insert_one(data)
        except json.JSONDecodeError as e:
            print(f"Error al decodificar la línea: {e}")

ServerSelectionTimeoutError: localhost:27017: [WinError 10061] No connection could be made because the target machine actively refused it (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms), Timeout: 30s, Topology Description: <TopologyDescription id: 674aa5f45232a9d255180b95, topology_type: Unknown, servers: [<ServerDescription ('localhost', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('localhost:27017: [WinError 10061] No connection could be made because the target machine actively refused it (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms)')>]>

2.2 Meta Gift Cards

In [7]:
# Borramos toda la data antes de volver a insertarla
collection_meta.delete_many({})
print("Todos los documentos de la colección han sido eliminados.")

# Leer el archivo JSON Lines
with open("data/raw/meta_Gift_Cards.jsonl", "r", encoding="utf-8") as file:
    for line in file:
        try:
            data = json.loads(line)
            # Insertamos la data
            collection_meta.insert_one(data)
        except json.JSONDecodeError as e:
            print(f"Error al decodificar la línea: {e}")

Todos los documentos de la colección han sido eliminados.


#### 3. Querys MongoDB

3.1 Consulta para actualizar el campo "main_category" a "Gift Card" de la coleccion de Meta Gift Card

In [8]:
result = collection_meta.update_many({}, {"$set": {"main_category": "Gift Cards"}})
print("Documentos actualizados")

Documentos actualizados


3.2 Consulta en donde se busquen los comentarios que tienen un verified_purchase con valor True, ademas que tengan un rating mayor o igual a 1 y menor o igual a 5, y un helpful_vote mayor o igual a 0

In [39]:
query = {
    "verified_purchase": True,
    "rating": {"$gte": 1, "$lte": 5},
    "helpful_vote": {"$gte": 0},
}

results = collection.find(query)
df = pd.DataFrame(list(results))

#### 4. Dask y Pandas

4.1 Limpieza de data

In [40]:
def clean_text(text):
    if not isinstance(text, str):
        return text
    text = unescape(text) # html entities?

    # text = codecs.decode(text, 'unicode_escape') #decode emojis
    text = re.sub(r'\\u[0-9a-fA-F]{4}', '', text) #remove emojis

    text = re.sub(r'!{2,}', '!', text) #Multiple exclamation

    text = re.sub(r'\.{2,}', ' ', text) # Dots

    text = re.sub(r'<br\s*/?>', ' ', text)  # Para br

    text = re.sub(r'\b([A-Z])(?:\s+([A-Z]))+\b', lambda m: m.group(0).replace(" ", "").lower(), text) # t h i s to this

    text = re.sub(r'[+-]', '', text) #omit some chars

    text = re.sub(r'\u2019', "'", text) # this ´ to this '

    text = re.sub(r'(^|\.\s+)([a-z])', lambda m: m.group(1) + m.group(2).upper(), text) #Capital 
    
    text = re.sub(r'\b[A-Z]{2,}\b', lambda m: m.group().lower(), text) #Words normalized
    
    return text

4.2 Medicion de tiempos

In [41]:
def process_and_filter_pandas(df, max_length=512):
    start_time = time.time()

    df["title"] = df["title"].apply(clean_text)
    df["text"] = df["text"].apply(clean_text)

    filtered_df = df[df["text"].str.len() <= max_length].copy()
    
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Tiempo transcurrido: {elapsed_time:.2f} segundos")
    
    return filtered_df

def process_and_filter_dask(df, max_length=512):
    start_time = time.time()

    dask_df = dd.from_pandas(df, npartitions=4)
    
    dask_df["title"] = dask_df["title"].apply(clean_text, meta=('title', 'str'))
    dask_df["text"] = dask_df["text"].apply(clean_text, meta=('text', 'str'))
    
    filtered_dask_df = dask_df[dask_df["text"].str.len() <= max_length]
    
    filtered_df = filtered_dask_df.compute()
    
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Tiempo transcurrido: {elapsed_time:.2f} segundos")
    
    return filtered_df

In [62]:
df_pandas = process_and_filter_pandas(df)
df_dask = process_and_filter_dask(df)

Tiempo transcurrido: 3.45 segundos
Tiempo transcurrido: 7.71 segundos


#### 5. Seteamos LLM

In [43]:
tqdm.pandas()

tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")

In [44]:
# Funcion para clasificar el texto
def classify_text(row, model):
    input_data = tokenizer(row['text'], return_tensors="pt")
    with torch.no_grad():
        logits = model(**input_data).logits
    predicted_class_id = logits.argmax().item()
    if model.config.id2label[predicted_class_id] == 'POSITIVE':
        return 1
    else:
        return 0

5.1 Generamos el Dataset completo

In [45]:
start_time = time.time()

df_pandas['label'] = df_pandas.progress_apply(
    lambda row: classify_text(row, model), axis=1
)

end_time = time.time()
elapsed_time = end_time - start_time

df_pandas_classify = df_pandas.copy()

print(df_pandas_classify)
print(f"Tiempo transcurrido: {elapsed_time:.2f} segundos")


100%|██████████| 140481/140481 [1:01:49<00:00, 37.87it/s]


                             _id  rating  \
0       6748bafe19c0f99fe2ba20cc     5.0   
1       6748bafe19c0f99fe2ba20ce     5.0   
3       6748bafe19c0f99fe2ba20d1     5.0   
4       6748bafe19c0f99fe2ba20d2     5.0   
5       6748bafe19c0f99fe2ba20d3     5.0   
...                          ...     ...   
141956  6748bb1f19c0f99fe2bc7421     5.0   
141957  6748bb1f19c0f99fe2bc7422     5.0   
141958  6748bb1f19c0f99fe2bc7423     1.0   
141959  6748bb1f19c0f99fe2bc7424     5.0   
141960  6748bb1f19c0f99fe2bc7425     4.0   

                                                    title  \
0                                              Great gift   
1                                            Perfect gift   
3                                                   Cute!   
4                                               Easy gift   
5                                              Great gift   
...                                                   ...   
141956  I gave it to my sister and she immed

In [65]:
df_dask = dd.from_pandas(df_pandas, npartitions=4)

start_time = time.time()

with ProgressBar():
    df_dask['label'] = df_dask.map_partitions(
        lambda partition: partition.apply(
            lambda row: classify_text(row, model), axis=1
        ),
        meta=('label', 'int64')
    )

    df_dask_classify = df_dask.compute()

end_time = time.time()
elapsed_time = end_time - start_time

print(f"Tiempo transcurrido: {elapsed_time:.2f} segundos")
print(df_dask_classify)

[########################################] | 100% Completed | 62m 48s
Tiempo transcurrido: 3772.64 segundos
                             _id  rating  \
0       6748bafe19c0f99fe2ba20cc     5.0   
1       6748bafe19c0f99fe2ba20ce     5.0   
3       6748bafe19c0f99fe2ba20d1     5.0   
4       6748bafe19c0f99fe2ba20d2     5.0   
5       6748bafe19c0f99fe2ba20d3     5.0   
...                          ...     ...   
141956  6748bb1f19c0f99fe2bc7421     5.0   
141957  6748bb1f19c0f99fe2bc7422     5.0   
141958  6748bb1f19c0f99fe2bc7423     1.0   
141959  6748bb1f19c0f99fe2bc7424     5.0   
141960  6748bb1f19c0f99fe2bc7425     4.0   

                                                    title  \
0                                              Great gift   
1                                            Perfect gift   
3                                                   Cute!   
4                                               Easy gift   
5                                              Great gift 

Guardamos en JSON el Dataframe de pandas

In [None]:
# Convertir todos los valores a texto
df_pandas_classify = df_pandas_classify.astype(str)

In [58]:
df_pandas_classify.to_json(
    "data/processed/gift_cards_classify_pandas.json",
    orient="records",
    lines=True,
    force_ascii=False
)

Guardamos en JSON el Dataframe de Dask

In [66]:
df_dask_classify.to_json(
    "data/processed/gift_cards_classify_dask.json",
    orient="records",
    lines=True,
    force_ascii=False
)