Skip to content

Commit

Permalink
migrate to a ws client for broadcasting levels
Browse files Browse the repository at this point in the history
  • Loading branch information
3ll3d00d committed Jun 20, 2021
1 parent 195fcb5 commit ce3e29d
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 70 deletions.
54 changes: 48 additions & 6 deletions ezbeq/apis/ws.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import logging
from typing import Callable, Optional, List
from collections import defaultdict
from typing import Callable, Optional, List, Dict

from autobahn.exception import Disconnected
from autobahn.twisted import WebSocketServerProtocol, WebSocketServerFactory

SUBSCRIBE_LEVELS_CMD = 'subscribe levels'

logger = logging.getLogger('ezbeq.ws')


Expand All @@ -19,6 +22,9 @@ def factory(self) -> 'WsServerFactory':
def broadcast(self, msg: str):
self.__factory.broadcast(msg)

def levels(self, device: str, msg: str) -> bool:
return self.__factory.send_levels(device, msg)


class WsProtocol(WebSocketServerProtocol):

Expand All @@ -36,6 +42,8 @@ def onMessage(self, payload, is_binary):
try:
s = payload.decode('utf-8')
logger.info(f"Received {s}")
if s.startswith(SUBSCRIBE_LEVELS_CMD):
self.factory.register_for_levels(s[len(SUBSCRIBE_LEVELS_CMD) + 1:], self)
except:
logger.exception('Message received failure')

Expand All @@ -46,11 +54,16 @@ class WsServerFactory(WebSocketServerFactory):
def __init__(self, *args, **kwargs):
super(WsServerFactory, self).__init__(*args, **kwargs)
self.__clients: List[WsProtocol] = []
self.__levels_client: Dict[str, List[WsProtocol]] = defaultdict(list)
self.__state_provider: Optional[Callable[[], str]] = None
self.__levels_provider: Dict[str, Callable[[], None]] = {}

def init(self, state_provider: Callable[[], str]):
self.__state_provider = state_provider

def set_levels_provider(self, name: str, broadcaster: Callable[[], None]):
self.__levels_provider[name] = broadcaster

def register(self, client: WsProtocol):
if client not in self.__clients:
logger.info(f"Registered client {client.peer}")
Expand All @@ -62,25 +75,54 @@ def register(self, client: WsProtocol):
else:
logger.info(f"Ignoring duplicate client {client.peer}")

def register_for_levels(self, device: str, client: WsProtocol):
if device in self.__levels_provider:
logger.info(f"Transferring client {client.peer} from broadcast to level subscription for {device}")
self.__clients.remove(client)
self.__levels_client[device].append(client)
self.__levels_provider[device]()
else:
logger.warning(f"Unknown device {device} requested by {client.peer}")

def unregister(self, client: WsProtocol):
if client in self.__clients:
logger.info(f"Unregistering client {client.peer}")
self.__clients.remove(client)
else:
logger.info(f"Ignoring unregistered client {client.peer}")

found = False
for device, clients in self.__levels_client.items():
if client in clients:
found = True
logger.info(f"Unregistering {device} levels client {client.peer}")
clients.remove(client)
if not found:
logger.info(f"Ignoring unregistered client {client.peer}")

def broadcast(self, msg: str):
logger.debug(f"Broadcasting {msg}")
if self.__clients:
self.__send_to_all(self.__clients, msg)

def __send_to_all(self, clients, msg: str) -> bool:
if clients:
disconnected_clients = []
for c in self.__clients:
logger.info(f"Sending to {c.peer} - {msg}")
for c in clients:
logger.debug(f"Sending to {c.peer} - {msg}")
try:
c.sendMessage(msg.encode('utf8'), isBinary=False)
except Disconnected as e:
logger.exception(f"Failed to send to {c.peer}, discarding")
disconnected_clients.append(c)
for c in disconnected_clients:
self.unregister(c)
return len(disconnected_clients) < len(clients)
else:
logger.info(f"No devices connected, ignoring {msg}")
return False

def send_levels(self, device: str, msg: str):
logger.debug(f"Broadcasting levels {msg}")
clients = self.__levels_client.get(device, None)
if clients:
return self.__send_to_all(clients, msg)
else:
return False
4 changes: 4 additions & 0 deletions ezbeq/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,7 @@ def _hydrate_cache_broadcast(self, func: callable):
finally:
self._persist()
self._broadcast()

@property
def ws_server(self) -> WsServer:
return self.__ws_server
22 changes: 20 additions & 2 deletions ezbeq/minidsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def __init__(self, name: str, config_path: str, cfg: dict, ws_server: WsServer,
self.__ignore_retcode = cfg.get('ignoreRetcode', False)
self.__runner = cfg['make_runner']()
self.__client = MinidspRsClient(self) if cfg.get('useWs', False) else None
ws_server.factory.set_levels_provider(name, self.start_broadcast_levels)

@property
def device_type(self) -> str:
Expand All @@ -197,9 +198,10 @@ def __read_state_from_device(self) -> Optional[MinidspState]:
lines = None
try:
kwargs = {'retcode': None} if self.__ignore_retcode else {}
status = json.loads(self.__runner['-o', 'jsonline'](timeout=self.__cmd_timeout, **kwargs))
output = self.__runner['-o', 'jsonline'](timeout=self.__cmd_timeout, **kwargs)
status = json.loads(output)
values = {
'active_slot': status['master']['preset'],
'active_slot': str(status['master']['preset'] + 1),
'mute': status['master']['mute'],
'mv': status['master']['volume']
}
Expand Down Expand Up @@ -376,16 +378,32 @@ def __read_levels_from_device(self) -> dict:
lines = None
try:
kwargs = {'retcode': None} if self.__ignore_retcode else {}
start = time.time()
lines = self.__runner['-o', 'jsonline'](timeout=self.__cmd_timeout, **kwargs)
end = time.time()
levels = json.loads(lines)
ts = time.time()
logger.info(f"readlevels,{ts},{to_millis(start, end)}")
return {
'ts': ts,
'input': levels['input_levels'],
'output': levels['output_levels']
}
except:
logger.exception(f"Unable to load levels {lines}")
return {}

def start_broadcast_levels(self) -> None:
from twisted.internet import reactor
sched = lambda: reactor.callLater(0.1, __send)

def __send():
msg = json.dumps(self.levels())
if self.ws_server.levels(self.name, msg):
sched()

sched()


class MinidspBeqCommandGenerator:

Expand Down
14 changes: 9 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,22 @@ def __init__(self):

def __make_status(self) -> str:
mute_str = f"{self.__mute}".lower()
return f'MasterStatus {{ preset: {self.__slot - 1}, source: Usb, volume: Gain({self.__gain:.1f}), mute: {mute_str} }}\n' \
'Input levels: -131.5, -131.5\n' \
'Output levels: -131.5, -131.5, -120.0, -131.5'
return '{"master":{"preset":' + str(self.__slot - 1) + \
',"source":"Usb","volume":' + f"{self.__gain:.1f}" + \
',"mute":' + mute_str + \
'},"input_levels":[-15.814797,-15.652734],"output_levels":[-120.0,-15.861839,-15.661137,-15.661137]}'

def take_commands(self):
cmds = self.commands
self.commands = []
return cmds

def __getitem__(self, item):
self.pending.append(item)
return self
if item == ('-o', 'jsonline'):
return self
else:
self.pending.append(item)
return self

def __call__(self, *args, **kwargs):
if self.pending:
Expand Down
101 changes: 44 additions & 57 deletions ui/src/components/levels/index.js
Original file line number Diff line number Diff line change
@@ -1,80 +1,55 @@
import Header from "../Header";
import Controls from "./Controls";
import {useEffect, useState} from "react";
import {useEffect, useRef, useState} from "react";
import Chart from "./Chart";
import ezbeq from "../../services/ezbeq";

const opts = {
width: window.innerWidth - 16,
height: window.innerHeight - 233,
series: [
{
label: 'Time'
},
{
label: 'I1',
scale: 'dBI',
stroke: "red",
points: {show: false}
},
{
label: 'I2',
scale: 'dBI',
stroke: "orange",
points: {show: false}
},
{
label: 'O1',
scale: 'dBO',
stroke: "green",
points: {show: false}
},
{
label: 'O2',
scale: 'dBO',
stroke: "blue",
points: {show: false}
},
{
label: 'O3',
scale: 'dBO',
stroke: "indigo",
points: {show: false}
},
{
label: 'O4',
scale: 'dBO',
stroke: "violet",
points: {show: false}
}
],
axes: [
{
label: "Time (s)",
values: (self, ticks) => ticks.map(rawValue => rawValue / 1000),
label: "Time (s)"
},
{
label: "Input (dB)",
scale: "dBI",
},
{
label: "Output (dB)",
scale: "dBO",
side: 1,
grid: {show: false},
label: "Level (dB)"
}
],
scales: {
"x": {
time: false,
},
"dBI": {
auto: false,
range: [-150, 0],
},
"dBO": {
auto: false,
range: [-150, 0]
}
},
};
Expand All @@ -83,7 +58,7 @@ const trimToDuration = (data, duration) => {
const time = data[0];
const firstTs = time[0];
const lastTs = time[time.length - 1];
const newFirstTs = lastTs - (duration * 1000);
const newFirstTs = lastTs - duration;
if (newFirstTs > firstTs) {
const firstIdx = time.findIndex(t => t>=newFirstTs);
return data.map(d1 => d1.slice(firstIdx));
Expand All @@ -96,48 +71,60 @@ const Levels = ({availableDevices, selectedDeviceName, setSelectedDeviceName, se
const [duration, setDuration] = useState(30);
const [first, setFirst] = useState(0);
const [data, setData] = useState([[], [], [], [], [], [], []]);
const ws = useRef(null);

useEffect(() => {
const timer = setInterval(async () => {
try {
const levels = await ezbeq.getLevels(selectedDeviceName);
if (levels.hasOwnProperty('input') && levels.hasOwnProperty('output')) {
const newVals = [new Date().getTime(), ...levels.input, ...levels.output];
let offset = 0;
if (first === 0) {
const firstTs = new Date().getTime();
setFirst(firstTs);
offset = firstTs;
} else {
offset = first;
}
setData(d => {
if (d[0].length > 0) {
d = newVals.map((v, idx) => idx === 0 ? [...d[idx], (v - offset)] : [...d[idx], v]);
} else {
d = newVals.map((v, idx) => idx === 0 ? [v-offset] : [v]);
}
return trimToDuration(d, duration);
});
ws.current = new WebSocket("ws://" + window.location.host + "/ws");
ws.current.onopen = () => {
console.log(`Subscribing to ${selectedDeviceName}`)
ws.current.send(`subscribe levels ${selectedDeviceName}`);
};
ws.current.onclose = () => {
console.log("Closing ws")
}
ws.current.onmessage = e => {
const levels = JSON.parse(e.data);
if (levels.hasOwnProperty('masterVolume')) {
// ignore
} else if (levels.hasOwnProperty('input') && levels.hasOwnProperty('output')) {
const newVals = [levels.ts, ...levels.input, ...levels.output];
let offset = 0;
if (first === 0) {
const firstTs = newVals[0];
setFirst(firstTs);
offset = firstTs;
} else {
setErr(new Error(`No data available in levels ${JSON.stringify(levels)}`))
offset = first;
}
} catch (e) {
setErr(e);
clearInterval(timer);
setData(d => {
if (d[0].length > 0) {
d = newVals.map((v, idx) => idx === 0 ? [...d[idx], (v - offset)] : [...d[idx], v]);
} else {
d = newVals.map((v, idx) => idx === 0 ? [v-offset] : [v]);
}
return trimToDuration(d, duration);
});
} else {
setErr(new Error(`No data available in levels ${JSON.stringify(levels)}`))
}
}, 200);
return () => clearInterval(timer);
}
return () => {
ws.current.close();
};
}, [selectedDeviceName, setData, first, setFirst, setErr, duration]);

const chartOpts = Object.assign({}, opts, {
width: window.innerWidth - 16,
height: window.innerHeight - 233,
});
return (
<>
<Header availableDeviceNames={Object.keys(availableDevices)}
setSelectedDeviceName={setSelectedDeviceName}
selectedDeviceName={selectedDeviceName}/>
<Controls duration={duration}
setDuration={setDuration}/>
<Chart options={opts} data={data}/>
<Chart options={chartOpts} data={data}/>
</>
);
};
Expand Down

0 comments on commit ce3e29d

Please sign in to comment.