In [18]:
import websocket
import json
import threading
import time
from typing import Optional, Dict, Any, Callable
import yaml

class MT48CometDClient:
    def __init__(self, host: str = "169.254.98.248", port: int = 80):
        self.host = host
        self.port = port
        self.ws_url = f"ws://{host}:{port}/cometd"
        self.ws = None
        self.client_id = None
        self.message_id = 1
        self.connected = False
        self.subscriptions = set()
        self.message_callbacks = {}
        
    def _get_next_message_id(self) -> str:
        """Get next message ID"""
        msg_id = str(self.message_id)
        self.message_id += 1
        return msg_id
        
    def _send_message(self, message: Dict[str, Any]) -> str:
        """Send a message and return the message ID"""
        if isinstance(message, list):
            # Multiple messages
            for msg in message:
                if 'id' not in msg:
                    msg['id'] = self._get_next_message_id()
        else:
            # Single message
            if 'id' not in message:
                message['id'] = self._get_next_message_id()
                
        json_message = json.dumps(message if isinstance(message, list) else [message])
        print(f"Sending: {json_message}")
        
        if self.ws:
            self.ws.send(json_message)
            return message['id'] if not isinstance(message, list) else message[0]['id']
        return None
        
    def _on_message(self, ws, message):
        """Handle incoming WebSocket messages"""
        print(f"Received: {message}")
        try:
            messages = json.loads(message)
            if not isinstance(messages, list):
                messages = [messages]
                
            for msg in messages:
                self._handle_message(msg)
        except json.JSONDecodeError as e:
            print(f"JSON decode error: {e}")
            
    def _handle_message(self, message: Dict[str, Any]):
        """Handle individual CometD messages"""
        channel = message.get('channel')
        
        if channel == '/meta/handshake':
            if message.get('successful'):
                self.client_id = message.get('clientId')
                print(f"Handshake successful, clientId: {self.client_id}")
                self._connect()
            else:
                print(f"Handshake failed: {message.get('error')}")
                
        elif channel == '/meta/connect':
            if message.get('successful'):
                self.connected = True
                print("Connected successfully")
                # Continue long polling for CometD
                self._send_connect()
            else:
                print(f"Connect failed: {message.get('error')}")
                
        elif channel == '/meta/subscribe':
            if message.get('successful'):
                subscription = message.get('subscription')
                self.subscriptions.add(subscription)
                print(f"Subscribed to: {subscription}")
            else:
                print(f"Subscribe failed: {message.get('error')}")
                
        elif channel == '/service/ravenna/settings':  # Updated channel name
            # Handle settings updates
            print(f"Settings update: {message}")
            
        # Handle message callbacks
        msg_id = message.get('id')
        if msg_id and msg_id in self.message_callbacks:
            callback = self.message_callbacks.pop(msg_id)
            callback(message)
            
    def _on_error(self, ws, error):
        """Handle WebSocket errors"""
        print(f"WebSocket error: {error}")
        
    def _on_close(self, ws, close_status_code, close_msg):
        """Handle WebSocket close"""
        print(f"WebSocket closed: {close_status_code} - {close_msg}")
        self.connected = False
        self.client_id = None
        
    def _on_open(self, ws):
        """Handle WebSocket open"""
        print("WebSocket connection opened")
        self._handshake()
        
    def connect(self):
        """Connect to the MT48 WebSocket server"""
        self.ws = websocket.WebSocketApp(
            self.ws_url,
            on_open=self._on_open,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close
        )
        
        # Start WebSocket in a separate thread
        self.ws_thread = threading.Thread(target=self.ws.run_forever)
        self.ws_thread.daemon = True
        self.ws_thread.start()
        
        # Wait for connection
        timeout = 10
        start_time = time.time()
        while not self.connected and (time.time() - start_time) < timeout:
            time.sleep(0.1)
            
        return self.connected
        
    def disconnect(self):
        """Disconnect from the WebSocket server"""
        if self.ws:
            self.ws.close()
            
    def _handshake(self):
        """Perform CometD handshake"""
        handshake_msg = {
            "channel": "/meta/handshake",
            "version": "1.0",
            "minimumVersion": "1.0",
            "supportedConnectionTypes": ["websocket", "long-polling"]
        }
        self._send_message(handshake_msg)
        
    def _connect(self):
        """Perform CometD connect"""
        if not self.client_id:
            print("No client ID available for connect")
            return
            
        connect_msg = {
            "channel": "/meta/connect",
            "clientId": self.client_id,
            "connectionType": "websocket"
        }
        self._send_message(connect_msg)
        
    def _send_connect(self):
        """Send periodic connect messages for CometD long polling"""
        if self.connected and self.client_id:
            connect_msg = {
                "channel": "/meta/connect",
                "clientId": self.client_id,
                "connectionType": "websocket"
            }
            self._send_message(connect_msg)
            
    def subscribe(self, channel: str):
        """Subscribe to a CometD channel"""
        if not self.client_id:
            print("Not connected - cannot subscribe")
            return False
            
        subscribe_msg = {
            "channel": "/meta/subscribe",
            "clientId": self.client_id,
            "subscription": channel
        }
        self._send_message(subscribe_msg)
        return True
        
    def publish(self, channel: str, data: Dict[str, Any]) -> str:
        """Publish data to a CometD channel"""
        if not self.client_id:
            print("Not connected - cannot publish")
            return None
            
        publish_msg = {
            "channel": channel,
            "clientId": self.client_id,
            "data": data
        }
        return self._send_message(publish_msg)
        
    def set_mixer_setting(self, path: str, value: Any, channel: str = "/service/ravenna/settings") -> str:
        """Set a mixer setting using the RAVENNA path format"""
        data = {
            "path": path,
            "value": value
        }
        return self.publish(channel, data)  # Updated channel
        
    def set_strip_gain(self, strip_id: int, send_id: int, gain_db: float) -> str:
        """Set strip gain in dB"""
        gain_value = -1445 if gain_db <= -144 else int(gain_db * 10)
        path = f"$._oem_ui_process_engine.music.mixer.strips[?(@.id=={strip_id})][0].sends[?(@.id=={send_id})][0]"
        
        value = {
            "gain": gain_value,
            "group_gain": gain_value
        }
        
        print(f"Setting strip {strip_id} send {send_id} gain to {gain_db}dB")
        return self.set_mixer_setting(path, value)
        
    def set_bus_gain(self, bus_id: int, gain_db: float) -> str:
        """Set bus master gain in dB"""
        gain_value = -1445 if gain_db <= -144 else int(gain_db * 10)
        # Fixed: Use the bus_id parameter and target master_gain specifically
        # path = f"$._oem_ui_process_engine.music.mixer.busses[?(@.id=={bus_id})][0].master_gain"
        path = f"$._oem_ui_process_engine.music.mixer.busses[?(@.id=={bus_id})][0]"
        value = {
            "master_gain": gain_value
        }
        
        print(f"Setting bus {bus_id} master gain to {gain_db}dB")
        return self.set_mixer_setting(path, value)
            
    def set_preamp_gain(self, module_id: int, channel_id: int, gain_db: float, input_mode: str = "mic") -> str:
        """Set preamp gain in dB"""
        gain_value = int(gain_db * 10)
        gain_property = f"{input_mode}Gain"
        path = f"$._modules[?(@.id=={module_id})][0].custom.ins.channels[{channel_id}].{gain_property}"
        
        print(f"Setting {input_mode} gain to {gain_db}dB for module {module_id}, channel {channel_id}")
        return self.set_mixer_setting(path, gain_value)

In [23]:
# Create client with correct port
client = MT48CometDClient("169.254.98.248", 80)  # Changed to port 80

#read MIXER_ID.yaml
with open("MIXER_ID.yaml", "r") as f:
    mixer_id_data = yaml.safe_load(f)


gain_db_mute = -1445  # Mute gain value in dB, corresponds to -144.5 dB in the mixer
gain_setting = -90
sendID = mixer_id_data['mixes']['MIX1']['send_id']

try:
    # Connect
    if client.connect():
        print("Connected to MT48!")
        
        # Subscribe to settings updates
        client.subscribe("/service/ravenna/settings")  # Updated channel
        
        # Wait a bit for subscription to complete
        time.sleep(1)
        
        # Send some commands
        # print("Setting strip gain...")
        # client.set_strip_gain(strip_id=1000, send_id=sendID, gain_db=gain_setting)
        # client.set_strip_gain(strip_id=1001, send_id=sendID, gain_db=gain_setting)
        # client.set_strip_gain(strip_id=1002, send_id=sendID, gain_db=gain_setting)
        # client.set_strip_gain(strip_id=1003, send_id=sendID, gain_db=gain_setting)
        # client.set_strip_gain(strip_id=1004, send_id=sendID, gain_db=gain_setting)
        # client.set_strip_gain(strip_id=14, send_id=sendID, gain_db=gain_setting)
        # client.set_strip_gain(strip_id=15, send_id=sendID, gain_db=gain_setting)
        # client.set_strip_gain(strip_id=100, send_id=sendID, gain_db=gain_setting)
        # client.set_strip_gain(strip_id=101, send_id=sendID, gain_db=gain_setting)
        # client.set_strip_gain(strip_id=102, send_id=sendID, gain_db=gain_setting)
        # client.set_strip_gain(strip_id=103, send_id=sendID, gain_db=gain_setting)

        print("Setting bus gain...")
        client.set_bus_gain(bus_id=sendID, gain_db=-45.0)
        # print("Setting preamp gain...")
        # client.set_preamp_gain(module_id=0, channel_id=0, gain_db=0.0)
        
        print("Commands sent, waiting for responses...")
        # # Keep connection alive for a while
        # print("Keeping connection alive for 30 seconds...")
        time.sleep(2)
        
    else:
        print("Failed to connect to MT48")
        
except KeyboardInterrupt:
    print("Interrupted by user")
finally:
    client.disconnect()
    print("Disconnected")

WebSocket connection opened
Sending: [{"channel": "/meta/handshake", "version": "1.0", "minimumVersion": "1.0", "supportedConnectionTypes": ["websocket", "long-polling"], "id": "1"}]
Received: <html><head><title>Bad Request</title></head><body><h1>400 Bad Request</h1></body></html>
JSON decode error: Expecting value: line 1 column 1 (char 0)
Received: [{"id":"1","channel":"/meta/handshake","successful":true,"clientId":"0xb64121fc-246152","minimumVersion":"0.9","version":"1.0","supportedConnectionTypes":["long-polling","callback-polling","websocket"],"advice":{"reconnect":"retry","interval":10,"timeout":5000}}]
Handshake successful, clientId: 0xb64121fc-246152
Sending: [{"channel": "/meta/connect", "clientId": "0xb64121fc-246152", "connectionType": "websocket", "id": "2"}]
Received: [{"id":"2","clientId":"0xb64121fc-246152","channel":"/meta/connect","successful":true}]
Connected successfully
Sending: [{"channel": "/meta/connect", "clientId": "0xb64121fc-246152", "connectionType": "webso