In [None]:
# The following notebook explains the provided Python code, which is for managing a fleet of robots (Robotinos) and their charging processes.
# The code uses socket communication, threading, and logging to manage a fleet of robots, allocate chargers, and send messages.

# 1. Importing Required Libraries
import socket
import threading
import time
import math
import logging

In [None]:
1. Library Imports
The code imports the necessary libraries to manage socket communication, threading, time delays, mathematical operations, and logging.

socket: Enables network communication for receiving and sending data over the network.
threading: Used to handle multiple tasks (such as handling messages from clients and sending commands) concurrently.
time: Provides utilities like delays between actions.
math: Contains mathematical functions like calculating Euclidean distance.
logging: Allows us to log information and errors during the program's execution.

In [None]:
# 2. Constants and Configuration Variables

HOST = '0.0.0.0' # Host to listen on
PORT = 13000 # Port for server to listen on
JobId = 40 # Starting JobId
MAX_BUFFER_SIZE = 4096 # Max buffer size for incoming messages
CURRENT_ROBOTINO_STATE = None

In [None]:
2. Global Variables and Configuration
Here, the code defines a few important constants:

HOST and PORT: Define the server's listening address and port.
JobId: Initialized to 40, this is used to uniquely identify job commands for robots.
MAX_BUFFER_SIZE: Sets the buffer size for receiving data.

In [None]:
# 3. Logging Configuration
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s [%(levelname)s] [%(threadName)s] %(message)s",
    handlers=[
        logging.FileHandler("charging_selection.log"), # Logs to a file
        logging.StreamHandler() # Logs to the console
    ]
)
logging.info("initialized logging")

In [None]:
3. Logging Setup
This block configures logging to print messages to both the console and a log file (charging_selection.log).
Logs are generated at the DEBUG level, which captures detailed logs during development.

In [None]:
# 4. Robot and Charger Configuration Data

charger_configurations = {
    11: {"X": -15.262, "Y": 0.853, "type": 3},
    12: {"X": -14.215, "Y": 0.830, "type": 3},
    13: {"X": -0.162, "Y": 1.525, "type": 3},
    14: {"X": -8.677, "Y": -0.988, "type": 3},
    17: {"X": -0.504, "Y": -0.949, "type": 4},
    18: {"X": -0.276, "Y": 0.067, "type": 4},
}

robotino_configurations = {
    20: {"type": 3},
    21: {"type": 3},
    22: {"type": 3},
    23: {"type": 3},
    24: {"type": 4},
    25: {"type": 4},
}

In [None]:
4. Charger and Robot Configurations
Chargers: The dictionary charger_configurations contains information about the chargers, including their X, Y coordinates and their type.
Robotinos: The dictionary robotino_configurations contains the types of robots available in the fleet.

In [None]:
# 5. Fleet State Initialization

fleet_state = {}  # Dictionary to store robots' battery, position, and other state information

In [None]:
5. Fleet State Management
fleet_state is an empty dictionary that will later hold information about each robot in the fleet (such as its battery level, position, etc.).

In [None]:
def calculate_distance(x1, y1, x2, y2):
    """
    Calculates the Euclidean distance between two points (x1, y1) and (x2, y2).
    """
    return math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)

In [None]:
calculate_distance: This function computes the Euclidean distance between two points, given their X and Y coordinates.

In [None]:
def is_charger_occupied(charger_id):
    """
    Determines if a charger is occupied based on the current fleet state.
    """
    # Fetch charger coordinates and type
    charger_coords = charger_configurations[charger_id]
    charger_x, charger_y = charger_coords["X"], charger_coords["Y"]
    
    # Check if any robot is within 20cm of the charger
    for robot_id, robot_info in fleet_state.items():
        robot_x, robot_y = robot_info['x'], robot_info['y']
        distance = calculate_distance(charger_x, charger_y, robot_x, robot_y)
        if distance <= 0.2:  # 20 cm threshold
            return True  # Charger is occupied
    return False  # Charger is not occupied


In [None]:
is_charger_occupied:
This function checks if a charger is occupied by a robot. It compares the robot's distance to the charger and considers the charger occupied if any robot is within 20 cm of the charger.



In [None]:
def find_closest_free_charger(robot_id):
    """
    Finds the closest unoccupied charger for the given Robotino ID based on its position.
    """
    robot_x = fleet_state[robot_id]['x']
    robot_y = fleet_state[robot_id]['y']
    
    closest_charger = None
    shortest_distance = float('inf')

    # Loop over chargers to find the closest unoccupied one
    for charger_id, charger_config in charger_configurations.items():
        if not is_charger_occupied(charger_id) and charger_config["type"] == robotino_configurations[robot_id]["type"]:
            distance = calculate_distance(robot_x, robot_y, charger_config["X"], charger_config["Y"])
            if distance < shortest_distance:
                closest_charger = charger_id
                shortest_distance = distance
    
    return closest_charger


In [None]:
find_closest_free_charger: 
This function finds the closest charger that is unoccupied and of the correct type for the given robot.

In [None]:
def send_robot_to_closest_charger(robot_id):
    """
    Generates a command to send the robot to the closest unoccupied charger.
    """
    closest_charger = find_closest_free_charger(robot_id)
    if closest_charger:
        command = f"PushJob GotoPosition {JobId} 1 {robot_id} {closest_charger}\n"
        return command
    return ""


In [None]:
send_robot_to_closest_charger: This function generates a command to move a robot to the closest unoccupied charger.

In [None]:
def handle_incoming_messages(conn, addr):
    """
    Handles incoming messages from the client.
    """
    while True:
        data = conn.recv(MAX_BUFFER_SIZE)
        if not data:
            break  # Connection closed
        decoded_data = data.decode('utf-8').strip()
        if "FleetState" in decoded_data:
            process_fleet_state_response(decoded_data)


In [None]:
handle_incoming_messages: 
This function listens for incoming messages from the client, processes them, and updates the fleet state.

In [None]:
def main():
    """
    Main function to start the server, accept clients, and create threads for handling messages.
    """
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((HOST, PORT))
        s.listen()
        while True:
            conn, addr = s.accept()
            threading.Thread(target=handle_incoming_messages, args=(conn, addr)).start()

In [None]:
main:
This is the entry point of the program where the server is started. It listens for client connections and starts a new thread for each client.

In [None]:
Summary:
Purpose: The code is a server-side script that manages a fleet of robots (Robotinos) by tracking their positions, battery status, and sending them to chargers when necessary.
Components:
Charger management: Finding the closest available charger.
Fleet management: Storing and processing the robots' states.
Client-server communication: Receiving fleet data and sending commands.
Functionality: The server listens for incoming connections, processes fleet states, and sends robots to their respective chargers.
