Skip to content

Commit

Permalink
link error handling on camilladsp load to the UI
Browse files Browse the repository at this point in the history
  • Loading branch information
3ll3d00d committed May 22, 2023
1 parent 1da0925 commit 529dbb3
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 48 deletions.
140 changes: 104 additions & 36 deletions ezbeq/camilladsp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import List, Optional
from typing import List, Optional, Callable

from autobahn.exception import Disconnected
from autobahn.twisted.websocket import connectWS, WebSocketClientProtocol, WebSocketClientFactory
Expand All @@ -10,13 +10,15 @@
from catalogue import CatalogueProvider, CatalogueEntry
from device import SlotState, DeviceState, PersistentDevice

SLOT_ID = 'CamillaDSP'

logger = logging.getLogger('ezbeq.camilladsp')


class CamillaDspSlotState(SlotState):

def __init__(self):
super().__init__('CamillaDSP')
super().__init__(SLOT_ID)


class CamillaDspState(DeviceState):
Expand All @@ -42,8 +44,7 @@ def __init__(self, name: str, config_path: str, cfg: dict, ws_server: WsServer,
self.__ip: str = cfg['ip']
self.__port: int = cfg['port']
self.__channels: List[int] = [int(c) for c in cfg['channels']]
self.__dsp_config = {}
self.__peq = {}
self.__config_loader: Optional[LoadConfig] = None
if not self.__channels:
raise ValueError(f'No channels supplied for CamillaDSP {name} - {self.__ip}:{self.__port}')
self.__client = CamillaDspClient(self.__ip, self.__port, self)
Expand All @@ -55,7 +56,7 @@ def _merge_state(self, loaded: CamillaDspState, cached: dict) -> CamillaDspState
if 'slots' in cached:
for slot in cached['slots']:
if 'id' in slot:
if slot['id'] == 'CAMILLADSP':
if slot['id'] == SLOT_ID:
if slot['last']:
loaded.slot.last = slot['last']
return loaded
Expand All @@ -68,25 +69,27 @@ def update(self, params: dict) -> bool:
any_update = False
if 'slots' in params:
for slot in params['slots']:
if slot['id'] == 'CAMILLADSP':
if slot['id'] == SLOT_ID:
if 'entry' in slot:
if slot['entry']:
match = self.__catalogue.find(slot['entry'])
if match:
self.load_filter('CAMILLADSP', match)
self.load_filter(SLOT_ID, match)
any_update = True
else:
self.clear_filter('CAMILLADSP')
self.clear_filter(SLOT_ID)
any_update = True
return any_update

def __send(self, to_load: List[dict]):
if self.__dsp_config:
logger.info(f"Sending {len(to_load)} filters")
self.__client.send(json.dumps({'SetConfigJson': json.dumps(create_new_cfg(to_load, self.__dsp_config, self.__channels))}))
self.__client.send(json.dumps('Reload'))
def __send(self, to_load: List[dict], title: str, on_complete: Callable[[bool], None]):
if self.__config_loader is None:
logger.info(f"Sending {len(to_load)} filters for {title}")
self.__config_loader = LoadConfig(title, self.__channels, to_load, self.ws_server, self.__client,
on_complete)
from twisted.internet import reactor
reactor.callLater(0.0, self.__config_loader.send_get_config)
else:
raise ValueError(f'Unable to load PEQ, no dsp config available')
raise ValueError(f'Unable to load PEQ, load already in progress for {self.__config_loader.title}')

def activate(self, slot: str) -> None:
def __do_it():
Expand All @@ -106,8 +109,15 @@ def load_filter(self, slot: str, entry: CatalogueEntry) -> None:

def __do_it(self, to_load: List[dict], title: str):
try:
self.__send(to_load)
self._current_state.slot.last = title
self._current_state.slot.last = 'Loading' if to_load else 'Clearing'

def completed(success: bool):
if success:
self._current_state.slot.last = title
else:
self._current_state.slot.last = 'ERROR'

self.__send(to_load, title, lambda b: self._hydrate_cache_broadcast(lambda: completed(b)))
except Exception as e:
self._current_state.slot.last = 'ERROR'
raise e
Expand All @@ -129,11 +139,34 @@ def levels(self) -> dict:
return {}

def on_get_config(self, config: dict):
if config['result'] == 'Ok':
candidate = json.loads(config['value'])
if candidate != self.__dsp_config:
logger.info(f"Received new DSP config {candidate}")
self.__dsp_config = candidate
if self.__config_loader is None:
logger.info(f'Received new DSP config but nothing to load, ignoring {config}')
else:
if config['result'] == 'Ok':
self.__config_loader.on_get_config(json.loads(config['value']))
else:
self.__config_loader.failed('GetConfig', config)
self.__config_loader = None

def on_set_config(self, result: str):
if self.__config_loader is None:
logger.info(f'Received response to SetConfigJson but nothing to load, ignoring {result}')
else:
if result == 'Ok':
self.__config_loader.on_set_config()
else:
self.__config_loader.failed('SetConfig', result)
self.__config_loader = None

def on_reload(self, result: str):
if self.__config_loader is None:
logger.info(f'Received response to Reload but nothing to load, ignoring {result}')
else:
if result == 'Ok':
self.__config_loader.on_reload()
else:
self.__config_loader.failed('Reload', result)
self.__config_loader = None

def on_get_volume(self, msg):
pass
Expand All @@ -160,31 +193,20 @@ def send(self, msg: str):

class CamillaDspProtocol(WebSocketClientProtocol):

do_send = None
# do_send = None

def onConnecting(self, transport_details):
logger.info(f"Connecting to {transport_details}")

def onConnect(self, response):
logger.info(f"Connected to {response.peer}")
from twisted.internet import reactor
self.do_send = lambda: reactor.callLater(0.5, __send)

def __send():
if self.do_send:
try:
self.sendMessage(json.dumps("GetConfigJson").encode('utf-8'), isBinary=False)
finally:
self.do_send()

self.do_send()

def onOpen(self):
logger.info("Connected to CAMILLADSP")
self.factory.register(self)

def onClose(self, was_clean, code, reason):
self.do_send = None
# self.do_send = None
if was_clean:
logger.info(f"Disconnected code: {code} reason: {reason}")
else:
Expand All @@ -199,9 +221,13 @@ def onMessage(self, payload, is_binary):
logger.debug(f'>>> {msg}')
if 'GetConfigJson' in msg:
self.factory.listener.on_get_config(msg['GetConfigJson'])
elif 'SetConfigJson' in msg:
self.factory.listener.on_set_config(msg['SetConfigJson']['result'])
elif 'Reload' in msg:
self.factory.listener.on_reload(msg['Reload']['result'])
elif 'GetVolume' in msg:
self.factory.listener.on_get_volume(msg['GetVolume'])
elif 'GetMute' in msg:
elif 'GetMute' in msg:\
self.factory.listener.on_get_mute(msg['GetMute'])
elif 'GetPlaybackSignalRms' in msg:
self.factory.listener.on_get_playback_rms(msg['GetPlaybackSignalRms'])
Expand Down Expand Up @@ -250,7 +276,7 @@ def broadcast(self, msg):
for c in self.__clients:
logger.info(f"Sending to {c.peer} - {msg}")
try:
c.sendMessage(msg.encode('utf8'))
c.sendMessage(msg.encode('utf8'), isBinary=False)
except Disconnected as e:
logger.exception(f"Failed to send to {c.peer}, discarding")
disconnected_clients.append(c)
Expand Down Expand Up @@ -301,3 +327,45 @@ def create_new_cfg(to_load: List[dict], base_cfg: dict, channels: List[int]) ->
raise ValueError(f'Unable to load PEQ, dsp config has no pipeline declared')
return new_cfg


class LoadConfig:

def __init__(self, title: str, channels: List[int], to_load: List[dict], ws_server: WsServer,
client: CamillaDspClient, on_complete: Callable[[bool], None]):
self.title = title
self.__channels = channels
self.__to_load = to_load
self.__dsp_config = None
self.__ws_server = ws_server
self.__client = client
self.__failed = False
self.__on_complete = on_complete

def on_get_config(self, cfg: dict):
logger.info(f"Received new DSP config {cfg}")
self.__dsp_config = cfg
self.__do_set_config()

def send_get_config(self):
logger.info(f'[{self.title}] Sending GetConfigJson')
self.__client.send(json.dumps("GetConfigJson"))

def __do_set_config(self):
logger.info(f'[{self.title}] Sending SetConfigJson')
new_cfg = create_new_cfg(self.__to_load, self.__dsp_config, self.__channels)
self.__client.send(json.dumps({'SetConfigJson': json.dumps(new_cfg)}))

def on_set_config(self):
logger.info(f'[{self.title}] Sending Reload')
self.__client.send(json.dumps('Reload'))

def on_reload(self):
logger.info(f'[{self.title}] Reload completed')
self.__on_complete(True)

def failed(self, stage: str, payload):
self.__failed = True
msg = f'{stage} failed : {payload}'
logger.warning(f'[{self.title}] {msg}')
self.__on_complete(False)
self.__ws_server.broadcast(json.dumps({'message': 'Error', 'data': msg}))
6 changes: 3 additions & 3 deletions ezbeq/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,12 @@ def _persist(self):
json.dump(self._current_state.serialise(), f, sort_keys=True)

def _broadcast(self):
if self.__ws_server:
self.__ws_server.broadcast(self.__get_state_msg())
if self.ws_server:
self.ws_server.broadcast(self.__get_state_msg())

def __get_state_msg(self):
assert self._current_state, 'hydrate cannot return None'
return json.dumps(self._current_state.serialise(), ensure_ascii=False)
return json.dumps({'message': 'DeviceState', 'data': self._current_state.serialise()}, ensure_ascii=False)

def _hydrate_cache_broadcast(self, func: callable):
self._hydrate()
Expand Down
17 changes: 14 additions & 3 deletions ui/src/App.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,24 @@ const App = () => {
setAvailableDevices(Object.assign({}, availableDevices, {[replacement.name]: replacement}));
};

// errors
const [err, setErr] = useState(null);

ws.onmessage = event => {
replaceDevice(JSON.parse(event.data));
const payload = JSON.parse(event.data);
switch (payload.message) {
case 'DeviceState':
replaceDevice(payload.data);
break;
case 'Error':
setErr(new Error(payload.data));
break;
default:
console.warn(`Unknown ws message ${event.data}`)
}
};

const [hasMultipleTabs, setHasMultipleTabs] = useState(false);
// errors
const [err, setErr] = useState(null);
// catalogue data
const [entries, setEntries] = useState([]);
// device state
Expand Down
12 changes: 6 additions & 6 deletions ui/src/components/main/Slots.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ const Slots = ({selectedDeviceName, selectedSlotId, useWide, device, setDevice,
{r.map((d, i2) =>
<Grid key={i2} container item xs={r.length === 1 ? 12 : 6} className={classes.container}>
<Slot selected={d.id === selectedSlotId}
slot={d}
onSelect={() => activateSlot(d.id)}
onClear={() => clearDeviceSlot(d.id)}
isPending={isPending(d.id)}/>
slot={d}
onSelect={() => activateSlot(d.id)}
onClear={() => clearDeviceSlot(d.id)}
isPending={isPending(d.id)}/>
</Grid>
)}
</Grid>
Expand All @@ -159,7 +159,7 @@ const Slots = ({selectedDeviceName, selectedSlotId, useWide, device, setDevice,
isActive={() => getCurrentState(pending, 'gain', selectedSlotId) === 1}/>;
if (useWide) {
return (
<Box sx={{ flexGrow: 1 }}>
<Box sx={{flexGrow: 1}}>
{devices}
<Grid container>
{gain}
Expand All @@ -168,7 +168,7 @@ const Slots = ({selectedDeviceName, selectedSlotId, useWide, device, setDevice,
);
} else {
return (
<Box sx={{ flexGrow: 1 }}>
<Box sx={{flexGrow: 1}}>
<Grid container direction={'column'}>{devices}</Grid>
<Grid container direction={'column'}>{gain}</Grid>
</Box>
Expand Down

0 comments on commit 529dbb3

Please sign in to comment.