### Imports ###

In [1]:
import socket
import xml.etree.ElementTree as ET

### class MovePrimitve ###
- Represents one robot motion command
- Holds motion parameters:
    - Type, Mode, BaseIndex, ToolIndex, Velocity
    - X, Y, Z, A, B, C (target pose)

- Provides method to_xml() → converts data into valid XML message

In [2]:
class MovePrimitive:
    def __init__(
        self,
        cmd_id: int = 1,
        cmd_type: int = 2,
        mode: int = 1,
        base_index: int = 1,
        tool_index: int = 1,
        velocity: float = 0.25,
        x: float = 0.0,
        y: float = 0.0,
        z: float = 0.0,
        a: float = 0.0,
        b: float = 0.0,
        c: float = 0.0,
    ):
        self.id = cmd_id
        self.type = cmd_type
        self.mode = mode
        self.base_index = base_index
        self.tool_index = tool_index
        self.velocity = velocity

        self.x = x
        self.y = y
        self.z = z
        self.a = a
        self.b = b
        self.c = c

    def to_xml(self) -> bytes:
        """
        builds structure and returns xml bytestring
        """
        root = ET.Element("EthernetKRL")

        robot_command = ET.SubElement(root, "RobotCommand")
        robot_command.set("Id", str(self.id))
        robot_command.set("Type", str(self.type))

        move = ET.SubElement(robot_command, "Move")
        move.set("Mode", str(self.mode))
        move.set("BaseIndex", str(self.base_index))
        move.set("ToolIndex", str(self.tool_index))
        move.set("Velocity", str(self.velocity))

        target = ET.SubElement(move, "TargetPose")
        target.set("X", str(self.x))
        target.set("Y", str(self.y))
        target.set("Z", str(self.z))
        target.set("A", str(self.a))
        target.set("B", str(self.b))
        target.set("C", str(self.c))

        return ET.tostring(root, encoding="utf-8", method="xml")
    
    def setTool(self):
        print('tbd')

    def setBase(self):
        print('tbd')

    def PtP(self):
        print('tbd')

    def LIN(self):
        print('tbd')

    def CIRC(self):
        print('tbd')

    def __repr__(self):
        return (
            f"MovePrimitive(Type={self.type}, Mode={self.mode}, "
            f"Base={self.base_index}, Tool={self.tool_index}, "
            f"Vel={self.velocity}, Pose=({self.x}, {self.y}, {self.z}, "
            f"{self.a}, {self.b}, {self.c}))"
        )

### class KukaClient ###

- Manages TCP/IP communication with the KUKA controller (EKI interface)
- Stores and sends MovePrimitive objects

- Provides methods for:

    - connect() – establish connection
    - disconnect() – close connection
    - add_move() – store a motion command
    - send_move() – send one motion command
    - send_all_moves() – send all stored commands

In [3]:
class KukaServer:
    """
    Manages TCP/IP communication with the KUKA controller via EKI.
    Can store and send MovePrimitive objects.
    """

    def __init__(self, ip: str, port: int):
        self.ip = ip
        self.port = port
        self.socket = None
        self.connected = False
        self.moves: list[MovePrimitive] = []

    # --------------------------
    # Establish connection
    # --------------------------
    def connect(self):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        self.socket.bind(("0.0.0.0", self.port))

        self.socket.listen(1)
        print(f"Waiting for robot on port {self.port}...")

        try:
            self.client_socket, addr = self.socket.accept()
            print(f"Connected to KUKA at {addr}")
            self.connected = True
        except Exception as e:
            print(f"Connection failed: {e}")
            self.connected = False
                   

    # --------------------------
    # Close connection
    # --------------------------
    def disconnect(self):
        if self.socket:
            self.socket.close()
            print("Connection closed.")
        self.connected = False

    # --------------------------
    # Add a MovePrimitive to the list
    # --------------------------
    def add_move(self, move: MovePrimitive):
        self.moves.append(move)
        print(f"Move added: {move}")

    # --------------------------
    # Send a single MovePrimitive
    # --------------------------
    def send_move(self, move: MovePrimitive):
        if not self.connected:
            print("Not connected - please call connect() first.")
            return

        xml_data = move.to_xml()
        try:
            self.socket.sendall(xml_data)
            print(f"Move sent:\n{xml_data.decode()}")

            # Optionally receive response from KUKA
            self.socket.settimeout(2.0)
            try:
                data = self.socket.recv(4096)
                if data:
                    print(f"Response from KUKA:\n{data.decode()}")
            except socket.timeout:
                print("(No response received - KUKA may be in RECEIVE mode only.)")

        except Exception as e:
            print(f"Send error: {e}")

    # --------------------------
    # Send all stored MovePrimitives
    # --------------------------
    def send_all_moves(self):
        if not self.connected:
            print("Not connected - please call connect() first.")
            return

        for move in self.moves:
            self.send_move(move)


In [4]:
kuka = KukaServer(ip="10.181.116.200", port=54601)
kuka.connect()


Waiting for robot on port 54601...


KeyboardInterrupt: 

In [None]:
# Engineering - IP:
# 10.181.116.200

# create example move
TestMove = MovePrimitive(
    cmd_type=2,
    mode=1,
    base_index=1,
    tool_index=1,
    velocity=0.25,
    x=1.0,
    y=2.0,
    z=3.0,
    a=0.0,
    b=0.0,
    c=0.0,
)

kuka.add_move(TestMove)
kuka.send_all_moves()

kuka.disconnect()



KeyboardInterrupt: 