In [20]:
import torch
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models
from PIL import Image
import requests
from io import BytesIO
import numpy as np
import cv2
import time
import json
import urllib.request
import speech_recognition as sr
import pyttsx3
from heapq import heappush, heappop

In [21]:
# Initialize speech recognition and text-to-speech engines
recognizer = sr.Recognizer()
engine = pyttsx3.init()

In [22]:
# Define the ESP32-CAM stream URL and ESP32 control endpoint
ESP32_CAM_URL = "http://172.20.52.94:81/stream"  # Replace with your ESP32-CAM stream URL
ESP32_CONTROL_URL = "http://172.20.51.59/control"  # Replace with your ESP32 control endpoint

In [23]:
# Define the classes
CLASSES = ["Bottle", "Cup", "Book", "Cell phone"]

In [24]:
# Load the custom ResNet-50 model
MODEL_PATH = "fine_tuned_resnet.pth"  # Replace with your .pth file path
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [25]:
checkpoint = torch.load(MODEL_PATH, map_location=device)
print([key for key in checkpoint.keys() if 'fc' in key])


['fc.0.weight', 'fc.0.bias', 'fc.3.weight', 'fc.3.bias']


In [26]:
# Load ResNet50 model
model = models.resnet50(pretrained=False)

# Define the FC layer exactly as it was in Colab
model.fc = torch.nn.Sequential(
    torch.nn.Linear(model.fc.in_features, 512),  # Matches fc.0.weight and fc.0.bias
    torch.nn.ReLU(),
    torch.nn.Dropout(0.5),  # (Optional) Wasn't in your keys but might have been in Colab
    torch.nn.Linear(512, len(CLASSES))  # Matches fc.3.weight and fc.3.bias
)

# Load the saved state dictionary
model.load_state_dict(torch.load(MODEL_PATH, map_location=device))

# Send model to device and set to evaluation mode
model.to(device)
model.eval()

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [27]:
# Define image preprocessing
preprocess = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [28]:
# Function to classify an image
def classify_image(image):
    image = preprocess(image).unsqueeze(0).to(device)
    with torch.no_grad():
        outputs = model(image)
        _, predicted = torch.max(outputs, 1)
    return CLASSES[predicted.item()]

In [29]:
import json
import urllib.request

def send_control_command(command):
    try:
        # Create the JSON payload
        data = json.dumps({"command": command}).encode("utf-8")
        
        # Set the headers
        headers = {
            "Content-Type": "application/json",
            "Content-Length": len(data)
        }
        
        # Create the request
        req = urllib.request.Request(ESP32_CONTROL_URL, data=data, headers=headers, method="POST")
        
        # Send the request
        with urllib.request.urlopen(req) as response:
            response_data = response.read().decode("utf-8")
            print(f"Command sent successfully. Response: {response_data}")
    except urllib.error.HTTPError as e:
        print(f"HTTP Error: {e.code} - {e.reason}")
    except urllib.error.URLError as e:
        print(f"URL Error: {e.reason}")
    except Exception as e:
        print(f"Error sending command: {e}")

In [30]:
# A* Path Planning Algorithm
def astar_path_planning(start, goal, grid):
    def heuristic(a, b):
        return abs(a[0] - b[0]) + abs(a[1] - b[1])

    open_set = []
    heappush(open_set, (0, start))
    came_from = {}
    g_score = {start: 0}
    f_score = {start: heuristic(start, goal)}

    while open_set:
        _, current = heappop(open_set)

        if current == goal:
            path = []
            while current in came_from:
                path.append(current)
                current = came_from[current]
            return path[::-1]

        for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
            neighbor = (current[0] + dx, current[1] + dy)
            if 0 <= neighbor[0] < len(grid) and 0 <= neighbor[1] < len(grid[0]) and grid[neighbor[0]][neighbor[1]] == 0:
                tentative_g_score = g_score[current] + 1
                if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
                    came_from[neighbor] = current
                    g_score[neighbor] = tentative_g_score
                    f_score[neighbor] = tentative_g_score + heuristic(neighbor, goal)
                    heappush(open_set, (f_score[neighbor], neighbor))

    return None

In [31]:
# Function to listen for voice commands
def listen_for_command():
    with sr.Microphone() as source:
        print("Listening for command...")
        recognizer.adjust_for_ambient_noise(source)  # Adjust for ambient noise
        audio = recognizer.listen(source)

        try:
            command = recognizer.recognize_google(audio).lower()
            print(f"You said: {command}")
            return command
        except sr.UnknownValueError:
            print("Sorry, I could not understand the audio.")
            return None
        except sr.RequestError:
            print("Could not request results from the speech recognition service.")
            return None


In [32]:
# Function to speak text
def speak(text):
    engine.say(text)
    engine.runAndWait()

In [33]:
import requests

def get_distance():
    try:
        # Send a GET request to the ESP32 to fetch the distance
        response = requests.get(f"http://172.20.51.59/distance")
        if response.status_code == 200:
            distance = float(response.text)  # Assuming the ESP32 returns the distance as a plain text response
            return distance
        else:
            print(f"Failed to fetch distance. Status code: {response.status_code}")
            return None
    except Exception as e:
        print(f"Error fetching distance: {e}")
        return None

In [34]:
def process_stream(target_object):
    cap = cv2.VideoCapture(ESP32_CAM_URL)
    if not cap.isOpened():
        print("Error: Could not open video stream.")
        return

    # Define the desired distance (20 cm)
    desired_distance = 20

    # Define a simple grid for path planning (adjust based on your environment)
    grid = [
        [0, 0, 0, 0, 0],
        [0, 1, 1, 1, 0],
        [0, 0, 0, 0, 0],
        [0, 1, 1, 1, 0],
        [0, 0, 0, 0, 0],
    ]

    start = (0, 0)  # Starting position
    goal = (4, 4)   # Goal position (adjust based on your environment)

    # Find the path using A* algorithm
    path = astar_path_planning(start, goal, grid)
    if not path:
        speak("No path found to the target.")
        return

    speak(f"Searching for the {target_object}.")
    print(f"Path to follow: {path}")

    while True:
        ret, frame = cap.read()
        if not ret:
            print("Error: Failed to capture frame.")
            break

        # Convert frame to PIL image
        pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        # Classify the image
        predicted_class = classify_image(pil_image)

        # Display the frame with the predicted class
        cv2.putText(frame, f"Predicted: {predicted_class}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.imshow("ESP32-CAM Stream", frame)

        # Check if the predicted class matches the target object
        if predicted_class == target_object:
            print(f"Object found: {target_object}")
            speak(f"I found the {target_object}.")
            distance = get_distance()  # Get the distance from the object

            if distance is not None:
                print(f"Distance to object: {distance} cm")

                if distance <= desired_distance:
                    print(f"Object is close enough. Stopping the vehicle.")
                    send_control_command("stop")  # Stop the vehicle
                    cv2.imwrite(f"{target_object}_captured.jpg", frame)  # Save the captured image
                    break
                else:
                    print(f"Moving forward to get closer to the object.")
                    send_control_command("move_forward")  # Move the vehicle forward
            else:
                print("Failed to retrieve distance. Continuing...")
        else:
            # Move along the path
            if path:
                next_step = path.pop(0)
                print(f"Moving to: {next_step}")
                send_control_command("move_forward")  # Replace with actual movement logic

        # Exit on 'q' key press
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

In [43]:
# Main loop
if __name__ == "__main__":
    speak("Hello! What object should I find?")
    while True:
        command = listen_for_command()
        if command:
            target_object = None
            for obj in CLASSES:
                if obj.lower() in command:
                    target_object = obj
                    break

            if target_object:
                process_stream(target_object)
                break
            else:
                speak("Sorry, I didn't understand. Please try again.")
        else:
            speak("Sorry, I didn't catch that. Please try again.")

Listening for command...
Sorry, I could not understand the audio.
Listening for command...
You said: find bottle
Path to follow: [(0, 1), (0, 2), (0, 3), (0, 4), (1, 4), (2, 4), (3, 4), (4, 4)]
Object found: Bottle
Distance to object: 142.66 cm
Moving forward to get closer to the object.
Command sent successfully. Response: Command received: move_forward
Object found: Bottle
Distance to object: 53.7 cm
Moving forward to get closer to the object.
Command sent successfully. Response: Command received: move_forward
Moving to: (0, 1)
Command sent successfully. Response: Command received: move_forward
Moving to: (0, 2)
Command sent successfully. Response: Command received: move_forward
Moving to: (0, 3)
Command sent successfully. Response: Command received: move_forward
Object found: Bottle
Distance to object: 31.87 cm
Moving forward to get closer to the object.
Command sent successfully. Response: Command received: move_forward
Moving to: (0, 4)
Command sent successfully. Response: Command