In [None]:
pip install tensorflow opencv-python matplotlib seaborn pandas numpy tqdm psutil

In [15]:
import os
import sys
import glob
import io
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate
from tensorflow.keras.layers import Dropout, GlobalAveragePooling2D, Dense, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, CSVLogger
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import seaborn as sns
from datetime import datetime
import time
import psutil
from tqdm import tqdm
import os
import sys
import glob
import shutil
from datetime import datetime

# Set random seeds for reproducibility
tf.random.set_seed(42)
np.random.seed(42)

In [16]:
# Define paths
BASE_PATH = './'
TRAIN_PATH = os.path.join(BASE_PATH, 'train')
VAL_PATH = os.path.join(BASE_PATH, 'validation')
TEST_PATH = os.path.join(BASE_PATH, 'test')

# Create directories for logs and checkpoints
LOGS_PATH = os.path.join(BASE_PATH, 'logs')
CHECKPOINTS_PATH = os.path.join(BASE_PATH, 'checkpoints')
os.makedirs(LOGS_PATH, exist_ok=True)
os.makedirs(CHECKPOINTS_PATH, exist_ok=True)

In [17]:
def detection_loss(y_true, y_pred):
    """Custom loss function combining classification and bounding box regression"""
    # Presence/absence loss (binary cross-entropy)
    presence_loss = tf.keras.losses.binary_crossentropy(y_true[:, 0], y_pred[:, 0])
    
    # Only compute box loss when license plate is present
    mask = y_true[:, 0]
    
    # Bounding box regression loss (smooth L1/Huber loss)
    box_loss = tf.keras.losses.huber(y_true[:, 1:], y_pred[:, 1:], delta=1.0)
    box_loss = tf.reduce_mean(box_loss, axis=-1)
    box_loss = tf.reduce_mean(box_loss * mask)
    
    # Combine losses with weighting
    total_loss = presence_loss + 5.0 * box_loss
    return total_loss


In [18]:
class CheckpointManager:
    def __init__(self, checkpoint_dir, max_checkpoints=2):
        self.checkpoint_dir = checkpoint_dir
        self.max_checkpoints = max_checkpoints

    def get_latest_checkpoint(self):
        checkpoints = glob.glob(os.path.join(self.checkpoint_dir, 'checkpoint_*.keras'))
        if not checkpoints:
            return None, 0

        checkpoints.sort(key=lambda x: int(x.split('_')[-1].split('.')[0]))
        latest_checkpoint = checkpoints[-1]
        latest_epoch = int(latest_checkpoint.split('_')[-1].split('.')[0])
        return latest_checkpoint, latest_epoch

    def cleanup_old_checkpoints(self):
        checkpoints = glob.glob(os.path.join(self.checkpoint_dir, 'checkpoint_*.keras'))
        if len(checkpoints) > self.max_checkpoints:
            checkpoints.sort(key=lambda x: int(x.split('_')[-1].split('.')[0]))
            os.remove(checkpoints[0])


In [19]:
def load_data(csv_file, img_folder, batch_size=32):
    """Load and preprocess image data with caching"""
    cache_path = os.path.join(os.path.dirname(csv_file), 'cached_dataset.npz')
    
    if os.path.exists(cache_path):
        print("Loading cached dataset...")
        data = np.load(cache_path, allow_pickle=True)
        images = data['images']
        labels = data['labels']
    else:
        print("Creating new dataset cache...")
        df = pd.read_csv(csv_file)
        images = []
        labels = []

        for idx, row in tqdm(df.iterrows(), total=len(df)):
            try:
                img_path = os.path.join(img_folder, row['filename'])
                if not os.path.exists(img_path):
                    continue

                img = load_img(img_path, target_size=(256, 256))
                img_array = img_to_array(img) / 255.0

                if row['license_plate_present'] == 1:
                    bbox = [1, row['xmin']/row['width'], row['ymin']/row['height'],
                           row['xmax']/row['width'], row['ymax']/row['height']]
                else:
                    bbox = [0, 0, 0, 0, 0]

                images.append(img_array)
                labels.append(bbox)

            except Exception as e:
                print(f"\nError processing {row['filename']}: {str(e)}")
                continue

        images = np.array(images)
        labels = np.array(labels)

        np.savez_compressed(cache_path, images=images, labels=labels)

    dataset = tf.data.Dataset.from_tensor_slices((images, labels))
    dataset = dataset.shuffle(buffer_size=1000)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)

    return dataset


In [20]:
def detection_model(input_size=(256, 256, 3)):
    """Create detection model architecture with regularization"""
    inputs = Input(input_size)

    # Encoder
    x = Conv2D(64, 3, activation='relu', padding='same',
               kernel_regularizer=tf.keras.regularizers.l2(0.01))(inputs)
    x = BatchNormalization()(x)
    x = Dropout(0.2)(x)

    x = Conv2D(128, 3, activation='relu', padding='same',
               kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D()(x)
    x = Dropout(0.3)(x)

    x = Conv2D(256, 3, activation='relu', padding='same',
               kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D()(x)
    x = Dropout(0.4)(x)

    # Detection head
    x = GlobalAveragePooling2D()(x)
    x = Dense(256, activation='relu',
              kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
    x = Dropout(0.5)(x)
    outputs = Dense(5, activation='sigmoid')(x)

    return Model(inputs=inputs, outputs=outputs)


In [21]:
class TrainingProgressCallback(tf.keras.callbacks.Callback):
    """Custom callback for detailed progress tracking"""
    def __init__(self):
        super().__init__()
        self.epoch_times = []

    def on_train_begin(self, logs=None):
        headers = ['Epoch', 'Train Loss', 'Train Acc', 'Val Loss', 'Val Acc', 'LR', 'Time', 'Memory(MB)']
        self.row_format = "{:^7} {:^12} {:^12} {:^12} {:^12} {:^10} {:^8} {:^12}"
        print("\n" + "="*88)
        print(self.row_format.format(*headers))
        print("="*88)

    def on_epoch_begin(self, epoch, logs=None):
        self.epoch_start_time = time.time()

    def on_epoch_end(self, epoch, logs=None):
        epoch_time = time.time() - self.epoch_start_time
        self.epoch_times.append(epoch_time)
        memory_used = psutil.Process().memory_info().rss / 1024 / 1024
        current_lr = float(tf.keras.backend.get_value(self.model.optimizer.learning_rate))

        values = [
            epoch + 1,
            f"{logs.get('loss'):.4f}",
            f"{logs.get('accuracy'):.4f}",
            f"{logs.get('val_loss'):.4f}",
            f"{logs.get('val_accuracy'):.4f}",
            f"{current_lr:.6f}",
            f"{epoch_time:.1f}s",
            f"{memory_used:.1f}"
        ]
        print(self.row_format.format(*values))


In [22]:
class RollingCheckpointCallback(tf.keras.callbacks.Callback):
    def __init__(self, checkpoint_dir, max_checkpoints=2):
        super().__init__()
        self.checkpoint_dir = checkpoint_dir
        self.max_checkpoints = max_checkpoints
        os.makedirs(checkpoint_dir, exist_ok=True)

    def on_epoch_end(self, epoch, logs=None):
        # Save current model state
        checkpoint_path = os.path.join(
            self.checkpoint_dir, 
            f'checkpoint_{epoch:03d}_loss_{logs["val_loss"]:.4f}.keras'
        )
        self.model.save(checkpoint_path)

        # Get list of checkpoints sorted by epoch number
        checkpoints = glob.glob(os.path.join(self.checkpoint_dir, 'checkpoint_*.keras'))
        checkpoints.sort(key=lambda x: int(x.split('_')[1]))

        # Remove oldest checkpoint if exceeding max_checkpoints
        if len(checkpoints) > self.max_checkpoints:
            os.remove(checkpoints[0])


In [23]:
def load_latest_checkpoint(checkpoint_dir):
    checkpoints = glob.glob(os.path.join(checkpoint_dir, 'checkpoint_*.keras'))
    if not checkpoints:
        return None, 0
    
    checkpoints.sort(key=lambda x: int(x.split('_')[1]))
    latest_checkpoint = checkpoints[-1]
    epoch = int(latest_checkpoint.split('_')[1])
    return latest_checkpoint, epoch

def train_model():
    latest_checkpoint, initial_epoch = load_latest_checkpoint(CHECKPOINTS_PATH)

    if latest_checkpoint:
        print(f"Resuming from checkpoint: {latest_checkpoint}")
        model = load_model(latest_checkpoint, custom_objects={'detection_loss': detection_loss})
    else:
        print("Starting new training")
        model = detection_model()
        model.compile(
            optimizer=Adam(learning_rate=1e-4),
            loss=detection_loss,
            metrics=['accuracy']
        )
        initial_epoch = 0

    train_data = load_data(os.path.join(TRAIN_PATH, 'annotations.csv'),
                          os.path.join(TRAIN_PATH, 'img'),
                          batch_size=16)

    val_data = load_data(os.path.join(VAL_PATH, 'annotations.csv'),
                        os.path.join(VAL_PATH, 'img'),
                        batch_size=16)

    callbacks = [
        RollingCheckpointCallback(CHECKPOINTS_PATH, max_checkpoints=2),
        EarlyStopping(patience=10, restore_best_weights=True),
        ReduceLROnPlateau(factor=0.1, patience=5, min_lr=1e-6),
        TrainingProgressCallback()
    ]

    history = model.fit(
        train_data,
        validation_data=val_data,
        epochs=100,
        initial_epoch=initial_epoch,
        callbacks=callbacks,
        verbose=1
    )

    return model, history

def evaluate_model(model, test_data):
    """Evaluate model and visualize results"""
    test_predictions = model.predict(test_data)
    
    # Calculate metrics
    presence_accuracy = np.mean(
        (test_predictions[:, 0] > 0.5) == (test_data.get_single_element()[1][:, 0] > 0.5)
    )
    
    print(f"\nTest Set Metrics:")
    print(f"Presence Accuracy: {presence_accuracy:.4f}")
    
    # Visualize results
    test_images, test_labels = next(iter(test_data.take(1)))
    predictions = model.predict(test_images)
    
    plt.figure(figsize=(15, 10))
    for i in range(min(5, len(test_images))):
        plt.subplot(1, 5, i+1)
        plt.imshow(test_images[i])
        
        # Ground truth box
        if test_labels[i][0] > 0.5:
            rect = patches.Rectangle(
                (test_labels[i][1] * 256, test_labels[i][2] * 256),
                (test_labels[i][3] - test_labels[i][1]) * 256,
                (test_labels[i][4] - test_labels[i][2]) * 256,
                linewidth=2, edgecolor='g', facecolor='none', label='Ground Truth'
            )
            plt.gca().add_patch(rect)
        
        # Predicted box
        if predictions[i][0] > 0.5:
            rect = patches.Rectangle(
                (predictions[i][1] * 256, predictions[i][2] * 256),
                (predictions[i][3] - predictions[i][1]) * 256,
                (predictions[i][4] - predictions[i][2]) * 256,
                linewidth=2, edgecolor='r', facecolor='none', label='Prediction'
            )
            plt.gca().add_patch(rect)
        
        plt.title(f'Conf: {predictions[i][0]:.2f}')
        plt.axis('off')
    
    plt.tight_layout()
    plt.show()



In [24]:
if __name__ == "__main__":
    # Train model
    model, history = train_model()
    
    # Load and evaluate on test set
    test_data = load_data(
        os.path.join(TEST_PATH, 'annotations.csv'),
        os.path.join(TEST_PATH, 'img'),
        batch_size=16
    )
    
    evaluate_model(model, test_data)

[1m 19/360[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m50:52[0m 9s/step - accuracy: 0.0386 - loss: 6.0673

KeyboardInterrupt: 