Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 22 additions & 42 deletions discos_client/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from __future__ import annotations
import json
import weakref
import zlib
from threading import Thread, Lock, Event
from collections import defaultdict
from typing import Any
from pathlib import Path
import orjson
import zmq
from zmq.utils.monitor import recv_monitor_message
from .namespace import DISCOSNamespace
Expand Down Expand Up @@ -142,12 +143,11 @@ def __command__(self, cmd: str, *args) -> DISCOSNamespace:
if args:
payload["args"] = args

payload = json.dumps(payload, separators=(",", ":"))
self._req.send_string(payload)
self._req.send(orjson.dumps(payload))

while self.__req_connected__(strict=True):
if (self._req.poll(10) & zmq.POLLIN) != 0:
answer <<= json.loads(self._req.recv_string())
answer <<= orjson.loads(self._req.recv())
return answer

# We lost connection between send and receive, we need to reinitialize
Expand Down Expand Up @@ -249,9 +249,9 @@ def __receive__(
sub.unsubscribe(t)
t = t[len(client_id):]
sub.subscribe(t)
p = json.loads(p)
payload = orjson.loads(zlib.decompress(p))
with locks[t]:
namespaces[t] <<= p
namespaces[t] <<= payload

def __req_connected__(self, strict: bool = False) -> bool:
"""
Expand Down Expand Up @@ -327,58 +327,38 @@ def __format__(self, spec: str) -> str:
"""
has_e = "e" in spec
has_m = "m" in spec
has_i = "i" in spec

if has_e and has_m:
raise ValueError(
"Format specifier cannot contain both 'e' and 'm'."
)

if has_e:
fmt_spec = spec[1:] if spec.startswith("e") else spec
fmt_spec = fmt_spec[:-1] if fmt_spec.endswith("e") else fmt_spec
elif has_m:
fmt_spec = spec[1:] if spec.startswith("m") else spec
fmt_spec = fmt_spec[:-1] if fmt_spec.endswith("m") else fmt_spec
else:
fmt_spec = spec
fmt_spec = spec
for ch in ("e", "m", "i"):
fmt_spec = fmt_spec.replace(ch, "")

if fmt_spec not in ("", "t"):
raise ValueError(
f"Unknown format code '{spec}' for "
f"{self.__class__.__name__}"
)

indent = None
separators = None
default = (
DISCOSNamespace.__full_dict__ if has_e
else DISCOSNamespace.__metadata_dict__ if has_m
else DISCOSNamespace.__message_dict__
)

if fmt_spec == "":
pass
elif fmt_spec == "t":
separators = (",", ":")
elif fmt_spec.endswith("i"):
fmt_par = fmt_spec[:-1]
indent = 2
if fmt_par:
try:
indent = int(fmt_par)
except ValueError as exc:
raise ValueError(
f"Invalid indent in format spec: '{fmt_spec[:-1]}'"
) from exc
if indent <= 0:
raise ValueError("Indentation must be a positive integer")
else:
raise ValueError(
f"Unknown format code '{spec}' for {self.__class__.__name__}"
)
option = orjson.OPT_SORT_KEYS
if has_i:
option |= orjson.OPT_INDENT_2

return json.dumps(
return orjson.dumps(
self.__public_dict__(),
default=default,
indent=indent,
separators=separators,
sort_keys=True,
ensure_ascii=False
)
option=option,
).decode()

def __public_dict__(self) -> dict[str, DISCOSNamespace]:
"""
Expand Down
Loading
Loading