### Importamos ciertas librerias necesarias para realizar scraping con respecto a los productos en la RD que se encuentran en el dataset de Open Food Facts

In [1]:
import requests
import pandas as pd
import os
import time

### Realizamos el scraping

## Prueba del modelo sin entrenamiento previo

### Importaciones y configuración global


In [52]:
import requests
import base64
from PIL import Image
import io
import json
import numpy as np
from sklearn.metrics import confusion_matrix, recall_score, precision_score, f1_score, accuracy_score
import seaborn as sns
import matplotlib.pyplot as plt
from typing import List, Dict, Union, Tuple
from IPython.display import display, Image as IPImage

# Configuraciones globales
BASE_URL = "http://localhost:11434/api/generate"
MODEL_NAME = "llama3.2-vision"
VALID_CLASSES = ['agua', 'leche', 'galletas']
BRANDS = {
    'agua': ['planeta azul', 'dasani', 'cascada'],
    'leche': ['carnation', 'rica', 'nestlé', 'milex'],
    'galletas': ['club social', 'oreo', 'club extra', 'guarina']
}
ERROR_CLASS = 'desconocido'

### Funciones de procesamiento de imágenes

In [53]:
def encode_image(image_path):
    """Codifica una imagen a base64 con manejo mejorado de formatos."""
    try:
        if isinstance(image_path, str):
            with Image.open(image_path) as img:
                if img.mode != 'RGB':
                    img = img.convert('RGB')
                buffer = io.BytesIO()
                img.save(buffer, format='JPEG')
                img_str = base64.b64encode(buffer.getvalue()).decode('utf-8')
        else:
            img = image_path
            if img.mode != 'RGB':
                img = img.convert('RGB')
            buffer = io.BytesIO()
            img.save(buffer, format='JPEG')
            img_str = base64.b64encode(buffer.getvalue()).decode('utf-8')
        
        return img_str
    except Exception as e:
        print(f"Error al codificar la imagen: {str(e)}")
        return None

def clean_prediction(prediction: str) -> Tuple[str, str]:
    """Limpia la respuesta del modelo para extraer la categoría y la marca."""
    prediction_lower = prediction.lower()
    
    # Primero identificamos la categoría
    category_scores = {clase: 0 for clase in VALID_CLASSES}
    
    # Palabras clave para categorías
    keywords = {
        'agua': ['agua', 'botella', 'bebida', 'refresco', 'líquido'],
        'leche': ['leche', 'lácteo', 'dairy', 'milk', 'evaporada', 'condensada'],
        'galletas': ['galleta', 'cookie', 'oreo', 'snack', 'dulce', 'club social', 'cracker', 'sandwich']
    }
    
    # Evaluar categoría
    for categoria, palabras in keywords.items():
        for palabra in palabras:
            if palabra in prediction_lower:
                category_scores[categoria] += 1
    
    # Determinar categoría
    category = max(category_scores.items(), key=lambda x: x[1])[0] if max(category_scores.values()) > 0 else VALID_CLASSES[0]
    
    # Buscar marca
    brand = "marca desconocida"
    for marca in BRANDS[category]:
        if marca.lower() in prediction_lower:
            brand = marca
            break
            
    if brand == "marca desconocida":
        common_prefixes = ["marca", "producto", "de la marca", "elaborado por"]
        for prefix in common_prefixes:
            if prefix in prediction_lower:
                start_idx = prediction_lower.find(prefix) + len(prefix)
                end_idx = prediction_lower.find(" ", start_idx + 15) if " " in prediction_lower[start_idx:] else len(prediction_lower)
                potential_brand = prediction_lower[start_idx:end_idx].strip()
                if len(potential_brand) > 2:
                    brand = potential_brand
                    break
    
    return category, brand

def analyze_image(image_path: str, prompt: str) -> str:
    """Analiza una imagen usando el modelo de visión."""
    base64_image = encode_image(image_path)
    if not base64_image:
        return f"{ERROR_CLASS} (marca desconocida)"
    
    payload = {
        "model": MODEL_NAME,
        "prompt": prompt,
        "stream": False,
        "images": [base64_image]
    }
    
    try:
        print("Enviando petición a Ollama...")
        response = requests.post(BASE_URL, json=payload)
        print(f"Código de estado: {response.status_code}")
        
        if response.status_code == 200:
            result = response.json()
            prediction = result.get('response', 'No se pudo obtener una respuesta')
            category, brand = clean_prediction(prediction)
            print(f"Predicción original: {prediction}")
            print(f"Categoría: {category}")
            print(f"Marca: {brand}")
            return f"{category} ({brand})"
        else:
            print(f"Respuesta completa del servidor: {response.text}")
            return f"{ERROR_CLASS} (marca desconocida)"
        
    except requests.exceptions.RequestException as e:
        print(f"Error detallado: {str(e)}")
        return f"{ERROR_CLASS} (marca desconocida)"

### Funciones de métricas y visualización

In [54]:
def calculate_metrics(predictions: List[str], ground_truth: List[str]) -> Dict[str, float]:
    """Calcula métricas de evaluación."""
    from sklearn.preprocessing import LabelEncoder
    le = LabelEncoder()
    
    # Limpiar predicciones para obtener solo la categoría (antes del paréntesis)
    cleaned_predictions = [pred.split('(')[0].strip() for pred in predictions]
    
    y_true = le.fit_transform(ground_truth)
    y_pred = le.transform(cleaned_predictions)
    
    metrics = {
        'accuracy': accuracy_score(y_true, y_pred),
        'precision_macro': precision_score(y_true, y_pred, average='macro'),
        'recall_macro': recall_score(y_true, y_pred, average='macro'),
        'f1_macro': f1_score(y_true, y_pred, average='macro')
    }
    
    classes = le.classes_
    for i, class_name in enumerate(classes):
        y_true_binary = (y_true == i)
        y_pred_binary = (y_pred == i)
        
        metrics[f'precision_{class_name}'] = precision_score(y_true_binary, y_pred_binary)
        metrics[f'recall_{class_name}'] = recall_score(y_true_binary, y_pred_binary)
        metrics[f'f1_{class_name}'] = f1_score(y_true_binary, y_pred_binary)
        
    return metrics

def plot_confusion_matrix(predictions: List[str], ground_truth: List[str]) -> None:
    """Genera y muestra la matriz de confusión."""
    from sklearn.preprocessing import LabelEncoder
    le = LabelEncoder()
    
    # Limpiar predicciones para obtener solo la categoría
    cleaned_predictions = [pred.split('(')[0].strip() for pred in predictions]
    
    y_true = le.fit_transform(ground_truth)
    y_pred = le.transform(cleaned_predictions)
    
    cm = confusion_matrix(y_true, y_pred)
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, 
               annot=True, 
               fmt='d', 
               cmap='Blues',
               xticklabels=le.classes_,
               yticklabels=le.classes_)
    plt.title('Matriz de Confusión')
    plt.ylabel('Etiqueta Verdadera')
    plt.xlabel('Predicción')
    plt.show()

### Función principal de prueba

In [55]:
def test_vision_with_metrics(image_paths: List[str], 
                           ground_truth: List[str] = None, 
                           prompt: str = None) -> Dict[str, float]:
    """Función de prueba que evalúa múltiples imágenes y muestra métricas."""
    
    if ground_truth is None:
        ground_truth = ['galletas'] * len(image_paths)
    
    if len(image_paths) != len(ground_truth):
        print("ADVERTENCIA: El número de imágenes no coincide con el número de etiquetas ground truth")
        if len(image_paths) > len(ground_truth):
            ground_truth.extend([ground_truth[-1]] * (len(image_paths) - len(ground_truth)))
        else:
            ground_truth = ground_truth[:len(image_paths)]
    
    if prompt is None:
        prompt = f"Identifica qué tipo de producto ves en esta imagen y su marca. El producto debe ser uno de estos: {', '.join(VALID_CLASSES)}. Menciona explícitamente tanto el tipo de producto como la marca, especialmente si es una marca dominicana como Planeta Azul, Club Social, Rica, etc."
    
    print("Ejemplos de imágenes a analizar:")
    for i, path in enumerate(image_paths):
        try:
            display(IPImage(filename=path))
            print(f"Ground truth: {ground_truth[i]}\n")
        except Exception as e:
            print(f"Error al mostrar la imagen {path}: {str(e)}")
    
    print("\nProcesando imágenes...")
    predictions = []
    
    for i, image_path in enumerate(image_paths):
        print(f"\nProcesando imagen {i+1}/{len(image_paths)}: {image_path}")
        prediction = analyze_image(image_path, prompt)
        predictions.append(prediction)
    
    print("\nResumen de predicciones:")
    for i, (pred, true) in enumerate(zip(predictions, ground_truth)):
        print(f"Imagen {i+1}: Predicción = {pred}, Ground Truth = {true}")
    
    metrics = calculate_metrics(predictions, ground_truth)
    plot_confusion_matrix(predictions, ground_truth)
    
    print("\nMétricas de evaluación:")
    for metric_name, value in metrics.items():
        print(f"{metric_name}: {value:.4f}")
    
    return metrics


# Celda 5: Ejecución de prueba
image_paths = ['37_Planeta Azul.jpg', '197_leche.jpg', '4_Club Social Integral Tradicion.jpg', '7_Oreo Original.jpg', '127_Carnation Evaporated Milk.jpg']
ground_truth = ['agua', 'leche', 'galletas', 'galletas', 'leche']
metrics = test_vision_with_metrics(image_paths, ground_truth)

In [26]:
ls

'Agua dasani.jpg'  'Club Soda.jpg'   lemon.jpg  'Planeta Azul.jpg'


In [27]:
cd dataset/

[Errno 2] No such file or directory: 'dataset/'
/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images/water


In [4]:
cd train/

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train


In [6]:
cd images/

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images


In [8]:
cd biscuits/

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images/biscuits


In [13]:
cd ..

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images


In [11]:
cd canned/

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images/canned


In [14]:
cd ..

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train


In [16]:
cd cereals/

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images/cereals


In [18]:
cd ..

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images


In [19]:
cd dried-foods/

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images/dried-foods


In [21]:
cd ..

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images


In [22]:
cd milk-powder/

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images/milk-powder


In [24]:
cd ..

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images


In [25]:
cd water/

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images/water


In [60]:
import tensorflow as tf
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import os
import numpy as np

class ProductClassifier:
    def __init__(self, base_path):
        self.base_path = base_path
        self.img_size = (224, 224)
        # Detectar categorías automáticamente del directorio
        self.categories = self._detect_categories()
        print(f"Categorías detectadas: {self.categories}")
        
        # Modelo para clasificación de categorías
        self.category_model = self._build_category_model()
        
    def _detect_categories(self):
        """Detecta automáticamente las categorías presentes en el directorio"""
        categories = [d for d in os.listdir(self.base_path) 
                     if os.path.isdir(os.path.join(self.base_path, d))]
        return sorted(categories)  # Ordenar para consistencia
            
    def _build_category_model(self):
        """Construye el modelo base usando EfficientNetB0 pre-entrenado"""
        base_model = EfficientNetB0(weights='imagenet', include_top=False)
        
        # Congelar capas base
        for layer in base_model.layers:
            layer.trainable = False
            
        # Agregar capas de clasificación
        x = GlobalAveragePooling2D()(base_model.output)
        x = Dense(256, activation='relu')(x)
        outputs = Dense(len(self.categories), activation='softmax')(x)
        
        model = Model(inputs=base_model.input, outputs=outputs)
        model.compile(optimizer='adam',
                     loss='categorical_crossentropy',
                     metrics=['accuracy'])
        
        return model
    
    def prepare_data(self):
        """Prepara los generadores de datos para entrenamiento y validación"""
        train_datagen = ImageDataGenerator(
            rescale=1./255,
            rotation_range=20,
            width_shift_range=0.2,
            height_shift_range=0.2,
            horizontal_flip=True,
            validation_split=0.2
        )
        
        print(f"Preparando datos desde: {self.base_path}")
        
        train_generator = train_datagen.flow_from_directory(
            self.base_path,
            target_size=self.img_size,
            batch_size=32,
            class_mode='categorical',
            subset='training',
            classes=self.categories,
            shuffle=True
        )
        
        validation_generator = train_datagen.flow_from_directory(
            self.base_path,
            target_size=self.img_size,
            batch_size=32,
            class_mode='categorical',
            subset='validation',
            classes=self.categories,
            shuffle=True
        )
        
        return train_generator, validation_generator
    
    def train(self, epochs=10):
        """Entrena el modelo con los datos preparados"""
        print("Iniciando entrenamiento...")
        train_generator, validation_generator = self.prepare_data()
        
        # Imprimir información sobre los generadores
        print(f"Clases en el generador de entrenamiento: {train_generator.class_indices}")
        print(f"Número de clases: {len(train_generator.class_indices)}")
        print(f"Tamaño del batch: {train_generator.batch_size}")
        print(f"Número de muestras de entrenamiento: {train_generator.samples}")
        print(f"Número de muestras de validación: {validation_generator.samples}")
        
        history = self.category_model.fit(
            train_generator,
            validation_data=validation_generator,
            epochs=epochs,
            callbacks=[
                tf.keras.callbacks.EarlyStopping(
                    monitor='val_loss',
                    patience=3,
                    restore_best_weights=True
                )
            ]
        )
        
        return history
    
    def fine_tune(self, epochs=5):
        """Fine-tune el modelo desbloqueando algunas capas"""
        print("Iniciando fine-tuning...")
        # Desbloquear últimas capas del modelo base
        for layer in self.category_model.layers[-20:]:
            layer.trainable = True
            
        self.category_model.compile(
            optimizer=tf.keras.optimizers.Adam(1e-5),
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )
        
        return self.train(epochs)
    
    def predict_category(self, image_path):
        """Predice la categoría de una imagen"""
        img = tf.keras.preprocessing.image.load_img(
            image_path, target_size=self.img_size
        )
        img_array = tf.keras.preprocessing.image.img_to_array(img)
        img_array = np.expand_dims(img_array, axis=0)
        img_array /= 255.
        
        predictions = self.category_model.predict(img_array)
        predicted_category = self.categories[np.argmax(predictions[0])]
        confidence = np.max(predictions[0])
        
        return predicted_category, confidence
    
    def save_models(self, path):
        """Guarda los modelos entrenados"""
        if not os.path.exists(path):
            os.makedirs(path)
        self.category_model.save(os.path.join(path, 'category_model.h5'))
    
    def load_models(self, path):
        """Carga modelos previamente entrenados"""
        self.category_model = tf.keras.models.load_model(
            os.path.join(path, 'category_model.h5')
        )

# Ejemplo de uso
if __name__ == "__main__":
    base_path = "dataset"
    classifier = ProductClassifier(base_path)
    
    # Entrenamiento inicial
    print("Iniciando entrenamiento inicial...")
    history = classifier.train(epochs=10)
    
    # Fine-tuning
    print("Iniciando fine-tuning...")
    history_ft = classifier.fine_tune(epochs=5)
    
    # Guardar modelos
    classifier.save_models("./models")

Categorías detectadas: ['entrenamiento', 'imagenes_productos', 'train', 'train (Copy)']
Iniciando entrenamiento inicial...
Iniciando entrenamiento...
Preparando datos desde: dataset
Found 333 images belonging to 4 classes.
Found 82 images belonging to 4 classes.
Clases en el generador de entrenamiento: {'entrenamiento': 0, 'imagenes_productos': 1, 'train': 2, 'train (Copy)': 3}
Número de clases: 4
Tamaño del batch: 32
Número de muestras de entrenamiento: 333
Número de muestras de validación: 82
Epoch 1/10
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 1s/step - accuracy: 0.2218 - loss: 1.4767 - val_accuracy: 0.3537 - val_loss: 1.4046
Epoch 2/10
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 895ms/step - accuracy: 0.3344 - loss: 1.3724 - val_accuracy: 0.3537 - val_loss: 1.3820
Epoch 3/10
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 990ms/step - accuracy: 0.3220 - loss: 1.4059 - val_accuracy: 0.2317 - val_loss: 1.3979
Epoch 4/10
[1



In [62]:
    # Ejemplo de predicción
    test_image = "7_Oreo Original.jpg"
    category, confidence = classifier.predict_category(test_image)
    print(f"Categoría predicha: {category} (Confianza: {confidence:.2f})")

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
Categoría predicha: imagenes_productos (Confianza: 0.35)


In [53]:
ls

'=0.26.0'                                ej3.jpeg
'127_Carnation Evaporated Milk.jpg'      ej4.jpeg
 197_leche.jpg                           forClaudeContex.ipynb
'37_Planeta Azul.jpg'                    [0m[01;34mimagenes_productos[0m/
'4_Club Social Integral Tradicion.jpg'   [01;34mmodels[0m/
'7_Oreo Original.jpg'                    output.jpg
 catalog_metadata.json                   productos_rd.csv
 [01;34mdataset[0m/                                Untitled.ipynb
 [01;34mdataset2.0[0m/                             yolov8n.pt
 ej1.jpeg                                yolov8x.pt


In [42]:
cd ..

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook


In [35]:
cd dataset/

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset


In [55]:

import os
import pandas as pd
import shutil
from PIL import Image
import json

class DataProcessor:
    def __init__(self, base_path):
        self.base_path = base_path
        self.categories = ['biscuits', 'canned', 'cereals', 'dried-foods', 'milk-powder', 'water']
        self.metadata = {}
        
    def process_dataset(self):
        """Procesa el dataset y genera metadata"""
        for category in self.categories:
            category_path = os.path.join(self.base_path, category)
            if not os.path.exists(category_path):
                continue
                
            self.metadata[category] = {
                'products': [],
                'total_images': 0,
                'brands': set()
            }
            
            for image_name in os.listdir(category_path):
                if image_name.endswith(('.jpg', '.jpeg', '.png')):
                    product_name = self._clean_product_name(image_name)
                    brand = self._extract_brand(product_name)
                    
                    self.metadata[category]['products'].append({
                        'filename': image_name,
                        'product_name': product_name,
                        'brand': brand
                    })
                    self.metadata[category]['total_images'] += 1
                    self.metadata[category]['brands'].add(brand)
            
            # Convertir set a lista para serialización JSON
            self.metadata[category]['brands'] = list(self.metadata[category]['brands'])
    
    def _clean_product_name(self, filename):
        """Limpia el nombre del producto del nombre del archivo"""
        name = os.path.splitext(filename)[0]
        # Eliminar prefijos comunes como "WhatsApp Image" etc.
        if "WhatsApp Image" in name:
            name = "Unknown Product"
        return name.replace('.', ' ').strip()
    
    def _extract_brand(self, product_name):
        """Extrae la marca del nombre del producto"""
        # Lista de marcas conocidas
        known_brands = {
            'biscuits': ['Casino', 'Club Social', 'Oreo', 'Ritz', 'Costa', 'Emperador'],
            'water': ['Planeta Azul', 'Dasani'],
            'milk-powder': ['Milex', 'Nido'],
            'cereals': ['Nesquik', 'Corn Flakes', 'Trix'],
            'canned': ['Goya', 'La Famosa'],
            'dried-foods': ['Nesquik', 'Sustagen']
        }
        
        for category, brands in known_brands.items():
            for brand in brands:
                if brand.lower() in product_name.lower():
                    return brand
        return "Unknown Brand"
    
    def generate_dataset_summary(self):
        """Genera un resumen del dataset"""
        summary = {
            'total_categories': len(self.categories),
            'total_images': sum(meta['total_images'] for meta in self.metadata.values()),
            'categories': self.metadata
        }
        
        return summary
    
    def save_metadata(self, output_path):
        """Guarda la metadata en formato JSON"""
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(self.metadata, f, indent=4, ensure_ascii=False)
    
    def validate_images(self):
        """Valida la integridad de las imágenes"""
        invalid_images = []
        
        for category in self.categories:
            category_path = os.path.join(self.base_path, category)
            if not os.path.exists(category_path):
                continue
                
            for image_name in os.listdir(category_path):
                image_path = os.path.join(category_path, image_name)
                try:
                    with Image.open(image_path) as img:
                        img.verify()
                except:
                    invalid_images.append(image_path)
        
        return invalid_images

In [57]:
# Ejemplo de uso
if __name__ == "__main__":
    base_path = "dataset/train/images"
    processor = DataProcessor(base_path)
    
    # Procesar dataset
    processor.process_dataset()
    
    # Generar y guardar metadata
    summary = processor.generate_dataset_summary()
    processor.save_metadata("dataset_metadata.json")
    
    # Validar imágenes
    invalid_images = processor.validate_images()
    if invalid_images:
        print("Imágenes inválidas encontradas:", invalid_images)
    
    # Imprimir resumen
    print(json.dumps(summary, indent=2))

{
  "total_categories": 6,
  "total_images": 95,
  "categories": {
    "biscuits": {
      "products": [
        {
          "filename": "FRAC Cl\u00e1sica.jpg",
          "product_name": "FRAC Cl\u00e1sica",
          "brand": "Unknown Brand"
        },
        {
          "filename": "Gretel Chocolate.jpg",
          "product_name": "Gretel Chocolate",
          "brand": "Unknown Brand"
        },
        {
          "filename": "Ritz sabor a Queso.jpg",
          "product_name": "Ritz sabor a Queso",
          "brand": "Ritz"
        },
        {
          "filename": "Costa Choco Chips.jpg",
          "product_name": "Costa Choco Chips",
          "brand": "Costa"
        },
        {
          "filename": "WhatsApp Image 2025-01-28 at 7.33.56 PM (4).jpeg",
          "product_name": "Unknown Product",
          "brand": "Unknown Brand"
        },
        {
          "filename": "WhatsApp Image 2025-01-28 at 7.33.56 PM.jpeg",
          "product_name": "Unknown Product",
          "b

In [56]:
ls

'=0.26.0'                                ej3.jpeg
'127_Carnation Evaporated Milk.jpg'      ej4.jpeg
 197_leche.jpg                           forClaudeContex.ipynb
'37_Planeta Azul.jpg'                    [0m[01;34mimagenes_productos[0m/
'4_Club Social Integral Tradicion.jpg'   [01;34mmodels[0m/
'7_Oreo Original.jpg'                    output.jpg
 catalog_metadata.json                   productos_rd.csv
 [01;34mdataset[0m/                                Untitled.ipynb
 [01;34mdataset2.0[0m/                             yolov8n.pt
 ej1.jpeg                                yolov8x.pt


In [58]:
processor = DataProcessor("dataset")
processor.process_dataset()
summary = processor.generate_dataset_summary()

In [63]:
classifier = ProductClassifier("dataset")
classifier.train()
classifier.fine_tune()

Categorías detectadas: ['entrenamiento', 'imagenes_productos', 'train', 'train (Copy)']
Iniciando entrenamiento...
Preparando datos desde: dataset
Found 333 images belonging to 4 classes.
Found 82 images belonging to 4 classes.
Clases en el generador de entrenamiento: {'entrenamiento': 0, 'imagenes_productos': 1, 'train': 2, 'train (Copy)': 3}
Número de clases: 4
Tamaño del batch: 32
Número de muestras de entrenamiento: 333
Número de muestras de validación: 82
Epoch 1/10
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 1s/step - accuracy: 0.3218 - loss: 1.4463 - val_accuracy: 0.2317 - val_loss: 1.4747
Epoch 2/10
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 882ms/step - accuracy: 0.2784 - loss: 1.4203 - val_accuracy: 0.3537 - val_loss: 1.3767
Epoch 3/10
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 880ms/step - accuracy: 0.3745 - loss: 1.3467 - val_accuracy: 0.3537 - val_loss: 1.4057
Epoch 4/10
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━

<keras.src.callbacks.history.History at 0x764970775ac0>

In [107]:
import os
import json
from typing import Dict, List, Tuple
import requests
import base64
from PIL import Image
import io

class EnhancedVisionClassifier:
    def __init__(self):
        self.BASE_URL = "http://localhost:11434/api/generate"
        self.MODEL_NAME = "llava"  #llama3.2-vision
        self.categories = {
            'biscuits': ['Casino', 'Club Social', 'Oreo', 'Ritz', 'Costa', 'Emperador', 'FRAC', 'Guarina'],
            'canned': ['La Famosa', 'Goya', 'Del Monte'],
            'cereals': ['Nesquik', 'Corn Flakes', 'Trix', 'Gran Cereal'],
            'dried-foods': ['Nesquik', 'Sustagen'],
            'milk-powder': ['Milex', 'Nido', 'Carnation'],
            'water': ['Planeta Azul', 'Dasani', 'Cascada']
        }
        self.dataset_knowledge = {}
        
    def verify_ollama(self):
        """Verifica que Ollama esté funcionando"""
        try:
            response = requests.get("http://localhost:11434/api/tags")
            if response.status_code == 200:
                print("Ollama está funcionando correctamente")
                return True
            else:
                print(f"Ollama respondió con código de estado: {response.status_code}")
                return False
        except requests.exceptions.ConnectionError:
            print("No se pudo conectar a Ollama. ¿Está corriendo el servicio?")
            return False
        
    def load_dataset_knowledge(self, dataset_path: str):
        """Carga conocimiento del dataset para mejorar las predicciones"""
        print(f"\nCargando dataset desde: {os.path.abspath(dataset_path)}")
        
        if not os.path.exists(dataset_path):
            print(f"Error: El directorio {dataset_path} no existe")
            return
            
        for category in self.categories.keys():
            category_path = os.path.join(dataset_path, category)
            print(f"Buscando categoría: {category} en {category_path}")
            
            if os.path.exists(category_path):
                self.dataset_knowledge[category] = []
                files = os.listdir(category_path)
                print(f"Encontrados {len(files)} archivos en {category}")
                
                for img_name in files:
                    if "WhatsApp Image" not in img_name:
                        product_name = os.path.splitext(img_name)[0]
                        self.dataset_knowledge[category].append(product_name)
            else:
                print(f"No se encontró el directorio para la categoría: {category}")

    def generate_enhanced_prompt(self, image_path: str) -> str:
        """Genera un prompt mejorado basado en el conocimiento del dataset"""
        base_prompt = (
            f"Analiza esta imagen '{os.path.basename(image_path)}' y clasifícala en una de las siguientes categorías: "
            f"{', '.join(self.categories.keys())}. "
            "Identifica también la marca del producto. "
            "Las marcas comunes para cada categoría son:\n"
        )
        
        for cat, brands in self.categories.items():
            base_prompt += f"- {cat}: {', '.join(brands)}\n"
        
        if self.dataset_knowledge:
            base_prompt += "\nEjemplos de productos conocidos:\n"
            for cat, products in self.dataset_knowledge.items():
                if products:
                    base_prompt += f"- {cat}: {', '.join(products[:3])}\n"
        
        base_prompt += "\nPor favor, responde SOLO con el formato 'categoría (marca)'"
        print(f"\nPrompt generado:\n{base_prompt}\n")
        return base_prompt

    def encode_image(self, image_path: str) -> str:
        """Codifica la imagen a base64"""
        if not os.path.exists(image_path):
            print(f"Error: La imagen {image_path} no existe")
            return None
            
        try:
            print(f"Codificando imagen: {image_path}")
            with Image.open(image_path) as img:
                if img.mode != 'RGB':
                    img = img.convert('RGB')
                buffer = io.BytesIO()
                img.save(buffer, format='JPEG')
                return base64.b64encode(buffer.getvalue()).decode('utf-8')
        except Exception as e:
            print(f"Error al codificar la imagen: {str(e)}")
            return None

    def classify_image(self, image_path: str) -> Tuple[str, str]:
        """Clasifica una imagen usando el modelo llama3.2-vision mejorado"""
        # Verificar Ollama
        if not self.verify_ollama():
            return "error", "error (Ollama no está disponible)"
            
        # Verificar y codificar imagen
        base64_image = self.encode_image(image_path)
        if not base64_image:
            return "error", "error (imagen no válida)"

        prompt = self.generate_enhanced_prompt(image_path)
        
        payload = {
            "model": self.MODEL_NAME,
            "prompt": prompt,
            "stream": False,
            "images": [base64_image]
        }

        try:
            print("Enviando solicitud a Ollama...")
            response = requests.post(self.BASE_URL, json=payload)
            print(f"Código de respuesta: {response.status_code}")
            
            if response.status_code == 200:
                result = response.json()
                prediction = result.get('response', '').lower()
                print(f"Predicción recibida: {prediction}")
                
                # Extraer categoría y marca
                for category in self.categories.keys():
                    if category in prediction:
                        # Buscar marca
                        for brand in self.categories[category]:
                            if brand.lower() in prediction:
                                return category, brand
                        return category, "marca desconocida"
                
                return "desconocido", "marca desconocida"
            else:
                print(f"Error en la respuesta: {response.text}")
                return "error", f"error (código {response.status_code})"
                
        except Exception as e:
            print(f"Error en la clasificación: {str(e)}")
            return "error", f"error ({str(e)})"

    def validate_dataset(self, dataset_path: str) -> Dict:
        """Valida el dataset y genera estadísticas"""
        stats = {
            'total_images': 0,
            'categories': {},
            'invalid_images': []
        }
        
        print(f"\nValidando dataset en: {os.path.abspath(dataset_path)}")
        
        if not os.path.exists(dataset_path):
            print(f"Error: El directorio del dataset no existe: {dataset_path}")
            return stats
        
        for category in self.categories.keys():
            category_path = os.path.join(dataset_path, category)
            print(f"Verificando categoría: {category}")
            
            if os.path.exists(category_path):
                stats['categories'][category] = {
                    'count': 0,
                    'valid_images': 0,
                    'invalid_images': 0
                }
                
                files = os.listdir(category_path)
                print(f"Encontrados {len(files)} archivos en {category}")
                
                for img_name in files:
                    img_path = os.path.join(category_path, img_name)
                    stats['categories'][category]['count'] += 1
                    stats['total_images'] += 1
                    
                    try:
                        with Image.open(img_path) as img:
                            img.verify()
                            stats['categories'][category]['valid_images'] += 1
                    except Exception as e:
                        print(f"Imagen inválida encontrada: {img_path}")
                        print(f"Error: {str(e)}")
                        stats['categories'][category]['invalid_images'] += 1
                        stats['invalid_images'].append(img_path)
            else:
                print(f"No se encontró el directorio para la categoría: {category}")
        
        return stats

# Ejemplo de uso
if __name__ == "__main__":
    # Inicializar el clasificador
    print("Inicializando clasificador...")
    classifier = EnhancedVisionClassifier()
    
    # Verificar Ollama
    if not classifier.verify_ollama():
        print("Error: No se puede continuar sin Ollama funcionando")
        exit(1)
    
    # Construir la ruta completa al dataset
    base_path = "dataset"
    dataset_path = os.path.join(base_path, "train", "images")
    
    # Verificar que el directorio existe
    if not os.path.exists(dataset_path):
        print(f"Error: No se encontró el directorio del dataset en: {dataset_path}")
        # Intentar con ruta absoluta como fallback
        dataset_path = "/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images"
        if not os.path.exists(dataset_path):
            print(f"Error: Tampoco se encontró el dataset en la ruta absoluta: {dataset_path}")
            exit(1)
    
    print(f"Usando dataset en: {dataset_path}")
    
    # Cargar conocimiento del dataset
    classifier.load_dataset_knowledge(dataset_path)
    
    # Validar dataset
    stats = classifier.validate_dataset(dataset_path)
    print("\nEstadísticas del dataset:")
    print(json.dumps(stats, indent=2))
    
    # Ejemplo de clasificación
   # test_image = "37_Planeta Azul.jpg"  # Asumiendo que la imagen está en el directorio actual
    test_image = "197_leche.jpg"
    print(f"\nClasificando imagen: {test_image}")
    category, brand = classifier.classify_image(test_image)
    print(f"\nResultado de clasificación:")
    print(f"Categoría: {category}")
    print(f"Marca: {brand}")

Inicializando clasificador...
Ollama está funcionando correctamente
Usando dataset en: dataset/train/images

Cargando dataset desde: /home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images
Buscando categoría: biscuits en dataset/train/images/biscuits
Encontrados 42 archivos en biscuits
Buscando categoría: canned en dataset/train/images/canned
Encontrados 8 archivos en canned
Buscando categoría: cereals en dataset/train/images/cereals
Encontrados 26 archivos en cereals
Buscando categoría: dried-foods en dataset/train/images/dried-foods
Encontrados 3 archivos en dried-foods
Buscando categoría: milk-powder en dataset/train/images/milk-powder
Encontrados 12 archivos en milk-powder
Buscando categoría: water en dataset/train/images/water
Encontrados 4 archivos en water

Validando dataset en: /home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images
Verificando categoría: biscuits
Encontrados 42 archivos en biscuits
Verificando c

In [91]:
ls

[0m[01;34mbiscuits[0m/  [01;34mcanned[0m/  [01;34mcereals[0m/  [01;34mdried-foods[0m/  [01;34mmilk-powder[0m/  [01;34mwater[0m/


In [85]:
cd dataset/

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset


In [96]:
ls

'=0.26.0'                                ej3.jpeg
'127_Carnation Evaporated Milk.jpg'      ej4.jpeg
 197_leche.jpg                           forClaudeContex.ipynb
'37_Planeta Azul.jpg'                    [0m[01;34mimagenes_productos[0m/
'4_Club Social Integral Tradicion.jpg'   [01;34mmodels[0m/
'7_Oreo Original.jpg'                    output.jpg
 catalog_metadata.json                   productos_rd.csv
 [01;34mdataset[0m/                                Untitled.ipynb
 [01;34mdataset2.0[0m/                             yolov8n.pt
 dataset_metadata.json                   yolov8x.pt
 ej1.jpeg


In [87]:
cd train/

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train


In [90]:
cd images/

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook/dataset/train/images


In [95]:
cd ..

/home/friasluna/Tesis_Proyect/proyecto_tesis/modelo/jupyer_notebook


In [113]:
import os
import json
from typing import Dict, List, Tuple
import requests
import base64
from PIL import Image
import io
from pathlib import Path
import numpy as np
import cv2

class EnhancedVisionClassifier:
    def __init__(self):
        self.BASE_URL = "http://localhost:11434/api/generate"
        self.MODEL_NAME = "llava"
        self.categories = {
            'biscuits': {
                'brands': ['Casino', 'Club Social', 'Oreo', 'Ritz', 'Costa', 'Emperador', 'FRAC', 'Guarina', 'Max'],
                'keywords': ['galleta', 'cookie', 'cracker', 'wafer', 'biscuit', 'snack']
            },
            'canned': {
                'brands': ['La Famosa', 'Goya', 'Del Monte'],
                'keywords': ['enlatado', 'conserva', 'lata', 'atún', 'sardina']
            },
            'cereals': {
                'brands': ['Nesquik', 'Corn Flakes', 'Trix', 'Gran Cereal'],
                'keywords': ['cereal', 'avena', 'corn', 'granola', 'harina']
            },
            'dried-foods': {
                'brands': ['Nesquik', 'Sustagen'],
                'keywords': ['polvo', 'deshidratado', 'instantáneo']
            },
            'milk-powder': {
                'brands': ['Milex', 'Nido', 'Carnation', 'Leche Entera'],
                'keywords': ['leche', 'milk', 'lácteo', 'fórmula']
            },
            'water': {
                'brands': ['Planeta Azul', 'Dasani', 'Cascada'],
                'keywords': ['agua', 'water', 'bebida', 'botella']
            }
        }
        self.dataset_knowledge = {}
        self.product_features = {}
        
    def load_and_process_dataset(self, dataset_path: str):
        """Carga y procesa el dataset, extrayendo características visuales de las imágenes"""
        print(f"\nProcesando dataset en: {dataset_path}")
        
        for category in self.categories.keys():
            category_path = Path(dataset_path) / category
            if not category_path.exists():
                continue
                
            self.product_features[category] = []
            for img_file in category_path.glob('*.[jJ][pP][gG]'):
                if 'WhatsApp' not in img_file.name:
                    try:
                        # Extraer características visuales
                        img = cv2.imread(str(img_file))
                        if img is None:
                            continue
                            
                        # Convertir a RGB y redimensionar
                        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                        img = cv2.resize(img, (224, 224))
                        
                        # Extraer características básicas
                        features = {
                            'name': img_file.stem,
                            'hist': cv2.calcHist([img], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]).flatten(),
                            'dominant_colors': self._get_dominant_colors(img),
                            'edges': self._get_edge_density(img)
                        }
                        
                        self.product_features[category].append(features)
                        
                    except Exception as e:
                        print(f"Error procesando {img_file}: {e}")
                        
        print("Dataset procesado y características extraídas")

    def _get_dominant_colors(self, img, n_colors=5):
        """Extrae los colores dominantes de una imagen"""
        pixels = img.reshape(-1, 3)
        pixels = np.float32(pixels)
        
        criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 200, .1)
        flags = cv2.KMEANS_RANDOM_CENTERS
        
        _, labels, centers = cv2.kmeans(pixels, n_colors, None, criteria, 10, flags)
        
        _, counts = np.unique(labels, return_counts=True)
        return centers[np.argsort(-counts)]
        
    def _get_edge_density(self, img):
        """Calcula la densidad de bordes en la imagen"""
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        edges = cv2.Canny(gray, 100, 200)
        return np.mean(edges > 0)

    def generate_enhanced_prompt(self, image_path: str, image_features: Dict) -> str:
        """Genera un prompt mejorado basado en características de la imagen"""
        base_prompt = (
            f"Analiza esta imagen y clasifícala. La imagen muestra un producto que podría ser:\n\n"
        )
        
        # Agregar información específica de cada categoría
        for category, info in self.categories.items():
            base_prompt += f"- {category.upper()}: {', '.join(info['brands'])}\n"
            base_prompt += f"  Características típicas: {', '.join(info['keywords'])}\n"
        
        # Agregar información sobre características visuales detectadas
        base_prompt += "\nCaracterísticas detectadas en la imagen:\n"
        base_prompt += f"- Densidad de bordes: {'alta' if image_features['edges'] > 0.1 else 'baja'}\n"
        base_prompt += f"- Colores dominantes: {len(image_features['dominant_colors'])} colores principales\n"
        
        base_prompt += "\nResponde SOLO con el formato: 'categoría (marca)'"
        return base_prompt

    def classify_image(self, image_path: str) -> Tuple[str, str]:
        """Clasifica una imagen usando el modelo mejorado"""
        try:
            # Cargar y procesar la imagen
            img = cv2.imread(image_path)
            if img is None:
                return "error", "error (imagen no válida)"
                
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = cv2.resize(img, (224, 224))
            
            # Extraer características
            image_features = {
                'hist': cv2.calcHist([img], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]).flatten(),
                'dominant_colors': self._get_dominant_colors(img),
                'edges': self._get_edge_density(img)
            }
            
            # Generar prompt mejorado
            prompt = self.generate_enhanced_prompt(image_path, image_features)
            
            # Codificar imagen
            with open(image_path, 'rb') as img_file:
                base64_image = base64.b64encode(img_file.read()).decode('utf-8')
            
            # Preparar payload
            payload = {
                "model": self.MODEL_NAME,
                "prompt": prompt,
                "stream": False,
                "images": [base64_image]
            }
            
            # Realizar petición
            response = requests.post(self.BASE_URL, json=payload)
            if response.status_code != 200:
                return "error", f"error (código {response.status_code})"
                
            result = response.json()
            prediction = result.get('response', '').lower()
            
            # Procesar predicción
            best_category = None
            best_brand = None
            max_score = 0
            
            for category, info in self.categories.items():
                # Verificar keywords
                keyword_score = sum(1 for keyword in info['keywords'] if keyword in prediction)
                
                # Verificar marcas
                brand_score = 0
                detected_brand = None
                for brand in info['brands']:
                    if brand.lower() in prediction:
                        brand_score = 1
                        detected_brand = brand
                        break
                
                total_score = keyword_score + (brand_score * 2)
                if total_score > max_score:
                    max_score = total_score
                    best_category = category
                    best_brand = detected_brand
            
            if best_category:
                return best_category, best_brand if best_brand else "marca desconocida"
            
            return "desconocido", "marca desconocida"
            
        except Exception as e:
            print(f"Error en clasificación: {e}")
            return "error", f"error ({str(e)})"

    def batch_classify(self, image_paths: List[str]) -> List[Tuple[str, str]]:
        """Clasifica un lote de imágenes"""
        results = []
        for path in image_paths:
            category, brand = self.classify_image(path)
            results.append((path, category, brand))
        return results



In [115]:
# Ejemplo de uso
if __name__ == "__main__":
    classifier = EnhancedVisionClassifier()
    
    # Configurar y cargar dataset
    dataset_path = "dataset/train/images"
    classifier.load_and_process_dataset(dataset_path)
    
    # Ejemplo de clasificación
    test_image = "ej4.jpeg"
    category, brand = classifier.classify_image(test_image)
    print(f"\nResultado de clasificación:")
    print(f"Imagen: {test_image}")
    print(f"Categoría: {category}")
    print(f"Marca: {brand}")


Procesando dataset en: dataset/train/images
Dataset procesado y características extraídas

Resultado de clasificación:
Imagen: ej4.jpeg
Categoría: milk-powder
Marca: Nido


In [130]:
import os
import json
from typing import Dict, List, Tuple, Union
import requests
import base64
from PIL import Image
import io
import re
from pathlib import Path
import numpy as np
import cv2

class EnhancedVisionClassifier:
    def __init__(self):
        self.BASE_URL = "http://localhost:11434/api/generate"
        self.MODEL_NAME = "llava:13"
        self.categories = {
            'biscuits': {
                'brands': ['Casino', 'Club Social', 'Oreo', 'Ritz', 'Costa', 'Emperador', 'FRAC', 'Guarina'],
                'keywords': ['galleta', 'cookie', 'cracker', 'wafer', 'biscuit', 'snack']
            },
            'canned': {
                'brands': ['La Famosa', 'Goya', 'Del Monte'],
                'keywords': ['enlatado', 'conserva', 'lata', 'atún', 'sardina']
            },
            'cereals': {
                'brands': ['Nesquik', 'Corn Flakes', 'Trix', 'Gran Cereal'],
                'keywords': ['cereal', 'avena', 'corn', 'granola', 'harina']
            },
            'dried-foods': {
                'brands': ['Nesquik', 'Sustagen'],
                'keywords': ['polvo', 'deshidratado', 'instantáneo']
            },
            'milk-powder': {
                'brands': ['Milex', 'Nido', 'Carnation'],
                'keywords': ['leche', 'milk', 'lácteo', 'fórmula']
            },
            'water': {
                'brands': ['Planeta Azul', 'Dasani', 'Cascada'],
                'keywords': ['agua', 'water', 'bebida', 'botella']
            }
        }
        self.dataset_knowledge = {}
        self.product_features = {}
        
    def load_and_process_dataset(self, dataset_path: str):
        """Carga y procesa el dataset, extrayendo características visuales de las imágenes"""
        print(f"\nProcesando dataset en: {dataset_path}")
        
        for category in self.categories.keys():
            category_path = Path(dataset_path) / category
            if not category_path.exists():
                continue
                
            self.product_features[category] = []
            for img_file in category_path.glob('*.[jJ][pP][gG]'):
                if 'WhatsApp' not in img_file.name:
                    try:
                        # Extraer características visuales
                        img = cv2.imread(str(img_file))
                        if img is None:
                            continue
                            
                        # Convertir a RGB y redimensionar
                        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                        img = cv2.resize(img, (224, 224))
                        
                        # Extraer características básicas
                        features = {
                            'name': img_file.stem,
                            'hist': cv2.calcHist([img], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]).flatten(),
                            'dominant_colors': self._get_dominant_colors(img),
                            'edges': self._get_edge_density(img)
                        }
                        
                        self.product_features[category].append(features)
                        
                    except Exception as e:
                        print(f"Error procesando {img_file}: {e}")
                        
        print("Dataset procesado y características extraídas")

    def _get_dominant_colors(self, img, n_colors=5):
        """Extrae los colores dominantes de una imagen"""
        pixels = img.reshape(-1, 3)
        pixels = np.float32(pixels)
        
        criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 200, .1)
        flags = cv2.KMEANS_RANDOM_CENTERS
        
        _, labels, centers = cv2.kmeans(pixels, n_colors, None, criteria, 10, flags)
        
        _, counts = np.unique(labels, return_counts=True)
        return centers[np.argsort(-counts)]
        
    def _get_edge_density(self, img):
        """Calcula la densidad de bordes en la imagen"""
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        edges = cv2.Canny(gray, 100, 200)
        return np.mean(edges > 0)

    def generate_enhanced_prompt(self, image_path: str, image_features: Dict) -> str:
        """Genera un prompt mejorado basado en características de la imagen"""
        base_prompt = (
            f"Analiza detalladamente esta imagen y enumera TODOS los productos visibles. "
            f"IMPORTANTE: Si hay múltiples unidades del mismo producto, cuéntalas. "
            f"Si hay productos diferentes en la misma imagen, enuméralos por separado.\n\n"
            f"Para cada producto que veas, necesito:\n"
            f"1. Categoría exacta (elige de la lista proporcionada)\n"
            f"2. Marca específica\n"
            f"3. Cantidad precisa de unidades\n\n"
            f"Para la categoría milk-powder, busca específicamente:\n"
            f"- Cajas o envases de leche\n"
            f"- Leche en polvo o líquida\n"
            f"- Marcas como Milex, Nido, Carnation o Leche Entera\n\n"
            f"Los productos deben clasificarse en una de estas categorías:\n\n"
        )
        
        # Agregar información específica de cada categoría
        for category, info in self.categories.items():
            base_prompt += f"- {category.upper()}: {', '.join(info['brands'])}\n"
            base_prompt += f"  Características típicas: {', '.join(info['keywords'])}\n"
        
        # Agregar información sobre características visuales detectadas
        base_prompt += "\nCaracterísticas detectadas en la imagen:\n"
        base_prompt += f"- Densidad de bordes: {'alta' if image_features['edges'] > 0.1 else 'baja'}\n"
        base_prompt += f"- Colores dominantes: {len(image_features['dominant_colors'])} colores principales\n"
        
        base_prompt += "\nResponde en formato JSON como este ejemplo:\n"
        base_prompt += """[
            {"category": "milk-powder", "brand": "Leche Entera", "quantity": 4},
            {"category": "cereals", "brand": "Corn Flakes", "quantity": 1}
        ]"""
        return base_prompt

    def classify_image(self, image_path: str) -> List[Dict[str, Union[str, int]]]:
        """Clasifica múltiples productos en una imagen y cuenta cantidades
        
        Returns:
            List[Dict]: Lista de productos detectados con su categoría, marca y cantidad
        """
        try:
            # Cargar y procesar la imagen
            img = cv2.imread(image_path)
            if img is None:
                return "error", "error (imagen no válida)"
                
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = cv2.resize(img, (224, 224))
            
            # Extraer características
            image_features = {
                'hist': cv2.calcHist([img], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]).flatten(),
                'dominant_colors': self._get_dominant_colors(img),
                'edges': self._get_edge_density(img)
            }
            
            # Generar prompt mejorado
            prompt = self.generate_enhanced_prompt(image_path, image_features)
            
            # Codificar imagen
            with open(image_path, 'rb') as img_file:
                base64_image = base64.b64encode(img_file.read()).decode('utf-8')
            
            # Preparar payload
            payload = {
                "model": self.MODEL_NAME,
                "prompt": prompt,
                "stream": False,
                "images": [base64_image]
            }
            
            # Realizar petición
            response = requests.post(self.BASE_URL, json=payload)
            if response.status_code != 200:
                return "error", f"error (código {response.status_code})"
                
            result = response.json()
            prediction = result.get('response', '').lower()
            
            # Procesar predicción
            products = []
            
            # Primero intentar encontrar un formato JSON en la respuesta
            json_matches = re.findall(r'\[.*\]', prediction.replace('\n', ' '))
            if json_matches:
                try:
                    products = json.loads(json_matches[0])
                    if isinstance(products, list):
                        return products
                except json.JSONDecodeError:
                    pass
                    
            # Si no se encontró JSON válido, procesar el texto
            print("Procesando respuesta en texto plano:", prediction)
            # Procesamiento mejorado del texto
            found_products = {}
            
            # Patrones de cantidad más específicos
            quantity_patterns = [
                r'(\d+)\s*(?:unidades?|cajas?|paquetes?|botellas?|envases?)',
                r'hay\s*(\d+)',
                r'se ven\s*(\d+)',
                r'(\d+)\s*productos?',
                r'cantidad[:\s]+(\d+)',
                r'(\d+)\s*(?:milk|leche|agua|galletas?|cereales?)'
            ]
            
            # Procesar por categoría
            for category, info in self.categories.items():
                category_keywords = set(info['keywords'])
                category_brands = set(brand.lower() for brand in info['brands'])
                
                # Buscar coincidencias de categoría
                if any(keyword in prediction.lower() for keyword in category_keywords):
                    # Buscar marcas específicas
                    for brand in info['brands']:
                        brand_lower = brand.lower()
                        if brand_lower in prediction.lower():
                            product_key = f"{category}-{brand}"
                            if product_key not in found_products:
                                # Buscar cantidad usando todos los patrones
                                quantities = []
                                for pattern in quantity_patterns:
                                    matches = re.findall(pattern, prediction.lower())
                                    quantities.extend(matches)
                                
                                # Si encontramos cantidades, usar la más grande
                                quantity = 1
                                if quantities:
                                    quantity = max(int(q) for q in quantities if q.isdigit())
                                
                                products.append({
                                    "category": category,
                                    "brand": brand,
                                    "quantity": quantity
                                })
                                found_products[product_key] = True
                                
                                print(f"Encontrado: {category} - {brand} - {quantity} unidades")
            
            if not products:
                products.append({
                    "category": "desconocido",
                    "brand": "marca desconocida",
                    "quantity": 1
                })
            
            return products
            
        except Exception as e:
            print(f"Error en clasificación: {e}")
            return "error", f"error ({str(e)})"

    def batch_classify(self, image_paths: List[str]) -> List[Tuple[str, str]]:
        """Clasifica un lote de imágenes"""
        results = []
        for path in image_paths:
            category, brand = self.classify_image(path)
            results.append((path, category, brand))
        return results

# Ejemplo de uso
if __name__ == "__main__":
    classifier = EnhancedVisionClassifier()
    
    # Configurar y cargar dataset
    dataset_path = "dataset/train/images"
    classifier.load_and_process_dataset(dataset_path)
    
    # Ejemplo de clasificación
    test_image = "ej4.jpeg"
    category, brand = classifier.classify_image(test_image)
    print(f"\nResultado de clasificación:")
    print(f"Imagen: {test_image}")
    print(f"Categoría: {category}")
    print(f"Marca: {brand}")


Procesando dataset en: dataset/train/images
Dataset procesado y características extraídas

Resultado de clasificación:
Imagen: ej4.jpeg
Categoría: {'category': 'milk-powder', 'brand': 'leche entera', 'quantity': 4}
Marca: {'category': 'cereals', 'brand': 'corn flakes', 'quantity': 1}


In [145]:
import os
import json
from typing import Dict, List, Tuple, Union
import requests
import base64
from PIL import Image
import io
import re
from pathlib import Path
import numpy as np
import cv2

class EnhancedVisionClassifier:
    def __init__(self):
        self.BASE_URL = "http://localhost:11434/api/generate"
        self.MODEL_NAME = "llava"
        self.categories = {
            'biscuits': {
                'brands': ['Casino', 'Club Social', 'Oreo', 'Ritz', 'Costa', 'Emperador', 'FRAC', 'Guarina'],
                'keywords': ['galleta', 'cookie', 'cracker', 'wafer', 'biscuit', 'snack']
            },
            'canned': {
                'brands': ['La Famosa', 'Goya', 'Del Monte'],
                'keywords': ['enlatado', 'conserva', 'lata', 'atún', 'sardina']
            },
            'cereals': {
                'brands': ['Nesquik', 'Corn Flakes', 'Trix', 'Gran Cereal'],
                'keywords': ['cereal', 'avena', 'corn', 'granola', 'harina']
            },
            'dried-foods': {
                'brands': ['Nesquik', 'Sustagen'],
                'keywords': ['polvo', 'deshidratado', 'instantáneo']
            },
            'milk-powder': {
                'brands': ['Milex', 'Nido', 'Carnation'],
                'keywords': ['leche', 'milk', 'lácteo', 'fórmula']
            },
            'water': {
                'brands': ['Planeta Azul', 'Dasani', 'Cascada'],
                'keywords': ['agua', 'water', 'bebida', 'botella']
            }
        }
        self.dataset_knowledge = {}
        self.product_features = {}
        
    def load_and_process_dataset(self, dataset_path: str):
        """Carga y procesa el dataset, extrayendo características visuales de las imágenes"""
        print(f"\nProcesando dataset en: {dataset_path}")
        
        for category in self.categories.keys():
            category_path = Path(dataset_path) / category
            if not category_path.exists():
                continue
                
            self.product_features[category] = []
            for img_file in category_path.glob('*.[jJ][pP][gG]'):
                if 'WhatsApp' not in img_file.name:
                    try:
                        # Extraer características visuales
                        img = cv2.imread(str(img_file))
                        if img is None:
                            continue
                            
                        # Convertir a RGB y redimensionar
                        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                        img = cv2.resize(img, (224, 224))
                        
                        # Extraer características básicas
                        features = {
                            'name': img_file.stem,
                            'hist': cv2.calcHist([img], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]).flatten(),
                            'dominant_colors': self._get_dominant_colors(img),
                            'edges': self._get_edge_density(img)
                        }
                        
                        self.product_features[category].append(features)
                        
                    except Exception as e:
                        print(f"Error procesando {img_file}: {e}")
                        
        print("Dataset procesado y características extraídas")

    def _get_dominant_colors(self, img, n_colors=5):
        """Extrae los colores dominantes de una imagen"""
        pixels = img.reshape(-1, 3)
        pixels = np.float32(pixels)
        
        criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 200, .1)
        flags = cv2.KMEANS_RANDOM_CENTERS
        
        _, labels, centers = cv2.kmeans(pixels, n_colors, None, criteria, 10, flags)
        
        _, counts = np.unique(labels, return_counts=True)
        return centers[np.argsort(-counts)]
        
    def _get_edge_density(self, img):
        """Calcula la densidad de bordes en la imagen"""
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        edges = cv2.Canny(gray, 100, 200)
        return np.mean(edges > 0)

    def generate_enhanced_prompt(self, image_path: str, image_features: Dict) -> str:
        """Genera un prompt mejorado basado en características de la imagen"""
        base_prompt = (
            f"Analiza detalladamente esta imagen y enumera TODOS los productos visibles. "
            f"IMPORTANTE: Si hay múltiples unidades del mismo producto, cuéntalas. "
            f"Si hay productos diferentes en la misma imagen, enuméralos por separado.\n\n"
            f"Para cada producto que veas, necesito:\n"
            f"1. Categoría exacta (elige de la lista proporcionada)\n"
            f"2. Marca específica\n"
            f"3. Cantidad precisa de unidades\n\n"
            f"Para la categoría milk-powder, busca específicamente:\n"
            f"- Cajas o envases de leche\n"
            f"- Leche en polvo o líquida\n"
            f"- Marcas como Milex, Nido, Carnation o Leche Entera\n\n"
            f"Los productos deben clasificarse en una de estas categorías:\n\n"
        )
        
        # Agregar información específica de cada categoría
        for category, info in self.categories.items():
            base_prompt += f"- {category.upper()}: {', '.join(info['brands'])}\n"
            base_prompt += f"  Características típicas: {', '.join(info['keywords'])}\n"
        
        # Agregar información sobre características visuales detectadas
        base_prompt += "\nCaracterísticas detectadas en la imagen:\n"
        base_prompt += f"- Densidad de bordes: {'alta' if image_features['edges'] > 0.1 else 'baja'}\n"
        base_prompt += f"- Colores dominantes: {len(image_features['dominant_colors'])} colores principales\n"
        
        base_prompt += "\nResponde en formato JSON como este ejemplo:\n"
        base_prompt += """[
            {"category": "milk-powder", "brand": "Leche Entera", "quantity": 4},
            {"category": "cereals", "brand": "Corn Flakes", "quantity": 1}
        ]"""
        return base_prompt

    def classify_image(self, image_path: str) -> List[Dict[str, Union[str, int]]]:
        """Clasifica múltiples productos en una imagen y cuenta cantidades
        
        Returns:
            List[Dict]: Lista de productos detectados con su categoría, marca y cantidad
        """
        try:
            # Cargar y procesar la imagen
            img = cv2.imread(image_path)
            if img is None:
                return "error", "error (imagen no válida)"
                
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = cv2.resize(img, (224, 224))
            
            # Extraer características
            image_features = {
                'hist': cv2.calcHist([img], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]).flatten(),
                'dominant_colors': self._get_dominant_colors(img),
                'edges': self._get_edge_density(img)
            }
            
            # Generar prompt mejorado
            prompt = self.generate_enhanced_prompt(image_path, image_features)
            
            # Codificar imagen
            with open(image_path, 'rb') as img_file:
                base64_image = base64.b64encode(img_file.read()).decode('utf-8')
            
            # Preparar payload
            payload = {
                "model": self.MODEL_NAME,
                "prompt": prompt,
                "stream": False,
                "images": [base64_image]
            }
            
            # Realizar petición
            response = requests.post(self.BASE_URL, json=payload)
            if response.status_code != 200:
                return "error", f"error (código {response.status_code})"
                
            result = response.json()
            prediction = result.get('response', '').lower()
            
            # Procesar predicción
            products = []
            
            # Primero intentar encontrar un formato JSON en la respuesta
            json_matches = re.findall(r'\[.*\]', prediction.replace('\n', ' '))
            if json_matches:
                try:
                    products = json.loads(json_matches[0])
                    if isinstance(products, list):
                        return products
                except json.JSONDecodeError:
                    pass
                    
            # Si no se encontró JSON válido, procesar el texto
            print("Procesando respuesta en texto plano:", prediction)
            # Procesamiento mejorado del texto
            found_products = {}
            
            # Patrones de cantidad más específicos
            quantity_patterns = [
                r'(\d+)\s*(?:unidades?|cajas?|paquetes?|botellas?|envases?)',
                r'hay\s*(\d+)',
                r'se ven\s*(\d+)',
                r'(\d+)\s*productos?',
                r'cantidad[:\s]+(\d+)',
                r'(\d+)\s*(?:milk|leche|agua|galletas?|cereales?)'
            ]
            
            # Procesar por categoría
            for category, info in self.categories.items():
                category_keywords = set(info['keywords'])
                category_brands = set(brand.lower() for brand in info['brands'])
                
                # Buscar coincidencias de categoría
                if any(keyword in prediction.lower() for keyword in category_keywords):
                    # Buscar marcas específicas
                    for brand in info['brands']:
                        brand_lower = brand.lower()
                        if brand_lower in prediction.lower():
                            product_key = f"{category}-{brand}"
                            if product_key not in found_products:
                                # Buscar cantidad usando todos los patrones
                                quantities = []
                                for pattern in quantity_patterns:
                                    matches = re.findall(pattern, prediction.lower())
                                    quantities.extend(matches)
                                
                                # Si encontramos cantidades, usar la más grande
                                quantity = 1
                                if quantities:
                                    quantity = max(int(q) for q in quantities if q.isdigit())
                                
                                products.append({
                                    "category": category,
                                    "brand": brand,
                                    "quantity": quantity
                                })
                                found_products[product_key] = True
                                
                                print(f"Encontrado: {category} - {brand} - {quantity} unidades")
            
            if not products:
                products.append({
                    "category": "desconocido",
                    "brand": "marca desconocida",
                    "quantity": 1
                })
            
            return products
            
        except Exception as e:
            print(f"Error en clasificación: {e}")
            return "error", f"error ({str(e)})"

    def batch_classify(self, image_paths: List[str]) -> List[Tuple[str, str]]:
        """Clasifica un lote de imágenes"""
        results = []
        for path in image_paths:
            category, brand = self.classify_image(path)
            results.append((path, category, brand))
        return results

# Ejemplo de uso
if __name__ == "__main__":
    classifier = EnhancedVisionClassifier()
    
    try:
        # Verificar que Ollama está funcionando
        response = requests.get("http://localhost:11434/api/tags")
        if response.status_code != 200:
            print("Error: No se puede conectar con Ollama. Asegúrate de que esté corriendo.")
            exit(1)
            
        # Intentar descargar llava:13b si no está disponible
        print("\nVerificando/descargando llava:13b...")
        download_response = requests.post("http://localhost:11434/api/pull", json={"name": "llava:13b"})
        if download_response.status_code != 200:
            print("Error descargando llava:13b. Por favor, ejecuta 'ollama pull llava:13b' manualmente")
            exit(1)
    except requests.exceptions.ConnectionError:
        print("Error: No se puede conectar con Ollama. ¿Está corriendo el servicio?")
        exit(1)
    
    # Configurar y cargar dataset
    dataset_path = "dataset/train/images"
    classifier.load_and_process_dataset(dataset_path)
    
    # Ejemplo de clasificación
    test_image = "ej4.jpeg"
    products = classifier.classify_image(test_image)
    
    print(f"\nResultado de clasificación para {test_image}:")
    print("\nProductos detectados:")
    
    if isinstance(products, list):
        for product in products:
            if isinstance(product, dict):
                print(f"\nCategoría: {product.get('category', 'desconocido')}")
                print(f"Marca: {product.get('brand', 'desconocida')}")
                print(f"Cantidad: {product.get('quantity', 1)} unidades")
            else:
                print(f"Error: Formato de producto inesperado - {product}")
    else:
        print("Error en la clasificación:", products)


Verificando/descargando llava:13b...

Procesando dataset en: dataset/train/images
Dataset procesado y características extraídas

Resultado de clasificación para ej4.jpeg:

Productos detectados:

Categoría: milk-powder
Marca: leche entera
Cantidad: 4 unidades

Categoría: cereals
Marca: corn flakes
Cantidad: 1 unidades
