In [20]:
import os
import cv2
import torch
import time
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from tqdm import tqdm
from google.colab import drive
from PIL import Image
import tensorflow as tf
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler, EarlyStopping
from tensorflow.keras.layers import LSTM, Dense, TimeDistributed, Flatten, Input, Bidirectional, Average, Attention, InputLayer, GlobalAveragePooling1D
from tensorflow.keras.regularizers import l2
from enum import Enum
import threading
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split, GridSearchCV
from scikeras.wrappers import KerasClassifier
from sklearn.metrics import accuracy_score

In [21]:
# Mount Google Drive
drive.mount('/content/drive')

# Define paths
img_folder = "/content/drive/MyDrive/idd_mm_primary_dataset/idd_multimodal/primary/d0/leftCamImgs"
results_dir = os.path.join(img_folder, 'detections')
os.makedirs(results_dir, exist_ok=True)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [32]:
# YOLO model path
yolo_weights_path = '/content/yolov5/yolov5s.pt'

# Define global variables
sequence_length = 15
time_step = 15

# Function definitions
def load_yolo_model(weights_path):
    model = torch.hub.load('ultralytics/yolov5', 'custom', path=weights_path, force_reload=True)
    return model.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))

def perform_inference(model, img_path):
    img = cv2.imread(img_path)
    results = model(img, size=640)
    return results, img

def focal_loss(gamma=2., alpha=.25):
    def focal_loss_fixed(y_true, y_pred):
        pt_1 = tf.where(tf.equal(y_true, 1), y_pred, tf.ones_like(y_pred))
        pt_0 = tf.where(tf.equal(y_true, 0), y_pred, tf.zeros_like(y_pred))
        return -tf.reduce_sum(alpha * tf.pow(1. - pt_1, gamma) * tf.log(pt_1)) - tf.reduce_sum((1-alpha) * tf.pow( pt_0, gamma) * tf.log(1. - pt_0))
    return focal_loss_fixed

def lr_schedule(epoch):
    return 0.001 * (0.1 ** int(epoch / 10))

def visualize_attention(model, sequence):
    # Find the Attention layer
    attention_layer = [layer for layer in model.layers if isinstance(layer, Attention)][0]

    # Create a new model that outputs the attention weights
    attention_model = Model(inputs=model.input, outputs=attention_layer.output)

    # Get the attention weights
    sequence_input = np.expand_dims(sequence, axis=0)
    attention_weights = attention_model.predict(sequence_input)

    # The attention weights might be in different shapes depending on the exact Attention layer used
    # Let's flatten it to a 2D array
    if len(attention_weights.shape) == 4:
        attention_weights = attention_weights.squeeze()
    elif len(attention_weights.shape) == 3:
        attention_weights = attention_weights[0]
    # Plotting
    plt.figure(figsize=(10, 6))
    plt.imshow(attention_weights, aspect='auto', cmap='viridis')
    plt.title('Attention Weights')
    plt.xlabel('Sequence Steps')
    plt.ylabel('Attention Weight')
    plt.colorbar()
    plt.show()



def create_multitask_model(sequence_length, max_len, input_dim, lstm_units=128, dense_units=64):
    input_layer = Input(shape=(sequence_length, max_len, input_dim))
    x = TimeDistributed(Flatten())(input_layer)
    x = Bidirectional(LSTM(lstm_units, return_sequences=True, kernel_regularizer=l2(0.01)))(x)
    x = Attention()(x)
    shared = Dense(dense_units, activation='relu', kernel_regularizer=l2(0.01))(x)

    output1 = Dense(1, activation='sigmoid', name='cut_in')(shared)
    output2 = Dense(3, activation='softmax', name='direction')(shared)

    model = Model(inputs=input_layer, outputs=[output1, output2])
    model.compile(optimizer='adam',
                  loss={'cut_in': focal_loss(), 'direction': 'categorical_crossentropy'},
                  loss_weights={'cut_in': 1.0, 'direction': 0.5},
                  metrics={'cut_in': 'accuracy', 'direction': 'accuracy'})
    return model

def improved_cut_in_detection(detections, sequence_length=15, confidence_threshold=0.5, iou_threshold=0.5, movement_threshold=0.05):
    labels = []

    for i in range(sequence_length - 1, len(detections)):
        sequence = detections[i-sequence_length+1:i+1]
        cut_in_detected = False

        for j in range(1, len(sequence)):
            current_frame = sequence[j]
            previous_frame = sequence[j-1]

            current_detections = current_frame[current_frame[:, 4] > confidence_threshold]
            previous_detections = previous_frame[previous_frame[:, 4] > confidence_threshold]

            for detection in current_detections:
                if not any(calculate_iou(detection[:4], prev_det[:4]) > iou_threshold for prev_det in previous_detections):
                    if is_moving_towards_ego(detection, previous_detections) and is_in_cut_in_zone(detection):
                        cut_in_detected = True
                        break

        labels.append(1 if cut_in_detected else 0)

    return np.array([0] * (sequence_length - 1) + labels)

def calculate_iou(box1, box2):
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])

    intersection = max(0, x2 - x1) * max(0, y2 - y1)
    area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])

    iou = intersection / (area1 + area2 - intersection + 1e-6)
    return iou

def is_moving_towards_ego(detection, previous_detections, movement_threshold=0.05):
    center_x = (detection[0] + detection[2]) / 2
    previous_centers = [(det[0] + det[2]) / 2 for det in previous_detections]

    if previous_centers:
        avg_previous_center = sum(previous_centers) / len(previous_centers)
        return (center_x - avg_previous_center) > movement_threshold
    return False

def is_in_cut_in_zone(detection, image_width=640, cut_in_threshold=0.3):
    center_x = (detection[0] + detection[2]) / 2
    normalized_x = center_x / image_width
    return normalized_x < cut_in_threshold or normalized_x > (1 - cut_in_threshold)

def calculate_velocity(detections):
    velocities = []
    for i in range(1, len(detections)):
        prev_frame = detections[i-1]
        curr_frame = detections[i]
        frame_velocities = []
        for prev_det in prev_frame:
            for curr_det in curr_frame:
                if calculate_iou(prev_det[:4], curr_det[:4]) > 0.5:
                    velocity = (curr_det[:2] - prev_det[:2]) / time_step
                    frame_velocities.append(velocity)
        if frame_velocities:
            velocities.append(np.mean(frame_velocities, axis=0))
        else:
            velocities.append(np.zeros(2))
    velocities.insert(0, np.zeros(2))
    return np.array(velocities)

class WarningLevel(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3

class WarningSystem:
    def __init__(self, cooldown=3):
        self.last_warning_time = 0
        self.warning_cooldown = cooldown
        self.current_level = WarningLevel.LOW

    def send_warning_signal(self, level=WarningLevel.MEDIUM, message=""):
        current_time = time.time()
        if current_time - self.last_warning_time >= self.warning_cooldown:
            self.last_warning_time = current_time
            self.current_level = level
            print(f"Warning: {message} (Level: {level.name})")
    def _activate_warning(self, level, message):
        if level == WarningLevel.LOW:
            self._visual_warning(message)
        elif level == WarningLevel.MEDIUM:
            self._visual_warning(message)
            self._audio_warning()
        elif level == WarningLevel.HIGH:
            self._visual_warning(message)
            self._audio_warning()
            self._haptic_warning()
    def _visual_warning(self, message):
        print(f"Visual Warning: {message}")

    def _audio_warning(self):
        print("Audio Warning: Beep! Beep!")

    def _haptic_warning(self):
        print("Haptic Warning: Steering wheel vibration")

def create_sequences(features, labels, sequence_length):
    X, y = [], []
    for i in range(len(labels) - sequence_length + 1):
        if i + sequence_length <= len(features):
            X.append(features[i:i+sequence_length])
            y.append(labels[i+sequence_length-1])
    return np.array(X), np.array(y)

def generate_direction_labels(detections, image_width=640):
    directions = []
    for frame_detections in detections:
        if len(frame_detections) > 0:
            center_x = np.mean([(det[0] + det[2]) / 2 for det in frame_detections])
            if center_x < image_width / 3:
                directions.append([1, 0, 0])  # Left
            elif center_x > 2 * image_width / 3:
                directions.append([0, 0, 1])  # Right
            else:
                directions.append([0, 1, 0])  # Straight
        else:
            directions.append([0, 1, 0])  # No detection, assume straight
    return np.array(directions)

def create_model(lstm_units=128, dense_units=64):
    input_layer = Input(shape=(X_train.shape[1], X_train.shape[2], X_train.shape[3]))
    x = TimeDistributed(Flatten())(input_layer)
    x = LSTM(lstm_units, return_sequences=False)(x)
    x = Dense(dense_units, activation='relu')(x)
    output = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=input_layer, outputs=output)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

def predict_and_warn(sequences, model, warning_system, threshold=0.5):
    for seq in sequences:
        start_time = time.time()
        seq_expanded = np.expand_dims(seq, axis=0)
        cut_in_prob, direction_prob = model.predict(seq_expanded)
        cut_in_prob = cut_in_prob[0][0]
        direction = np.argmax(direction_prob[0])

        if cut_in_prob > threshold:
            level = WarningLevel.HIGH if cut_in_prob > 0.8 else WarningLevel.MEDIUM
            direction_text = ['left', 'straight', 'right'][direction]
            warning_text = f"Cut-in detected with probability {cut_in_prob:.2f} from the {direction_text}"
            warning_system.send_warning_signal(level, warning_text)

        end_time = time.time()
        processing_time = end_time - start_time
        print(f"Processing time: {processing_time:.3f} seconds")

def predict_cut_in(model, sequence, threshold=0.5):
    cut_in_prob, direction_prob = model.predict(sequence)
    is_cut_in = cut_in_prob[0, 0] > threshold
    predicted_direction = np.argmax(direction_prob[0])
    return cut_in_prob[0, 0], is_cut_in, predicted_direction

def create_multitask_model(sequence_length, max_detections, feature_dim, lstm_units=128, dense_units=64):
    input_layer = Input(shape=(sequence_length, max_detections, feature_dim))
    x = TimeDistributed(Flatten())(input_layer)
    lstm_out = Bidirectional(LSTM(lstm_units, return_sequences=True, kernel_regularizer=l2(0.01)))(x)

    # Create query and value for attention
    query = Dense(lstm_units, use_bias=False)(lstm_out)
    value = Dense(lstm_units, use_bias=False)(lstm_out)

    # Apply attention
    attention_output = Attention()([query, value])

    # Global average pooling
    x = GlobalAveragePooling1D()(attention_output)

    shared = Dense(dense_units, activation='relu', kernel_regularizer=l2(0.01))(x)

    cut_in_output = Dense(1, activation='sigmoid', name='cut_in')(shared)
    direction_output = Dense(3, activation='softmax', name='direction')(shared)

    model = Model(inputs=input_layer, outputs=[cut_in_output, direction_output])
    model.compile(optimizer='adam',
                  loss={'cut_in': 'binary_crossentropy', 'direction': 'categorical_crossentropy'},
                  loss_weights={'cut_in': 1.0, 'direction': 0.5},
                  metrics={'cut_in': 'accuracy', 'direction': 'accuracy'})
    return model


def plot_training_history(history):
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['cut_in_accuracy'], label='Cut-in Train Accuracy')
    plt.plot(history.history['val_cut_in_accuracy'], label='Cut-in Validation Accuracy')
    plt.title('Model Cut-in Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['direction_accuracy'], label='Direction Train Accuracy')
    plt.plot(history.history['val_direction_accuracy'], label='Direction Validation Accuracy')
    plt.title('Model Direction Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()


In [4]:
# Main process
model = load_yolo_model(yolo_weights_path)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

img_paths = sorted(Path(img_folder).glob('*.jpg'))
detections = []

for img_path in tqdm(img_paths):
    results, img = perform_inference(model, str(img_path))
    save_path = os.path.join(results_dir, img_path.stem + '_detection.jpg')
    results.render()
    cv2.imwrite(save_path, img)
    detections.append(results.xyxy[0].cpu().numpy())

print("Detection completed. Results saved in:", results_dir)


Downloading: "https://github.com/ultralytics/yolov5/zipball/master" to /root/.cache/torch/hub/master.zip
YOLOv5 🚀 2024-7-15 Python-3.10.12 torch-2.3.0+cu121 CPU

Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients, 16.4 GFLOPs
Adding AutoShape... 
100%|██████████| 4530/4530 [56:22<00:00,  1.34it/s]

Detection completed. Results saved in: /content/drive/MyDrive/idd_mm_primary_dataset/idd_multimodal/primary/d0/leftCamImgs/detections





In [23]:
# Calculate velocities
velocities = calculate_velocity(detections)

# Combine detections and velocities to create features
features = []
for det, vel in zip(detections, velocities):
    frame_features = []
    for d in det:
        frame_features.append(np.concatenate([d, vel]))
    features.append(np.array(frame_features))

# Generate labels
cut_in_labels = improved_cut_in_detection(detections)
direction_labels = generate_direction_labels(detections)

# Ensure labels have the same length as features
cut_in_labels = cut_in_labels[:len(features)]
direction_labels = direction_labels[:len(features)]

# Combine detections and velocities
features = []
for det, vel in zip(detections, velocities):
    frame_features = []
    for d in det:
        frame_features.append(np.concatenate([d, vel]))
    features.append(np.array(frame_features))


In [24]:
# Create sequences
max_len = max(len(f) for f in features)
padded_features = [np.pad(f, ((0, max_len - len(f)), (0, 0)), mode='constant') for f in features]
sequences, labels = create_sequences(padded_features, cut_in_labels, sequence_length)
input_dim = sequences.shape[-1]
max_detections = sequences.shape[2]

direction_labels = generate_direction_labels(detections)


In [25]:
#Matching sequence length with that of cut_in and direction_label length
print("sequences length:", len(sequences))
print("cut_in_labels length:", len(cut_in_labels))
print("direction_labels length:", len(direction_labels))
min_length = min(len(sequences), len(cut_in_labels), len(direction_labels))

sequences = sequences[:min_length]
cut_in_labels = cut_in_labels[:min_length]
direction_labels = direction_labels[:min_length]

print("After trimming:")
print("sequences length:", len(sequences))
print("cut_in_labels length:", len(cut_in_labels))
print("direction_labels length:", len(direction_labels))

sequences length: 4516
cut_in_labels length: 4530
direction_labels length: 4530
After trimming:
sequences length: 4516
cut_in_labels length: 4516
direction_labels length: 4516


In [26]:
# Split into train+val and test sets
X_train_val, X_test, y_train_val_cut_in, y_test_cut_in, y_train_val_direction, y_test_direction = train_test_split(
    sequences, cut_in_labels, direction_labels, test_size=0.2, random_state=42)

# Now split the train+val into train and validation sets
X_train, X_val, y_train_cut_in, y_val_cut_in, y_train_direction, y_val_direction = train_test_split(
    X_train_val, y_train_val_cut_in, y_train_val_direction, test_size=0.2, random_state=42)

# Hyperparameter tuning
model = KerasClassifier(
    model=create_model,
    lstm_units=128,
    dense_units=64,
    epochs=10,
    batch_size=32,
    verbose=0
)
param_grid = {
    'lstm_units': [64, 128],
    'dense_units': [32, 64],
    'batch_size': [32],
    'epochs': [10]
}
grid = GridSearchCV(estimator=model, param_grid=param_grid, cv=2, error_score='raise')

try:
    grid_result = grid.fit(X_train, y_train_cut_in)
    print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
except Exception as e:
    print("Error during grid search:", str(e))


Best: 0.916930 using {'batch_size': 32, 'dense_units': 64, 'epochs': 10, 'lstm_units': 128}


In [27]:
# Use the best parameters to create your final model
best_params = grid_result.best_params_
final_model = create_multitask_model(sequence_length, max_detections, input_dim,
                                     lstm_units=best_params['lstm_units'],
                                     dense_units=best_params['dense_units'])

# Callbacks
lr_scheduler = LearningRateScheduler(lambda epoch: 0.001 * (0.1 ** (epoch // 10)))
early_stopping = EarlyStopping(monitor='val_loss', patience=5)

# Train the model
history = final_model.fit(
    X_train,
    {'cut_in': y_train_cut_in, 'direction': y_train_direction},
    validation_data=(X_val, {'cut_in': y_val_cut_in, 'direction': y_val_direction}),
    epochs=best_params['epochs'],
    batch_size=best_params['batch_size'],
    callbacks=[lr_scheduler, early_stopping]
)


Epoch 1/10
[1m91/91[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 99ms/step - cut_in_accuracy: 0.9016 - direction_accuracy: 0.9862 - loss: 4.7919 - val_cut_in_accuracy: 0.9295 - val_direction_accuracy: 0.9959 - val_loss: 1.1580 - learning_rate: 0.0010
Epoch 2/10
[1m91/91[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 106ms/step - cut_in_accuracy: 0.9236 - direction_accuracy: 0.9965 - loss: 1.0074 - val_cut_in_accuracy: 0.9115 - val_direction_accuracy: 0.9959 - val_loss: 0.7812 - learning_rate: 0.0010
Epoch 3/10
[1m91/91[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 107ms/step - cut_in_accuracy: 0.9109 - direction_accuracy: 0.9953 - loss: 0.7254 - val_cut_in_accuracy: 0.9115 - val_direction_accuracy: 0.9959 - val_loss: 0.6255 - learning_rate: 0.0010
Epoch 4/10
[1m91/91[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 78ms/step - cut_in_accuracy: 0.9104 - direction_accuracy: 0.9945 - loss: 0.6148 - val_cut_in_accuracy: 0.9142 - val_direction_accuracy: 0

In [28]:
# Evaluate the model on the test set
test_results = final_model.evaluate(
    X_test,
    {'cut_in': y_test_cut_in, 'direction': y_test_direction},
    verbose=0
)

# Print returned metrics
print("Test results:")
for metric_name, metric_value in zip(final_model.metrics_names, test_results):
    print(f"{metric_name}: {metric_value}")

# Calculate accuracies manually
y_pred = final_model.predict(X_test)
y_pred_cut_in = (y_pred[0] > 0.5).astype(int).flatten()
y_pred_direction = np.argmax(y_pred[1], axis=1)

cut_in_acc = accuracy_score(y_test_cut_in, y_pred_cut_in)
direction_acc = accuracy_score(np.argmax(y_test_direction, axis=1), y_pred_direction)

print(f"\nManually calculated accuracies:")
print(f"Cut-in test accuracy: {cut_in_acc}")
print(f"Direction test accuracy: {direction_acc}")

Test results:
loss: 0.39590978622436523
compile_metrics: 0.9181416034698486
[1m29/29[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 39ms/step

Manually calculated accuracies:
Cut-in test accuracy: 0.918141592920354
Direction test accuracy: 0.9988938053097345


In [33]:
# Visualize attention for a sample sequence
sample_sequence = X_test[0]
visualize_attention(final_model, sample_sequence)

# Save the model
final_model.save('/content/drive/MyDrive/vehicle_cut_in_multitask_model.h5')

# Initialize warning system
warning_system = WarningSystem()

# Predict and send warning signals
predict_and_warn(X_test, final_model, warning_system)

print("Model training and evaluation completed.")

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 620ms/step




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
Processing time: 0.099 seconds
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step
Processing time: 0.094 seconds
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
Processing time: 0.084 seconds
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
Processing time: 0.101 seconds
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
Processing time: 0.097 seconds
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
Processing time: 0.099 seconds
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step
Processing time: 0.095 seconds
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step
Processing time: 0.084 seconds
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step
Processing time: 0.091 seconds
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
P

In [31]:
# Call this function after training
plot_training_history(history)