In [8]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, Model, losses, optimizers
import cv2
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import albumentations as A  # Augmentations
from ultralytics import YOLO  # YOLOv8
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [9]:
class KITTIDataLoader:
    def __init__(self, data_path, img_size=(416, 416)):
        self.data_path = data_path
        self.img_size = img_size
        self.classes = ["car", "pedestrian", "cyclist"]

    def load_data(self):
        """Loads images and labels from KITTI dataset."""
        images, labels = [], []
        try:
            img_dir = os.path.join(self.data_path, "image_2")
            label_dir = os.path.join(self.data_path, "label_2")
            
            for img_file in os.listdir(img_dir):
                img_path = os.path.join(img_dir, img_file)
                label_path = os.path.join(label_dir, img_file.replace(".png", ".txt"))
                
                # Load and resize image
                img = cv2.imread(img_path)
                img = cv2.resize(img, self.img_size)
                images.append(img)
                
                # Parse labels (simplified)
                labels.append(self._parse_labels(label_path))
                
            return np.array(images), np.array(labels)
        except Exception as e:
            logger.error(f"Error loading data: {e}")
            raise

    def _parse_labels(self, label_path):
        """Parses KITTI label files into [class_idx, x_min, y_min, x_max, y_max] format."""
        labels = []
        with open(label_path, 'r') as f:
            for line in f.readlines():
                parts = line.strip().split()
                class_name = parts[0]
                if class_name in self.classes:
                    bbox = list(map(float, parts[4:8]))  # x1, y1, x2, y2
                    labels.append([self.classes.index(class_name)] + bbox)
        return labels

In [None]:
def load_kitti_data(data_path, img_size=(416, 416)):
    images = []
    labels = []
    
    # Example: Load images and labels from KITTI
    for img_file in os.listdir(os.path.join(data_path, "image_2")):
        img = cv2.imread(os.path.join(data_path, "image_2", img_file))
        img = cv2.resize(img, img_size)
        images.append(img)
        
        # Load corresponding label (simplified)
        label_file = os.path.join(data_path, "label_2", img_file.replace(".png", ".txt"))
        labels.append(parse_kitti_label(label_file))  # Implement this based on KITTI format
    
    return np.array(images), np.array(labels)

# Data Augmentation Pipeline
augmentation = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.Rotate(limit=20, p=0.3),
])

In [10]:
class SiameseNetwork:
    def __init__(self, input_shape=(416, 416, 3)):
        self.input_shape = input_shape
        self.model = self._build_model()

    def _build_model(self):
        """Builds a Siamese Network with ResNet50 backbone."""
        base_cnn = tf.keras.applications.ResNet50(
            weights="imagenet", 
            include_top=False, 
            input_shape=self.input_shape
        )
        
        # Freeze base CNN
        for layer in base_cnn.layers:
            layer.trainable = False

        # Twin networks
        input_a = layers.Input(self.input_shape)
        input_b = layers.Input(self.input_shape)
        
        processed_a = base_cnn(input_a)
        processed_b = base_cnn(input_b)
        
        # Distance layer
        distance = layers.Lambda(
            lambda tensors: tf.abs(tensors[0] - tensors[1])
        )([processed_a, processed_b])
        
        # Classification head
        output = layers.Dense(1, activation="sigmoid")(distance)
        
        return Model(inputs=[input_a, input_b], outputs=output)

    def train(self, train_pairs, val_pairs, epochs=10):
        """Trains the Siamese network."""
        self.model.compile(
            optimizer=optimizers.Adam(0.0001),
            loss=losses.BinaryCrossentropy(),
            metrics=["accuracy"]
        )
        
        history = self.model.fit(
            [train_pairs[0], train_pairs[1]],
            train_pairs[2],
            validation_data=([val_pairs[0], val_pairs[1]], val_pairs[2]),
            epochs=epochs,
            batch_size=32
        )
        
        return history

In [11]:
class GAN:
    def __init__(self, latent_dim=100, img_shape=(416, 416, 3)):
        self.latent_dim = latent_dim
        self.img_shape = img_shape
        self.generator = self._build_generator()
        self.discriminator = self._build_discriminator()
        self.gan = self._build_gan()

    def _build_generator(self):
        """Builds the generator model."""
        model = models.Sequential([
            layers.Dense(256, input_dim=self.latent_dim),
            layers.LeakyReLU(alpha=0.2),
            layers.BatchNormalization(),
            layers.Dense(512),
            layers.LeakyReLU(alpha=0.2),
            layers.BatchNormalization(),
            layers.Dense(np.prod(self.img_shape), activation="tanh"),
            layers.Reshape(self.img_shape)
        ])
        return model

    def _build_discriminator(self):
        """Builds the discriminator model."""
        model = models.Sequential([
            layers.Flatten(input_shape=self.img_shape),
            layers.Dense(512),
            layers.LeakyReLU(alpha=0.2),
            layers.Dense(256),
            layers.LeakyReLU(alpha=0.2),
            layers.Dense(1, activation="sigmoid")
        ])
        return model

    def _build_gan(self):
        """Combines generator and discriminator."""
        self.discriminator.compile(
            optimizer=optimizers.Adam(0.0002, 0.5),
            loss="binary_crossentropy"
        )
        self.discriminator.trainable = False
        
        z = layers.Input(shape=(self.latent_dim,))
        img = self.generator(z)
        validity = self.discriminator(img)
        
        combined = Model(z, validity)
        combined.compile(
            optimizer=optimizers.Adam(0.0002, 0.5),
            loss="binary_crossentropy"
        )
        return combined

    def train(self, real_images, epochs=100, batch_size=32):
        """Trains the GAN."""
        valid = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))
        
        for epoch in range(epochs):
            # Train Discriminator
            idx = np.random.randint(0, real_images.shape[0], batch_size)
            real_imgs = real_images[idx]
            
            noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
            gen_imgs = self.generator.predict(noise)
            
            d_loss_real = self.discriminator.train_on_batch(real_imgs, valid)
            d_loss_fake = self.discriminator.train_on_batch(gen_imgs, fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
            
            # Train Generator
            noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
            g_loss = self.gan.train_on_batch(noise, valid)
            
            if epoch % 10 == 0:
                logger.info(f"Epoch {epoch}, D Loss: {d_loss[0]}, G Loss: {g_loss}")
                

In [12]:
class ObjectDetector:
    def __init__(self, model_type="yolov8n.pt"):
        self.model = YOLO(model_type)

    def train(self, data_yaml, epochs=50, imgsz=416):
        """Fine-tunes YOLOv8 on KITTI data."""
        results = self.model.train(
            data=data_yaml,
            epochs=epochs,
            imgsz=imgsz,
            batch=16,
            augment=True
        )
        return results

    def evaluate(self, val_data):
        """Evaluates model performance."""
        metrics = self.model.val(data=val_data)
        return metrics

    def detect(self, image):
        """Runs inference on a single image."""
        results = self.model(image)
        return results[0].plot()  # Returns image with bounding boxes

In [None]:
if __name__ == "__main__":
    # Step 1: Load KITTI Data
    data_loader = KITTIDataLoader("path/to/kitti")   #add ur file 
    images, labels = data_loader.load_data()
    
    # Step 2: Train Siamese Network (example)
    siamese = SiameseNetwork()
    # Generate pairs (anchor, positive/negative, label)
    pairs = generate_pairs(images, labels)  # Implement this
    siamese.train(pairs["train"], pairs["val"])
    
    # Step 3: Train GAN
    gan = GAN()
    gan.train(images)
    
    # Step 4: Generate synthetic data
    synthetic_images = gan.generator.predict(np.random.normal(0, 1, (100, 100)))
    
    # Step 5: Train YOLOv8
    detector = ObjectDetector()
    detector.train("kitti.yaml")
    
    # Step 6: Test detection
    test_img = images[0]
    result_img = detector.detect(test_img)
    plt.imshow(result_img)
    plt.show()


├── /data
│   ├── /image_2
│   │   ├── /training         <-- Training images from left camera
│   │   └── /testing          <-- Testing images from left camera
│   ├── /image_3
│   │   ├── /training         <-- Training images from right camera
│   │   └── /testing          <-- Testing images from right camera
│   └── /calib

