TASK 2: Robotics – Warehouse Robot Navigation
Autonomous robots navigate warehouse aisles, detect obstacles using sensors, and optimize paths using reinforcement learning. Intelligence runs on edge devices, with cloud monitoring.

Problem statement-Modern warehouses increasingly rely on autonomous robots to transport goods, manage inventory, and improve operational efficiency. These robots must safely navigate narrow aisles, avoid static and dynamic obstacles, and reach target locations without collisions. Traditional rule-based navigation systems are insufficient in complex and dynamic warehouse environments.

This project focuses on designing an intelligent warehouse robot navigation system using sensor data (LIDAR and ultrasonic sensors) combined with machine learning and reinforcement learning techniques. A neural network–based obstacle detection model is used to predict potential collisions, while a reinforcement learning approach optimizes the robot’s navigation path by maximizing safety and efficiency.

The solution follows an edge-cloud architecture, where the model is trained in AWS SageMaker and deployed on robot edge devices using AWS Greengrass. Real-time inference is performed locally on the robot to minimize latency, while critical events such as collision alerts are sent to the cloud for monitoring.

The objective of this project is to build a scalable, low-latency, and intelligent robotic navigation system suitable for real-world warehouse automation.

In [57]:
#This cell connects SageMaker to S3 and prepares the dataset for training.
import sagemaker
from sagemaker import get_execution_role

role = get_execution_role()
session = sagemaker.Session()

s3_input_path = "s3://warehouse-robot-dataset/path/to/data/"

training_input = sagemaker.inputs.TrainingInput(
    s3_data=s3_input_path,
    content_type="csv" 
)


In [60]:
#This code downloads all files from a specified S3 bucket folder to a local SageMaker directory for local processing.
import boto3
import os

bucket = "warehouse-robot-dataset"
prefix = "path/to/data/"
local_dir = "/home/ec2-user/SageMaker/data"

os.makedirs(local_dir, exist_ok=True)

s3 = boto3.client("s3")

response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)

for obj in response.get("Contents", []):
    file_name = os.path.basename(obj["Key"])
    if file_name:
        s3.download_file(
            bucket,
            obj["Key"],
            os.path.join(local_dir, file_name)
        )

print("Download complete.")


Download complete.


In [65]:
#This code uploads the local dataset from the SageMaker directory to an S3 bucket and returns the S3 URI so it can be used for training or processing jobs.
import sagemaker

session = sagemaker.Session()

local_path = "/home/ec2-user/SageMaker/data"
s3_uri = session.upload_data(
    path=local_path,
    bucket="warehouse-robot-dataset",
    key_prefix="data/autonomous_navigation_dataset.csv"
)

print("Uploaded to:", s3_uri)


Uploaded to: s3://warehouse-robot-dataset/data/autonomous_navigation_dataset.csv


In [76]:
#This code lists and prints all object (file) names stored in the specified S3 bucket.
import boto3

s3 = boto3.client("s3")

response = s3.list_objects_v2(
    Bucket="warehouse-robot-dataset",
    Prefix=""
)

for obj in response.get("Contents", []):
    print(obj["Key"])


autonomous_navigation_dataset.csv


In [80]:
# To read the dataset
import pandas as pd

df = pd.read_csv(
    "s3://warehouse-robot-dataset/autonomous_navigation_dataset.csv"
)

df.head()


Unnamed: 0,run_id,timestamp,lidar_min,lidar_max,ultrasonic_left,ultrasonic_right,x,y,orientation,battery_level,collision_flag,path_smoothness,time_taken,target
0,1,0,0.76,3.04,0.97,1.63,0.13,0.1,316.54,98.9,0,0.83,34,timeout
1,1,1,2.16,4.84,0.95,0.82,0.14,0.43,253.94,98.32,0,0.83,34,timeout
2,1,2,1.48,4.97,1.11,1.37,0.17,0.88,108.54,97.96,0,0.83,34,timeout
3,1,3,0.66,4.32,1.24,0.86,0.46,1.24,314.48,97.26,0,0.83,34,timeout
4,1,4,0.34,4.96,1.46,1.86,0.59,1.41,233.75,96.87,0,0.83,34,timeout


In [82]:
df.shape

(3135, 14)

In [7]:
#Obstacle Detection Model (Sensor-Based)
#This class creates a custom PyTorch dataset that converts warehouse sensor data into input features and collision labels for training a machine learning model.
import torch
from torch.utils.data import Dataset

class WarehouseSensorDataset(Dataset):
    def __init__(self, df):
        self.features = df[
            ["lidar_min", "lidar_max",
             "ultrasonic_left", "ultrasonic_right",
             "x", "y", "orientation", "battery_level"]
        ].values

        self.labels = df["collision_flag"].values

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        x = torch.tensor(self.features[idx], dtype=torch.float32)
        y = torch.tensor(self.labels[idx], dtype=torch.long)
        return x, y


  import pynvml  # type: ignore[import]


In [8]:
#✅ Obstacle Detection Model (MLP)
#This neural network model takes sensor inputs and predicts whether an obstacle or collision is present using fully connected layers.
import torch.nn as nn

class ObstacleDetector(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(8, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 2)
        )

    def forward(self, x):
        return self.net(x)


In [9]:
#Train Obstacle Detector
#This code trains the obstacle detection model by loading sensor data in batches, calculating prediction loss, and updating the model weights using backpropagation.
from torch.utils.data import DataLoader
import torch

dataset = WarehouseSensorDataset(df)
loader = DataLoader(dataset, batch_size=32, shuffle=True)

model = ObstacleDetector()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()

for epoch in range(5):
    for x, y in loader:
        optimizer.zero_grad()
        loss = loss_fn(model(x), y)
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1} Loss: {loss.item():.4f}")


Epoch 1 Loss: 0.0322
Epoch 2 Loss: 0.0179
Epoch 3 Loss: 0.0317
Epoch 4 Loss: 0.0085
Epoch 5 Loss: 0.0086


In [10]:
#STEP 3: Reinforcement Learning for Path Optimization (DQN)
#This class defines a custom environment for reinforcement learning. It provides the robot's current state (sensor readings), computes a reward based on path smoothness, collisions, and time, and moves to the next step in the dataset.
class WarehouseEnv:
    def __init__(self, df):
        self.df = df.reset_index(drop=True)
        self.idx = 0
        self.state_cols = [
            "lidar_min", "lidar_max",
            "ultrasonic_left", "ultrasonic_right",
            "x", "y", "orientation", "battery_level"
        ]

    def reset(self):
        self.idx = 0
        return self._state()

    def _state(self):
        return self.df.loc[self.idx, self.state_cols].values.astype("float32")

    def step(self, action):
        row = self.df.loc[self.idx]

        reward = (
            row["path_smoothness"] * 2
            - row["collision_flag"] * 10
            - row["time_taken"] * 0.01
        )

        self.idx += 1
        done = self.idx >= len(self.df) - 1
        return self._state(), reward, done


In [11]:
#DQN Model
#This is a Deep Q-Network (DQN) model for reinforcement learning. It takes the robot’s sensor state as input and outputs Q-values for four possible actions (forward, left, right, stop) to decide the best move.
class DQN(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(8, 128),
            nn.ReLU(),
            nn.Linear(128, 4)
        )

    def forward(self, x):
        return self.net(x)


In [13]:
env = WarehouseEnv(df)


In [14]:
state = env.reset()


In [15]:
#This code runs the trained DQN model in evaluation mode on the current state without updating weights. It computes Q-values for all possible actions and selects the action with the highest value.
import torch

model.eval()

with torch.no_grad():
    state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    action = model(state_tensor).argmax(dim=1).item()

print("Selected action:", action)


Selected action: 0


In [16]:
#This code executes the selected action in the environment, returning the next state, the reward received for that action, and whether the episode has finished.
next_state, reward, done = env.step(action)

print("Reward:", reward)
print("Done:", done)


Reward: 1.3199999999999998
Done: False


In [17]:
#state = env.reset()
total_reward = 0

while True:
    with torch.no_grad():
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        action = model(state_tensor).argmax(dim=1).item()

    state, reward, done = env.step(action)
    total_reward += reward

    if done:
        break

print("Total Episode Reward:", total_reward)
state = env.reset()
total_reward = 0

while True:
    with torch.no_grad():
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        action = model(state_tensor).argmax(dim=1).item()

    state, reward, done = env.step(action)
    total_reward += reward

    if done:
        break

print("Total Episode Reward:", total_reward)


Total Episode Reward: 1941.9899999999761


Warehouse Robot Navigation using AI & IoT

This project focuses on autonomous warehouse robot navigation using sensor data and reinforcement learning. Robots are equipped with LIDAR, ultrasonic sensors, and cameras to perceive the environment and detect obstacles in real time. Sensor data is processed by an obstacle detection model to identify potential collisions.

A reinforcement learning (DQN) agent optimizes navigation decisions such as moving forward, turning, or stopping. The agent receives rewards based on path smoothness, collision avoidance, and time efficiency. This ensures safe and optimal navigation within warehouse aisles.

Inference is deployed on edge devices for low latency decision-making. Cloud platforms such as AWS Greengrass or Azure IoT Hub are used for monitoring, alerting, and performance logging. Experimental results show a high cumulative reward, indicating efficient and collision-free navigation.

In [18]:
#Saves trained RL model

#This file will be deployed to edge device
torch.save(model.state_dict(), "warehouse_robot_dqn.pt")


In [19]:
#Upload Model to S3
import sagemaker

session = sagemaker.Session()

s3_model_path = session.upload_data(
    path="warehouse_robot_dqn.pt",
    bucket="warehouse-robot-dataset",
    key_prefix="greengrass/models"
)

print("Model uploaded to:", s3_model_path)


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml
Model uploaded to: s3://warehouse-robot-dataset/greengrass/models/warehouse_robot_dqn.pt


In [20]:
#Download Model on Edge Device (Greengrass Core)
import boto3

s3 = boto3.client("s3")

s3.download_file(
    "warehouse-robot-dataset",
    "greengrass/models/warehouse_robot_dqn.pt",
    "warehouse_robot_dqn.pt"
)


In [22]:
import torch
import torch.nn as nn

class DQN(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(8, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 4)   # 4 actions
        )

    def forward(self, x):
        return self.net(x)


In [24]:
model = ObstacleDetector()
model.load_state_dict(
    torch.load("warehouse_robot_dqn.pt", map_location="cpu")
)
model.eval()


ObstacleDetector(
  (net): Sequential(
    (0): Linear(in_features=8, out_features=64, bias=True)
    (1): ReLU()
    (2): Linear(in_features=64, out_features=32, bias=True)
    (3): ReLU()
    (4): Linear(in_features=32, out_features=2, bias=True)
  )
)

In [25]:
def infer_collision(state):
    with torch.no_grad():
        x = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        pred = model(x).argmax(dim=1).item()
    return pred  # 1 = collision


In [26]:
if infer_collision(state) == 1:
    send_alert({"alert": "Collision detected"})


Summary-This project implements an autonomous warehouse robot navigation system using sensor-driven machine learning and reinforcement learning techniques. LIDAR and ultrasonic sensor data were processed to construct a state representation for obstacle detection and navigation decision-making. A supervised neural network model was trained in AWS SageMaker to classify collision risk, while a reinforcement learning–based policy optimized navigation actions by maximizing a reward function that penalizes collisions and inefficient paths.

The trained models were prepared for deployment using an edge-cloud architecture, where AWS Greengrass enables low-latency inference directly on robotic edge devices. This approach minimizes cloud dependency during real-time operation while allowing cloud-based monitoring and alerting for collision and path anomalies. The system supports scalable deployment, real-time decision-making, and reliable monitoring, making it suitable for intelligent warehouse automation scenarios.