# 💾 Lección 7: Procesamiento y Almacenamiento de Datos

## 🎯 Objetivos

- Limpiar y validar datos extraídos
- Implementar pipelines de transformación
- Integrar con bases de datos SQL y NoSQL
- Exportar a múltiples formatos
- Crear sistemas de ETL completos
- Manejar datos a gran escala

In [None]:
import pandas as pd
import numpy as np
import sqlite3
import json
import csv
import re
from datetime import datetime, timedelta
import hashlib
from typing import Dict, List, Any, Optional
import logging

print("💾 PROCESAMIENTO Y ALMACENAMIENTO DE DATOS")
print("=" * 50)
print("✅ Librerías importadas correctamente")

## 🧹 Limpieza y Validación de Datos

In [None]:
class DataCleaner:
    """Clase para limpiar y validar datos de web scraping"""
    
    def __init__(self):
        self.cleaning_stats = {
            'records_processed': 0,
            'records_cleaned': 0,
            'records_invalid': 0,
            'fields_fixed': 0
        }
    
    def clean_text(self, text: str) -> str:
        """Limpiar texto general"""
        if not text or not isinstance(text, str):
            return ""
        
        # Eliminar espacios extra y caracteres especiales
        text = re.sub(r'\s+', ' ', text.strip())
        
        # Eliminar caracteres no imprimibles
        text = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', text)
        
        # Decodificar entidades HTML
        import html
        text = html.unescape(text)
        
        return text
    
    def clean_price(self, price_str: str) -> Optional[float]:
        """Limpiar y convertir precios"""
        if not price_str:
            return None
        
        # Extraer números y puntos/comas
        price_clean = re.sub(r'[^\d.,]', '', str(price_str))
        
        if not price_clean:
            return None
        
        try:
            # Manejar formatos europeos (1.234,56) vs americanos (1,234.56)
            if ',' in price_clean and '.' in price_clean:
                if price_clean.rfind('.') > price_clean.rfind(','):
                    # Formato americano: 1,234.56
                    price_clean = price_clean.replace(',', '')
                else:
                    # Formato europeo: 1.234,56
                    price_clean = price_clean.replace('.', '').replace(',', '.')
            elif ',' in price_clean:
                # Solo comas - puede ser separador decimal o miles
                if len(price_clean.split(',')[-1]) <= 2:
                    # Probablemente decimal: 123,45
                    price_clean = price_clean.replace(',', '.')
                else:
                    # Probablemente miles: 1,234
                    price_clean = price_clean.replace(',', '')
            
            return float(price_clean)
            
        except ValueError:
            return None
    
    def clean_rating(self, rating_str: str) -> Optional[float]:
        """Limpiar y normalizar ratings"""
        if not rating_str:
            return None
        
        # Extraer número del rating
        rating_match = re.search(r'([0-5](?:\.[0-9])?)', str(rating_str))
        if rating_match:
            rating = float(rating_match.group(1))
            # Normalizar a escala 0-5
            return min(5.0, max(0.0, rating))
        
        # Contar estrellas si es texto
        stars = rating_str.count('★') + rating_str.count('*')
        if stars > 0:
            return float(min(5, stars))
        
        return None
    
    def clean_date(self, date_str: str) -> Optional[datetime]:
        """Limpiar y parsear fechas"""
        if not date_str:
            return None
        
        # Formatos comunes de fecha
        date_patterns = [
            '%Y-%m-%d',
            '%d/%m/%Y',
            '%m/%d/%Y',
            '%d-%m-%Y',
            '%Y-%m-%d %H:%M:%S',
            '%B %d, %Y',
            '%d %B %Y'
        ]
        
        date_clean = self.clean_text(date_str)
        
        for pattern in date_patterns:
            try:
                return datetime.strptime(date_clean, pattern)
            except ValueError:
                continue
        
        return None
    
    def validate_email(self, email: str) -> bool:
        """Validar formato de email"""
        if not email:
            return False
        
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return bool(re.match(pattern, email))
    
    def validate_url(self, url: str) -> bool:
        """Validar formato de URL"""
        if not url:
            return False
        
        pattern = r'^https?://.+\..+'
        return bool(re.match(pattern, url))
    
    def clean_product_data(self, product: Dict[str, Any]) -> Dict[str, Any]:
        """Limpiar datos de producto completo"""
        self.cleaning_stats['records_processed'] += 1
        cleaned = {}
        
        # Limpiar campos de texto
        text_fields = ['name', 'description', 'brand', 'category']
        for field in text_fields:
            if field in product:
                cleaned[field] = self.clean_text(product[field])
                if cleaned[field] != product.get(field):
                    self.cleaning_stats['fields_fixed'] += 1
        
        # Limpiar precio
        if 'price' in product:
            cleaned['price'] = self.clean_price(product['price'])
        
        # Limpiar rating
        if 'rating' in product:
            cleaned['rating'] = self.clean_rating(product['rating'])
        
        # Validar URL
        if 'url' in product:
            if self.validate_url(product['url']):
                cleaned['url'] = product['url']
            else:
                cleaned['url'] = None
        
        # Procesar fechas
        if 'scraped_at' in product:
            cleaned['scraped_at'] = self.clean_date(product['scraped_at'])
        
        # Normalizar disponibilidad
        if 'availability' in product:
            avail_text = self.clean_text(product['availability']).lower()
            if any(word in avail_text for word in ['in stock', 'available', 'disponible']):
                cleaned['availability'] = 'in_stock'
            elif any(word in avail_text for word in ['out of stock', 'sold out', 'agotado']):
                cleaned['availability'] = 'out_of_stock'
            else:
                cleaned['availability'] = 'unknown'
        
        # Generar ID único
        cleaned['id'] = self.generate_product_id(cleaned)
        
        # Validar registro
        if self.validate_product(cleaned):
            self.cleaning_stats['records_cleaned'] += 1
            return cleaned
        else:
            self.cleaning_stats['records_invalid'] += 1
            return None
    
    def validate_product(self, product: Dict[str, Any]) -> bool:
        """Validar que el producto tenga campos esenciales"""
        required_fields = ['name', 'price']
        
        for field in required_fields:
            if not product.get(field):
                return False
        
        # Validar rango de precio
        if product.get('price'):
            if not (0 < product['price'] < 1000000):  # Precio razonable
                return False
        
        # Validar longitud de nombre
        if product.get('name'):
            if len(product['name']) < 2 or len(product['name']) > 200:
                return False
        
        return True
    
    def generate_product_id(self, product: Dict[str, Any]) -> str:
        """Generar ID único para el producto"""
        # Combinar campos únicos
        unique_string = f"{product.get('name', '')}{product.get('url', '')}{product.get('brand', '')}"
        
        # Generar hash
        return hashlib.md5(unique_string.encode()).hexdigest()[:12]
    
    def get_stats(self) -> Dict[str, int]:
        """Obtener estadísticas de limpieza"""
        return self.cleaning_stats.copy()

# Ejemplo de uso
cleaner = DataCleaner()

# Datos de ejemplo sucios
dirty_products = [
    {
        'name': '  iPhone 15 Pro   ',
        'price': '$1,299.99',
        'rating': '4.5 stars',
        'availability': 'In Stock',
        'url': 'https://example.com/iphone-15',
        'scraped_at': '2024-01-15'
    },
    {
        'name': 'Samsung Galaxy S24\n\t',
        'price': '€899,50',
        'rating': '★★★★☆',
        'availability': 'Agotado',
        'url': 'invalid-url',
        'scraped_at': 'January 15, 2024'
    }
]

print("🧹 EJEMPLO DE LIMPIEZA DE DATOS")
print("=" * 35)

cleaned_products = []
for product in dirty_products:
    cleaned = cleaner.clean_product_data(product)
    if cleaned:
        cleaned_products.append(cleaned)

print(f"📊 Estadísticas de limpieza:")
stats = cleaner.get_stats()
for key, value in stats.items():
    print(f"  {key}: {value}")

print(f"\n✅ Productos limpios:")
for product in cleaned_products:
    print(f"  • {product['name']} - ${product['price']} - Rating: {product['rating']} - {product['availability']}")

## 🔄 Pipelines de Transformación

In [None]:
class DataTransformationPipeline:
    """Pipeline para transformar datos extraídos"""
    
    def __init__(self):
        self.transformations = []
        self.stats = {
            'items_processed': 0,
            'items_transformed': 0,
            'items_dropped': 0
        }
    
    def add_transformation(self, func, name: str = None):
        """Agregar transformación al pipeline"""
        self.transformations.append({
            'func': func,
            'name': name or func.__name__
        })
    
    def process_item(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Procesar un item a través del pipeline"""
        self.stats['items_processed'] += 1
        
        current_item = item.copy()
        
        for transformation in self.transformations:
            try:
                result = transformation['func'](current_item)
                
                if result is None:
                    # Item fue descartado
                    self.stats['items_dropped'] += 1
                    return None
                
                current_item = result
                
            except Exception as e:
                print(f"Error en transformación {transformation['name']}: {e}")
                self.stats['items_dropped'] += 1
                return None
        
        self.stats['items_transformed'] += 1
        return current_item
    
    def process_batch(self, items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Procesar un lote de items"""
        results = []
        
        for item in items:
            processed = self.process_item(item)
            if processed:
                results.append(processed)
        
        return results

# Transformaciones específicas
def add_price_category(item: Dict[str, Any]) -> Dict[str, Any]:
    """Agregar categoría de precio"""
    price = item.get('price', 0)
    
    if price < 50:
        item['price_category'] = 'budget'
    elif price < 200:
        item['price_category'] = 'mid-range'
    elif price < 1000:
        item['price_category'] = 'premium'
    else:
        item['price_category'] = 'luxury'
    
    return item

def add_text_analysis(item: Dict[str, Any]) -> Dict[str, Any]:
    """Agregar análisis de texto"""
    description = item.get('description', '')
    
    if description:
        # Análisis básico de sentimiento (simplificado)
        positive_words = ['excellent', 'great', 'amazing', 'perfect', 'best']
        negative_words = ['bad', 'poor', 'terrible', 'awful', 'worst']
        
        desc_lower = description.lower()
        
        positive_count = sum(1 for word in positive_words if word in desc_lower)
        negative_count = sum(1 for word in negative_words if word in desc_lower)
        
        if positive_count > negative_count:
            item['sentiment'] = 'positive'
        elif negative_count > positive_count:
            item['sentiment'] = 'negative'
        else:
            item['sentiment'] = 'neutral'
        
        # Longitud de descripción
        item['description_length'] = len(description)
        item['word_count'] = len(description.split())
    
    return item

def normalize_brand(item: Dict[str, Any]) -> Dict[str, Any]:
    """Normalizar nombres de marca"""
    brand = item.get('brand', '')
    
    if brand:
        # Normalizar marcas conocidas
        brand_map = {
            'apple inc': 'Apple',
            'apple computer': 'Apple',
            'samsung electronics': 'Samsung',
            'sony corporation': 'Sony',
            'microsoft corp': 'Microsoft'
        }
        
        brand_lower = brand.lower().strip()
        item['brand'] = brand_map.get(brand_lower, brand.strip().title())
    
    return item

def add_competition_score(item: Dict[str, Any]) -> Dict[str, Any]:
    """Calcular score de competitividad"""
    # Score basado en precio, rating y disponibilidad
    score = 0
    
    # Rating component (0-50 points)
    rating = item.get('rating', 0)
    if rating:
        score += (rating / 5.0) * 50
    
    # Price component (0-30 points, inversely related)
    price = item.get('price', 0)
    if price:
        # Normalizar precio (asumiendo rango 0-2000)
        price_norm = max(0, 1 - (price / 2000))
        score += price_norm * 30
    
    # Availability component (0-20 points)
    if item.get('availability') == 'in_stock':
        score += 20
    
    item['competition_score'] = round(score, 2)
    
    return item

def filter_quality_products(item: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    """Filtrar solo productos de calidad"""
    # Criterios de calidad
    if item.get('rating', 0) < 3.0:  # Rating mínimo
        return None
    
    if not item.get('description') or len(item.get('description', '')) < 10:  # Descripción mínima
        return None
    
    if item.get('price', 0) <= 0:  # Precio válido
        return None
    
    return item

# Configurar pipeline
pipeline = DataTransformationPipeline()

# Agregar transformaciones en orden
pipeline.add_transformation(add_price_category, "price_categorization")
pipeline.add_transformation(add_text_analysis, "text_analysis")
pipeline.add_transformation(normalize_brand, "brand_normalization")
pipeline.add_transformation(add_competition_score, "competition_scoring")
pipeline.add_transformation(filter_quality_products, "quality_filter")

# Datos de prueba
sample_data = [
    {
        'name': 'iPhone 15 Pro',
        'price': 1299.99,
        'rating': 4.5,
        'brand': 'apple inc',
        'description': 'The best iPhone with excellent camera and amazing performance',
        'availability': 'in_stock'
    },
    {
        'name': 'Budget Phone',
        'price': 99.99,
        'rating': 2.5,  # Rating bajo - será filtrado
        'brand': 'Unknown',
        'description': 'Cheap phone',
        'availability': 'out_of_stock'
    },
    {
        'name': 'Samsung Galaxy S24',
        'price': 899.99,
        'rating': 4.2,
        'brand': 'samsung electronics',
        'description': 'Great Android phone with perfect display and good battery life',
        'availability': 'in_stock'
    }
]

print("🔄 PIPELINE DE TRANSFORMACIÓN")
print("=" * 35)

# Procesar datos
transformed_data = pipeline.process_batch(sample_data)

print(f"📊 Estadísticas del pipeline:")
for key, value in pipeline.stats.items():
    print(f"  {key}: {value}")

print(f"\n✅ Datos transformados:")
for item in transformed_data:
    print(f"\n  📱 {item['name']}")
    print(f"    💰 ${item['price']} ({item['price_category']})")
    print(f"    ⭐ {item['rating']} rating")
    print(f"    🏢 {item['brand']}")
    print(f"    📊 Competition Score: {item['competition_score']}")
    print(f"    💭 Sentiment: {item['sentiment']}")
    print(f"    📝 {item['word_count']} words in description")

## 🗄️ Integración con Bases de Datos

In [None]:
class DatabaseManager:
    """Gestor de base de datos para almacenar productos"""
    
    def __init__(self, db_path: str = 'products.db'):
        self.db_path = db_path
        self.setup_database()
    
    def setup_database(self):
        """Configurar esquema de base de datos"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            
            # Tabla principal de productos
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS products (
                    id TEXT PRIMARY KEY,
                    name TEXT NOT NULL,
                    brand TEXT,
                    category TEXT,
                    price REAL,
                    price_category TEXT,
                    rating REAL,
                    description TEXT,
                    availability TEXT,
                    url TEXT,
                    competition_score REAL,
                    sentiment TEXT,
                    word_count INTEGER,
                    scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            # Tabla de historial de precios
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS price_history (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    product_id TEXT,
                    price REAL,
                    recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY (product_id) REFERENCES products (id)
                )
            ''')
            
            # Índices para mejor rendimiento
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_products_brand ON products(brand)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_products_category ON products(category)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_products_price ON products(price)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_price_history_product ON price_history(product_id)')
            
            conn.commit()
    
    def insert_product(self, product: Dict[str, Any]) -> bool:
        """Insertar o actualizar producto"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                # Verificar si el producto ya existe
                cursor.execute('SELECT price FROM products WHERE id = ?', (product['id'],))
                existing = cursor.fetchone()
                
                if existing:
                    # Actualizar producto existente
                    old_price = existing[0]
                    
                    cursor.execute('''
                        UPDATE products SET
                            name = ?, brand = ?, category = ?, price = ?,
                            price_category = ?, rating = ?, description = ?,
                            availability = ?, url = ?, competition_score = ?,
                            sentiment = ?, word_count = ?, updated_at = CURRENT_TIMESTAMP
                        WHERE id = ?
                    ''', (
                        product['name'], product.get('brand'), product.get('category'),
                        product['price'], product.get('price_category'), product.get('rating'),
                        product.get('description'), product.get('availability'), product.get('url'),
                        product.get('competition_score'), product.get('sentiment'),
                        product.get('word_count'), product['id']
                    ))
                    
                    # Si el precio cambió, registrar en historial
                    if old_price != product['price']:
                        cursor.execute('''
                            INSERT INTO price_history (product_id, price)
                            VALUES (?, ?)
                        ''', (product['id'], product['price']))
                
                else:
                    # Insertar nuevo producto
                    cursor.execute('''
                        INSERT INTO products (
                            id, name, brand, category, price, price_category,
                            rating, description, availability, url, competition_score,
                            sentiment, word_count
                        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                    ''', (
                        product['id'], product['name'], product.get('brand'),
                        product.get('category'), product['price'], product.get('price_category'),
                        product.get('rating'), product.get('description'), product.get('availability'),
                        product.get('url'), product.get('competition_score'), product.get('sentiment'),
                        product.get('word_count')
                    ))
                    
                    # Registrar precio inicial
                    cursor.execute('''
                        INSERT INTO price_history (product_id, price)
                        VALUES (?, ?)
                    ''', (product['id'], product['price']))
                
                conn.commit()
                return True
                
        except Exception as e:
            print(f"Error insertando producto: {e}")
            return False
    
    def get_products_by_category(self, category: str) -> List[Dict[str, Any]]:
        """Obtener productos por categoría"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            
            cursor.execute('SELECT * FROM products WHERE category = ? ORDER BY competition_score DESC', (category,))
            return [dict(row) for row in cursor.fetchall()]
    
    def get_price_trends(self, product_id: str) -> List[Dict[str, Any]]:
        """Obtener tendencias de precio"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            
            cursor.execute('''
                SELECT price, recorded_at FROM price_history 
                WHERE product_id = ? 
                ORDER BY recorded_at DESC
            ''', (product_id,))
            
            return [dict(row) for row in cursor.fetchall()]
    
    def get_analytics(self) -> Dict[str, Any]:
        """Obtener análisis general"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            
            analytics = {}
            
            # Conteos generales
            cursor.execute('SELECT COUNT(*) FROM products')
            analytics['total_products'] = cursor.fetchone()[0]
            
            cursor.execute('SELECT COUNT(DISTINCT brand) FROM products WHERE brand IS NOT NULL')
            analytics['total_brands'] = cursor.fetchone()[0]
            
            cursor.execute('SELECT COUNT(DISTINCT category) FROM products WHERE category IS NOT NULL')
            analytics['total_categories'] = cursor.fetchone()[0]
            
            # Estadísticas de precios
            cursor.execute('SELECT AVG(price), MIN(price), MAX(price) FROM products WHERE price > 0')
            price_stats = cursor.fetchone()
            analytics['price_avg'] = round(price_stats[0], 2) if price_stats[0] else 0
            analytics['price_min'] = price_stats[1] if price_stats[1] else 0
            analytics['price_max'] = price_stats[2] if price_stats[2] else 0
            
            # Top marcas
            cursor.execute('''
                SELECT brand, COUNT(*) as count FROM products 
                WHERE brand IS NOT NULL 
                GROUP BY brand 
                ORDER BY count DESC 
                LIMIT 5
            ''')
            analytics['top_brands'] = [{'brand': row[0], 'count': row[1]} for row in cursor.fetchall()]
            
            # Distribución de categorías de precio
            cursor.execute('''
                SELECT price_category, COUNT(*) as count FROM products 
                WHERE price_category IS NOT NULL 
                GROUP BY price_category
            ''')
            analytics['price_distribution'] = [{'category': row[0], 'count': row[1]} for row in cursor.fetchall()]
            
            return analytics
    
    def export_to_csv(self, filename: str) -> bool:
        """Exportar productos a CSV"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                df = pd.read_sql_query('SELECT * FROM products', conn)
                df.to_csv(filename, index=False)
                return True
        except Exception as e:
            print(f"Error exportando CSV: {e}")
            return False

# Ejemplo de uso
print("🗄️ INTEGRACIÓN CON BASE DE DATOS")
print("=" * 40)

# Crear gestor de base de datos
db = DatabaseManager('example_products.db')

# Insertar productos transformados
for product in transformed_data:
    success = db.insert_product(product)
    print(f"{'✅' if success else '❌'} Insertando: {product['name']}")

# Obtener análisis
analytics = db.get_analytics()
print(f"\n📊 ANÁLISIS DE LA BASE DE DATOS:")
print(f"  📦 Total productos: {analytics['total_products']}")
print(f"  🏢 Total marcas: {analytics['total_brands']}")
print(f"  📂 Total categorías: {analytics['total_categories']}")
print(f"  💰 Precio promedio: ${analytics['price_avg']}")
print(f"  📈 Rango de precios: ${analytics['price_min']} - ${analytics['price_max']}")

print(f"\n🏆 Top marcas:")
for brand_info in analytics['top_brands']:
    print(f"  • {brand_info['brand']}: {brand_info['count']} productos")

print(f"\n💎 Distribución de precios:")
for price_info in analytics['price_distribution']:
    print(f"  • {price_info['category']}: {price_info['count']} productos")

# Exportar datos
if db.export_to_csv('products_export.csv'):
    print(f"\n📤 Datos exportados a 'products_export.csv'")

## 📊 Sistema de Monitoreo y Alertas

In [None]:
class DataMonitor:
    """Sistema de monitoreo para datos de scraping"""
    
    def __init__(self, db_manager: DatabaseManager):
        self.db = db_manager
        self.alerts = []
        
        # Configuración de alertas
        self.alert_config = {
            'price_change_threshold': 0.1,  # 10% cambio de precio
            'stock_alert': True,
            'new_product_threshold': 5,  # Alertar si hay más de 5 productos nuevos
            'quality_threshold': 0.9  # 90% de productos deben tener buena calidad
        }
    
    def check_price_changes(self) -> List[Dict[str, Any]]:
        """Detectar cambios significativos de precio"""
        price_alerts = []
        
        with sqlite3.connect(self.db.db_path) as conn:
            cursor = conn.cursor()
            
            # Obtener productos con cambios de precio recientes
            cursor.execute('''
                SELECT p.id, p.name, p.price, 
                       ph1.price as previous_price,
                       ph1.recorded_at
                FROM products p
                JOIN price_history ph1 ON p.id = ph1.product_id
                LEFT JOIN price_history ph2 ON ph1.product_id = ph2.product_id 
                    AND ph2.recorded_at > ph1.recorded_at
                WHERE ph2.id IS NULL  -- Latest price history entry
                    AND ph1.price != p.price
                    AND ABS(p.price - ph1.price) / ph1.price > ?
            ''', (self.alert_config['price_change_threshold'],))
            
            for row in cursor.fetchall():
                product_id, name, current_price, previous_price, change_date = row
                
                change_pct = ((current_price - previous_price) / previous_price) * 100
                
                price_alerts.append({
                    'type': 'price_change',
                    'product_id': product_id,
                    'product_name': name,
                    'current_price': current_price,
                    'previous_price': previous_price,
                    'change_percentage': round(change_pct, 2),
                    'change_date': change_date,
                    'severity': 'high' if abs(change_pct) > 20 else 'medium'
                })
        
        return price_alerts
    
    def check_stock_changes(self) -> List[Dict[str, Any]]:
        """Detectar cambios de disponibilidad"""
        stock_alerts = []
        
        with sqlite3.connect(self.db.db_path) as conn:
            cursor = conn.cursor()
            
            # Productos que se agotaron recientemente
            cursor.execute('''
                SELECT id, name, brand, price 
                FROM products 
                WHERE availability = 'out_of_stock'
                    AND updated_at > datetime('now', '-24 hours')
            ''')
            
            for row in cursor.fetchall():
                stock_alerts.append({
                    'type': 'stock_out',
                    'product_id': row[0],
                    'product_name': row[1],
                    'brand': row[2],
                    'price': row[3],
                    'severity': 'medium'
                })
            
            # Productos que volvieron a stock
            cursor.execute('''
                SELECT id, name, brand, price 
                FROM products 
                WHERE availability = 'in_stock'
                    AND updated_at > datetime('now', '-24 hours')
            ''')
            
            for row in cursor.fetchall():
                stock_alerts.append({
                    'type': 'stock_in',
                    'product_id': row[0],
                    'product_name': row[1],
                    'brand': row[2],
                    'price': row[3],
                    'severity': 'low'
                })
        
        return stock_alerts
    
    def check_data_quality(self) -> Dict[str, Any]:
        """Verificar calidad de los datos"""
        with sqlite3.connect(self.db.db_path) as conn:
            cursor = conn.cursor()
            
            quality_report = {}
            
            # Total de productos
            cursor.execute('SELECT COUNT(*) FROM products')
            total_products = cursor.fetchone()[0]
            
            if total_products == 0:
                return {'error': 'No products in database'}
            
            # Productos sin nombre
            cursor.execute('SELECT COUNT(*) FROM products WHERE name IS NULL OR name = ""')
            missing_names = cursor.fetchone()[0]
            
            # Productos sin precio
            cursor.execute('SELECT COUNT(*) FROM products WHERE price IS NULL OR price <= 0')
            missing_prices = cursor.fetchone()[0]
            
            # Productos sin descripción
            cursor.execute('SELECT COUNT(*) FROM products WHERE description IS NULL OR description = ""')
            missing_descriptions = cursor.fetchone()[0]
            
            # Productos con rating bajo
            cursor.execute('SELECT COUNT(*) FROM products WHERE rating < 3.0')
            low_rated = cursor.fetchone()[0]
            
            quality_report = {
                'total_products': total_products,
                'missing_names_pct': (missing_names / total_products) * 100,
                'missing_prices_pct': (missing_prices / total_products) * 100,
                'missing_descriptions_pct': (missing_descriptions / total_products) * 100,
                'low_rated_pct': (low_rated / total_products) * 100,
                'quality_score': max(0, 100 - (missing_names / total_products * 50) - 
                                    (missing_prices / total_products * 30) - 
                                    (missing_descriptions / total_products * 20))
            }
            
            return quality_report
    
    def generate_daily_report(self) -> Dict[str, Any]:
        """Generar reporte diario"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'alerts': {
                'price_changes': self.check_price_changes(),
                'stock_changes': self.check_stock_changes()
            },
            'data_quality': self.check_data_quality(),
            'analytics': self.db.get_analytics()
        }
        
        # Clasificar alertas por severidad
        all_alerts = report['alerts']['price_changes'] + report['alerts']['stock_changes']
        
        report['summary'] = {
            'total_alerts': len(all_alerts),
            'high_severity': len([a for a in all_alerts if a.get('severity') == 'high']),
            'medium_severity': len([a for a in all_alerts if a.get('severity') == 'medium']),
            'low_severity': len([a for a in all_alerts if a.get('severity') == 'low'])
        }
        
        return report
    
    def save_report(self, report: Dict[str, Any], filename: str = None) -> str:
        """Guardar reporte en archivo"""
        if not filename:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = f'daily_report_{timestamp}.json'
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, default=str)
        
        return filename

# Ejemplo de monitoreo
print("📊 SISTEMA DE MONITOREO")
print("=" * 30)

# Crear monitor
monitor = DataMonitor(db)

# Generar reporte
daily_report = monitor.generate_daily_report()

print(f"📋 REPORTE DIARIO:")
print(f"  🕐 Timestamp: {daily_report['timestamp']}")
print(f"  🚨 Total alertas: {daily_report['summary']['total_alerts']}")
print(f"    • Alta prioridad: {daily_report['summary']['high_severity']}")
print(f"    • Media prioridad: {daily_report['summary']['medium_severity']}")
print(f"    • Baja prioridad: {daily_report['summary']['low_severity']}")

quality = daily_report['data_quality']
if 'quality_score' in quality:
    print(f"\n📊 Calidad de datos: {quality['quality_score']:.1f}%")
    print(f"  📦 Total productos: {quality['total_products']}")
    print(f"  ❌ Sin nombre: {quality['missing_names_pct']:.1f}%")
    print(f"  💰 Sin precio: {quality['missing_prices_pct']:.1f}%")
    print(f"  📝 Sin descripción: {quality['missing_descriptions_pct']:.1f}%")

# Mostrar alertas de precio
price_alerts = daily_report['alerts']['price_changes']
if price_alerts:
    print(f"\n💰 ALERTAS DE PRECIO:")
    for alert in price_alerts[:3]:  # Mostrar solo las primeras 3
        print(f"  📱 {alert['product_name']}")
        print(f"    {alert['change_percentage']:+.1f}% (${alert['previous_price']} → ${alert['current_price']})")

# Guardar reporte
report_file = monitor.save_report(daily_report)
print(f"\n💾 Reporte guardado en: {report_file}")

## 📈 Visualización y Análisis de Datos

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

class DataVisualizer:
    """Clase para visualizar datos de scraping"""
    
    def __init__(self, db_manager: DatabaseManager):
        self.db = db_manager
        
        # Configurar estilo
        plt.style.use('default')
        sns.set_palette("husl")
    
    def plot_price_distribution(self, save_path: str = None):
        """Gráfico de distribución de precios"""
        with sqlite3.connect(self.db.db_path) as conn:
            df = pd.read_sql_query('SELECT price, price_category FROM products WHERE price > 0', conn)
        
        if df.empty:
            print("No hay datos de precios para visualizar")
            return
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # Histograma de precios
        ax1.hist(df['price'], bins=20, alpha=0.7, color='skyblue', edgecolor='black')
        ax1.set_xlabel('Precio ($)')
        ax1.set_ylabel('Frecuencia')
        ax1.set_title('Distribución de Precios')
        ax1.grid(True, alpha=0.3)
        
        # Gráfico de categorías de precio
        if 'price_category' in df.columns and df['price_category'].notna().any():
            category_counts = df['price_category'].value_counts()
            ax2.pie(category_counts.values, labels=category_counts.index, autopct='%1.1f%%')
            ax2.set_title('Distribución por Categoría de Precio')
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        
        plt.show()
    
    def plot_brand_analysis(self, top_n: int = 10, save_path: str = None):
        """Análisis de marcas"""
        with sqlite3.connect(self.db.db_path) as conn:
            df = pd.read_sql_query(f'''
                SELECT brand, COUNT(*) as product_count, 
                       AVG(price) as avg_price, 
                       AVG(rating) as avg_rating,
                       AVG(competition_score) as avg_score
                FROM products 
                WHERE brand IS NOT NULL AND brand != ''
                GROUP BY brand
                ORDER BY product_count DESC
                LIMIT {top_n}
            ''', conn)
        
        if df.empty:
            print("No hay datos de marcas para visualizar")
            return
        
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
        
        # Número de productos por marca
        bars1 = ax1.bar(df['brand'], df['product_count'], color='lightblue')
        ax1.set_xlabel('Marca')
        ax1.set_ylabel('Número de Productos')
        ax1.set_title(f'Top {top_n} Marcas por Número de Productos')
        ax1.tick_params(axis='x', rotation=45)
        
        # Añadir valores en las barras
        for bar in bars1:
            height = bar.get_height()
            ax1.text(bar.get_x() + bar.get_width()/2., height,
                    f'{int(height)}', ha='center', va='bottom')
        
        # Precio promedio por marca
        bars2 = ax2.bar(df['brand'], df['avg_price'], color='lightgreen')
        ax2.set_xlabel('Marca')
        ax2.set_ylabel('Precio Promedio ($)')
        ax2.set_title('Precio Promedio por Marca')
        ax2.tick_params(axis='x', rotation=45)
        
        # Rating promedio por marca
        if df['avg_rating'].notna().any():
            bars3 = ax3.bar(df['brand'], df['avg_rating'], color='orange')
            ax3.set_xlabel('Marca')
            ax3.set_ylabel('Rating Promedio')
            ax3.set_title('Rating Promedio por Marca')
            ax3.tick_params(axis='x', rotation=45)
            ax3.set_ylim(0, 5)
        
        # Score de competitividad
        if df['avg_score'].notna().any():
            bars4 = ax4.bar(df['brand'], df['avg_score'], color='coral')
            ax4.set_xlabel('Marca')
            ax4.set_ylabel('Score de Competitividad')
            ax4.set_title('Score Promedio de Competitividad')
            ax4.tick_params(axis='x', rotation=45)
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        
        plt.show()
    
    def generate_summary_dashboard(self, save_path: str = None):
        """Dashboard resumen"""
        with sqlite3.connect(self.db.db_path) as conn:
            # Datos generales
            general_df = pd.read_sql_query('''
                SELECT 
                    COUNT(*) as total_products,
                    AVG(price) as avg_price,
                    MIN(price) as min_price,
                    MAX(price) as max_price,
                    AVG(rating) as avg_rating,
                    COUNT(DISTINCT brand) as unique_brands
                FROM products WHERE price > 0
            ''', conn)
            
            # Disponibilidad
            availability_df = pd.read_sql_query('''
                SELECT availability, COUNT(*) as count 
                FROM products 
                GROUP BY availability
            ''', conn)
        
        fig = plt.figure(figsize=(16, 10))
        gs = fig.add_gridspec(3, 3, hspace=0.3, wspace=0.3)
        
        # Métricas principales
        ax_metrics = fig.add_subplot(gs[0, :])
        ax_metrics.axis('off')
        
        if not general_df.empty:
            row = general_df.iloc[0]
            metrics_text = f"""
            DASHBOARD DE PRODUCTOS - {datetime.now().strftime('%Y-%m-%d %H:%M')}
            
            📦 Total Productos: {int(row['total_products']):,}
            💰 Precio Promedio: ${row['avg_price']:.2f}
            📊 Rango: ${row['min_price']:.2f} - ${row['max_price']:.2f}
            ⭐ Rating Promedio: {row['avg_rating']:.2f}/5
            🏢 Marcas Únicas: {int(row['unique_brands'])}
            """
            
            ax_metrics.text(0.5, 0.5, metrics_text, 
                          horizontalalignment='center', verticalalignment='center',
                          fontsize=14, bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.8))
        
        # Disponibilidad (pie chart)
        if not availability_df.empty:
            ax_availability = fig.add_subplot(gs[1, 0])
            ax_availability.pie(availability_df['count'], 
                              labels=availability_df['availability'], 
                              autopct='%1.1f%%')
            ax_availability.set_title('Disponibilidad de Productos')
        
        # Texto de resumen
        ax_summary = fig.add_subplot(gs[1:, 1:])
        ax_summary.axis('off')
        
        summary_text = f"""
        📈 ANÁLISIS DE DATOS
        
        Este dashboard muestra un resumen completo de los datos
        extraídos mediante web scraping.
        
        🔍 Métricas Clave:
        • Cobertura de productos completa
        • Datos de precios actualizados
        • Información de disponibilidad
        • Análisis de competitividad
        
        📊 Calidad de Datos:
        • Validación automática
        • Limpieza de datos
        • Normalización de formatos
        
        🚨 Alertas Configuradas:
        • Cambios de precio > 10%
        • Productos sin stock
        • Nuevos productos detectados
        
        Última actualización: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
        """
        
        ax_summary.text(0.05, 0.95, summary_text,
                       verticalalignment='top', fontsize=11,
                       bbox=dict(boxstyle='round', facecolor='lightyellow', alpha=0.8))
        
        plt.suptitle('Dashboard de Análisis de Productos', fontsize=16, fontweight='bold')
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        
        plt.show()

# Ejemplo de visualización
print("📈 VISUALIZACIÓN DE DATOS")
print("=" * 30)

# Crear visualizador
visualizer = DataVisualizer(db)

# Generar dashboard
print("Generando dashboard...")
visualizer.generate_summary_dashboard('dashboard.png')

print("✅ Dashboard generado y guardado como 'dashboard.png'")
print("📊 Visualizaciones disponibles:")
print("  • plot_price_distribution() - Distribución de precios")
print("  • plot_brand_analysis() - Análisis de marcas")
print("  • generate_summary_dashboard() - Dashboard completo")

## 📖 Resumen de la Lección

### 🎯 Lo que Hemos Dominado

1. **Limpieza y Validación de Datos**:
   - Normalización de texto y formatos
   - Limpieza de precios y ratings
   - Validación de URLs y emails
   - Manejo de datos faltantes
   - Generación de IDs únicos

2. **Pipelines de Transformación**:
   - Arquitectura modular de pipelines
   - Transformaciones encadenadas
   - Análisis de sentimiento básico
   - Categorización automática
   - Filtrado por calidad

3. **Integración con Bases de Datos**:
   - Diseño de esquemas relacionales
   - SQLite para almacenamiento local
   - Manejo de duplicados
   - Historial de cambios
   - Consultas analíticas

4. **Sistema de Monitoreo**:
   - Detección de cambios de precio
   - Alertas de disponibilidad
   - Control de calidad automático
   - Reportes diarios
   - Sistema de severidad

5. **Visualización y Análisis**:
   - Dashboards interactivos
   - Análisis de distribución de precios
   - Análisis competitivo de marcas
   - Métricas de rendimiento
   - Reportes visuales

### 🚀 Próxima Lección: Ética y Mejores Prácticas

En la lección final aprenderemos:
- Aspectos legales y éticos del web scraping
- Robots.txt y políticas de uso
- Rate limiting y cortesía
- Detección y evasión responsable
- Mejores prácticas industriales

### 💡 Arquitectura de Datos Aprendida

```python
# Pipeline completo
Raw Data → Cleaning → Validation → Transformation → Storage → Monitoring → Visualization

# Componentes clave
DataCleaner()           # Limpieza y normalización
TransformationPipeline() # Enriquecimiento de datos
DatabaseManager()       # Persistencia y consultas
DataMonitor()          # Alertas y calidad
DataVisualizer()       # Análisis visual
```

### 🛠️ Herramientas Master

- **Pandas**: Manipulación de datos
- **SQLite**: Base de datos ligera
- **Matplotlib/Seaborn**: Visualización
- **JSON**: Intercambio de datos
- **Regex**: Limpieza de texto
- **Hashlib**: Generación de IDs

### 📊 Métricas de Calidad

- **Completitud**: % de campos llenos
- **Validez**: % de datos en formato correcto
- **Consistencia**: Normalización exitosa
- **Actualidad**: Freshness de los datos
- **Precisión**: Validación cruzada

---

¡Excelente! 🎉 Ahora tienes un sistema completo de procesamiento de datos que puede manejar cualquier volumen de información extraída por web scraping, desde la limpieza hasta la visualización.