In [47]:
%pip install segmentation-models-pytorch pytorch-lightning albumentations
%pip install --upgrade numpy
%pip install --upgrade matplotlib
%pip install tensorflow opencv-python scikit-learn pillow


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
Collecting numpy
  Using cached numpy-2.3.4-cp311-cp311-macosx_14_0_arm64.whl.metadata (62 kB)
Using cached numpy-2.3.4-cp311-cp311-macosx_14_0_arm64.whl (5.4 MB)
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 2.1.3
    Uninstalling numpy-2.1.3:
      Successfully uninstalled numpy-2.1.3
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
opencv-python 4.12.0.88 requires numpy<2.3.0,>=2; python_version >= "3.9", but you have numpy 2.3.4 which is incompatible.
opencv-python-head

##### Step 1. Convert YOLOv8 Data to U-Net Segmentation Masks

Summary: Converting the YOLO bounding boxes into U-Net pixel-wise segmentation masks (integer-encoded grayscale images).

In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import cv2

# Try importing matplotlib with error handling
try:
    import matplotlib.pyplot as plt
    MATPLOTLIB_AVAILABLE = True
except ImportError as e:
    print(f"Matplotlib import failed: {e}")
    print("Continuing without matplotlib for visualization")
    MATPLOTLIB_AVAILABLE = False

from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import precision_score, recall_score, f1_score
import glob

# Set random seeds for reproducibility
tf.random.set_seed(42)
np.random.seed(42)

print("Libraries imported successfully!")
print(f"TensorFlow version: {tf.__version__}")
print(f"NumPy version: {np.__version__}")

In [49]:
class TrafficSignDataset:
    def __init__(self, data_path, img_size=(256, 256)):
        self.data_path = data_path
        self.img_size = img_size
        self.images = []
        self.masks = []
        
    def find_data_directories(self):
        """Find train/val/test directories"""
        possible_paths = [
            os.path.join(self.data_path, 'train'),
            os.path.join(self.data_path, 'Train'),
            self.data_path  # if data is in root directory
        ]
        
        for path in possible_paths:
            if os.path.exists(path):
                images_dir = os.path.join(path, 'images')
                if os.path.exists(images_dir):
                    return path
        raise FileNotFoundError(f"Could not find data directories in {self.data_path}")
    
    def load_yolo_annotations(self, img_path, img_shape):
        """Convert YOLO format to segmentation mask"""
        # Get annotation file path
        annotation_path = img_path.replace('images', 'labels').replace('.jpg', '.txt')
        
        # Create empty mask
        mask = np.zeros(img_shape[:2], dtype=np.uint8)
        
        if os.path.exists(annotation_path):
            try:
                with open(annotation_path, 'r') as f:
                    lines = f.readlines()
                    
                for line in lines:
                    data = line.strip().split()
                    if len(data) >= 5:  # YOLO format: class x_center y_center width height
                        class_id = int(data[0])
                        x_center, y_center, width, height = map(float, data[1:5])
                        
                        # Convert normalized coordinates to pixel coordinates
                        h, w = img_shape[:2]
                        x_center *= w
                        y_center *= h
                        width *= w
                        height *= h
                        
                        # Calculate bounding box coordinates
                        x1 = max(0, int(x_center - width/2))
                        y1 = max(0, int(y_center - height/2))
                        x2 = min(w, int(x_center + width/2))
                        y2 = min(h, int(y_center + height/2))
                        
                        # Create binary mask for traffic signs
                        if x2 > x1 and y2 > y1:  # Ensure valid coordinates
                            mask[y1:y2, x1:x2] = 1
            except Exception as e:
                print(f"Error processing annotation {annotation_path}: {e}")
                    
        return mask
    
    def load_data(self, max_samples=None):
        """Load images and create corresponding masks"""
        base_path = self.find_data_directories()
        images_dir = os.path.join(base_path, 'images')
        
        if not os.path.exists(images_dir):
            # Try to find image files directly
            image_files = glob.glob(os.path.join(base_path, '*.jpg')) + \
                         glob.glob(os.path.join(base_path, '*.png'))
        else:
            image_files = glob.glob(os.path.join(images_dir, '*.jpg')) + \
                         glob.glob(os.path.join(images_dir, '*.png'))
        
        if max_samples:
            image_files = image_files[:max_samples]
        
        print(f"Found {len(image_files)} image files")
        
        for i, img_file in enumerate(image_files):
            if i % 500 == 0:
                print(f"Processing image {i}/{len(image_files)}")
                
            try:
                # Load and preprocess image
                image = cv2.imread(img_file)
                if image is None:
                    print(f"Warning: Could not load image {img_file}")
                    continue
                    
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                image = cv2.resize(image, self.img_size)
                image = image.astype(np.float32) / 255.0
                
                # Create mask
                original_img = cv2.imread(img_file)
                mask = self.load_yolo_annotations(img_file, original_img.shape)
                mask = cv2.resize(mask, self.img_size, interpolation=cv2.INTER_NEAREST)
                mask = mask.astype(np.float32)
                
                self.images.append(image)
                self.masks.append(mask)
                
            except Exception as e:
                print(f"Error processing image {img_file}: {e}")
                continue
            
        if len(self.images) == 0:
            raise ValueError("No images were successfully loaded!")
            
        self.images = np.array(self.images)
        self.masks = np.array(self.masks)
        self.masks = np.expand_dims(self.masks, -1)  # Add channel dimension
        
        print(f"Successfully loaded {len(self.images)} images and masks")
        print(f"Images shape: {self.images.shape}, Masks shape: {self.masks.shape}")
        
        return self.images, self.masks

In [50]:
def unet_model(input_size=(256, 256, 3)):
    """U-Net architecture for binary segmentation"""
    inputs = keras.Input(input_size)
    
    # Encoder (Contracting path)
    # Block 1
    conv1 = layers.Conv2D(64, 3, activation='relu', padding='same')(inputs)
    conv1 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv1)
    pool1 = layers.MaxPooling2D(pool_size=(2, 2))(conv1)
    
    # Block 2
    conv2 = layers.Conv2D(128, 3, activation='relu', padding='same')(pool1)
    conv2 = layers.Conv2D(128, 3, activation='relu', padding='same')(conv2)
    pool2 = layers.MaxPooling2D(pool_size=(2, 2))(conv2)
    
    # Block 3
    conv3 = layers.Conv2D(256, 3, activation='relu', padding='same')(pool2)
    conv3 = layers.Conv2D(256, 3, activation='relu', padding='same')(conv3)
    pool3 = layers.MaxPooling2D(pool_size=(2, 2))(conv3)
    
    # Block 4 - Bottleneck
    conv4 = layers.Conv2D(512, 3, activation='relu', padding='same')(pool3)
    conv4 = layers.Conv2D(512, 3, activation='relu', padding='same')(conv4)
    
    # Decoder (Expansive path)
    # Block 5
    up5 = layers.Conv2DTranspose(256, 2, strides=(2, 2), padding='same')(conv4)
    up5 = layers.concatenate([up5, conv3])
    conv5 = layers.Conv2D(256, 3, activation='relu', padding='same')(up5)
    conv5 = layers.Conv2D(256, 3, activation='relu', padding='same')(conv5)
    
    # Block 6
    up6 = layers.Conv2DTranspose(128, 2, strides=(2, 2), padding='same')(conv5)
    up6 = layers.concatenate([up6, conv2])
    conv6 = layers.Conv2D(128, 3, activation='relu', padding='same')(up6)
    conv6 = layers.Conv2D(128, 3, activation='relu', padding='same')(conv6)
    
    # Block 7
    up7 = layers.Conv2DTranspose(64, 2, strides=(2, 2), padding='same')(conv6)
    up7 = layers.concatenate([up7, conv1])
    conv7 = layers.Conv2D(64, 3, activation='relu', padding='same')(up7)
    conv7 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv7)
    
    # Output layer
    outputs = layers.Conv2D(1, 1, activation='sigmoid')(conv7)
    
    model = keras.Model(inputs=inputs, outputs=outputs, name='U-Net')
    return model

In [51]:
def calculate_iou(y_true, y_pred):
    """Calculate Intersection over Union"""
    y_pred = (y_pred > 0.5).astype(np.float32)
    intersection = np.sum(y_true * y_pred)
    union = np.sum(y_true) + np.sum(y_pred) - intersection
    return intersection / (union + 1e-7)

def calculate_metrics(y_true, y_pred):
    """Calculate precision, recall, and F1-score"""
    y_true_flat = y_true.flatten()
    y_pred_flat = (y_pred.flatten() > 0.5).astype(np.float32)
    
    precision = precision_score(y_true_flat, y_pred_flat, zero_division=0)
    recall = recall_score(y_true_flat, y_pred_flat, zero_division=0)
    f1 = f1_score(y_true_flat, y_pred_flat, zero_division=0)
    
    return precision, recall, f1

In [52]:
def train_model(data_path, epochs=30, batch_size=8, max_samples=1000):
    """Train U-Net model with simple train/validation split"""
    
    print("Loading dataset...")
    dataset = TrafficSignDataset(data_path)
    X, y = dataset.load_data(max_samples=max_samples)
    
    # Split data
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42, shuffle=True
    )
    
    print(f"Training samples: {X_train.shape[0]}")
    print(f"Validation samples: {X_val.shape[0]}")
    
    # Create model
    print("Creating U-Net model...")
    model = unet_model()
    
    # Compile model
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=1e-4),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    
    print("Model summary:")
    model.summary()
    
    # Callbacks
    callbacks = [
        keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True,
            verbose=1
        ),
        keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            verbose=1
        )
    ]
    
    # Train model
    print("Starting training...")
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=callbacks,
        verbose=1
    )
    
    # Evaluate model
    print("Evaluating model...")
    y_pred = model.predict(X_val)
    iou = calculate_iou(y_val, y_pred)
    precision, recall, f1 = calculate_metrics(y_val, y_pred)
    
    print("\n" + "="*50)
    print("FINAL RESULTS")
    print("="*50)
    print(f"IoU: {iou:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-score: {f1:.4f}")
    
    return model, history, (iou, precision, recall, f1)

In [53]:
def main():
    # Update this path to your dataset location
    data_path = "/path/to/your/dataset"  # Change this!
    
    # For testing with a small subset first
    test_mode = True
    
    if test_mode:
        print("Running in TEST MODE with limited samples...")
        model, history, metrics = train_model(
            data_path, 
            epochs=5, 
            batch_size=4, 
            max_samples=100
        )
    else:
        print("Running in FULL TRAINING MODE...")
        model, history, metrics = train_model(
            data_path, 
            epochs=50, 
            batch_size=8, 
            max_samples=None
        )
    
    # Save the model
    model.save('traffic_sign_unet_model.h5')
    print("\nModel saved as 'traffic_sign_unet_model.h5'")
    
    return model, metrics

# Run the main function
if __name__ == "__main__":
    try:
        model, metrics = main()
    except Exception as e:
        print(f"Error occurred: {e}")
        print("Please check your data path and dataset structure")

Running in TEST MODE with limited samples...
Loading dataset...
Error occurred: Could not find data directories in /path/to/your/dataset
Please check your data path and dataset structure


In [54]:
def quick_test():
    """Quick test to verify the implementation works"""
    print("Running quick test...")
    
    # Create a simple test
    test_input = np.random.random((1, 256, 256, 3)).astype(np.float32)
    
    # Test model creation
    model = unet_model()
    output = model.predict(test_input)
    
    print(f"Input shape: {test_input.shape}")
    print(f"Output shape: {output.shape}")
    print("Model test passed!")
    
    # Test metrics
    y_true = np.random.randint(0, 2, (10, 256, 256, 1)).astype(np.float32)
    y_pred = np.random.random((10, 256, 256, 1)).astype(np.float32)
    
    iou = calculate_iou(y_true, y_pred)
    precision, recall, f1 = calculate_metrics(y_true, y_pred)
    
    print(f"Metrics test - IoU: {iou:.4f}, Precision: {precision:.4f}")

# Run quick test
quick_test()

Running quick test...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 433ms/step
Input shape: (1, 256, 256, 3)
Output shape: (1, 256, 256, 1)
Model test passed!
Metrics test - IoU: 0.3341, Precision: 0.5007


In [55]:
# import os
# import numpy as np
# from PIL import Image, ImageDraw
# import shutil # New import for file operations

# # --- Configuration ---
# IMG_SIZE = 416 
# YOLO_CLASSES = ['Green Light', 'Red Light', 'Speed Limit 10', 'Speed Limit 100', 'Speed Limit 110', 'Speed Limit 120', 'Speed Limit 20', 'Speed Limit 30', 'Speed Limit 40', 'Speed Limit 50', 'Speed Limit 60', 'Speed Limit 70', 'Speed Limit 80', 'Speed Limit 90', 'Stop'] 
# YOLO_TO_UNET_MAP = {str(i): i + 1 for i in range(len(YOLO_CLASSES))}

# # IMPORTANT: Ensure these paths are correct for YOUR system
# BASE_DIR = './data/car'       # Source of original images and YOLO labels
# OUT_DIR = './data_processed'  # Destination for new /images and /masks folders

# for split in ['train', 'valid', 'test']:
#     image_source_dir = os.path.join(BASE_DIR, split, 'images')
#     label_dir = os.path.join(BASE_DIR, split, 'labels')
    
#     # Destination directories
#     image_output_dir = os.path.join(OUT_DIR, split, 'images') # New path for images
#     mask_output_dir = os.path.join(OUT_DIR, split, 'masks')
    
#     # Create all required destination folders
#     os.makedirs(image_output_dir, exist_ok=True)
#     os.makedirs(mask_output_dir, exist_ok=True)

#     if not os.path.exists(label_dir) or not os.path.exists(image_source_dir):
#         print(f"Skipping {split}: Source directories not found.")
#         continue
    
#     print(f"Processing {split} split...")
    
#     for label_file in os.listdir(label_dir):
#         if label_file.endswith('.txt'):
#             base_name = label_file.replace('.txt', '')
#             image_name = base_name + '.jpg' # Assuming image format is .jpg
            
#             # --- ACTION 1: Copy Image ---
#             source_image_path = os.path.join(image_source_dir, image_name)
#             destination_image_path = os.path.join(image_output_dir, image_name)
            
#             if os.path.exists(source_image_path):
#                 shutil.copyfile(source_image_path, destination_image_path)
            
#             # --- ACTION 2: Create Mask ---
#             mask = Image.new('L', (IMG_SIZE, IMG_SIZE), 0)
#             draw = ImageDraw.Draw(mask)
            
#             with open(os.path.join(label_dir, label_file), 'r') as f:
#                 for line in f:
#                     parts = line.strip().split()
#                     if len(parts) != 5: continue
                        
#                     yolo_id, x_c, y_c, w, h = parts[0], *map(float, parts[1:])
#                     unet_id = YOLO_TO_UNET_MAP.get(yolo_id)

#                     x_min = int((x_c - w/2) * IMG_SIZE)
#                     y_min = int((y_c - h/2) * IMG_SIZE)
#                     x_max = int((x_c + w/2) * IMG_SIZE)
#                     y_max = int((y_c + h/2) * IMG_SIZE)

#                     if unet_id is not None:
#                         draw.rectangle([x_min, y_min, x_max, y_max], fill=unet_id)

#             output_path = os.path.join(mask_output_dir, base_name + '.png')
#             mask.save(output_path)

# print("\n✅ Data preparation complete. The 'data_processed' directory now has both /images and /masks.")

##### Step 2. Class Weight Calculation