In [None]:
import csv
import random
from datetime import datetime, timedelta

# Lista de productos y categorías para generar datos aleatorios
productos = [
    ("Laptop", "Electrónica"),
    ("Mouse", "Accesorios"),
    ("Teclado", "Accesorios"),
    ("Celular", "Electrónica"),
    ("Audífonos", "Accesorios"),
    ("Monitor", "Electrónica"),
    ("Impresora", "Oficina"),
    ("Silla de Oficina", "Muebles"),
    ("Mesa", "Muebles"),
    ("Cámara", "Fotografía")
]

# Función para generar una fecha aleatoria en el último año
def generar_fecha_aleatoria():
    inicio = datetime.now() - timedelta(days=365)
    return inicio + timedelta(days=random.randint(0, 365))

# Función para generar un registro de venta válido
def generar_registro_valido():
    producto, categoria = random.choice(productos)
    fecha = generar_fecha_aleatoria().strftime('%Y-%m-%d')
    cantidad = random.randint(1, 20)
    precio_unitario = round(random.uniform(10, 1000), 2)
    return [fecha, producto, categoria, cantidad, precio_unitario]

# Función para generar un registro con errores
def generar_registro_con_errores():
    producto, categoria = random.choice(productos)
    # Introducir errores aleatorios
    fecha = random.choice(["2024-13-01", "2024/01/15", "15-01-2024"])  # Formato de fecha incorrecto
    cantidad = random.choice([-5, 0, None])  # Cantidad inválida
    precio_unitario = random.choice([-100,0, None])  # Precio negativo o no numérico
    return [fecha, producto, categoria, cantidad, precio_unitario]

# Función para escribir un archivo CSV
def escribir_csv(nombre_archivo, registros):
    with open(nombre_archivo, mode='w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        writer.writerow(["fecha", "producto", "categoria", "cantidad", "precio_unitario"])
        writer.writerows(registros)
    print(f"Archivo generado: {nombre_archivo}")

# Función principal para generar los tres archivos
def generar_archivos_prueba():
    # Archivo de datos válidos
    registros_validos = [generar_registro_valido() for _ in range(100)]
    escribir_csv('ventas_validas.csv', registros_validos)

    # Archivo con errores
    escribir_csv(nombre_archivo='ventas_errores.csv', registros=[generar_registro_con_errores() for _ in range(10)])

    # Archivo grande con más de 1000 registros
    registros_grandes = [generar_registro_valido() for _ in range(1000)]
    escribir_csv('ventas_grande.csv', registros_grandes)

if __name__ == "__main__":
    generar_archivos_prueba()

from functools import reduce
import csv
from typing import List, Dict
from statistics import mean
from collections import Counter
from datetime import datetime

# Leer archivo CSV con manejo de errores
def leer_archivo_csv(nombre_archivo: str) -> List[Dict]:
    try:
        with open(nombre_archivo, mode='r', encoding='utf-8') as file:
            lector = csv.DictReader(file)
            return [linea for linea in lector]
    except FileNotFoundError:
        print(f"Error: El archivo {nombre_archivo} no se encuentra.")
        return []
    except Exception as e:
        print(f"Error inesperado: {e}")
        return []

# ✔️ Validar que un registro sea válido antes de procesar
def es_registro_valido(item: Dict) -> bool:
    try:
        # Validar fecha en formato YYYY-MM-DD
        datetime.strptime(item['fecha'], '%Y-%m-%d')
        cantidad = int(item['cantidad'])
        precio = float(item['precio_unitario'])

        # Solo aceptar valores positivos
        if cantidad <= 0 or precio <= 0:
            return False
        return True
    except:
        return False

# Total de ventas por categoría
def total_ventas_por_categoria(datos: List[Dict]) -> Dict[str, float]:
    ventas_por_categoria = {}
    for item in datos:
        categoria = item['categoria']
        total = int(item['cantidad']) * float(item['precio_unitario'])
        ventas_por_categoria[categoria] = ventas_por_categoria.get(categoria, 0) + total
    return ventas_por_categoria

# Filtrar productos por umbral
def filtrar_productos_por_umbral(datos: List[Dict], umbral: float) -> List[Dict]:
    return list(filter(lambda x: int(x['cantidad']) * float(x['precio_unitario']) > umbral, datos))

# Promedio de ventas por día
def promedio_ventas_por_dia(datos: List[Dict]) -> float:
    ventas_diarias = {}
    for item in datos:
        fecha = item['fecha']
        total = int(item['cantidad']) * float(item['precio_unitario'])
        ventas_diarias[fecha] = ventas_diarias.get(fecha, 0) + total
    return mean(ventas_diarias.values()) if ventas_diarias else 0

# Producto más vendido
def producto_mas_vendido(datos: List[Dict]) -> str:
    productos = map(lambda x: x['producto'], datos)
    conteo = Counter(productos)
    return conteo.most_common(1)[0][0] if conteo else "Ninguno"

# Generar archivo CSV con el reporte
def generar_reporte(resultados: Dict, nombre_archivo: str):
    with open(nombre_archivo, mode='w', newline='', encoding='utf-8') as file:
        escritor = csv.writer(file)
        escritor.writerow(["Métrica", "Valor"])
        for clave, valor in resultados.items():
            escritor.writerow([clave, valor])
    print(f"\nReporte generado: {nombre_archivo}")

# Mostrar reporte en pantalla
def imprimir_reporte(resultados: Dict):
    print("\n--- Reporte de Ventas ---")
    for clave, valor in resultados.items():
        print(f"{clave}: {valor}")

# Función principal corregida
def main(nombre_archivo: str):
    datos = leer_archivo_csv(nombre_archivo)
    if not datos:
        return

    # Filtrar solo registros válidos
    datos_validos = list(filter(es_registro_valido, datos))
    if not datos_validos:
        print("\n---Ventas_errores.cvs: No hay datos válidos para procesar.---")
        return

    # Procesamiento de métricas
    total_por_categoria = total_ventas_por_categoria(datos_validos)
    productos_filtrados = filtrar_productos_por_umbral(datos_validos, umbral=1000)
    promedio_diario = promedio_ventas_por_dia(datos_validos)
    producto_top = producto_mas_vendido(datos_validos)

    # Resultados
    resultados = {
        "Total de Ventas por Categoría": total_por_categoria,
        "Promedio de Ventas Diarias": promedio_diario,
        "Producto Más Vendido": producto_top,
        "Productos con Ventas Mayores a 1000": len(productos_filtrados)
    }

    # Generar y mostrar reporte
    nombre_reporte = f"reporte_{nombre_archivo.split('.')[0]}.csv"
    generar_reporte(resultados, nombre_reporte)
    imprimir_reporte(resultados)

# Ejecutar
if __name__ == "__main__":
    main(nombre_archivo='ventas_grande.csv')  # Análisis de ventas grandes
    main(nombre_archivo='ventas_validas.csv')  # Análisis de ventas válidas
    main(nombre_archivo='ventas_errores.csv')  # Análisis de ventas con errores