In [6]:
import os
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
from PIL import Image
import numpy as np

# Step 1: Extract Frames from Multiple Videos and Auto-label as 'asphalt' and 'good'
videos = [
    "C:/Users/HP/Desktop/Road_quality/1.mp4",
    "C:/Users/HP/Desktop/Road_quality/2.mp4",
    "C:/Users/HP/Desktop/Road_quality/3.mp4"
]

output_dir_asphalt = 'frames/train/asphalt'
output_dir_good = 'frames/train/good'
os.makedirs(output_dir_asphalt, exist_ok=True)
os.makedirs(output_dir_good, exist_ok=True)

def extract_frames_and_label(videos, output_dir, frame_skip=30):
    frame_count = 0
    for video_path in videos:
        cap = cv2.VideoCapture(video_path)
        count = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if count % frame_skip == 0:
                frame_path_asphalt = os.path.join(output_dir_asphalt, f'frame_{frame_count}.jpg')
                frame_path_good = os.path.join(output_dir_good, f'frame_{frame_count}.jpg')
                cv2.imwrite(frame_path_asphalt, frame)
                cv2.imwrite(frame_path_good, frame)
                frame_count += 1
            count += 1
        cap.release()
        print(f"Extracted and labeled frames from {video_path}")
    print(f"Total extracted and labeled frames: {frame_count}")

extract_frames_and_label(videos, output_dir_asphalt)

# Step 2: Train the Model Using Extracted Frames
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

train_dataset = datasets.ImageFolder('frames/train', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

def get_model(num_classes):
    model = models.resnet18(pretrained=True)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    return model

def train_model(model, criterion, optimizer, num_epochs=10):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")
    return model

device = "cuda" if torch.cuda.is_available() else "cpu"

# Train road type model
road_type_model = get_model(num_classes=2)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(road_type_model.parameters(), lr=0.001)
road_type_model = train_model(road_type_model, criterion, optimizer, num_epochs=10)
torch.save(road_type_model.state_dict(), 'road_type_model.pth')

# Train road quality model
road_quality_model = get_model(num_classes=2)
optimizer = optim.Adam(road_quality_model.parameters(), lr=0.001)
road_quality_model = train_model(road_quality_model, criterion, optimizer, num_epochs=10)
torch.save(road_quality_model.state_dict(), 'road_quality_model.pth')

# Step 3: Real-Time Video Classification
road_type_model.load_state_dict(torch.load('road_type_model.pth', map_location=device))
road_type_model.eval()

road_quality_model.load_state_dict(torch.load('road_quality_model.pth', map_location=device))
road_quality_model.eval()

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

def classify_frame(frame):
    image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    input_tensor = transform(image).unsqueeze(0).to(device)

    with torch.no_grad():
        road_type_pred = road_type_model(input_tensor)
        road_type_label = 'Asphalt' if torch.argmax(road_type_pred) == 0 else 'Concrete'

        road_quality_pred = road_quality_model(input_tensor)
        road_quality_label = 'Good' if torch.argmax(road_quality_pred) == 0 else 'Bad (Pothole)'

    return road_type_label, road_quality_label

video_capture = cv2.VideoCapture('C:/Users/HP/Desktop/Road_quality/1.mp4')  # Change to a video file path if needed

while video_capture.isOpened():
    ret, frame = video_capture.read()
    if not ret:
        break

    road_type, road_quality = classify_frame(frame)
    cv2.putText(frame, f'Road Type: {road_type}', (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
    cv2.putText(frame, f'Road Quality: {road_quality}', (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)

    cv2.imshow('Road Classification', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

video_capture.release()
cv2.destroyAllWindows()


Extracted and labeled frames from C:/Users/HP/Desktop/Road_quality/1.mp4
Extracted and labeled frames from C:/Users/HP/Desktop/Road_quality/2.mp4
Extracted and labeled frames from C:/Users/HP/Desktop/Road_quality/3.mp4
Total extracted and labeled frames: 384


Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to C:\Users\HP/.cache\torch\hub\checkpoints\resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:48<00:00, 962kB/s] 


Epoch 1/10, Loss: 0.9763183891773224
Epoch 2/10, Loss: 0.7132951269547144
Epoch 3/10, Loss: 0.7123153110345205
Epoch 4/10, Loss: 0.7122755944728851
Epoch 5/10, Loss: 0.7251083329319954
Epoch 6/10, Loss: 0.706922655304273
Epoch 7/10, Loss: 0.705375832815965
Epoch 8/10, Loss: 0.7073759237925211
Epoch 9/10, Loss: 0.711012452840805
Epoch 10/10, Loss: 0.6981175070007642
Epoch 1/10, Loss: 0.8753547022740046
Epoch 2/10, Loss: 0.7569331054886183
Epoch 3/10, Loss: 0.7108229746421179
Epoch 4/10, Loss: 0.7065151905020078
Epoch 5/10, Loss: 0.6964832295974096
Epoch 6/10, Loss: 0.7030665750304858
Epoch 7/10, Loss: 0.7002002273996671
Epoch 8/10, Loss: 0.7214961846669515
Epoch 9/10, Loss: 0.7214476491014162
Epoch 10/10, Loss: 0.7212604582309723


  road_type_model.load_state_dict(torch.load('road_type_model.pth', map_location=device))
  road_quality_model.load_state_dict(torch.load('road_quality_model.pth', map_location=device))


In [9]:
import os
import shutil
import random

# Path to your `train` directory
source_path = 'frames/train'
categories = ['asphalt', 'good']  # Adjust these as necessary based on your labels

# Define the split ratios
train_ratio = 0.7
val_ratio = 0.15
test_ratio = 0.15

# Create val and test directories
for category in categories:
    os.makedirs(f'frames/val/{category}', exist_ok=True)
    os.makedirs(f'frames/test/{category}', exist_ok=True)

    # Path to the category directory in the `train` folder
    category_path = os.path.join(source_path, category)
    if not os.path.exists(category_path):
        print(f"Category directory {category_path} does not exist. Skipping.")
        continue

    # Get all files in the category directory
    files = os.listdir(category_path)
    random.shuffle(files)

    # Calculate the split indices
    train_split = int(len(files) * train_ratio)
    val_split = int(len(files) * (train_ratio + val_ratio))

    # Split the files into training, validation, and testing sets
    val_files = files[train_split:val_split]
    test_files = files[val_split:]

    # Move the files to the respective directories
    for file in val_files:
        shutil.move(os.path.join(category_path, file), f'frames/val/{category}/{file}')
    for file in test_files:
        shutil.move(os.path.join(category_path, file), f'frames/test/{category}/{file}')

    print(f"Category '{category}' has been split into val and test sets.")

print("Frame splitting into val and test sets completed.")


Category 'asphalt' has been split into val and test sets.
Category 'good' has been split into val and test sets.
Frame splitting into val and test sets completed.


In [11]:
# Step 3: Real-Time Video Classification
import cv2
import torch
from torchvision import transforms
from PIL import Image
import numpy as np

# Load the state dictionaries for the trained models with weights_only=True
road_type_model.load_state_dict(torch.load('road_type_model.pth', map_location=device, weights_only=True))
road_type_model.eval()

road_quality_model.load_state_dict(torch.load('road_quality_model.pth', map_location=device, weights_only=True))
road_quality_model.eval()

# Define transformation for input images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Function to classify a frame
def classify_frame(frame):
    image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    input_tensor = transform(image).unsqueeze(0).to(device)

    with torch.no_grad():
        road_type_pred = road_type_model(input_tensor)
        road_type_label = 'Asphalt' if torch.argmax(road_type_pred) == 0 else 'Concrete'

        road_quality_pred = road_quality_model(input_tensor)
        road_quality_label = 'Good' if torch.argmax(road_quality_pred) == 0 else 'Bad (Pothole)'

    return road_type_label, road_quality_label

# Run real-time classification on a custom video file
video_capture = cv2.VideoCapture('C:/Users/HP/Desktop/Road_quality/1.mp4')  # Replace with your custom video path

while video_capture.isOpened():
    ret, frame = video_capture.read()
    if not ret:
        break

    road_type, road_quality = classify_frame(frame)
    cv2.putText(frame, f'Road Type: {road_type}', (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
    cv2.putText(frame, f'Road Quality: {road_quality}', (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)

    cv2.imshow('Road Classification', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

video_capture.release()
cv2.destroyAllWindows()
