<a href="https://colab.research.google.com/github/Srinjoy2002/Machine-Learning-Robot_arduino/blob/main/SIHv2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [22]:
!pip install roboflow

from roboflow import Roboflow
rf = Roboflow(api_key="UnoiVQqm728ScpgMtjQd")
project = rf.workspace("personal-yatgp").project("sih-mhvih")
version = project.version(2)
dataset = version.download("voc")


loading Roboflow workspace...
loading Roboflow project...


In [23]:
import tensorflow as tf

# Check if GPU is available
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))


Num GPUs Available:  1


In [24]:
# Step 1: Install Required Libraries
!pip install roboflow
!pip install tensorflow

# Step 2: Import Libraries
import os
import xml.etree.ElementTree as ET
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
import time
from roboflow import Roboflow

# Step 3: Download Dataset from Roboflow
rf = Roboflow(api_key="UnoiVQqm728ScpgMtjQd")
project = rf.workspace("personal-yatgp").project("sih-mhvih")
version = project.version(2)

# Download dataset in Pascal VOC format
dataset = version.download("voc")

# Set up the dataset directories
train_dir = dataset.location + '/train'
val_dir = dataset.location + '/valid'
test_dir = dataset.location + '/test'

print(f"Train directory: {train_dir}")
print(f"Validation directory: {val_dir}")
print(f"Test directory: {test_dir}")

# Step 4: Define Label Map (For Three Classes)
label_map = {
    'FoundationDone': 0,
    'Beams': 1,
    'Neither': 2  # Added Neither class for images with neither Foundation nor Beams
}

# Step 5: Parse Pascal VOC Annotations (Images and Annotations in Same Directory), skipping unknown labels
def parse_voc_annotation_same_dir(annotation_dir, label_map):
    images = []
    labels = []

    for file in os.listdir(annotation_dir):
        if file.endswith('.xml'):  # Look for XML files
            tree = ET.parse(os.path.join(annotation_dir, file))
            root = tree.getroot()

            # Get the image filename from the XML and construct its full path
            img_file = root.find('filename').text
            img_path = os.path.join(annotation_dir, img_file)

            # Get label from <object><name> tag in the XML
            label = root.find('object').find('name').text

            if label in label_map:  # Only append if label is in label_map
                images.append(img_path)
                labels.append(label_map[label])  # Convert class name to index
            else:
                print(f"Skipping file {img_file} with unknown label: {label}")

    return images, labels

# Parse train, validation, and test sets, skipping unknown labels
train_images, train_labels = parse_voc_annotation_same_dir(train_dir, label_map)
val_images, val_labels = parse_voc_annotation_same_dir(val_dir, label_map)
test_images, test_labels = parse_voc_annotation_same_dir(test_dir, label_map)

print(f"Train dataset: {len(train_images)} images")
print(f"Validation dataset: {len(val_images)} images")
print(f"Test dataset: {len(test_images)} images")

# Step 6: Data Loading and Preprocessing
IMG_SIZE = (224, 224)
BATCH_SIZE = 32

def load_and_preprocess_image(img_path, label):
    img = tf.io.read_file(img_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, IMG_SIZE)
    img = img / 255.0  # Normalize to [0,1] range
    return img, label

# Create TensorFlow datasets
train_ds = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_ds = train_ds.map(load_and_preprocess_image).batch(BATCH_SIZE).shuffle(buffer_size=len(train_images))

val_ds = tf.data.Dataset.from_tensor_slices((val_images, val_labels))
val_ds = val_ds.map(load_and_preprocess_image).batch(BATCH_SIZE)

test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
test_ds = test_ds.map(load_and_preprocess_image).batch(BATCH_SIZE)

# Step 7: Define ResNet50 Model for Multi-class Classification
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Add custom layers on top for multi-class classification
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
predictions = Dense(3, activation='softmax')(x)  # Three units with softmax for multi-class classification

# Define the full model
model = Model(inputs=base_model.input, outputs=predictions)

# Freeze the base ResNet50 layers
for layer in base_model.layers:
    layer.trainable = False

# Compile the model for multi-class classification
model.compile(optimizer=Adam(learning_rate=0.001), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Step 8: Train the Model with Time, Accuracy, and Loss Tracking
class TimingCallback(tf.keras.callbacks.Callback):
    def __init__(self):
        self.times = []
        self.logs = []

    def on_epoch_begin(self, epoch, logs=None):
        self.start_time = time.time()

    def on_epoch_end(self, epoch, logs=None):
        self.times.append(time.time() - self.start_time)
        self.logs.append(logs)
        print(f"Epoch {epoch+1}: Time taken: {self.times[-1]:.2f}s, Accuracy: {logs['accuracy']:.4f}, Loss: {logs['loss']:.4f}, Validation Accuracy: {logs['val_accuracy']:.4f}, Validation Loss: {logs['val_loss']:.4f}")

# Initialize the timing callback
timing_callback = TimingCallback()




loading Roboflow workspace...
loading Roboflow project...
Train directory: /content/SIH-2/train
Validation directory: /content/SIH-2/valid
Test directory: /content/SIH-2/test
Skipping file pexels-roger-brown-3435524-5125782_jpg.rf.1bbec1f1199c7c869c7b9a129a949b8c.jpg with unknown label: grndclr
Skipping file Bridge-foundation_21_png.rf.5a4709f3064951cd12ba27a812d40129.jpg with unknown label: BridgeStrucDone
Skipping file Bridge-foundation_1_jpeg.rf.5f659990b33349e26aec8c640c84804e.jpg with unknown label: BridgeStrucDone
Skipping file Bridge-foundation_25_jpeg.rf.be77dd0354f58a11f0bb1ed1217cc530.jpg with unknown label: BridgeStrucDone
Skipping file Bridge-foundation_72_jpeg.rf.f7f69a1985371b7c726cecdbdc94bade.jpg with unknown label: BridgeStrucDone
Skipping file Bridge-foundation_14_jpeg.rf.dbcb05bea8a8a16c87331bbb8db22674.jpg with unknown label: BridgeStrucDone
Skipping file pexels-printexstar-11680715_jpg.rf.6d63d91be2e377b17dfd4817c3952a9f.jpg with unknown label: grndclr
Skipping fil

In [None]:
# Train the model
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=100,  # Adjust based on dataset size and performance
    callbacks=[timing_callback]
)

# Step 9: Evaluate the Model on the Test Set
test_loss, test_accuracy = model.evaluate(test_ds)
print(f'Test Accuracy: {test_accuracy * 100:.2f}%')

# Step 10: Save the Model
model.save('construction_progress_resnet_model.h5')

# Step 11: Make Predictions on New Images
def predict_image(img_path):
    img = tf.io.read_file(img_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, IMG_SIZE)
    img = img / 255.0  # Normalize to [0,1]
    img = tf.expand_dims(img, axis=0)  # Add batch dimension

    prediction = model.predict(img)

    # Map prediction index to class name
    class_labels = ['FoundationDone', 'Beams', 'Neither']
    predicted_class = class_labels[tf.argmax(prediction[0])]
    return predicted_class

# # Example: Test on a new image
# new_image_path = 'path_to_new_image.jpg'  # Update with the path to your image
# predicted_class = predict_image(new_image_path)
# print(f'The predicted class is: {predicted_class}')

Epoch 1/100
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 768ms/step - accuracy: 0.5810 - loss: 0.9964Epoch 1: Time taken: 27.78s, Accuracy: 0.6426, Loss: 0.9005, Validation Accuracy: 0.1000, Validation Loss: 0.9407
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 2s/step - accuracy: 0.5878 - loss: 0.9858 - val_accuracy: 0.1000 - val_loss: 0.9407
Epoch 2/100
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 84ms/step - accuracy: 0.6294 - loss: 0.6986Epoch 2: Time taken: 1.25s, Accuracy: 0.7590, Loss: 0.6202, Validation Accuracy: 0.9000, Validation Loss: 0.3112
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 92ms/step - accuracy: 0.6438 - loss: 0.6899 - val_accuracy: 0.9000 - val_loss: 0.3112
Epoch 3/100
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 88ms/step - accuracy: 0.7780 - loss: 0.6532Epoch 3: Time taken: 1.28s, Accuracy: 0.8233, Loss: 0.5517, Validation Accuracy: 0.9000, Validation Loss: 0.3404
[1m8/8[0m 

In [None]:
# Install necessary packages
!pip install tensorflow roboflow


In [None]:
import tensorflow as tf
import os
import xml.etree.ElementTree as ET
import time
from roboflow import Roboflow


In [None]:
# Define image size and batch size
IMG_SIZE = (224, 224)  # Adjust size as needed
BATCH_SIZE = 32

# Define label map
label_map = {'FoundationDone': 0, 'Beams': 1, 'Neither': 2}

# Function to load and preprocess images
def load_and_preprocess_image(img_path, label):
    img = tf.io.read_file(img_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, IMG_SIZE)
    img = img / 255.0  # Normalize to [0,1]
    return img, label

# Parse VOC annotations and prepare datasets
def parse_voc_annotation_same_dir(annotation_dir, label_map):
    images = []
    labels = []
    for file in os.listdir(annotation_dir):
        if file.endswith('.xml'):  # Check for XML annotation files
            tree = ET.parse(os.path.join(annotation_dir, file))
            root = tree.getroot()

            # Get image filename from the XML and construct the full path
            image_file = root.find('path').text
            image_path = os.path.join(annotation_dir, image_file)

            # Get label from <object><name> tag in the XML
            objects = root.findall('object')
            has_label = False
            for obj in objects:
                label = obj.find('name').text
                if label in label_map:
                    images.append(image_path)
                    labels.append(label_map[label])
                    has_label = True
                    break  # Skip to next image if any valid label is found
            if not has_label:
                images.append(image_path)
                labels.append(label_map['Neither'])  # No valid label found, assign 'Neither'
    return images, labels

# Define paths to dataset directories
train_dir = '/content/SIH-2/train'
val_dir = '/content/SIH-2/valid'
test_dir = '/content/SIH-2/test'

# Parse train, validation, and test sets
train_images, train_labels = parse_voc_annotation_same_dir(train_dir, label_map)
val_images, val_labels = parse_voc_annotation_same_dir(val_dir, label_map)
test_images, test_labels = parse_voc_annotation_same_dir(test_dir, label_map)


In [None]:
# Create TensorFlow datasets with optimization
AUTOTUNE = tf.data.AUTOTUNE

train_ds = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_ds = train_ds.map(load_and_preprocess_image, num_parallel_calls=AUTOTUNE)
train_ds = train_ds.batch(BATCH_SIZE).shuffle(buffer_size=len(train_images))
train_ds = train_ds.prefetch(buffer_size=AUTOTUNE)

val_ds = tf.data.Dataset.from_tensor_slices((val_images, val_labels))
val_ds = val_ds.map(load_and_preprocess_image, num_parallel_calls=AUTOTUNE)
val_ds = val_ds.batch(BATCH_SIZE).prefetch(buffer_size=AUTOTUNE)

test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
test_ds = test_ds.map(load_and_preprocess_image, num_parallel_calls=AUTOTUNE)
test_ds = test_ds.batch(BATCH_SIZE).prefetch(buffer_size=AUTOTUNE)


In [None]:
# Define the ResNet model
def create_resnet_model(num_classes):
    base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False, input_shape=(IMG_SIZE[0], IMG_SIZE[1], 3))
    x = base_model.output
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Dense(1024, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    output = tf.keras.layers.Dense(num_classes, activation='softmax')(x)

    model = tf.keras.Model(inputs=base_model.input, outputs=output)

    # Freeze the base model layers
    for layer in base_model.layers:
        layer.trainable = False

    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# Create the model
num_classes = len(label_map)
model = create_resnet_model(num_classes)


In [None]:
# Timing callback to track epoch time
class TimingCallback(tf.keras.callbacks.Callback):
    def __init__(self):
        self.times = []
        self.logs = []

    def on_epoch_begin(self, epoch, logs=None):
        self.start_time = time.time()

    def on_epoch_end(self, epoch, logs=None):
        self.times.append(time.time() - self.start_time)
        self.logs.append(logs)
        print(f"Epoch {epoch+1}: Time taken: {self.times[-1]:.2f}s, Accuracy: {logs['accuracy']:.4f}, Loss: {logs['loss']:.4f}, Validation Accuracy: {logs['val_accuracy']:.4f}, Validation Loss: {logs['val_loss']:.4f}")

# Initialize the timing callback
timing_callback = TimingCallback()

# Train the model
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=100,  # Adjust based on dataset size and performance
    callbacks=[timing_callback]
)


In [None]:
# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate(test_ds)
print(f'Test Accuracy: {test_accuracy * 100:.2f}%')


In [None]:
# Save the model
model.save('construction_progress_resnet_model.h5')
# Function to make predictions on new images
def get_status_message(predicted_class):
    """Returns a status message based on the predicted class."""
    messages = {
        'FoundationDone': 'The foundation is completed. Next, walls and beams need to be constructed.',
        'Beams': 'The ground is dug, and beams are erected. Now, the concrete should be poured.',
        'Neither': 'No construction progress is detected. Please check the construction status.'
    }
    return messages.get(predicted_class, 'Unknown status')

def predict_image(img_path):
    """Predicts the class of the image and provides a status message."""
    img = tf.io.read_file(img_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, IMG_SIZE)
    img = img / 255.0  # Normalize to [0,1]
    img = tf.expand_dims(img, axis=0)  # Add batch dimension

    prediction = model.predict(img)

    # Map prediction index to class name
    class_labels = ['FoundationDone', 'Beams', 'Neither']
    predicted_class = class_labels[tf.argmax(prediction[0])]

    # Get the status message based on the predicted class
    status_message = get_status_message(predicted_class)

    return predicted_class, status_message

# Example: Test on a new image
new_image_path = '/content/00fc01e0-3acb-4abd-bc76-3a4d2e4ae074.jpeg'  # Update with the path to your image
predicted_class, status_message = predict_image(new_image_path)
print(f'The predicted class is: {predicted_class}')
print(f'Status message: {status_message}')



In [None]:
def build_model(num_classes):
    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(IMG_SIZE[0], IMG_SIZE[1], 3))
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    predictions = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=base_model.input, outputs=predictions)

    # Freeze the base model layers
    for layer in base_model.layers:
        layer.trainable = False

    model.compile(optimizer=Adam(), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

model = build_model(num_classes=3)  # Number of classes including 'Neither'


In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

# Define callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=True)
model_checkpoint = ModelCheckpoint('best_model.keras', monitor='val_accuracy', save_best_only=True)

# Train the model with callbacks
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=100,  # Adjust based on dataset size and performance
    callbacks=[early_stopping, model_checkpoint]
)

# Evaluate the Model on the Test Set
test_loss, test_accuracy = model.evaluate(test_ds)
print(f'Test Accuracy: {test_accuracy * 100:.2f}%')

# Save the Model
model.save('final_construction_progress_resnet_model.keras')

# Make Predictions on New Images
def predict_image(img_path):
    """Predicts the class of the image and provides a status message."""
    img = tf.io.read_file(img_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, IMG_SIZE)
    img = img / 255.0  # Normalize to [0,1]
    img = tf.expand_dims(img, axis=0)  # Add batch dimension

    prediction = model.predict(img)

    # Map prediction index to class name
    class_labels = ['FoundationDone', 'Beams', 'Neither']
    predicted_class = class_labels[tf.argmax(prediction[0])]

    # Get the status message based on the predicted class
    status_message = get_status_message(predicted_class)

    return predicted_class, status_message

# Example: Test on a new image
new_image_path = '/content/00fc01e0-3acb-4abd-bc76-3a4d2e4ae074.jpeg'  # Update with the path to your image
predicted_class, status_message = predict_image(new_image_path)
print(f'The predicted class is: {predicted_class}')
print(f'Status message: {status_message}')
