In [1]:
import cv2
import os
import xml.etree.ElementTree as ET
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import Sequence
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
from PIL import Image
import shutil
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

In [2]:
video_path = 'C:/Users/HP/Zoomcamp/Datasets/nakawa_output.avi'
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    print("Error: Could not open video.")
    exit()

fps = cap.get(cv2.CAP_PROP_FPS)

# Frame extraction rate (every 2 seconds)
extraction_rate = 2  
frame_skip = int(fps * extraction_rate)

frame_dir = 'video_frames'
os.makedirs(frame_dir, exist_ok=True)

count = 0
frame_index = 0

while True:
    ret, frame = cap.read()

    if not ret:
        break

    if frame_index % frame_skip == 0:
        frame_filename = os.path.join(frame_dir, f"frame_{count:05d}.jpg")
        cv2.imwrite(frame_filename, frame)
        count += 1
    
    frame_index += 1

cap.release()
print(f"Frames extracted: {count}")

Frames extracted: 1248


In [3]:
# Base directory where the exported data is stored
base_dir = 'D:/Users HP/Downloads/cnn'
images_dir = os.path.join(base_dir, 'JPEGImages')
annotations_dir = os.path.join(base_dir, 'Annotations')

# Create directories for train, validation, test sets
data_splits = ['train', 'val', 'test']
for split in data_splits:
    os.makedirs(os.path.join(base_dir, split, 'Images'), exist_ok=True)
    os.makedirs(os.path.join(base_dir, split, 'Annotations'), exist_ok=True)

# Get a list of filenames without extension
all_files = [os.path.splitext(f)[0] for f in os.listdir(images_dir) if f.endswith('.jpg') or f.endswith('.png')]

# Split data into train, val, and test
train_files, test_files = train_test_split(all_files, test_size=0.3, random_state=42)  # 70% training, 30% testing
train_files, val_files = train_test_split(train_files, test_size=0.2, random_state=42)  # Splitting 20% of training for validation

# Function to copy files
def copy_files(files, src_folder_images, src_folder_annotations, dst_folder_images, dst_folder_annotations):
    for fname in files:
        shutil.copy(os.path.join(src_folder_images, fname + '.jpg'), os.path.join(dst_folder_images, fname + '.jpg'))
        shutil.copy(os.path.join(src_folder_annotations, fname + '.xml'), os.path.join(dst_folder_annotations, fname + '.xml'))

# Copy image and annotation files
for file_set, split in [(train_files, 'train'), (val_files, 'val'), (test_files, 'test')]:
    copy_files(file_set, images_dir, annotations_dir, os.path.join(base_dir, split, 'Images'), os.path.join(base_dir, split, 'Annotations'))

print('Dataset successfully organized and split into train, validation, and test sets.')

Dataset successfully organized and split into train, validation, and test sets.


In [4]:
import os
import xml.etree.ElementTree as ET

def count_classes(directory):
    class_counts = {}
    annotation_dir = os.path.join(directory, 'Annotations')
    
    for filename in os.listdir(annotation_dir):
        if filename.endswith('.xml'):
            path = os.path.join(annotation_dir, filename)
            tree = ET.parse(path)
            root = tree.getroot()
            for member in root.findall('object'):
                class_name = member.find('name').text
                if class_name in class_counts:
                    class_counts[class_name] += 1
                else:
                    class_counts[class_name] = 1
    
    return class_counts

# Path to your dataset directory
base_dir = 'D:/Users HP/Downloads/cnn'

# Count classes in each dataset split
train_counts = count_classes(os.path.join(base_dir, 'train'))
val_counts = count_classes(os.path.join(base_dir, 'val'))
test_counts = count_classes(os.path.join(base_dir, 'test'))

print("Training Set Class Distribution:", train_counts)
print("Validation Set Class Distribution:", val_counts)
print("Test Set Class Analysis:", test_counts)

Training Set Class Distribution: {'Motorcycle': 1622, 'Car': 2612}
Validation Set Class Distribution: {'Motorcycle': 396, 'Car': 682}
Test Set Class Analysis: {'Motorcycle': 873, 'Car': 1509}


In [5]:
# Custom data generator
class CustomDataGenerator(Sequence):
    def __init__(self, image_dir, annot_dir, batch_size=32, image_size=(240, 320), shuffle=True):
        self.image_dir = image_dir
        self.annot_dir = annot_dir
        self.batch_size = batch_size
        self.image_size = image_size
        self.shuffle = shuffle
        self.image_paths, self.labels = self.load_dataset()
        self.on_epoch_end()

    def load_dataset(self):
        image_paths = []
        labels = []
        for annot_file in os.listdir(self.annot_dir):
            if annot_file.endswith('.xml'):
                tree = ET.parse(os.path.join(self.annot_dir, annot_file))
                root = tree.getroot()
                filename = root.find('filename').text
                image_path = os.path.join(self.image_dir, filename)
                for obj in root.findall('object'):
                    label = obj.find('name').text
                    if label == 'Motorcycle':
                        labels.append(1)
                    elif label == 'Car':
                        labels.append(0)
                    image_paths.append(image_path)
        return image_paths, labels

    def __len__(self):
        return int(np.floor(len(self.image_paths) / self.batch_size))

    def __getitem__(self, index):
        batch_image_paths = self.image_paths[index * self.batch_size:(index + 1) * self.batch_size]
        batch_labels = self.labels[index * self.batch_size:(index + 1) * self.batch_size]

        images = np.array([np.array(Image.open(img_path).resize(self.image_size)) / 255.0 for img_path in batch_image_paths])
        labels = np.array(batch_labels)

        return images, labels

    def on_epoch_end(self):
        if self.shuffle:
            temp = list(zip(self.image_paths, self.labels))
            np.random.shuffle(temp)
            self.image_paths, self.labels = zip(*temp)

# Directories
train_image_dir = 'D:/Users HP/Downloads/cnn/train/Images'
train_annot_dir = 'D:/Users HP/Downloads/cnn/train/Annotations'
val_image_dir = 'D:/Users HP/Downloads/cnn/val/Images'
val_annot_dir = 'D:/Users HP/Downloads/cnn/val/Annotations'
test_image_dir = 'D:/Users HP/Downloads/cnn/test/Images'
test_annot_dir = 'D:/Users HP/Downloads/cnn/test/Annotations'

# Data Generators
train_generator = CustomDataGenerator(train_image_dir, train_annot_dir, batch_size=32, image_size=(240, 320), shuffle=True)
val_generator = CustomDataGenerator(val_image_dir, val_annot_dir, batch_size=32, image_size=(240, 320), shuffle=False)
test_generator = CustomDataGenerator(test_image_dir, test_annot_dir, batch_size=32, image_size=(240, 320), shuffle=False)

In [None]:
#Basic CNN Model

def build_cnn_model(input_shape):
    model = Sequential()
    model.add(Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
    model.add(MaxPooling2D((2, 2)))
    model.add(Conv2D(64, (3, 3), activation='relu'))
    model.add(MaxPooling2D((2, 2)))
    model.add(Conv2D(128, (3, 3), activation='relu'))
    model.add(MaxPooling2D((2, 2)))
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dense(1, activation='sigmoid'))
    return model

input_shape = (240, 320, 3)
cnn_model = build_cnn_model(input_shape)
cnn_model.compile(optimizer=Adam(), loss='binary_crossentropy', metrics=['accuracy'])
cnn_model.summary()

epochs = 10
history = cnn_model.fit(train_generator, validation_data=val_generator, epochs=epochs)

# Evaluating the model
def evaluate_model(model, test_generator):
    predictions = model.predict(test_generator)
    y_true = np.concatenate([test_generator[i][1] for i in range(len(test_generator))])
    y_pred = (predictions > 0.5).astype(int).reshape(-1)
    cm = confusion_matrix(y_true, y_pred)
    cr = classification_report(y_true, y_pred, target_names=['Car', 'Motorcycle'])
    return cm, cr

cm, cr = evaluate_model(cnn_model, test_generator)

print("Confusion Matrix:\n", cm)
print("Classification Report:\n", cr)

# Plot training & validation accuracy values
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

# Plot training & validation loss values
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

  super().__init__(


Epoch 1/10


  self._warn_if_super_not_called()


[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m344s[0m 2s/step - accuracy: 0.5876 - loss: 1.0235 - val_accuracy: 0.6316 - val_loss: 0.6663
Epoch 2/10
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m333s[0m 2s/step - accuracy: 0.6085 - loss: 0.6707 - val_accuracy: 0.6316 - val_loss: 0.6618
Epoch 3/10
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m298s[0m 2s/step - accuracy: 0.6111 - loss: 0.6725 - val_accuracy: 0.6316 - val_loss: 0.6694
Epoch 4/10
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m290s[0m 2s/step - accuracy: 0.6129 - loss: 0.6685 - val_accuracy: 0.6316 - val_loss: 0.6592
Epoch 5/10
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m291s[0m 2s/step - accuracy: 0.6292 - loss: 0.6587 - val_accuracy: 0.6316 - val_loss: 0.6566
Epoch 6/10
[1m 73/132[0m [32m━━━━━━━━━━━[0m[37m━━━━━━━━━[0m [1m2:25[0m 2s/step - accuracy: 0.6098 - loss: 0.6609

In [None]:
from tensorflow.keras.preprocessing.image import img_to_array

# Function to preprocess image
def preprocess_image(image, input_size):
    image = cv2.resize(image, input_size)
    image = img_to_array(image) / 255.0
    return np.expand_dims(image, axis=0)

# Function to annotate image
def annotate_image(image, label, score, bbox):
    annotated_image = image.copy()
    x, y, w, h = bbox
    if label == 1 and score > 0.5:  # Assuming 1 is the label for object of interest
        cv2.rectangle(annotated_image, (x, y), (x+w, y+h), (255, 0, 0), 2)
        cv2.putText(annotated_image, f'{label} {score:.2f}', (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2)
    return annotated_image

# Load and preprocess input image
input_image_path = 'D:/Users HP/Downloads/cnn/JPEGImages/frame_01158.jpg'
input_image = np.array(Image.open(input_image_path))
input_size = (240, 320)  # Adjust based on your model's input size
preprocessed_image = preprocess_image(input_image, input_size)

# Predict on the entire image
prediction = cnn_model.predict(preprocessed_image)
label = (prediction > 0.5).astype(int)[0][0]
score = prediction[0][0]

# Annotate the image
bbox = (50, 50, 200, 200)  # Example bounding box, replace with actual region coordinates
annotated_image = annotate_image(input_image, label, score, bbox)

# Display the annotated image
plt.figure(figsize=(10, 10))
plt.imshow(annotated_image)
plt.axis('off')
plt.show()