Skip to content

Commit

Permalink
Merge pull request #114 from cloudblue/LITE-26314-fix-consume-log
Browse files Browse the repository at this point in the history
LITE-26314 fix logging of consume command
  • Loading branch information
ffaraone committed Feb 7, 2023
2 parents 854f5f8 + 4396501 commit 4eddd08
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions dj_cqrs/management/commands/cqrs_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,25 @@
from dj_cqrs.registries import ReplicaRegistry


logger = logging.getLogger('django_cqrs.cqrs_consume')
logger = logging.getLogger('django-cqrs')


def consume(**kwargs):
import django
django.setup()

from dj_cqrs.transport import current_transport
current_transport.consume(**kwargs)
try:
current_transport.consume(**kwargs)
except KeyboardInterrupt:
pass


def _display_path(path):
try:
return f'"{path.relative_to(Path.cwd())}"'
except ValueError: # pragma: no cover
return f'"{path}"'


class WorkersManager:
Expand Down Expand Up @@ -65,6 +75,10 @@ def run(self):
if self.reload:
for files_changed in self:
if files_changed:
logger.warning(
'Detected changes in %s. Reloading...',
', '.join(map(_display_path, files_changed)),
)
self.restart()
else:
self.stop_event.wait()
Expand All @@ -80,11 +94,13 @@ def start(self):
self.consume_kwargs,
)
self.pool.append(process)
logger.info(f'Consumer process with pid {process.pid} started')

def terminate(self, *args, **kwargs):
while self.pool:
process = self.pool.pop()
process.stop(sigint_timeout=self.sigint_timeout, sigkill_timeout=self.sigkill_timeout)
logger.info(f'Consumer process with pid {process.pid} stopped.')

def restart(self, *args, **kwargs):
self.terminate()
Expand Down

0 comments on commit 4eddd08

Please sign in to comment.