In [1]:
import json
from collections import Counter

# File path to the COCO annotation file
annotation_file = r"C:/Users/natha/OneDrive/Desktop/Applied Machine Learning/Run-Through_Switch/annotations/instances_default.json"

# Labels to count
target_labels = {"open_switch", "closed_swtich", "straight"}

# Load the COCO annotation file
with open(annotation_file, 'r') as f:
    coco_data = json.load(f)

# Get category mappings
categories = {category['id']: category['name'] for category in coco_data['categories']}

# Initialize a counter for the target labels
label_counter = Counter()

# Iterate over the annotations to count the labels
for annotation in coco_data['annotations']:
    category_id = annotation['category_id']
    category_name = categories.get(category_id, None)
    if category_name in target_labels:
        label_counter[category_name] += 1

# Display the counts
for label in target_labels:
    print(f"{label}: {label_counter[label]}")


closed_swtich: 36
straight: 145
open_switch: 141


In [2]:
import os
import json
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
import shutil
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import cv2

# Paths
annotation_file = r"C:/Users/natha/OneDrive/Desktop/Applied Machine Learning/Run-Through_Switch/annotations/instances_default.json"
image_dir = r"C:/Users/natha/OneDrive/Desktop/Applied Machine Learning/Run-Through_Switch/images"
output_dir = r"C:/Users/natha/OneDrive/Desktop/Applied Machine Learning/Run-Through_Switch/processed_data"

# Load COCO annotations
with open(annotation_file, 'r') as f:
    coco_data = json.load(f)

# Get category mapping
categories = {category['id']: category['name'] for category in coco_data['categories']}

# Filter annotations by class
annotations = coco_data['annotations']
image_labels = {
    annotation['image_id']: categories[annotation['category_id']]
    for annotation in annotations
    if categories[annotation['category_id']] in ['closed_swtich', 'open_switch', 'straight']  # Corrected typo here
}

# Prepare dataset directory structure
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

for split in ['train', 'val', 'test']:
    for label in ['closed_swtich', 'open_switch', 'straight']:  # Corrected typo here
        os.makedirs(os.path.join(output_dir, split, label), exist_ok=True)
    os.makedirs(os.path.join(output_dir, split, 'masks'), exist_ok=True)  # Masks folder

# Split data into train, val, test
image_filenames = {img['id']: img['file_name'] for img in coco_data['images']}
image_ids = list(image_filenames.keys())
train_ids, test_ids = train_test_split(image_ids, test_size=0.2, random_state=42)
train_ids, val_ids = train_test_split(train_ids, test_size=0.25, random_state=42)  # 20% val of 80% train

# Helper function to generate masks
def generate_mask(image_id):
    mask = np.zeros((128, 128), dtype=np.uint8)
    for annotation in [a for a in annotations if a['image_id'] == image_id]:
        if 'segmentation' in annotation:
            for poly in annotation['segmentation']:
                # Draw polygons on the mask
                poly = np.array(poly).reshape((int(len(poly) / 2), 2))  # Flatten and reshape into polygon vertices
                cv2.fillPoly(mask, [poly.astype(int)], 1)  # Fill polygon area with 1 (for object)
    return mask

# Helper function to copy images and masks
def copy_images_and_masks(image_ids, split):
    for image_id in image_ids:
        if image_id in image_labels:
            label = image_labels[image_id]
            src = os.path.join(image_dir, image_filenames[image_id])
            dst_image = os.path.join(output_dir, split, label, image_filenames[image_id])

            # Copy the image
            if os.path.exists(src):
                shutil.copy(src, dst_image)

            # Generate and save the mask
            mask = generate_mask(image_id)
            mask_filename = os.path.splitext(image_filenames[image_id])[0] + '_mask.png'
            mask_dst = os.path.join(output_dir, split, 'masks', mask_filename)
            cv2.imwrite(mask_dst, mask)

copy_images_and_masks(train_ids, 'train')
copy_images_and_masks(val_ids, 'val')
copy_images_and_masks(test_ids, 'test')

# Data augmentation and data loaders
datagen = ImageDataGenerator(rescale=1.0/255)

# For classification
train_generator = datagen.flow_from_directory(
    os.path.join(output_dir, 'train'),
    target_size=(128, 128),
    batch_size=32,
    class_mode='categorical'
)

val_generator = datagen.flow_from_directory(
    os.path.join(output_dir, 'val'),
    target_size=(128, 128),
    batch_size=32,
    class_mode='categorical'
)

test_generator = datagen.flow_from_directory(
    os.path.join(output_dir, 'test'),
    target_size=(128, 128),
    batch_size=32,
    class_mode='categorical'
)

# Verify class indices
print("Class indices:", train_generator.class_indices)

# Model definition for segmentation (Optional: For segmentation tasks)
input_img = tf.keras.layers.Input(shape=(128, 128, 3))

x = tf.keras.layers.Conv2D(32, (3, 3), activation='relu')(input_img)
x = tf.keras.layers.MaxPooling2D(2, 2)(x)
x = tf.keras.layers.Conv2D(64, (3, 3), activation='relu')(x)
x = tf.keras.layers.MaxPooling2D(2, 2)(x)
x = tf.keras.layers.Conv2D(128, (3, 3), activation='relu')(x)
x = tf.keras.layers.MaxPooling2D(2, 2)(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(128, activation='relu')(x)
output = tf.keras.layers.Dense(3, activation='softmax')(x)  # For multi-class classification

model = tf.keras.Model(inputs=input_img, outputs=output)

# Compile the model
model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Train the model
history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=10
)

# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate(test_generator)
print(f"Test Accuracy: {test_accuracy:.2f}")


ModuleNotFoundError: No module named 'cv2'

In [None]:
import matplotlib.pyplot as plt

# Assuming 'history' is the object returned by model.fit()
# Training loss and accuracy
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')

plt.xlabel('Epochs')
plt.ylabel('Loss/Accuracy')
plt.legend()
plt.title('Training and Validation Loss/Accuracy Over Epochs')
plt.show()


In [None]:
import os
import json
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
import numpy as np
import shutil
from tensorflow.keras.callbacks import EarlyStopping  # Import the EarlyStopping callback

# Paths
annotation_file = r"C:/Users/natha/OneDrive/Desktop/Applied Machine Learning/Run-Through_Switch/annotations/instances_default.json"
image_dir = r"C:/Users/natha/OneDrive/Desktop/Applied Machine Learning/Run-Through_Switch/images"
output_dir = r"C:/Users/natha/OneDrive/Desktop/Applied Machine Learning/Run-Through_Switch/processed_data"

# Load COCO annotations
with open(annotation_file, 'r') as f:
    coco_data = json.load(f)

# Get category mapping
categories = {category['id']: category['name'] for category in coco_data['categories']}

# Filter annotations for closed_switch
annotations = coco_data['annotations']
closed_switch_images = {
    annotation['image_id']
    for annotation in annotations
    if categories[annotation['category_id']] == 'closed_switch'
}

# Prepare dataset directory structure
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

for split in ['train', 'val', 'test']:
    for label in ['closed_switch', 'not_closed_switch']:
        os.makedirs(os.path.join(output_dir, split, label), exist_ok=True)

# Split data into train, val, test
image_filenames = {img['id']: img['file_name'] for img in coco_data['images']}
image_ids = list(image_filenames.keys())
train_ids, test_ids = train_test_split(image_ids, test_size=0.2, random_state=42)
train_ids, val_ids = train_test_split(train_ids, test_size=0.25, random_state=42)  # 20% val of 80% train

# Helper function to copy images
def copy_images(image_ids, split):
    for image_id in image_ids:
        label = 'closed_switch' if image_id in closed_switch_images else 'not_closed_switch'
        src = os.path.join(image_dir, image_filenames[image_id])
        dst = os.path.join(output_dir, split, label, image_filenames[image_id])
        if os.path.exists(src):
            shutil.copy(src, dst)

copy_images(train_ids, 'train')
copy_images(val_ids, 'val')
copy_images(test_ids, 'test')

# Data augmentation and data loaders
datagen = ImageDataGenerator(rescale=1.0/255)

train_generator = datagen.flow_from_directory(
    os.path.join(output_dir, 'train'),
    target_size=(128, 128),
    batch_size=32,
    class_mode='binary'
)

val_generator = datagen.flow_from_directory(
    os.path.join(output_dir, 'val'),
    target_size=(128, 128),
    batch_size=32,
    class_mode='binary'
)

# Model definition
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(128, 128, 3)),
    tf.keras.layers.MaxPooling2D(2, 2),
    tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(2, 2),
    tf.keras.layers.Conv2D(128, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(2, 2),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')  # Binary classification
])

# Compile the model
model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy']
)

# Define early stopping callback (added here)
early_stopping = EarlyStopping(
    monitor='val_loss',     # Monitor validation loss
    patience=3,             # Stop after 3 epochs without improvement
    restore_best_weights=True  # Restore weights from the best epoch
)

# Train the model with early stopping (added here)
history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=10,
    callbacks=[early_stopping]  # Add early stopping callback here
)

# Evaluate the model on the test set
test_generator = datagen.flow_from_directory(
    os.path.join(output_dir, 'test'),
    target_size=(128, 128),
    batch_size=32,
    class_mode='binary'
)

test_loss, test_accuracy = model.evaluate(test_generator)
print(f"Test Accuracy: {test_accuracy:.2f}")


In [None]:
import matplotlib.pyplot as plt

# Assuming 'history' is the object returned by model.fit()
# Training loss and accuracy
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')

plt.xlabel('Epochs')
plt.ylabel('Loss/Accuracy')
plt.legend()
plt.title('Training and Validation Loss/Accuracy Over Epochs')
plt.show()