In [None]:
# Testing code for Model 3

import tensorflow as tf
from tensorflow import keras
import numpy as np
import os
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import pandas as pd
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

def create_improved_model(dataset_path, save_dir):
    # Create save directory if it doesn't exist
    os.makedirs(save_dir, exist_ok=True)
    
    # Configuration settings
    IMG_SIZE = 224
    BATCH_SIZE = 16  # Reduced to avoid memory issues
    VALIDATION_SPLIT = 0.2
    INITIAL_EPOCHS = 15
    FINE_TUNE_EPOCHS = 10
    
    # 1. Analyze dataset
    print("Analyzing dataset distribution...")
    class_counts = {}
    for class_name in os.listdir(dataset_path):
        class_path = os.path.join(dataset_path, class_name)
        if os.path.isdir(class_path):
            num_images = len([f for f in os.listdir(class_path) 
                            if os.path.isfile(os.path.join(class_path, f)) and
                            f.lower().endswith(('.png', '.jpg', '.jpeg'))])
            class_counts[class_name] = num_images
            print(f"Class '{class_name}': {num_images} images")
    
    # Calculate class weights to handle imbalance
    total_samples = sum(class_counts.values())
    num_classes = len(class_counts)
    
    if num_classes == 0:
        raise ValueError(f"No valid class directories found in {dataset_path}")
    
    class_weights = {}
    for i, (class_name, count) in enumerate(class_counts.items()):
        # Avoid division by zero
        if count > 0:
            class_weights[i] = (1 / count) * (total_samples / num_classes)
        else:
            class_weights[i] = 1.0
    
    print("Class weights:", class_weights)
    
    # 2. Data augmentation for training
    data_augmentation = keras.Sequential([
        keras.layers.RandomFlip("horizontal"),
        keras.layers.RandomRotation(0.1),  # Reduced for stability
        keras.layers.RandomZoom(0.1),      # Reduced for stability
        keras.layers.RandomBrightness(0.1) # Reduced for stability
    ])
    
    # 3. Create data generators with error handling
    print("Loading and preparing dataset...")
    try:
        train_ds = keras.preprocessing.image_dataset_from_directory(
            dataset_path,
            validation_split=VALIDATION_SPLIT,
            subset="training",
            seed=42,
            image_size=(IMG_SIZE, IMG_SIZE),
            batch_size=BATCH_SIZE,
            label_mode='int',  # Explicitly set label mode
            color_mode='rgb'   # Explicitly set color mode
        )
        
        val_ds = keras.preprocessing.image_dataset_from_directory(
            dataset_path,
            validation_split=VALIDATION_SPLIT,
            subset="validation",
            seed=42,
            image_size=(IMG_SIZE, IMG_SIZE),
            batch_size=BATCH_SIZE,
            label_mode='int',  # Explicitly set label mode
            color_mode='rgb'   # Explicitly set color mode
        )
    except Exception as e:
        raise Exception(f"Error loading the dataset: {str(e)}")
    
    # Save class names
    class_names = train_ds.class_names
    print(f"Classes found: {class_names}")
    
    # Save class names to a text file
    with open(os.path.join(save_dir, 'class_names.txt'), 'w') as f:
        for name in class_names:
            f.write(name + '\n')
    
    # 4. Optimize dataset loading performance
    AUTOTUNE = tf.data.AUTOTUNE
    
    # Handle potential memory issues with smaller prefetch size
    train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
    val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)
    
    # 5. Create preprocessing function (normalization)
    preprocessing_layer = tf.keras.applications.mobilenet_v2.preprocess_input
    
    # Apply augmentation and preprocessing to training data
    def prepare_train(image, label):
        image = tf.cast(image, tf.float32)  # Ensure image is float32
        image = data_augmentation(image)
        image = preprocessing_layer(image)
        return image, label
    
    def prepare_val(image, label):
        image = tf.cast(image, tf.float32)  # Ensure image is float32
        image = preprocessing_layer(image)
        return image, label
    
    # Apply preprocessing
    train_ds = train_ds.map(prepare_train, num_parallel_calls=AUTOTUNE)
    val_ds = val_ds.map(prepare_val, num_parallel_calls=AUTOTUNE)
    
    # 6. Create model function with proper error handling
    def build_model(num_classes, fine_tune=False):
        print(f"Building model with {num_classes} output classes...")
        try:
            # Use a reliable base model 
            base_model = keras.applications.MobileNetV2(
                weights="imagenet",
                include_top=False,
                input_shape=(IMG_SIZE, IMG_SIZE, 3)
            )
            
            # Freeze base model layers initially
            base_model.trainable = False
            
            # Create model with reliable architecture
            inputs = keras.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
            x = base_model(inputs, training=False)
            x = keras.layers.GlobalAveragePooling2D()(x)
            x = keras.layers.BatchNormalization()(x)
            
            # Add dropout and dense layers (smaller than before to avoid overfitting)
            x = keras.layers.Dense(256, activation='relu')(x)
            x = keras.layers.Dropout(0.4)(x)
            x = keras.layers.Dense(128, activation='relu')(x)
            x = keras.layers.Dropout(0.3)(x)
            
            # Output layer with softmax activation
            outputs = keras.layers.Dense(num_classes, activation='softmax')(x)
            
            model = keras.Model(inputs, outputs)
            
            # Fine-tune the model if requested
            if fine_tune:
                # Unfreeze the top layers of the base model
                for layer in base_model.layers[-20:]:  # Reduced number of unfrozen layers
                    layer.trainable = True
                print("Fine-tuning last 20 layers of base model")
            
            return model
            
        except Exception as e:
            raise Exception(f"Error building the model: {str(e)}")
    
    # 7. Create and compile initial model
    model = build_model(len(class_names), fine_tune=False)
    
    # Compile with appropriate optimizer and learning rate
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    # 8. Set up callbacks for better training
    callbacks = [
        ModelCheckpoint(
            filepath=os.path.join(save_dir, 'checkpoint_best.h5'),
            save_best_only=True,
            monitor='val_accuracy',
            mode='max',
            verbose=1
        ),
        EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=3,
            min_lr=1e-6,
            verbose=1
        )
    ]
    
    # 9. Train the initial model (feature extraction phase)
    print("\nStarting initial training phase...")
    try:
        history = model.fit(
            train_ds,
            validation_data=val_ds,
            epochs=INITIAL_EPOCHS,
            callbacks=callbacks,
            class_weight=class_weights,
            verbose=1
        )
        
    except Exception as e:
        raise Exception(f"Error during initial training: {str(e)}")
    
    # 10. Fine-tuning phase
    print("\nStarting fine-tuning phase...")
    try:
        # Create fine-tuned model
        model = build_model(len(class_names), fine_tune=True)
        
        # Compile with lower learning rate for fine-tuning
        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.0001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
        
        # Train with fine-tuning
        fine_tune_history = model.fit(
            train_ds,
            validation_data=val_ds,
            epochs=FINE_TUNE_EPOCHS,
            callbacks=callbacks,
            class_weight=class_weights,
            verbose=1
        )
        
    except Exception as e:
        print(f"Error during fine-tuning: {str(e)}")
        print("Continuing with the best model from initial training phase...")
    
    
    # 12. Evaluate model on validation set
    try:
        # Build full validation dataset (without batching) for evaluation
        unbatched_val = val_ds.unbatch()
        val_images = []
        val_labels = []
        
        # This might be slow but is more reliable than using the batched dataset
        print("Preparing validation data for final evaluation...")
        for image, label in unbatched_val.take(1000):  # Limit to 1000 samples to avoid memory issues
            val_images.append(image)
            val_labels.append(label)
        
        if val_images:
            val_images = tf.stack(val_images)
            val_labels = tf.stack(val_labels)
            
            # Generate predictions
            predictions = model.predict(val_images)
            predicted_classes = np.argmax(predictions, axis=1)
            
    except Exception as e:
        print(f"Warning: Could not complete full evaluation: {str(e)}")
    
    # 13. Save the final model in multiple formats
    print("Saving final model...")
    try:
        # Save full model
        model.save(os.path.join(save_dir, 'final_model.h5'))
        
        # Save as TensorFlow Lite model with quantization
        converter = tf.lite.TFLiteConverter.from_keras_model(model)
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        
        # The following line ensures better compatibility
        converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS]
        
        tflite_model = converter.convert()
        
        with open(os.path.join(save_dir, 'model.tflite'), 'wb') as f:
            f.write(tflite_model)
        
        print(f"Model successfully saved to {save_dir}")
        print("Files created:")
        print(f"- {os.path.join(save_dir, 'final_model.h5')}")
        print(f"- {os.path.join(save_dir, 'model.tflite')}")
        print(f"- {os.path.join(save_dir, 'class_names.txt')}")
  
        
        return model, class_names
        
    except Exception as e:
        raise Exception(f"Error saving the model: {str(e)}")

if __name__ == "__main__":
    # Add your path for the dataset 
    dataset_path = r"D:\Test\rail_madad_model\model\model dataset"
    save_dir = r"D:\Test\rail_madad_model\Arin\models"
    
    try:
        model, class_names = create_improved_model(dataset_path, save_dir)
        print("Model creation completed successfully!")
    except Exception as e:
        print(f"ERROR: {str(e)}")

In [None]:
# Testing code for Model 3

import tensorflow.lite as tflite
import numpy as np
from tensorflow.keras.preprocessing import image
import os

# Load the proper class names from your saved file
class_names_path = r"D:\Test\rail_madad_model\Arin\models\class_names.txt"
with open(class_names_path, "r") as f:
    class_names = [line.strip() for line in f.readlines()]

print("Using these class names:", class_names)

# Load the TFLite model
interpreter = tflite.Interpreter(model_path=r"D:\Test\rail_madad_model\Arin\models\model.tflite")
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Added: Dictionary to map categories to their respective classes
CATEGORY_MAP = {
    "Security": ["fire smoke", "trackdefective"],
    "Medical": [ "medical assist"],
    "Cleanliness": ["unclean_coach","torn bed"],
    "Electrical": ["defective switch" ],
    "General": ["overcrowding", "broken glass"],
    "Not a Complaint":["tracknotdefective","clean coach","normal","non defective switch"],
    # "Coach Maintenance":["","broken door"]
}

# function to general model output into categories
def general_output(predicted_class):
    predicted_class = predicted_class.lower()
    for category, class_list in CATEGORY_MAP.items():
        if predicted_class in class_list:
            return {"category": category, "subcategory": predicted_class}

def predict_image(img_path):
    # Load and preprocess image exactly as in training
    img = image.load_img(img_path, target_size=(224, 224))
    img_array = image.img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)
    
    # Apply the EXACT SAME preprocessing as during training
    img_array = img_array / 127.5 - 1  # MobileNetV2 preprocessing
    
    # Run inference
    interpreter.set_tensor(input_details[0]['index'], img_array)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details[0]['index'])
    
    # Get predicted class and confidence
    predicted_class = np.argmax(output[0])
    confidence = output[0][predicted_class]
    
    # Print all class probabilities for debugging
    for i, prob in enumerate(output[0]):
        print(f"  {class_names[i]}: {prob:.4f}")
    
    return predicted_class, confidence

# Test multiple images
test_images = [
    r"C:\Users\Arin Thamke\Downloads\WhatsApp Image 2025-04-04 at 00.22.38_9af2ac5e.jpg",
    r"C:\Users\Arin Thamke\Downloads\test\OIP (6).jpg",
    r"C:\Users\Arin Thamke\Downloads\test\OIP.jpg",
    r"C:\Users\Arin Thamke\Downloads\test\OIP (7).jpg"
]

for img_path in test_images:
    print(f"\nPredicting: {os.path.basename(img_path)}")
    predicted_class, confidence = predict_image(img_path)
    predicted_class_name = class_names[predicted_class]
    print(f"Predicted class: {predicted_class_name}")
    print(f"Confidence: {confidence:.4f}")
    
    # Get the generalized category
    result = general_output(predicted_class_name)
    print(f"Category: {result['category']}")
    print(f"Subcategory: {result['subcategory']}")