# Data Collection via Guided Navigation

In [None]:
# %pip install torch torchvision opencv-python Pillow ipywidgets numpy ipython

In [None]:
# Import necessary libraries
import threading
import time
import torch
import torchvision.transforms as transforms
import numpy as np
import PIL.Image
import cv2
from jetbot import Robot
import ipywidgets as widgets
from IPython.display import display
from JetbotUsbCamera import JetbotUsbCamera as usbCam
import os
import csv
from datetime import datetime
from pathlib import Path

In [None]:
class RobotCamera:
    def __init__(self):
        self.camera = usbCam(device=0, width=224, height=224)
        self.robot = Robot()
        self.data_dir = Path('collected_data')
        self.data_dir.mkdir(exist_ok=True)
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.mean = torch.Tensor([0.485, 0.456, 0.406]).to(self.device)
        self.std = torch.Tensor([0.229, 0.224, 0.225]).to(self.device)
        self.stop_requested = False
        self.move_up = False
        self.move_down = False
        self.move_left = False
        self.move_right = False
        self.robot_active = False
        self.capturing = False
        self.master_switch = False
        self.directions_active = {'up': False, 'down': False, 'left': False, 'right': False, 'obstacle': False}
        self.saved_images = []
        self.count = {'up': 0, 'down': 0, 'left': 0, 'right': 0, 'obstacle': 0}

    def preprocess(self, image):
        image = PIL.Image.fromarray(image)
        image = transforms.functional.to_tensor(image).to(self.device)
        image.sub_(self.mean[:, None, None]).div_(self.std[:, None, None])
        return image[None, ...]

    def display_arrows(self, camera_image, direction):
        image = np.copy(camera_image)
        arrow_color = (0, 255, 0)  # Green
        glow_color = (0, 255, 255)  # Yellow

        overlay = image.copy()
        if direction == 'up':
            cv2.arrowedLine(overlay, (112, 200), (112, 24), glow_color, 3, tipLength=0.5)
        elif direction == 'down':
            cv2.arrowedLine(overlay, (112, 24), (112, 200), glow_color, 3, tipLength=0.5)
        else:
            cv2.arrowedLine(overlay, (112, 200), (112, 24), arrow_color, 3, tipLength=0.5)
            cv2.arrowedLine(overlay, (112, 24), (112, 200), arrow_color, 3, tipLength=0.5)

        if direction == 'left':
            cv2.arrowedLine(overlay, (200, 112), (24, 112), glow_color, 3, tipLength=0.5)
        else:
            cv2.arrowedLine(overlay, (200, 112), (24, 112), arrow_color, 3, tipLength=0.5)

        if direction == 'right':
            cv2.arrowedLine(overlay, (24, 112), (200, 112), glow_color, 3, tipLength=0.5)
        else:
            cv2.arrowedLine(overlay, (24, 112), (200, 112), arrow_color, 3, tipLength=0.5)

        cv2.addWeighted(overlay, 0.6, image, 0.4, 0, image)

        jpeg_image = cv2.imencode('.jpg', image)[1].tobytes()
        return jpeg_image

    def update_image(self):
        while not self.stop_requested:
            frame = self.camera.read()
            if frame is not None:
                target_widget.value = self.display_arrows(frame, self.active_direction())
            if self.capturing:
                if self.move_up and self.directions_active['up']:
                    self.save_frame(frame, 'up')
                if self.move_down and self.directions_active['down']:
                    self.save_frame(frame, 'down')
                if self.move_left and self.directions_active['left']:
                    self.save_frame(frame, 'left')
                if self.move_right and self.directions_active['right']:
                    self.save_frame(frame, 'right')
            self.check_for_obstacle(frame)
            time.sleep(0.1)

    def active_direction(self):
        if self.move_up:
            return 'up'
        if self.move_down:
            return 'down'
        if self.move_left:
            return 'left'
        if self.move_right:
            return 'right'
        return 'none'

    def save_frame(self, frame, direction):
        if frame is not None and self.count[direction] < 300:
            frame_with_arrows = self.display_arrows(frame, direction)
            frame_with_arrows = np.frombuffer(frame_with_arrows, dtype=np.uint8)
            frame_with_arrows = cv2.imdecode(frame_with_arrows, cv2.IMREAD_COLOR)
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S%f')
            image_path = self.data_dir / f"{timestamp}.jpg"
            cv2.imwrite(str(image_path), frame_with_arrows)

            with open(self.data_dir / 'labels.csv', 'a', newline='') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow([image_path, direction])

            photo_count_text.value += 1
            self.saved_images.append(image_path)
            self.count[direction] += 1

            # Update individual direction counts
            if direction == 'up':
                up_count_text.value = self.count[direction]
            elif direction == 'down':
                down_count_text.value = self.count[direction]
            elif direction == 'left':
                left_count_text.value = self.count[direction]
            elif direction == 'right':
                right_count_text.value = self.count[direction]
            elif direction == 'obstacle':
                obstacle_count_text.value = self.count[direction]

    def delete_last_photo(self, _):
        if self.saved_images:
            last_image_path = self.saved_images.pop()
            last_image_path.unlink()

            with open(self.data_dir / 'labels.csv', 'r') as csvfile:
                rows = list(csv.reader(csvfile))
            last_label = [row[1] for row in rows if row[0] == str(last_image_path)][0]
            rows = [row for row in rows if row[0] != str(last_image_path)]
            with open(self.data_dir / 'labels.csv', 'w', newline='') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerows(rows)

            photo_count_text.value -= 1
            self.count[last_label] -= 1

            # Update individual direction counts
            if last_label == 'up':
                up_count_text.value = self.count[last_label]
            elif last_label == 'down':
                down_count_text.value = self.count[last_label]
            elif last_label == 'left':
                left_count_text.value = self.count[last_label]
            elif last_label == 'right':
                right_count_text.value = self.count[last_label]
            elif last_label == 'obstacle':
                obstacle_count_text.value = self.count[last_label]

    def delete_all_photos(self, _):
        for image_path in self.saved_images:
            image_path.unlink()

        self.saved_images.clear()

        with open(self.data_dir / 'labels.csv', 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerows([])

        photo_count_text.value = 0
        for key in self.count.keys():
            self.count[key] = 0

        # Reset individual direction counts
        up_count_text.value = 0
        down_count_text.value = 0
        left_count_text.value = 0
        right_count_text.value = 0
        obstacle_count_text.value = 0

    def save_obstacle_frame(self, _):
        if self.master_switch and self.directions_active['obstacle']:
            frame = self.camera.read()
            self.save_frame(frame, "obstacle")

    def check_for_obstacle(self, frame):
        if "obstacle" in self.detect_obstacle(frame):
            self.move_up = self.move_down = self.move_left = self.move_right = False
            self.robot.left_motor.value = 0
            self.robot.right_motor.value = 0
        else:
            if self.move_up:
                self.robot.left_motor.value = speed_slider.value
                self.robot.right_motor.value = speed_slider.value

    def detect_obstacle(self, frame):
        return []

    def stop_camera(self, b):
        self.stop_requested = True
        self.camera.stop()
        self.stop_robot(b)
        camera_active_indicator.button_style = ''
        print("Camera stopped.")

    def start_camera(self, b):
        self.stop_requested = False
        self.camera.start()
        update_thread = threading.Thread(target=self.update_image)
        update_thread.start()
        camera_active_indicator.button_style = 'success'
        print("Camera started.")

    def move_forward_toggle(self, _):
        if self.robot_active:
            self.move_up = not self.move_up
            if self.move_up:
                threading.Thread(target=self.move_forward).start()
            else:
                self.robot.left_motor.value = 0
                self.robot.right_motor.value = 0

    def move_forward(self):
        while self.move_up and self.robot_active:
            speed = speed_slider.value
            self.robot.left_motor.value = speed
            self.robot.right_motor.value = speed
            time.sleep(0.1)

    def move_backward_toggle(self, _):
        if self.robot_active:
            self.move_down = not self.move_down
            if self.move_down:
                threading.Thread(target=self.move_backward).start()
            else:
                self.robot.left_motor.value = 0
                self.robot.right_motor.value = 0

    def move_backward(self):
        while self.move_down and self.robot_active:
            speed = speed_slider.value
            self.robot.left_motor.value = -speed
            self.robot.right_motor.value = -speed
            time.sleep(0.1)

    def turn_left_toggle(self, _):
        if self.robot_active:
            self.move_left = not self.move_left
            if self.move_left:
                threading.Thread(target=self.turn_left).start()
            else:
                self.robot.left_motor.value = 0
                self.robot.right_motor.value = 0

    def turn_left(self):
        while self.move_left and self.robot_active:
            speed = speed_slider.value
            self.robot.left_motor.value = -speed
            self.robot.right_motor.value = speed
            time.sleep(0.1)

    def turn_right_toggle(self, _):
        if self.robot_active:
            self.move_right = not self.move_right
            if self.move_right:
                threading.Thread(target=self.turn_right).start()
            else:
                self.robot.left_motor.value = 0
                self.robot.right_motor.value = 0

    def turn_right(self):
        while self.move_right and self.robot_active:
            speed = speed_slider.value
            self.robot.left_motor.value = speed
            self.robot.right_motor.value = -speed
            time.sleep(0.1)

    def stop_robot(self, _):
        self.move_up = self.move_down = self.move_left = self.move_right = False
        self.robot.left_motor.value = 0
        self.robot.right_motor.value = 0
        self.robot_active = False
        robot_active_indicator.button_style = ''
        print("Robot stopped.")

    def start_robot(self, _):
        if self.camera.cap.isOpened():
            self.robot_active = True
            robot_active_indicator.button_style = 'success'
            print("Robot started.")
        else:
            print("Camera is not on. Cannot start the robot.")

    def toggle_master_capturing(self, _):
        self.master_switch = not self.master_switch
        self.update_master_indicator()
        self.capturing = any(self.directions_active.values()) if self.master_switch else False
        if not self.master_switch:
            for direction in self.directions_active.keys():
                self.directions_active[direction] = False
                self.update_direction_indicator(direction)

    def toggle_direction_capturing(self, direction):
        if self.master_switch:
            self.directions_active[direction] = not self.directions_active[direction]
            self.update_direction_indicator(direction)
            self.capturing = any(self.directions_active.values())

    def update_master_indicator(self):
        master_indicator.button_style = 'success' if self.master_switch else ''

    def update_direction_indicator(self, direction):
        direction_indicators[direction].button_style = 'success' if self.directions_active[direction] else ''


In [None]:
# Widgets
target_widget = widgets.Image(format='jpeg', width=224, height=224)
speed_slider = widgets.FloatSlider(min=0.0, max=1.0, step=0.01, value=0.3, description='speed')
speed_text = widgets.FloatText(value=0.3, description='speed')
photo_count_text = widgets.IntText(value=0, description='Total Photos:', disabled=True)
up_count_text = widgets.IntText(value=0, description='Up:', disabled=True)
down_count_text = widgets.IntText(value=0, description='Down:', disabled=True)
left_count_text = widgets.IntText(value=0, description='Left:', disabled=True)
right_count_text = widgets.IntText(value=0, description='Right:', disabled=True)
obstacle_count_text = widgets.IntText(value=0, description='Obstacle:', disabled=True)

robot_active_indicator = widgets.Button(description='', button_style='success', layout=widgets.Layout(width='20px', height='20px'), disabled=True)
robot_active_indicator.button_style = ''

capturing_indicator = widgets.Button(description='', button_style='success', layout=widgets.Layout(width='20px', height='20px'), disabled=True)
capturing_indicator.button_style = ''

camera_active_indicator = widgets.Button(description='', button_style='success', layout=widgets.Layout(width='20px', height='20px'), disabled=True)
camera_active_indicator.button_style = ''

start_camera_button = widgets.Button(description='Start Camera', button_style='success')
stop_camera_button = widgets.Button(description='Stop Camera', button_style='danger')
start_robot_button = widgets.Button(description='Start Robot', button_style='success')
stop_robot_button = widgets.Button(description='Stop Robot', button_style='warning')
delete_last_photo_button = widgets.Button(description='Delete Last Photo', button_style='warning')
delete_all_photos_button = widgets.Button(description='Delete All Photos', button_style='danger')
obstacle_button = widgets.Button(description='Obstacle', button_style='danger')

up_button = widgets.Button(description='↑', layout=widgets.Layout(width='50px', height='50px'))
down_button = widgets.Button(description='↓', layout=widgets.Layout(width='50px', height='50px'))
left_button = widgets.Button(description='←', layout=widgets.Layout(width='50px', height='50px'))
right_button = widgets.Button(description='→', layout=widgets.Layout(width='50px', height='50px'))

master_button = widgets.Button(description='Start', button_style='success', layout=widgets.Layout(width='60px', height='30px'))
master_stop_button = widgets.Button(description='Stop', button_style='warning', layout=widgets.Layout(width='60px', height='30px'))
master_indicator = widgets.Button(description='', button_style='', layout=widgets.Layout(width='20px', height='20px'), disabled=True)

direction_buttons = {
    'up': widgets.Button(description='Start', button_style='success', layout=widgets.Layout(width='60px', height='30px')),
    'left': widgets.Button(description='Start', button_style='success', layout=widgets.Layout(width='60px', height='30px')),
    'right': widgets.Button(description='Start', button_style='success', layout=widgets.Layout(width='60px', height='30px')),
    'down': widgets.Button(description='Start', button_style='success', layout=widgets.Layout(width='60px', height='30px')),
    'obstacle': widgets.Button(description='Start', button_style='success', layout=widgets.Layout(width='60px', height='30px'))
}
direction_stop_buttons = {
    'up': widgets.Button(description='Stop', button_style='warning', layout=widgets.Layout(width='60px', height='30px')),
    'left': widgets.Button(description='Stop', button_style='warning', layout=widgets.Layout(width='60px', height='30px')),
    'right': widgets.Button(description='Stop', button_style='warning', layout=widgets.Layout(width='60px', height='30px')),
    'down': widgets.Button(description='Stop', button_style='warning', layout=widgets.Layout(width='60px', height='30px')),
    'obstacle': widgets.Button(description='Stop', button_style='warning', layout=widgets.Layout(width='60px', height='30px'))
}
direction_indicators = {
    'up': widgets.Button(description='', button_style='', layout=widgets.Layout(width='20px', height='20px'), disabled=True),
    'left': widgets.Button(description='', button_style='', layout=widgets.Layout(width='20px', height='20px'), disabled=True),
    'right': widgets.Button(description='', button_style='', layout=widgets.Layout(width='20px', height='20px'), disabled=True),
    'down': widgets.Button(description='', button_style='', layout=widgets.Layout(width='20px', height='20px'), disabled=True),
    'obstacle': widgets.Button(description='', button_style='', layout=widgets.Layout(width='20px', height='20px'), disabled=True)
}

In [None]:
robot_camera = RobotCamera()

master_button.on_click(robot_camera.toggle_master_capturing)
master_stop_button.on_click(robot_camera.toggle_master_capturing)

for direction, button in direction_buttons.items():
    button.on_click(lambda b, d=direction: robot_camera.toggle_direction_capturing(d))
for direction, button in direction_stop_buttons.items():
    button.on_click(lambda b, d=direction: robot_camera.toggle_direction_capturing(d))

def update_speed(change):
    speed_text.value = speed_slider.value

def update_slider(change):
    speed_slider.value = speed_text.value

speed_slider.observe(update_speed, names='value')
speed_text.observe(update_slider, names='value')

start_camera_button.on_click(robot_camera.start_camera)
stop_camera_button.on_click(robot_camera.stop_camera)
up_button.on_click(robot_camera.move_forward_toggle)
down_button.on_click(robot_camera.move_backward_toggle)
left_button.on_click(robot_camera.turn_left_toggle)
right_button.on_click(robot_camera.turn_right_toggle)
stop_robot_button.on_click(robot_camera.stop_robot)
start_robot_button.on_click(robot_camera.start_robot)
delete_last_photo_button.on_click(robot_camera.delete_last_photo)
delete_all_photos_button.on_click(robot_camera.delete_all_photos)
obstacle_button.on_click(robot_camera.save_obstacle_frame)

display(target_widget)
display(widgets.HBox([
    widgets.VBox([
        speed_slider,
        speed_text,
        widgets.HBox([left_button, up_button, right_button, down_button]),
        widgets.HBox([start_camera_button, camera_active_indicator, stop_camera_button, start_robot_button, robot_active_indicator, stop_robot_button]),
        delete_last_photo_button,
        delete_all_photos_button,
        obstacle_button
    ]),
    widgets.VBox([
        widgets.HBox([photo_count_text, master_button, master_stop_button, master_indicator]),
        widgets.HBox([up_count_text, direction_buttons['up'], direction_stop_buttons['up'], direction_indicators['up']]),
        widgets.HBox([left_count_text, direction_buttons['left'], direction_stop_buttons['left'], direction_indicators['left']]),
        widgets.HBox([right_count_text, direction_buttons['right'], direction_stop_buttons['right'], direction_indicators['right']]),
        widgets.HBox([down_count_text, direction_buttons['down'], direction_stop_buttons['down'], direction_indicators['down']]),
        widgets.HBox([obstacle_count_text, direction_buttons['obstacle'], direction_stop_buttons['obstacle'], direction_indicators['obstacle']])
    ])
]))

# Zipping needs

In [None]:
import shutil

# Path to the folder you want to zip
folder_to_zip = '/workspace/jetbot/notebooks/testing/collected_data'

# Path where the zip file will be saved
output_filename = '/workspace/jetbot/notebooks/testing/collected_data_4thAUG_Daytime.zip'

# Create a zip file
shutil.make_archive(output_filename.replace('.zip', ''), 'zip', folder_to_zip)

print("Folder zipped successfully!")

In [None]:
# Find Current directory
import os

# Get the current working directory
current_directory = os.getcwd()

# Print the current working directory
print("Current Working Directory:", current_directory)