Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions examples/cameraServiceExample.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from context_logger import get_logger, setup_logging

from examples import setup_shutdown
from hello import ServiceInfo, Hello, Group
from hello import Service, Hello, Group

setup_logging('hello')

Expand All @@ -21,9 +21,12 @@ def main() -> None:
group = Group.create(name='effective-range/sniper', address='239.0.1.1', port=5555, if_address=if_address)

# Define the service information for the camera
info = ServiceInfo(uuid=uuid4(), name='er-sniper-camera-1', role='camera', urls={
service = Service(uuid=uuid4(), name='er-sniper-camera-1', role='camera', address=if_address, urls={
'api': f'grpc://{if_address}:50051',
'stream': f'http://{if_address}:8000/video_feed'
}, info={
'site': 'er-sniper-site-1',
'range': 'er-sniper-range-1'
})

# Use a scheduled advertizer to periodically announce the camera service
Expand All @@ -32,7 +35,7 @@ def main() -> None:
advertizer.start(group)

# Immediately advertise the service information
advertizer.advertise(info)
advertizer.advertise(service)

# Schedule periodic advertisements every 10 seconds
advertizer.schedule_periodic(interval=10)
Expand Down
62 changes: 31 additions & 31 deletions hello/advertizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@
from common_utility import IReusableTimer
from context_logger import get_logger

from hello import ServiceInfo, Group, Sender, Receiver, ServiceMatcher, ServiceQuery, AbstractScheduler
from hello import Service, Group, Sender, Receiver, ServiceMatcher, ServiceQuery, AbstractScheduler

log = get_logger('Advertizer')


class Advertizer:

def start(self, group: Group, info: ServiceInfo | None = None) -> None:
def start(self, group: Group, service: Service | None = None) -> None:
raise NotImplementedError()

def stop(self) -> None:
raise NotImplementedError()

def advertise(self, info: ServiceInfo | None = None, log_level: int = INFO) -> None:
def advertise(self, service: Service | None = None, log_level: int = INFO) -> None:
raise NotImplementedError()


Expand All @@ -32,37 +32,37 @@ class DefaultAdvertizer(Advertizer):
def __init__(self, sender: Sender) -> None:
self._sender = sender
self._group: Group | None = None
self._info: ServiceInfo | None = None
self._service: Service | None = None

def __enter__(self) -> Advertizer:
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()

def start(self, group: Group, info: ServiceInfo | None = None) -> None:
def start(self, group: Group, service: Service | None = None) -> None:
self._sender.start(group.hello())
self._group = group
self._info = info
log.info('Advertizer started', group=self._group, service=self._info)
self._service = service
log.info('Advertizer started', group=self._group, service=self._service)

def stop(self) -> None:
self._group = None
self._info = None
self._service = None
self._sender.stop()
log.info('Advertizer stopped')

def advertise(self, info: ServiceInfo | None = None, log_level: int = INFO) -> None:
def advertise(self, service: Service | None = None, log_level: int = INFO) -> None:
if self._group:
if info:
self._info = info
if self._info:
self._sender.send(self._info)
log.log(log_level, 'Service advertised', service=self._info, group=self._group)
if service:
self._service = service
if self._service:
self._sender.send(self._service)
log.log(log_level, 'Service advertised', service=self._service, group=self._group)
else:
log.warning('Cannot advertise service, no service info provided', group=self._group)
log.warning('Cannot advertise service, no service provided', group=self._group)
else:
log.warning('Cannot advertise service, advertizer not started', service=info)
log.warning('Cannot advertise service, advertizer not started', service=service)


class RespondingAdvertizer(DefaultAdvertizer):
Expand All @@ -72,8 +72,8 @@ def __init__(self, sender: Sender, receiver: Receiver, max_response_delay: float
self._receiver = receiver
self._max_delay = max_response_delay

def start(self, group: Group, info: ServiceInfo | None = None) -> None:
super().start(group, info)
def start(self, group: Group, service: Service | None = None) -> None:
super().start(group, service)
self._receiver.start(group.query())
self._receiver.register(self._handle_message)

Expand All @@ -83,24 +83,24 @@ def stop(self) -> None:
super().stop()

def _handle_message(self, message: dict[str, Any]) -> None:
if self._info:
if self._service:
try:
query = ServiceQuery(**message)
matcher = ServiceMatcher(query)
log.debug('Service query received', group=self._group, query=query)
self._handle_query(matcher, self._info)
self._handle_query(matcher, self._service)
except Exception as error:
log.warning('Invalid service query received', group=self._group, received=message, error=error)

def _handle_query(self, matcher: ServiceMatcher, info: ServiceInfo) -> None:
if matcher.matches(info):
def _handle_query(self, matcher: ServiceMatcher, service: Service) -> None:
if matcher.matches(service):
delay = round(self._max_delay * random.random(), 3)
log.info('Responding to query', group=self._group, query=matcher.query, service=info, delay=delay)
log.info('Responding to query', group=self._group, query=matcher.query, service=service, delay=delay)
time.sleep(delay)
self.advertise(info)
self.advertise(service)


class ScheduledAdvertizer(AbstractScheduler[ServiceInfo], Advertizer):
class ScheduledAdvertizer(AbstractScheduler[Service], Advertizer):

def __init__(self, advertizer: Advertizer, timer: IReusableTimer) -> None:
super().__init__(timer)
Expand All @@ -112,15 +112,15 @@ def __enter__(self) -> 'ScheduledAdvertizer':
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()

def start(self, group: Group, info: ServiceInfo | None = None) -> None:
self._advertizer.start(group, info)
def start(self, group: Group, service: Service | None = None) -> None:
self._advertizer.start(group, service)

def stop(self) -> None:
super().stop()
self._advertizer.stop()

def advertise(self, info: ServiceInfo | None = None, log_level: int = INFO) -> None:
self._advertizer.advertise(info, log_level)
def advertise(self, service: Service | None = None, log_level: int = INFO) -> None:
self._advertizer.advertise(service, log_level)

def _execute(self, info: ServiceInfo | None = None) -> None:
self.advertise(info, DEBUG)
def _execute(self, service: Service | None = None) -> None:
self.advertise(service, DEBUG)
66 changes: 37 additions & 29 deletions hello/discoverer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@
from common_utility import IReusableTimer
from context_logger import get_logger

from hello import Group, ServiceQuery, Sender, Receiver, ServiceInfo, ServiceMatcher, AbstractScheduler
from hello import Group, ServiceQuery, Sender, Receiver, Service, ServiceMatcher, AbstractScheduler

log = get_logger('Discoverer')


class DiscoveryEventType(Enum):
DISCOVERED = 'discovered'
UNCHANGED = 'unchanged'
UPDATED = 'updated'


@dataclass
class DiscoveryEvent:
group: Group
query: ServiceQuery
service: ServiceInfo
service: Service
type: DiscoveryEventType


Expand All @@ -45,13 +46,13 @@ def stop(self) -> None:
def discover(self, query: ServiceQuery | None = None, log_level: int = INFO) -> None:
raise NotImplementedError()

def register(self, handler: OnDiscoveryEvent) -> None:
def register(self, handler: OnDiscoveryEvent, types: set[DiscoveryEventType] | None = None) -> None:
raise NotImplementedError()

def deregister(self, handler: OnDiscoveryEvent) -> None:
def deregister(self, handler: OnDiscoveryEvent, types: set[DiscoveryEventType] | None = None) -> None:
raise NotImplementedError()

def get_services(self) -> dict[UUID, ServiceInfo]:
def get_services(self) -> dict[UUID, Service]:
raise NotImplementedError()


Expand All @@ -62,8 +63,10 @@ def __init__(self, sender: Sender, receiver: Receiver, max_workers: int = 8) ->
self._receiver = receiver
self._group: Group | None = None
self._matcher: ServiceMatcher | None = None
self._services: dict[UUID, ServiceInfo] = {}
self._handlers: list[OnDiscoveryEvent] = []
self._services: dict[UUID, Service] = {}
self._handlers: dict[DiscoveryEventType, list[OnDiscoveryEvent]] = {
event_type: [] for event_type in DiscoveryEventType
}
self._handler_executor = ThreadPoolExecutor(max_workers=max_workers)

def __enter__(self) -> Discoverer:
Expand Down Expand Up @@ -101,48 +104,53 @@ def discover(self, query: ServiceQuery | None = None, log_level: int = INFO) ->
else:
log.warning('Cannot discover services, discoverer not started', query=query)

def register(self, handler: OnDiscoveryEvent) -> None:
self._handlers.append(handler)
def register(self, handler: OnDiscoveryEvent, types: set[DiscoveryEventType] | None = None) -> None:
for event_type in types if types else self._get_event_types():
self._handlers[event_type].append(handler)

def deregister(self, handler: OnDiscoveryEvent) -> None:
self._handlers.remove(handler)
def deregister(self, handler: OnDiscoveryEvent, types: set[DiscoveryEventType] | None = None) -> None:
for event_type in types if types else self._get_event_types():
self._handlers[event_type].remove(handler)

def get_services(self) -> dict[UUID, ServiceInfo]:
def get_services(self) -> dict[UUID, Service]:
return self._services.copy()

def _get_event_types(self) -> set[DiscoveryEventType]:
return set(self._handlers.keys())

def _handle_message(self, message: dict[str, Any]) -> None:
if self._group and self._matcher:
try:
service = ServiceInfo(UUID(message['uuid']), message['name'], message['role'], message.get('urls', {}))
log.debug('Service info received', service=service, group=self._group)
service = Service(UUID(message['uuid']), message['name'], message['role'],
message.get('urls', {}), message.get('info', {}), message['address'])
log.debug('Service received', service=service, group=self._group)
self._handle_service(service, self._group, self._matcher)
except Exception as error:
log.warn('Invalid service info received', group=self._group, data=message, error=error)
log.warn('Invalid service received', group=self._group, data=message, error=error)

def _handle_service(self, service: ServiceInfo, group: Group, matcher: ServiceMatcher) -> None:
def _handle_service(self, service: Service, group: Group, matcher: ServiceMatcher) -> None:
if matcher.matches(service):
stored = self._services.get(service.uuid)
event = self._create_event(group, matcher, stored, service)
self._handle_event(event)

if event := self._create_event(group, matcher, stored, service):
self._handle_event(event)

def _create_event(self, group: Group, matcher: ServiceMatcher,
stored: ServiceInfo | None, service: ServiceInfo) -> DiscoveryEvent | None:
def _create_event(self, group: Group, matcher: ServiceMatcher, stored: Service | None,
service: Service) -> DiscoveryEvent:
if stored:
if stored != service:
log.info('Service updated', group=group, old_service=stored, new_service=service)
return DiscoveryEvent(group, matcher.query, service, DiscoveryEventType.UPDATED)
else:
log.debug('Service unchanged', group=group, service=service)
return None
return DiscoveryEvent(group, matcher.query, service, DiscoveryEventType.UPDATED)
else:
log.info('New service discovered', group=group, service=service)
log.info('Service discovered', group=group, service=service)
return DiscoveryEvent(group, matcher.query, service, DiscoveryEventType.DISCOVERED)

def _handle_event(self, event: DiscoveryEvent) -> None:
self._services[event.service.uuid] = event.service

for handler in self._handlers:
for handler in self._handlers[event.type]:
self._handler_executor.submit(self._execute_handler, handler, event)

def _execute_handler(self, handler: OnDiscoveryEvent, event: DiscoveryEvent) -> None:
Expand Down Expand Up @@ -174,14 +182,14 @@ def stop(self) -> None:
def discover(self, query: ServiceQuery | None = None, log_level: int = INFO) -> None:
self._discoverer.discover(query, log_level)

def get_services(self) -> dict[UUID, ServiceInfo]:
def get_services(self) -> dict[UUID, Service]:
return self._discoverer.get_services()

def register(self, handler: OnDiscoveryEvent) -> None:
self._discoverer.register(handler)
def register(self, handler: OnDiscoveryEvent, types: set[DiscoveryEventType] | None = None) -> None:
self._discoverer.register(handler, types)

def deregister(self, handler: OnDiscoveryEvent) -> None:
self._discoverer.deregister(handler)
def deregister(self, handler: OnDiscoveryEvent, types: set[DiscoveryEventType] | None = None) -> None:
self._discoverer.deregister(handler, types)

def _execute(self, query: ServiceQuery | None = None) -> None:
self.discover(query, DEBUG)
17 changes: 11 additions & 6 deletions hello/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,26 @@


@dataclass
class ServiceInfo:
class Service:
uuid: UUID
name: str
role: str
urls: dict[str, str] = field(default_factory=dict)
info: dict[str, Any] = field(default_factory=dict)
address: str | None = None

def __repr__(self) -> str:
return f"ServiceInfo(uuid='{self.uuid}', name='{self.name}', role='{self.role}', urls='{self.urls}')"
return (f"Service(uuid='{self.uuid}', name='{self.name}', role='{self.role}', "
f"urls={self.urls}, info={self.info}), addr='{self.address}'")

def to_dict(self) -> dict[str, Any]:
return {
'uuid': str(self.uuid),
'name': self.name,
'role': self.role,
'urls': self.urls
'urls': self.urls,
'info': self.info,
'address': self.address,
}


Expand All @@ -40,7 +45,7 @@ def __init__(self, query: ServiceQuery) -> None:
self._name_matcher = re.compile(self.query.name)
self._role_matcher = re.compile(self.query.role)

def matches(self, info: ServiceInfo) -> bool:
name_match = self._name_matcher.match(info.name)
role_match = self._role_matcher.match(info.role)
def matches(self, service: Service) -> bool:
name_match = self._name_matcher.match(service.name)
role_match = self._role_matcher.match(service.role)
return bool(name_match and role_match)
Loading
Loading