### **Importacion de dataframe, preprocesamiento y seteo de indice en E.S**

**Librerias**

In [None]:
import os
import pandas as pd
from pathlib import Path
import glob
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import json


**¿De que trata el caso de estudio?**

El caso presenta datasets obtenidos a traves de scraping del sitio web de Amazon. En la fuente constan 142 archivos correspondientes a compras de productos separadas por cada categoria, así como el dataset combinado de todas las categorias con el nombre Amazon-Producs.csv,que contiene la información de ventas de 551 mil productos. 


In [3]:
df = pd.read_csv('../data/raw/Amazon-Products.csv', sep=',')

In [4]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 551585 entries, 0 to 551584
Data columns (total 10 columns):
 #   Column          Non-Null Count   Dtype 
---  ------          --------------   ----- 
 0   Unnamed: 0      551585 non-null  int64 
 1   name            551585 non-null  object
 2   main_category   551585 non-null  object
 3   sub_category    551585 non-null  object
 4   image           551585 non-null  object
 5   link            551585 non-null  object
 6   ratings         375791 non-null  object
 7   no_of_ratings   375791 non-null  object
 8   discount_price  490422 non-null  object
 9   actual_price    533772 non-null  object
dtypes: int64(1), object(9)
memory usage: 42.1+ MB


**Primeras observaciones**

- Los precios estan expresados en Rupias
- Existen productos que no cuentan con información de ratings de compras
- Cerca de 40 mil productos no cuentan con precios de descuento
- La primera columna del ds funciona como un conteo de filas
- Existen productos que podemos considerar como duplicados


In [5]:
#  Transformacion de precios a dolares

def convertir_a_numero(valor):
    if isinstance(valor, str):  # Verificar si el valor es una cadena
        valor = valor.replace('₹', '').replace(',', '').strip()  # Eliminar símbolos y comas
        try:
            return float(valor)
        except ValueError:
            return None  # Devolver None si la conversión falla
    return None  # Devolver None si el valor no es una cadena

#  Aplicar la función a las columnas correspondientes
df['discount_price'] = df['discount_price'].apply(convertir_a_numero)
df['actual_price'] = df['actual_price'].apply(convertir_a_numero)

# Tasa de conversión de Rupias a Dólares (puedes actualizar esta tasa si lo deseas)
tasa_conversion = 0.012  # Aproximadamente 1 INR = 0.012 USD

#  Crear columnas con precios en dólares y redondear a 2 decimales
df['discount_price_dolares'] = df['discount_price'].apply(lambda x: round(x * tasa_conversion, 2) if x is not None else None)
df['actual_price_dolares'] = df['actual_price'].apply(lambda x: round(x * tasa_conversion, 2) if x is not None else None)

print(" Transformación de precios completada exitosamente.")



 Transformación de precios completada exitosamente.


**Eliminando columnas innecesarias y revisando valores perdidos**

In [7]:
## Eliminando columna con conteo de filas
df.drop('Unnamed: 0', axis=1, inplace=True)

In [8]:
#  Proporción de datos faltantes por columna (en %)
missing_percentage = (df.isnull().mean() * 100).round(2)
missing_percentage = missing_percentage[missing_percentage > 0]
if not missing_percentage.empty:
    print("Proporción de datos faltantes por columna:")
    print(missing_percentage)

Proporción de datos faltantes por columna:
ratings                   31.87
no_of_ratings             31.87
discount_price            11.09
actual_price               3.23
discount_price_dolares    11.09
actual_price_dolares       3.23
dtype: float64


Por el momento no se modificarán los valores perdidos, ya que se observa que la mayor parte vienen de los ratings. Posteriormente se le datá tratamiento a estos casos al momento de indexarlos a E.S

### **Manejo de productos duplicados**

Si consideramos a todos los campos disponibles en la base se observa que **NO** existen duplicados, pero si por ejemplo, tomamos en cuenta la combinación de nombre, categoria, rating, precio de descuento y precio actual se puede observar que los duplicados son más de 60 mil. En este caso, se decide **ELIMINAR** estos registros, ya que solo estarian diferenciados por caracteristicas como su link o su imagen, y pueden estar causados por **duplicidad de ciertos productos en más de una categoria** 

La eliminación de este tipo de registros tambien nos ayudará a obtener mejores resultados en pasos posteriores como con el trabajo con embeddings.

In [10]:
# Agrupar por combinación única de columnas
df= df.copy()
duplicados = df.duplicated(subset=["name", "main_category", "sub_category", "ratings","no_of_ratings",
                                           "discount_price","actual_price"
                                           ], keep=False)

# Mostrar número de duplicados
print(f" Productos duplicados detectados: {duplicados.sum()}")

 Productos duplicados detectados: 60620


In [11]:
# Filtrar para quedarnos solo con los productos NO DUPLICADOS
df = df[~duplicados]
print(f" Productos únicos después de quitar duplicados: {df.shape[0]}")

 Productos únicos después de quitar duplicados: 490965


In [12]:
##Guardando df procesado para reutilizarlo más adelante 

df.to_csv('../data/processed/df_pro.csv')

**Creación de Índice en E.S e ingesta de documentos usando Bulk**


**Como siguiente paso, definimos al indice amazon_productos con el mapeo:**

- name: text (para búsquedas y análisis).
- main_category: keyword (para filtros exactos).
- sub_category: keyword.
- image: keyword (no se necesita por ahora).
- link: keyword.
- ratings: float.
- no_of_ratings: integer.
- discount_price_dolares: float.
- actual_price_dolares: float.



In [None]:
# Coneccion a E.S
es = Elasticsearch("http://localhost:9200")
index_name = "amazon_products"

# Configuración y Mapping
index_config = {
    "settings": {
        "analysis": {
            "analyzer": {
                "standard_analyzer": {
                    "type": "standard",
                    "stopwords": "_english_"
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "name": {"type": "text", "analyzer": "standard_analyzer"},  # Búsquedas flexibles por nombre
            "main_category": {"type": "keyword"},                      # Filtros exactos por categoría principal
            "sub_category": {"type": "keyword"},                       # Filtros exactos por subcategoría
            "ratings": {"type": "float"},                              # Para ordenar por calificación
            "no_of_ratings": {"type": "integer"},                      # Para ordenar por popularidad
            "discount_price_dolares": {"type": "float"},               # Filtrado por precio con descuento
            "actual_price_dolares": {"type": "float"}                  # Filtrado por precio original
        }
    }
}


# Crear el índice
if es.indices.exists(index=index_name):
    es.indices.delete(index=index_name)
    print(f"Índice {index_name} eliminado.")

es.indices.create(index=index_name, body=index_config)
print(f" Índice {index_name} creado exitosamente.")


Índice amazon_products eliminado.
 Índice amazon_products creado exitosamente.


**Ingesta de datos en E.S**

In [None]:

def indexar_datos(es, df, index_name, chunk_size=5000):
    # Mostrar cantidad de filas antes de la limpieza
    print(f" Total de filas antes de limpiar: {len(df)}")
    
    # Limpiar el DataFrame y reemplazar NaNs por None
    df_cleaned = df[['name', 'main_category', 'sub_category', 'ratings', 'no_of_ratings', 
                     'discount_price_dolares', 'actual_price_dolares', 'image', 'link']].copy()

    #Al momento no existen nulos en este campo, pero se incluye el codigo por si la base de input cambia en un futuro
    #                 
    df_cleaned = df_cleaned.dropna(subset=['name'])
    df_cleaned = df_cleaned.where(pd.notna(df_cleaned), None)
    
    #  Mostrar cantidad de filas después de la limpieza
    print(f" Total de filas después de limpiar: {len(df_cleaned)}")
    
    #  Conversión explícita de columnas
    df_cleaned['ratings'] = pd.to_numeric(df_cleaned['ratings'], errors='coerce').fillna(0)
    df_cleaned['no_of_ratings'] = pd.to_numeric(df_cleaned['no_of_ratings'], errors='coerce').fillna(0)
    df_cleaned['discount_price_dolares'] = pd.to_numeric(df_cleaned['discount_price_dolares'], errors='coerce').fillna(0)
    df_cleaned['actual_price_dolares'] = pd.to_numeric(df_cleaned['actual_price_dolares'], errors='coerce').fillna(0)
    
    #  Índices de documentos fallidos
    failed_documents = []

    #  Indexar en lotes
    total_indexed = 0
    total_failed = 0

    for start_idx in range(0, len(df_cleaned), chunk_size):
        end_idx = min(start_idx + chunk_size, len(df_cleaned))
        batch = df_cleaned.iloc[start_idx:end_idx].to_dict(orient="records")
        
        # Crear el formato adecuado para la indexación
        bulk_data = [
            {
                "_index": index_name,
                "_source": record
            }
            for record in batch
        ]
        
        # Intentar indexar y capturar errores
        try:
            success, failed = helpers.bulk(es, bulk_data, stats_only=True)
            total_indexed += success
            total_failed += failed
            print(f" Lote {start_idx // chunk_size + 1} indexado correctamente: {success} documentos.")
        
        except Exception as e:
            print(f"Error al indexar el lote {start_idx // chunk_size + 1}: {e}")
            failed_documents.append({
                "lote": start_idx // chunk_size + 1,
                "error": str(e)
            })
    
    #  Mostrar resultados finales
    print(f"\n Indexación completada. Total indexados: {total_indexed}. Total fallidos: {total_failed}.")
    
    # Si se obtienen errores , se guardarán en un archivo JSON para revisión
    if failed_documents:
        with open('errores_indexacion.json', 'w') as f:
            json.dump(failed_documents, f, indent=4)
        print(" Los errores se han guardado en 'errores_indexacion.json'.")


In [15]:
# Conexión con Elasticsearch
es = Elasticsearch("http://localhost:9200")

# Nombre del índice
index_name = "amazon_products"

# Ejecutar la función de indexación
indexar_datos(es, df, index_name)

# Verificar cantidad de documentos indexados
doc_count = es.count(index=index_name)['count']
print(f" Documentos indexados: {doc_count}")


 Total de filas antes de limpiar: 490965
 Total de filas después de limpiar: 490965
 Lote 1 indexado correctamente: 5000 documentos.
 Lote 2 indexado correctamente: 5000 documentos.
 Lote 3 indexado correctamente: 5000 documentos.
 Lote 4 indexado correctamente: 5000 documentos.
 Lote 5 indexado correctamente: 5000 documentos.
 Lote 6 indexado correctamente: 5000 documentos.
 Lote 7 indexado correctamente: 5000 documentos.
 Lote 8 indexado correctamente: 5000 documentos.
 Lote 9 indexado correctamente: 5000 documentos.
 Lote 10 indexado correctamente: 5000 documentos.
 Lote 11 indexado correctamente: 5000 documentos.
 Lote 12 indexado correctamente: 5000 documentos.
 Lote 13 indexado correctamente: 5000 documentos.
 Lote 14 indexado correctamente: 5000 documentos.
 Lote 15 indexado correctamente: 5000 documentos.
 Lote 16 indexado correctamente: 5000 documentos.
 Lote 17 indexado correctamente: 5000 documentos.
 Lote 18 indexado correctamente: 5000 documentos.
 Lote 19 indexado correct