# Memory-Efficient Shallow Learning for Full Dataset

This notebook implements a streaming approach to handle the full 12,870 image dataset without memory issues.

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
import cv2
import os
import sys
import pickle
from pathlib import Path
import gc
import tempfile
import joblib
from typing import Generator, List, Tuple

# Scikit-learn imports
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import SGDClassifier
from sklearn.decomposition import IncrementalPCA

# Add parent directory to path
sys.path.append('../..')
from ml_models_core.src.base_classifier import BaseImageClassifier
from ml_models_core.src.model_registry import ModelRegistry, ModelMetadata
from ml_models_core.src.utils import ModelUtils

# Set random seed
np.random.seed(42)
plt.style.use('default')

print("Setup complete - streaming approach ready")

2025-06-27 12:18:34.629251: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-06-27 12:18:34.648507: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Setup complete - streaming approach ready


2025-06-27 12:18:35.426439: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


In [2]:
# First, let's just get the dataset structure without loading anything
dataset_paths = [
    "/home/brandond/Projects/pvt/personal/image_game/data/downloads/combined_unified_classification",
    "/home/brandond/Projects/pvt/personal/image_game/image-classifier-shallow/notebooks/data/downloads/combined_unified_classification"
]

dataset_path = None
for path in dataset_paths:
    if os.path.exists(path):
        dataset_path = path
        break

if dataset_path is None:
    raise FileNotFoundError("Dataset not found in any of the expected locations")

print(f"Using dataset: {dataset_path}")

# Quick scan to get basic info
class_dirs = [d for d in Path(dataset_path).iterdir() if d.is_dir() and not d.name.startswith('.')]
class_names = sorted([d.name for d in class_dirs])
print(f"Found {len(class_names)} classes")
print(f"First 10 classes: {class_names[:10]}")
print(f"Last 10 classes: {class_names[-10:]}")

class StreamingDatasetProcessor:
    """Process large datasets without loading everything into memory."""
    
    def __init__(self, dataset_path: str, image_size: Tuple[int, int] = (64, 64)):
        self.dataset_path = Path(dataset_path)
        self.image_size = image_size
        self.class_names = None
        self.class_to_idx = None
        self._scan_classes()
    
    def _scan_classes(self):
        """Scan for class directories only."""
        class_dirs = [d for d in self.dataset_path.iterdir() 
                     if d.is_dir() and not d.name.startswith('.')]
        self.class_names = sorted([d.name for d in class_dirs])
        self.class_to_idx = {name: idx for idx, name in enumerate(self.class_names)}
        print(f"Found {len(self.class_names)} classes")
    
    def get_file_paths_generator(self) -> Generator[Tuple[str, int], None, None]:
        """Generator that yields (image_path, label) without loading images."""
        valid_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
        
        for class_dir in self.dataset_path.iterdir():
            if not class_dir.is_dir() or class_dir.name.startswith('.'):
                continue
                
            class_idx = self.class_to_idx[class_dir.name]
            
            for img_path in class_dir.iterdir():
                if img_path.suffix.lower() in valid_extensions:
                    yield str(img_path), class_idx
    
    def count_total_images(self) -> int:
        """Count total images without loading them."""
        count = 0
        for _ in self.get_file_paths_generator():
            count += 1
        return count
    
    def load_image_batch(self, paths_and_labels: List[Tuple[str, int]]) -> Tuple[np.ndarray, np.ndarray]:
        """Load a batch of images."""
        images = []
        labels = []
        
        for path, label in paths_and_labels:
            try:
                img = Image.open(path).convert('RGB')
                img = img.resize(self.image_size, Image.Resampling.LANCZOS)
                img_array = np.array(img, dtype=np.uint8)
                images.append(img_array)
                labels.append(label)
            except Exception as e:
                print(f"Error loading {path}: {e}")
                # Skip corrupted images
                continue
        
        return np.array(images), np.array(labels)
    
    def get_batch_generator(self, batch_size: int = 100) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]:
        """Generator that yields batches of (images, labels)."""
        batch_paths_labels = []
        
        for path, label in self.get_file_paths_generator():
            batch_paths_labels.append((path, label))
            
            if len(batch_paths_labels) >= batch_size:
                images, labels = self.load_image_batch(batch_paths_labels)
                if len(images) > 0:  # Only yield if we have valid images
                    yield images, labels
                batch_paths_labels = []
                gc.collect()  # Clean up after each batch
        
        # Handle remaining images
        if batch_paths_labels:
            images, labels = self.load_image_batch(batch_paths_labels)
            if len(images) > 0:
                yield images, labels

# Initialize the streaming processor
processor = StreamingDatasetProcessor(dataset_path)
print(f"Classes: {processor.class_names}")

# Count total images
print("Counting total images...")
total_images = processor.count_total_images()
print(f"Total images: {total_images}")

FileNotFoundError: Dataset not found in any of the expected locations

In [None]:
class StreamingFeatureExtractor:
    """Extract features from images in streaming fashion."""
    
    def __init__(self):
        self.scaler = StandardScaler()
        self.pca = None  # Will initialize after seeing feature dimensions
        self.is_fitted = False
        self.feature_dim = None
    
    def extract_features_batch(self, images: np.ndarray) -> np.ndarray:
        """Extract features from a batch of images."""
        batch_features = []
        
        for img in images:
            features = []
            
            # Convert to grayscale
            gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
            
            # Basic statistics for each RGB channel (6 features x 3 channels = 18)
            for channel in range(3):
                channel_data = img[:, :, channel].flatten()
                features.extend([
                    np.mean(channel_data),
                    np.std(channel_data),
                    np.min(channel_data),
                    np.max(channel_data),
                    np.percentile(channel_data, 25),
                    np.percentile(channel_data, 75)
                ])
            
            # Grayscale statistics (3 features)
            gray_flat = gray.flatten()
            features.extend([
                np.mean(gray_flat),
                np.std(gray_flat),
                np.var(gray_flat)
            ])
            
            # Color histogram features (8 bins x 3 channels = 24)
            for channel in range(3):
                hist, _ = np.histogram(img[:, :, channel], bins=8, range=(0, 256))
                hist = hist / (np.sum(hist) + 1e-8)  # Normalize with small epsilon
                features.extend(hist)
            
            # Edge and texture features (5 features)
            edges = cv2.Canny(gray, 50, 150)
            features.extend([
                np.sum(edges > 0) / edges.size,  # Edge density
                np.mean(edges),
                np.std(edges)
            ])
            
            # Simple texture measures (2 features)
            grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
            grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
            gradient_mag = np.sqrt(grad_x**2 + grad_y**2)
            
            features.extend([
                np.mean(gradient_mag),
                np.std(gradient_mag)
            ])
            
            batch_features.append(features)
        
        result = np.array(batch_features)
        return result
    
    def fit_transform_streaming(self, processor: StreamingDatasetProcessor, 
                               batch_size: int = 100) -> Tuple[List[np.ndarray], List[np.ndarray]]:
        """Fit scaler and PCA incrementally and transform data."""
        all_features = []
        all_labels = []
        batch_count = 0
        
        print("Phase 1: Fitting scaler and PCA incrementally...")
        
        # First pass: fit scaler and PCA
        for images, labels in processor.get_batch_generator(batch_size):
            batch_count += 1
            print(f"Processing batch {batch_count} ({len(images)} images)")
            
            # Extract features
            features = self.extract_features_batch(images)
            print(f"Extracted features shape: {features.shape}")
            
            # Initialize PCA on first batch based on actual feature dimensions
            if self.pca is None:
                self.feature_dim = features.shape[1]
                # Conservative: use at most min(20, feature_dim-1, batch_size-1)
                n_components = min(20, self.feature_dim - 1, len(features) - 1)
                n_components = max(1, n_components)  # Ensure at least 1 component
                
                print(f"Feature dimensions: {self.feature_dim}")
                print(f"Batch size: {len(features)}")
                print(f"Initializing PCA with {n_components} components")
                
                if n_components < self.feature_dim and len(features) > n_components:
                    self.pca = IncrementalPCA(n_components=n_components, batch_size=min(50, len(features)))
                    print(f"PCA initialized successfully")
                else:
                    print(f"Skipping PCA - using raw scaled features")
                    self.pca = None
            
            # Fit scaler incrementally
            self.scaler.partial_fit(features)
            
            # Transform features
            features_scaled = self.scaler.transform(features)
            
            # Apply PCA if available and conditions are met
            if (self.pca is not None and 
                len(features_scaled) > self.pca.n_components and 
                features_scaled.shape[1] > self.pca.n_components):
                
                self.pca.partial_fit(features_scaled)
                features_final = self.pca.transform(features_scaled)
                print(f"Applied PCA: {features_scaled.shape} -> {features_final.shape}")
            else:
                features_final = features_scaled
                print(f"Using scaled features without PCA: {features_final.shape}")
            
            all_features.append(features_final)
            all_labels.append(labels)
            
            # Memory cleanup
            del images, features, features_scaled, features_final
            gc.collect()
            
            # Break after a few batches for testing
            if batch_count >= 5:
                print(f"Stopping after {batch_count} batches for testing...")
                break
        
        self.is_fitted = True
        print(f"Fitted on {batch_count} batches")
        if self.pca is not None and hasattr(self.pca, 'explained_variance_ratio_'):
            print(f"PCA explained variance ratio: {self.pca.explained_variance_ratio_.sum():.3f}")
        else:
            print("No PCA applied - using scaled features")
        
        return all_features, all_labels
    
    def transform_batch(self, images: np.ndarray) -> np.ndarray:
        """Transform a batch of images using fitted transformers."""
        if not self.is_fitted:
            raise ValueError("Must fit the transformer first")
        
        features = self.extract_features_batch(images)
        features_scaled = self.scaler.transform(features)
        
        if (self.pca is not None and 
            hasattr(self.pca, 'components_') and 
            features_scaled.shape[1] == self.pca.n_features_in_):
            features_pca = self.pca.transform(features_scaled)
            return features_pca
        else:
            return features_scaled

# Test feature extraction first
print("Testing feature extraction...")
test_processor = StreamingDatasetProcessor(dataset_path)

# Get one small batch to test
test_batch = None
for images, labels in test_processor.get_batch_generator(batch_size=5):
    test_batch = (images, labels)
    break

if test_batch:
    test_images, test_labels = test_batch
    print(f"Test batch: {test_images.shape}, labels: {test_labels.shape}")
    
    # Test feature extraction
    test_extractor = StreamingFeatureExtractor()
    test_features = test_extractor.extract_features_batch(test_images)
    print(f"Test features shape: {test_features.shape}")
    print(f"Feature count breakdown:")
    print(f"- RGB stats: 18 (6 per channel)")
    print(f"- Grayscale stats: 3")
    print(f"- Histograms: 24 (8 bins x 3 channels)")
    print(f"- Edge features: 3")
    print(f"- Texture features: 2")
    print(f"- Expected total: 50")
    print(f"- Actual total: {test_features.shape[1]}")
    
    # Clean up test data
    del test_images, test_labels, test_features, test_extractor, test_batch
    gc.collect()

# Initialize feature extractor
feature_extractor = StreamingFeatureExtractor()
print("Feature extractor initialized")

In [None]:
# Extract features from full dataset using streaming approach
print("Starting streaming feature extraction on full dataset...")
all_features, all_labels = feature_extractor.fit_transform_streaming(processor, batch_size=50)

# Combine all features and labels
print("Combining extracted features...")
X = np.vstack(all_features)
y = np.concatenate(all_labels)

print(f"Final feature matrix shape: {X.shape}")
print(f"Final labels shape: {y.shape}")
print(f"Number of classes: {len(np.unique(y))}")

# Clean up intermediate arrays
del all_features, all_labels
gc.collect()

# Check memory usage
import psutil
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
print(f"Current memory usage: {memory_mb:.1f} MB")

In [None]:
# Split data for training
print("Splitting data for training...")
X_temp, X_test, y_temp, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

X_train, X_val, y_train, y_val = train_test_split(
    X_temp, y_temp, test_size=0.25, random_state=42, stratify=y_temp
)

print(f"Training set: {X_train.shape[0]} samples")
print(f"Validation set: {X_val.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")

# Clean up full dataset from memory
del X, y, X_temp, y_temp
gc.collect()

# Use memory-efficient models
print("\nTraining memory-efficient models...")

# SGD Classifier (memory efficient)
sgd_model = SGDClassifier(random_state=42, max_iter=1000)
sgd_model.fit(X_train, y_train)
sgd_val_acc = accuracy_score(y_val, sgd_model.predict(X_val))
print(f"SGD Validation Accuracy: {sgd_val_acc:.4f}")

# Random Forest (limited trees for memory)
rf_model = RandomForestClassifier(n_estimators=50, max_depth=10, random_state=42, n_jobs=-1)
rf_model.fit(X_train, y_train)
rf_val_acc = accuracy_score(y_val, rf_model.predict(X_val))
print(f"Random Forest Validation Accuracy: {rf_val_acc:.4f}")

# Choose best model
if rf_val_acc > sgd_val_acc:
    best_model = rf_model
    best_name = "Random Forest"
    best_val_acc = rf_val_acc
else:
    best_model = sgd_model
    best_name = "SGD Classifier"
    best_val_acc = sgd_val_acc

print(f"\nBest model: {best_name} with validation accuracy: {best_val_acc:.4f}")

# Test on test set
test_predictions = best_model.predict(X_test)
test_accuracy = accuracy_score(y_test, test_predictions)
print(f"Test accuracy: {test_accuracy:.4f}")

# Memory check
memory_mb = process.memory_info().rss / 1024 / 1024
print(f"Final memory usage: {memory_mb:.1f} MB")

In [None]:
# Save the trained model
class StreamingShallowClassifier(BaseImageClassifier):
    def __init__(self, model_name="streaming-shallow-classifier", version="1.0.0"):
        super().__init__(model_name, version)
        self.model = None
        self.feature_extractor = None
        self.class_names = None
    
    def load_model(self, model_path: str) -> None:
        with open(model_path, 'rb') as f:
            model_data = pickle.load(f)
        self.model = model_data['model']
        self.feature_extractor = model_data['feature_extractor']
        self.class_names = model_data['class_names']
        self._is_loaded = True
    
    def preprocess(self, image: np.ndarray) -> np.ndarray:
        image_resized = ModelUtils.resize_image(image, (64, 64))
        if len(image_resized.shape) == 3 and image_resized.shape[2] == 4:
            image_resized = ModelUtils.convert_to_rgb(image_resized)
        if image_resized.max() <= 1.0:
            image_resized = (image_resized * 255).astype(np.uint8)
        return image_resized
    
    def predict(self, image: np.ndarray) -> dict:
        if not self.is_loaded:
            raise ValueError("Model not loaded")
        
        processed_image = self.preprocess(image)
        features = self.feature_extractor.transform_batch(np.array([processed_image]))
        
        if hasattr(self.model, 'predict_proba'):
            probabilities = self.model.predict_proba(features)[0]
        else:
            # For SGD, create pseudo-probabilities
            prediction = self.model.predict(features)[0]
            probabilities = np.zeros(len(self.class_names))
            probabilities[prediction] = 1.0
        
        return {self.class_names[i]: float(prob) for i, prob in enumerate(probabilities)}
    
    def get_metadata(self) -> dict:
        return {
            "model_type": "streaming_shallow_learning",
            "algorithm": type(self.model).__name__,
            "feature_dimensions": self.feature_extractor.pca.n_components_,
            "classes": self.class_names,
            "version": self.version
        }
    
    def save_model(self, model_path: str, model, feature_extractor, class_names):
        model_data = {
            'model': model,
            'feature_extractor': feature_extractor,
            'class_names': class_names
        }
        with open(model_path, 'wb') as f:
            pickle.dump(model_data, f)
        print(f"Model saved to {model_path}")

# Save the model
model_path = "../models/streaming_shallow_classifier.pkl"
os.makedirs("../models", exist_ok=True)

classifier = StreamingShallowClassifier()
classifier.save_model(model_path, best_model, feature_extractor, processor.class_names)

print(f"\nModel training complete!")
print(f"Best model: {best_name}")
print(f"Validation accuracy: {best_val_acc:.4f}")
print(f"Test accuracy: {test_accuracy:.4f}")
print(f"Model saved to: {model_path}")
print(f"Total classes: {len(processor.class_names)}")
print(f"Total images processed: {len(X_train) + len(X_val) + len(X_test)}")