# Parte 2
## Detección de Precios Fuera de lo Normal en Productos de Supermercado

**Objetivo del Proyecto**

Desarrollar un sistema que analice los precios de productos básicos (selecciona productos de uso cotidiano mínimo 20) entre los diferentes supermercados (Coral, SuperMaxi, Megamaxi y Gran AKI) para identificar precios fuera de lo normal. Este proyecto busca detectar anomalías que podrían indicar errores, promociones o estrategias de precios inconsistentes.

**Para hacer la detección se debe usar el Z-Score**

- Cálculo del puntaje Z (Z-Score):

    data = data.withColumn(

    "Z_Score", (col("Precio") - col("Precio_Promedio")) / col("Desviacion_Estandar"))

    Considera como anómalos los precios con |Z| > 3

- Etiquetar anomalías:

- Clasificar las anomalías como "Precio Alto" o "Promoción" según el valor del puntaje Z.

- Examina la frecuencia de cada tipo de anomalía por producto y supermercado.

**Visualización de Resultados**
**Exportar resultados a Pandas:**

- Convierte los resultados de PySpark a Pandas para crear gráficos

**Entregables**
* Código fuente: Notebook .ipynb.

* Informe: Documento con los hallazgos principales y gráficos de resultados.

* Presentación: Resumen del proyecto y sus implicaciones para exponer.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, when, abs, mean, stddev
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

def limpiar_precio(df):
    """Limpiar columna de precio"""
    df = df.withColumn("precio", F.regexp_replace(col("precio"), "\\$", ""))
    df = df.withColumn("precio", F.regexp_replace(col("precio"), ",", "."))
    df = df.withColumn("precio", col("precio").cast(FloatType()))
    return df

def verificar_datos(df):
    """Verificar distribución de datos y cantidad de datos en cada grupo"""
    df.groupBy("nombre").agg(F.count("precio").alias("count"), F.mean("precio").alias("mean"), F.stddev("precio").alias("stddev")).show(10, truncate=False)

def detectar_anomalias_precios(df):
    """Detectar anomalías de precios usando Z-Score"""
    # Configurar ventana de análisis por nombre
    windowSpec = Window.partitionBy("nombre")

    # Calcular Z-Score
    df_anomalias = df.withColumn("precio_promedio", F.mean("precio").over(windowSpec)) \
        .withColumn("desviacion_estandar", F.stddev("precio").over(windowSpec)) \
        .withColumn("z_score",
            when(col("desviacion_estandar") != 0,
                (col("precio") - col("precio_promedio")) / col("desviacion_estandar"))
            .otherwise(0)
        )

    # Clasificar anomalías
    df_anomalias = df_anomalias.withColumn("tipo_anomalia",
        when(abs(col("z_score")) > 3,
            when(col("z_score") > 0, "Precio Alto")
            .otherwise("Promoción"))
        .otherwise("Normal")
    )

    return df_anomalias

def analizar_frecuencia_anomalias(df_anomalias):
    """Analizar frecuencia de anomalías"""
    anomalias_frecuencia = df_anomalias \
        .filter(col("tipo_anomalia") != "Normal") \
        .groupBy("tienda", "nombre", "tipo_anomalia") \
        .count() \
        .orderBy("count", ascending=False)

    return anomalias_frecuencia

def main():
    # Iniciar sesión de Spark
    spark = SparkSession.builder \
        .appName("Análisis Anomalías Precios") \
        .getOrCreate()

    try:
        # Cargar datos
        df = spark.read.csv('combined_products.csv', header=True, inferSchema=True, sep=',', quote='"', escape='"')

        # Mostrar los primeros registros originales
        df.show(10, truncate=False)

        # Limpiar precios
        df_limpio = limpiar_precio(df)

        # Mostrar los primeros registros después de la limpieza
        df_limpio.show(10, truncate=False)

        # Verificar distribución de datos y cantidad de datos en cada grupo
        verificar_datos(df_limpio)

        # Detectar anomalías
        df_anomalias = detectar_anomalias_precios(df_limpio)

        # Mostrar los primeros registros con anomalías
        df_anomalias.show(10, truncate=False)

        # Analizar frecuencia
        resultado_anomalias = analizar_frecuencia_anomalias(df_anomalias)

        # Mostrar resultados
        resultado_anomalias.show()

        # Exportar resultados
        resultado_anomalias.toPandas().to_csv('anomalias_supermercados.csv', index=False)

    except Exception as e:
        print(f"Error: {e}")

    finally:
        # Detener sesión de Spark
        spark.stop()

if __name__ == "__main__":
    main()

## Importar librerias necesarias


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs, mean, stddev, when, count, lit, coalesce
from pyspark.sql.types import FloatType, StringType
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from sentence_transformers import SentenceTransformer
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from pyspark.sql.window import Window


### Sesion Spark


In [2]:
# Inicialización de Spark
spark = SparkSession.builder.appName("ProductAnalysis") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()


### Limpieza de datos


In [3]:
def clean_price(price_str):
    if price_str is None:
        return None
    try:
        price_str = price_str.replace('$', '').strip().replace(',', '.')
        return float(price_str)
    except (ValueError, AttributeError):
        return None

clean_price_udf = spark.udf.register("clean_price", clean_price, FloatType())


## Detección de anomalías

**Caragar los datos y hacer limpieza de columnas**

In [4]:
import re
def preprocess_text(text):
    text = text.lower()  # Convertir a minúsculas
    text = re.sub(r'\W+', ' ', text)  # Eliminar caracteres especiales
    return text.strip()


In [5]:
# Cargar y preparar datos
def load_and_prepare_data(file_path):
    df = spark.read.csv(file_path, header=True)
    df = df.withColumn("precio_limpio", clean_price_udf(col("precio")))
    return df.filter(col("precio_limpio").isNotNull())

**calcular los embeddings y encontrar productos similares**

In [6]:
# Función para calcular embeddings
def calculate_embeddings(texts, model):
    return model.encode(texts, show_progress_bar=True)

# Función para encontrar productos similares con umbral
def find_similar_products(query_embedding, product_embeddings, threshold=0.3):
    similarities = cosine_similarity([query_embedding], product_embeddings)[0]
    similar_indices = np.where(similarities >= threshold)[0]
    return similar_indices, similarities[similar_indices]

**Cálculo del puntaje Z (Z-Score):**

data = data.withColumn(
"Z_Score", (col("Precio") - col("Precio_Promedio")) / col("Desviacion_Estandar"))

**Considera como anómalos los precios con |Z| > 3**
- Etiquetar anomalías:
- Clasificar las anomalías como "Precio Alto" o "Promoción" según el valor del puntaje Z.

In [7]:
def calculate_zscore_by_franchise(df):
    window_spec = Window.partitionBy("tienda")
    df_with_stats = df.withColumn(
        "precio_promedio", avg("precio_limpio").over(window_spec)
    ).withColumn(
        "desviacion_std",
        coalesce(stddev("precio_limpio").over(window_spec),
                lit(col("precio_promedio") * 0.1))
    ).withColumn(
        "z_score",
        (col("precio_limpio") - col("precio_promedio")) / col("desviacion_std")
    )

    # Clasificar anomalías basado en el Z-Score
    df_with_anomalies = df_with_stats.withColumn(
        "tipo_anomalia",
        when(col("z_score").between(-3, 3), "Normal")
        .when(col("z_score") > 3, "Precio Alto")
        .when(col("z_score") < -3, "Promoción")
        .otherwise("Normal")
    )

    # Agregar porcentaje de diferencia para análisis
    df_with_anomalies = df_with_anomalies.withColumn(
        "porcentaje_diferencia",
        round(((col("precio_limpio") - col("precio_promedio")) / col("precio_promedio") * 100), 2)
    )

    return df_with_anomalies


**Funciona principal**

In [None]:

    # Lista de productos a consultar
productos_consulta = [
        "pan", "leche", "arroz", "azúcar", "aceite",
        "huevos", "café", "pasta", "jabón", "papel higiénico",
        "detergente", "shampoo", "cepillo dental", "galletas", "refresco",
]
# Cargar modelo de embeddings
model = SentenceTransformer('all-MiniLM-L6-v2')

# Cargar datos
df = load_and_prepare_data("./combined_products.csv")

# Convertir a pandas para procesamiento de embeddings
df_pandas = df.toPandas()

# Calcular embeddings para todos los productos
product_names = df_pandas["nombre"].tolist()
product_embeddings = calculate_embeddings(product_names, model)

# DataFrame para almacenar resultados
resultados_finales = pd.DataFrame()

# Procesar cada producto de consulta
for producto in productos_consulta:
        # Calcular embedding para el producto de consulta
        query_embedding = model.encode(preprocess_text(producto))

        # Encontrar productos similares
        similar_indices, similarities = find_similar_products(
            query_embedding,
            product_embeddings,
            threshold=0.5  # Ajustar el umbral de similitud
        )

        # Crear DataFrame con productos similares
        if len(similar_indices) > 0:
            df_similares = df_pandas.iloc[similar_indices].copy()
            df_similares["similarity_score"] = similarities
            df_similares["producto_consulta"] = producto

            # Convertir a Spark DataFrame para Z-Score
            spark_df = spark.createDataFrame(df_similares)
            spark_df = calculate_zscore_by_franchise(spark_df)

            # Convertir resultados a pandas y agregar al DataFrame final
            resultados_producto = spark_df.toPandas()
            resultados_finales = pd.concat(
                [resultados_finales, resultados_producto],
                ignore_index=True
            )


## Resultados

#### Resumen por producto y prueba

In [9]:
resumen = resultados_finales.groupby(['producto_consulta', 'tienda']).agg({
    'nombre': 'count',
    'similarity_score': 'mean',
    'z_score': ['mean', 'std']
}).round(3)
print("Resumen por producto y tienda:")
print(resumen)

Resumen por producto y tienda:
                                      nombre similarity_score z_score     
                                       count             mean    mean  std
producto_consulta tienda                                                  
aceite            Coral Hipermercados    145            0.587    -0.0  1.0
                  GRAN AKI                22            0.578    -0.0  1.0
                  MEGAMAXI                18            0.577     0.0  1.0
                  SUPERMAXI               15            0.562     0.0  1.0
arroz             Coral Hipermercados     38            0.574     0.0  1.0
                  GRAN AKI                 3            0.511     0.0  1.0
                  MEGAMAXI                 2            0.552     0.0  1.0
                  SUPERMAXI                1            0.556     0.0  NaN
azúcar            Coral Hipermercados     49            0.558    -0.0  1.0
                  GRAN AKI                18            0.625    -0.0

#### Análisis de anomalías


In [None]:
print("\n=== Análisis de Anomalías ===")
print("\nDistribución de anomalías por tipo:")
resultados_finales.groupby('tipo_anomalia')['nombre'].count().to_frame('cantidad').to_csv('distribucion_anomalias.csv')
print(resultados_finales.groupby('tipo_anomalia')['nombre'].count())

#### Resumen análisis por tienda y tipo de anomalía

In [None]:
print("\nAnálisis por tienda y tipo de anomalía:")
analisis_tienda = resultados_finales.groupby(['tienda', 'tipo_anomalia']).agg({
    'nombre': 'count',
    'porcentaje_diferencia': 'mean'
}).round(2)
print(analisis_tienda)

In [None]:
print("\nAnálisis por tienda y tipo de anomalía:")
analisis_tienda = resultados_finales.groupby(['tienda', 'tipo_anomalia']).agg({
    'nombre': 'count',
    'porcentaje_diferencia': 'mean'
}).round(2)
print(analisis_tienda)

#### Análisis por tienda


In [None]:
print("\n=== Análisis por Tienda ===")
analisis_tienda = resultados_finales.groupby('tienda').agg({
    'tipo_anomalia': lambda x: (x != 'Normal').sum(),  # Total anomalías
    'nombre': 'count',  # Total productos
    'porcentaje_diferencia': 'mean'  # Diferencia promedio
}).round(3)
print("\nAnálisis por tienda:")
print(analisis_tienda)


#### Exportar todos los csv

In [16]:
##Guardar Anomalias por tienda
analisis_tienda.to_csv('anomalias_por_tienda.csv')
# Guardar resultados completos
resultados_finales.to_csv('resultados_analisis_completo.csv', index=False)
# Guardar resultados detallados
resultados_finales.to_csv('resultados_analisis_completo.csv', index=False)
# Guardar solo anomalías
anomalias_df = resultados_finales[resultados_finales['tipo_anomalia'] != 'Normal']
anomalias_df.to_csv('anomalias_detectadas.csv', index=False)

## Visualización de Resultados

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Datos de ejemplo
data_tipo_anomalia = {
    'tipo_anomalia': ['Normal', 'Precio Alto'],
    'cantidad': [1669, 18]
}

data_tienda_anomalia = {
    'tienda': ['Coral Hipermercados', 'Coral Hipermercados', 'GRAN AKI', 'GRAN AKI', 'MEGAMAXI', 'MEGAMAXI', 'SUPERMAXI', 'SUPERMAXI'],
    'tipo_anomalia': ['Normal', 'Precio Alto', 'Normal', 'Precio Alto', 'Normal', 'Precio Alto', 'Normal', 'Precio Alto'],
    'nombre': [1120, 10, 170, 2, 191, 2, 188, 4],
    'porcentaje_diferencia': [-3.52, 394.76, -4.46, 378.86, -2.15, 205.41, -7.89, 370.93]
}

# Crear DataFrame
df_tipo_anomalia = pd.DataFrame(data_tipo_anomalia)
df_tienda_anomalia = pd.DataFrame(data_tienda_anomalia)

# Gráfico 1: Distribución de anomalías por tipo
plt.figure(figsize=(8, 6))
plt.bar(df_tipo_anomalia['tipo_anomalia'], df_tipo_anomalia['cantidad'], color=['blue', 'orange'])
plt.xlabel('Tipo de Anomalía')
plt.ylabel('Cantidad')
plt.title('Distribución de Anomalías por Tipo')
plt.show()

# Gráfico 2: Análisis por tienda y tipo de anomalía
fig, ax = plt.subplots(figsize=(10, 8))
df_tienda_anomalia.pivot(index='tienda', columns='tipo_anomalia', values='nombre').plot(kind='bar', stacked=True, ax=ax)
ax.set_xlabel('Tienda')
ax.set_ylabel('Cantidad de Anomalías')
ax.set_title('Análisis por Tienda y Tipo de Anomalía')
plt.show()