# Assignment 01
## AI Agents & Environments

### Question 01

#### Code-task (a)

In [2]:
class AutonomousShip:
    def __init__(self):
        self.obstacle_detected = False
        self.weather_conditions = "Clear"
        self.current_speed = 10  # knots
        self.direction = 0  # degrees (0 = North)

    def perceive_environment(self, sensors):
        # Updates agent's perception based on sensor inputs
        self.obstacle_detected = sensors["obstacle"]
        self.weather_conditions = sensors["weather"]
    
    def navigate(self):
        # Makes real-time decisions based on sensor data
        if self.obstacle_detected:
            print("Obstacle detected! Changing course...")
            self.direction += 45  # Simple avoidance maneuver
        elif self.weather_conditions == "Stormy":
            print("Storm detected! Slowing down...")
            self.current_speed = max(5, self.current_speed - 5)
        else:
            print("No threats detected. Maintaining course.")

    def run(self, sensors):
        # Main loop for ship's navigation
        self.perceive_environment(sensors)
        self.navigate()
        print(f"New direction: {self.direction} degrees, Speed: {self.current_speed} knots")

In [3]:
sensors_input = {"obstacle": True, "weather": "Clear"}
ship = AutonomousShip()
ship.run(sensors_input)

Obstacle detected! Changing course...
New direction: 45 degrees, Speed: 10 knots


#### Code-task (b)

In [4]:
def integrate_sensor_data(gps_data, radar_data, sonar_data, weather_data):
    '''
    Processes multiple data sources and determines the best navigation decision.
    
    Parameters:
        gps_data (dict): {'latitude': float, 'longitude': float, 'speed': float}
        radar_data (dict): {'obstacle_distance': float, 'obstacle_direction': int}
        sonar_data (dict): {'depth': float, 'underwater_obstacle': bool}
        weather_data (dict): {'wave_height': float, 'storm_warning': bool}

    Returns:
        str: Recommended navigation action.
    '''
    safety_thresholds = {
        "min_obstacle_distance": 50,  # meters
        "min_depth": 10,  # meters
        "max_wave_height": 4,  # meters
    }
    
    # Collision Avoidance (Highest Priority)
    if radar_data["obstacle_distance"] < safety_thresholds["min_obstacle_distance"]:
        return f"Obstacle ahead! Change course {radar_data['obstacle_direction']} degrees."

    # Underwater Hazard Avoidance
    if sonar_data["underwater_obstacle"] or sonar_data["depth"] < safety_thresholds["min_depth"]:
        return "Shallow water or underwater obstacle detected! Reduce speed and adjust course."

    # Storm Avoidance
    if weather_data["storm_warning"] or weather_data["wave_height"] > safety_thresholds["max_wave_height"]:
        return "Storm warning! Adjust route to avoid bad weather."

    # Normal Navigation
    return "All clear. Maintain current course and speed."

In [5]:
gps_input = {"latitude": 37.7749, "longitude": -122.4194, "speed": 12}
radar_input = {"obstacle_distance": 30, "obstacle_direction": 45}  # Close obstacle
sonar_input = {"depth": 15, "underwater_obstacle": False}
weather_input = {"wave_height": 3, "storm_warning": False}

decision = integrate_sensor_data(gps_input, radar_input, sonar_input, weather_input)
print("Navigation Decision:", decision)

Navigation Decision: Obstacle ahead! Change course 45 degrees.


#### Code-task (c)

In [6]:
import socket
import json
import threading

class AutoShip:
    def __init__(self, name, ip, port):
        self.name = name
        self.ip = ip
        self.port = port
        self.position = {"latitude": 37.7749, "longitude": -122.4194, "speed": 12, "heading": 90}
    
    def send_position(self, target_ip, target_port):
        """ Sends position data to another ship """
        data = json.dumps({"ship_name": self.name, "position": self.position})
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
            s.sendto(data.encode(), (target_ip, target_port))
        print(f"{self.name} sent position data to {target_ip}:{target_port}")

    def receive_position(self):
        """ Listens for position updates from other ships """
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
            s.bind((self.ip, self.port))
            print(f"{self.name} listening on {self.ip}:{self.port} for incoming ship data...")
            while True:
                data, addr = s.recvfrom(1024)
                received_data = json.loads(data.decode())
                print(f"{self.name} received data from {received_data['ship_name']}: {received_data['position']}")

In [7]:
# Ship 1 sending data to Ship 2
ship1 = AutoShip("Ship1", "127.0.0.1", 5001)
ship2 = AutoShip("Ship2", "127.0.0.1", 5002)

# Start a thread to receive position updates on Ship 2
threading.Thread(target=ship2.receive_position, daemon=True).start()

# Ship 1 sends position update to Ship 2
ship1.send_position("127.0.0.1", 5002)

Ship2 listening on 127.0.0.1:5002 for incoming ship data...
Ship2 received data from Ship1: {'latitude': 37.7749, 'longitude': -122.4194, 'speed': 12, 'heading': 90}
Ship1 sent position data to 127.0.0.1:5002


### Question 02

#### Code-task (a)

In [8]:
import random

class OceanicEnvironment:
    def __init__(self):
        self.visibility = "Clear"  # Options: Clear, Foggy, Stormy
        self.weather = "Calm"  # Options: Calm, Windy, Stormy
        self.obstacle_density = "Low"  # Options: Low, Medium, High

    def update_conditions(self):
        """Randomly updates the environmental conditions."""
        self.visibility = random.choice(["Clear", "Foggy", "Stormy"])
        self.weather = random.choice(["Calm", "Windy", "Stormy"])
        self.obstacle_density = random.choice(["Low", "Medium", "High"])

    def get_environment_state(self):
        """Returns the current state of the oceanic environment."""
        return {
            "visibility": self.visibility,
            "weather": self.weather,
            "obstacle_density": self.obstacle_density,
        }

    def display_conditions(self):
        """Prints the current oceanic conditions."""
        print(f"Visibility: {self.visibility}, Weather: {self.weather}, Obstacle Density: {self.obstacle_density}")

In [9]:
ocean = OceanicEnvironment()
ocean.display_conditions()  # Initial state

# Simulating environmental changes
ocean.update_conditions()
ocean.display_conditions()

Visibility: Clear, Weather: Calm, Obstacle Density: Low
Visibility: Foggy, Weather: Windy, Obstacle Density: Medium


#### Code-task (b)

In [10]:
def adaptive_navigation(environment, ship_status):
    """
    Adjusts ship's course based on real-time environmental data.

    Parameters:
        environment (dict): {'visibility': str, 'weather': str, 'obstacle_density': str}
        ship_status (dict): {'speed': float, 'heading': int}

    Returns:
        dict: Updated ship speed and heading.
    """
    updated_status = ship_status.copy()

    # Safety thresholds
    safe_speeds = {"Calm": 20, "Windy": 15, "Stormy": 8}  # Knots
    heading_adjustment = {"Low": 5, "Medium": 15, "High": 30}  # Degrees

    # Adjust speed based on weather
    updated_status["speed"] = min(ship_status["speed"], safe_speeds[environment["weather"]])

    # Adjust heading if obstacles are dense
    if environment["obstacle_density"] in heading_adjustment:
        updated_status["heading"] += heading_adjustment[environment["obstacle_density"]]

    # Special case: If visibility is bad, slow down further
    if environment["visibility"] in ["Foggy", "Stormy"]:
        updated_status["speed"] *= 0.8  # Reduce speed by 20%

    return updated_status

In [11]:
environment_data = {"visibility": "Foggy", "weather": "Stormy", "obstacle_density": "High"}
ship_data = {"speed": 18, "heading": 90}  # Initial conditions

new_ship_status = adaptive_navigation(environment_data, ship_data)
print("Updated Ship Status:", new_ship_status)

Updated Ship Status: {'speed': 6.4, 'heading': 120}


#### Code-task (c)

In [12]:
def safe_navigation(environment, traffic_density, obstacle_distance, current_speed, current_heading):
    """
    Determines the safest navigation decision based on environmental conditions.

    Parameters:
        environment (dict): {'visibility': str, 'weather': str}
        traffic_density (str): 'Low', 'Medium', 'High'
        obstacle_distance (float): Distance to the nearest obstacle (in meters)
        current_speed (float): Ship's current speed (knots)
        current_heading (int): Ship's current heading (degrees)

    Returns:
        dict: Updated navigation parameters (new speed and heading).
    """
    updated_speed = current_speed
    updated_heading = current_heading

    # Adjust speed based on weather conditions
    if environment["weather"] == "Stormy":
        updated_speed *= 0.7  # Reduce speed by 30%
    elif environment["weather"] == "Windy":
        updated_speed *= 0.9  # Reduce speed by 10%

    # Adjust speed and heading based on traffic density
    if traffic_density == "High":
        updated_speed *= 0.8  # Reduce speed by 20%
        updated_heading += 15  # Slight deviation to avoid congestion
    elif traffic_density == "Medium":
        updated_speed *= 0.9  # Reduce speed slightly

    # Immediate action for obstacle avoidance
    if obstacle_distance < 200:  # If an obstacle is within 200 meters
        updated_speed *= 0.6  # Reduce speed significantly
        updated_heading += 30  # Sharp turn to avoid collision

    return {"new_speed": updated_speed, "new_heading": updated_heading}

In [13]:
environment_data = {"visibility": "Foggy", "weather": "Stormy"}
traffic_scenario_1 = safe_navigation(environment_data, "High", 150, 18, 90)  # High traffic, obstacle nearby
traffic_scenario_2 = safe_navigation(environment_data, "Low", 500, 18, 90)   # Low traffic, no immediate obstacles

print("Scenario 1 (High Traffic, Obstacle Near):", traffic_scenario_1)
print("Scenario 2 (Low Traffic, No Obstacle):", traffic_scenario_2)

Scenario 1 (High Traffic, Obstacle Near): {'new_speed': 6.048, 'new_heading': 135}
Scenario 2 (Low Traffic, No Obstacle): {'new_speed': 12.6, 'new_heading': 90}
