Skip to content

Commit

Permalink
Improve Redis server management
Browse files Browse the repository at this point in the history
- Allow starting instances with different detectors without warning
  message;
- In the terminal, "--topic" becomes a positional argument;
- Allow to shutdown others' Redis server to avoid zombie Redis server
  occupying the port;
- Fix bug in "extra-foam-kill";
- Fix web monitor;
- Improve global Redis client connection initialization;
- Fix logger level.
  • Loading branch information
zhujun98 committed Feb 3, 2020
1 parent e595488 commit 77a7d6a
Show file tree
Hide file tree
Showing 13 changed files with 353 additions and 295 deletions.
8 changes: 5 additions & 3 deletions extra_foam/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,13 @@ class _Config(dict):
# -------------------------------------------------------------
# absolute path of the Redis server executable
"REDIS_EXECUTABLE": osp.join(_abs_dirpath, "thirdparty/bin/redis-server"),
# default REDIS port used in testing. Each detector has its
# dedicated REDIS port.
# default Redis port used in testing. Each detector has its
# dedicated Redis port.
"REDIS_PORT": 6379,
# maximum attempts to ping the Redis server before shutting down the app
"REDIS_MAX_PING_ATTEMPTS": 10,
"REDIS_MAX_PING_ATTEMPTS": 3,
# interval for pinging the Redis server from the main GUI, in milliseconds
"REDIS_PING_ATTEMPT_INTERVAL": 5000,
# If the path is given, redis-py will use UnixDomainSocketConnection.
# Otherwise, normal TCP socket connection.
"REDIS_UNIX_DOMAIN_SOCKET_PATH": "",
Expand Down
37 changes: 11 additions & 26 deletions extra_foam/database/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
Copyright (C) European X-Ray Free-Electron Laser Facility GmbH.
All rights reserved.
"""
import pickle

from .base_proxy import _AbstractProxy
from .db_utils import redis_except_handler
from ..config import AnalysisType
Expand All @@ -30,8 +28,12 @@ def __new__(mcs, name, bases, class_dict):

class Metadata(metaclass=MetaMetadata):

SESSION = "meta:session"

CONNECTION = "meta:connection"

ANALYSIS_TYPE = "meta:analysis_type"

# The key of processors' metadata must end with '_PROC'
GLOBAL_PROC = "meta:proc:global"
IMAGE_PROC = "meta:proc:image"
Expand All @@ -55,22 +57,13 @@ class Metadata(metaclass=MetaMetadata):

class MetaProxy(_AbstractProxy):
"""Proxy for retrieving metadata."""
SESSION = "meta:session"

ANALYSIS_TYPE = "meta:analysis_type"

def set_session(self, mapping):
return self.hmset(self.SESSION, mapping)

def get_session(self):
return self.hget_all(self.SESSION)

def has_analysis(self, analysis_type):
"""Check if the given analysis type has been registered.
:param AnalysisType analysis_type: analysis type.
"""
return int(self.hget(self.ANALYSIS_TYPE, analysis_type))
return int(self.hget(Metadata.ANALYSIS_TYPE, analysis_type))

def has_any_analysis(self, analysis_types):
"""Check if any of the listed analysis types has been registered.
Expand All @@ -81,7 +74,7 @@ def has_any_analysis(self, analysis_types):
raise TypeError("Input must be a tuple or list!")

for analysis_type in analysis_types:
if int(self.hget(self.ANALYSIS_TYPE, analysis_type)) > 0:
if int(self.hget(Metadata.ANALYSIS_TYPE, analysis_type)) > 0:
return True
return False

Expand All @@ -94,7 +87,7 @@ def has_all_analysis(self, analysis_types):
raise TypeError("Input must be a tuple or list!")

for analysis_type in analysis_types:
if int(self.hget(self.ANALYSIS_TYPE, analysis_type)) <= 0:
if int(self.hget(Metadata.ANALYSIS_TYPE, analysis_type)) <= 0:
return False
return True

Expand All @@ -105,30 +98,22 @@ def get_all_analysis(self):
otherwise, a dictionary of key-value pairs (
analysis type: number of registrations).
"""
return self.hget_all(self.ANALYSIS_TYPE)

def initialize_analysis_types(self):
"""Initialize all analysis types in Redis.
Prevent 'has_analysis', 'has_any_analysis' and 'has_all_analysis'
from getting None when querying.
"""
return self.hmset(self.ANALYSIS_TYPE, {t: 0 for t in AnalysisType})
return self.hget_all(Metadata.ANALYSIS_TYPE)

def register_analysis(self, analysis_type):
"""Register the given analysis type.
:param AnalysisType analysis_type: analysis type.
"""
return self.hincrease_by(self.ANALYSIS_TYPE, analysis_type, 1)
return self.hincrease_by(Metadata.ANALYSIS_TYPE, analysis_type, 1)

def unregister_analysis(self, analysis_type):
"""Unregister the given analysis type.
:param AnalysisType analysis_type: analysis type.
"""
if int(self.hget(self.ANALYSIS_TYPE, analysis_type)) > 0:
return self.hincrease_by(self.ANALYSIS_TYPE, analysis_type, -1)
if int(self.hget(Metadata.ANALYSIS_TYPE, analysis_type)) > 0:
return self.hincrease_by(Metadata.ANALYSIS_TYPE, analysis_type, -1)
return 0

@redis_except_handler
Expand Down
1 change: 0 additions & 1 deletion extra_foam/database/mondata.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
"""
import time

from .metadata import Metadata as mt
from .base_proxy import _AbstractProxy
from .db_utils import redis_except_handler
from ..config import config
Expand Down
8 changes: 3 additions & 5 deletions extra_foam/database/tests/test_metadata.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest

from extra_foam.config import AnalysisType
from extra_foam.database.metadata import MetaMetadata
from extra_foam.database.metadata import Metadata, MetaMetadata
from extra_foam.database import MetaProxy, MonProxy
from extra_foam.processes import wait_until_redis_shutdown
from extra_foam.services import start_redis_server
Expand All @@ -13,8 +13,6 @@ def setUpClass(cls):
start_redis_server()

cls._meta = MetaProxy()
cls._meta.initialize_analysis_types()

cls._mon = MonProxy()

@classmethod
Expand Down Expand Up @@ -44,15 +42,15 @@ def testAnalysisType(self):
# register an analysis type twice
self._meta.register_analysis(type2)
self.assertTrue(self._meta.has_all_analysis([type1, type2]))
self.assertEqual('2', self._meta.hget(self._meta.ANALYSIS_TYPE, type2))
self.assertEqual('2', self._meta.hget(Metadata.ANALYSIS_TYPE, type2))

# unregister an analysis type
self._meta.unregister_analysis(type1)
self.assertFalse(self._meta.has_analysis(type1))

# unregister an analysis type which has not been registered
self._meta.unregister_analysis(type3)
self.assertEqual('0', self._meta.hget(self._meta.ANALYSIS_TYPE, type3))
self.assertEqual('0', self._meta.hget(Metadata.ANALYSIS_TYPE, type3))

def testMetaMetadata(self):
class Dummy(metaclass=MetaMetadata):
Expand Down
6 changes: 3 additions & 3 deletions extra_foam/gui/image_tool/tests/test_image_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from extra_foam.pipeline.processors.tests import _BaseProcessorTest
from extra_foam.processes import wait_until_redis_shutdown
from extra_foam.services import Foam
from extra_foam.database import MetaProxy
from extra_foam.database import Metadata, MetaProxy

app = mkQApp()

Expand Down Expand Up @@ -618,10 +618,10 @@ def testViewTabSwitching(self):
self.assertFalse(record_btn.isChecked())

# switch to "azimuthal integration 1D"
self.assertEqual('0', self._meta.hget(self._meta.ANALYSIS_TYPE, AnalysisType.AZIMUTHAL_INTEG))
self.assertEqual('0', self._meta.hget(Metadata.ANALYSIS_TYPE, AnalysisType.AZIMUTHAL_INTEG))
tab.tabBarClicked.emit(TabIndex.AZIMUTHAL_INTEG_1D)
tab.setCurrentIndex(TabIndex.AZIMUTHAL_INTEG_1D)
self.assertEqual('1', self._meta.hget(self._meta.ANALYSIS_TYPE, AnalysisType.AZIMUTHAL_INTEG))
self.assertEqual('1', self._meta.hget(Metadata.ANALYSIS_TYPE, AnalysisType.AZIMUTHAL_INTEG))

# switch to "geometry"
tab.tabBarClicked.emit(TabIndex.GEOMETRY)
Expand Down
12 changes: 8 additions & 4 deletions extra_foam/gui/main_gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,20 +400,24 @@ def updateAll(self):
def pingRedisServer(self):
try:
self._db.ping()
self.__redis_connection_fails = 0
if self.__redis_connection_fails > 0:
# Note: Indeed, we do not have mechanism to recover from
# a Redis server crash. It is recommended to restart
# Extra-foam if you encounter this situation.
logger.info("Reconnect to the Redis server!")
self.__redis_connection_fails = 0
except ConnectionError:
self.__redis_connection_fails += 1
rest_attempts = config["REDIS_MAX_PING_ATTEMPTS"] - \
self.__redis_connection_fails

if rest_attempts > 0:
logger.warning(f"No response from the Redis server! Shutting "
logger.warning(f"No response from the Redis server! Shut "
f"down after {rest_attempts} attempts ...")
else:
logger.warning(f"No response from the Redis server! "
f"Shutting down!")
self.close()
time.sleep(5)

def addAction(self, description, filename):
icon = QIcon(osp.join(self._root_dir, "icons/" + filename))
Expand Down Expand Up @@ -493,7 +497,7 @@ def start(self):
"""
self._thread_logger_t.start()
self._plot_timer.start(config["GUI_PLOT_UPDATE_TIMER"])
self._redis_timer.start(config["PROCESS_MONITOR_UPDATE_TIMER"])
self._redis_timer.start(config["REDIS_PING_ATTEMPT_INTERVAL"])
self._input.start(self._close_ev)

def onStart(self):
Expand Down
68 changes: 36 additions & 32 deletions extra_foam/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,16 @@ def get_all(self):
_global_connections = dict()


def reset_redis_connections():
"""Reset all connections."""
def init_redis_connection(host, port, *, password=None):
"""Initialize Redis client connection.
:param str host: IP address of the Redis server.
:param int port:: Port of the Redis server.
:param str password: password for the Redis server.
:return: Redis connection.
"""
# reset all connections first
global _GLOBAL_REDIS_CONNECTION
_GLOBAL_REDIS_CONNECTION = None
global _GLOBAL_REDIS_CONNECTION_BYTES
Expand All @@ -82,39 +90,31 @@ def reset_redis_connections():
if c is not None:
c.reset()

# initialize new connection
if config["REDIS_UNIX_DOMAIN_SOCKET_PATH"]:
raise NotImplementedError(
"Unix domain socket connection is not supported!")
# connection = redis.Redis(
# unix_socket_path=config["REDIS_UNIX_DOMAIN_SOCKET_PATH"],
# decode_responses=decode_responses
# )
else:
# the following two must have different pools
connection = redis.Redis(
host, port, password=password, decode_responses=True)
connection_byte = redis.Redis(
host, port, password=password, decode_responses=False)

_GLOBAL_REDIS_CONNECTION = connection
_GLOBAL_REDIS_CONNECTION_BYTES = connection_byte
return connection


def redis_connection(decode_responses=True):
"""Return a Redis connection."""
if decode_responses:
global _GLOBAL_REDIS_CONNECTION
connection = _GLOBAL_REDIS_CONNECTION
decode_responses = True
else:
global _GLOBAL_REDIS_CONNECTION_BYTES
connection = _GLOBAL_REDIS_CONNECTION_BYTES
decode_responses = False

if connection is None:
if config["REDIS_UNIX_DOMAIN_SOCKET_PATH"]:
raise NotImplementedError(
"Unix domain socket connection is not supported!")
# connection = redis.Redis(
# unix_socket_path=config["REDIS_UNIX_DOMAIN_SOCKET_PATH"],
# decode_responses=decode_responses
# )
else:
connection = redis.Redis(
'localhost', config['REDIS_PORT'],
password=config['REDIS_PASSWORD'],
decode_responses=decode_responses
)

if decode_responses:
_GLOBAL_REDIS_CONNECTION = connection
else:
_GLOBAL_REDIS_CONNECTION_BYTES = connection

return connection
return _GLOBAL_REDIS_CONNECTION
return _GLOBAL_REDIS_CONNECTION_BYTES


class MetaRedisConnection(type):
Expand Down Expand Up @@ -194,7 +194,11 @@ def reset(self):


class ProcessLogger:
"""Worker which publishes log message in another Process."""
"""Worker which publishes log message in another Process.
Note: remember to change other part of the code if the log pattern
changes.
"""

_db = RedisConnection()

Expand Down
4 changes: 1 addition & 3 deletions extra_foam/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,17 @@ def create_logger():
"""Create the logger object for the whole API."""
name = "EXtra-foam"
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)

log_file = os.path.join(ROOT_PATH, name + ".log")
fh = TimedRotatingFileHandler(log_file, when='midnight')

fh.suffix = "%Y%m%d"
fh.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(filename)s - %(levelname)s - %(message)s'
)
fh.setFormatter(formatter)

ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(filename)s - %(levelname)s - %(message)s'
)
Expand All @@ -49,3 +46,4 @@ def create_logger():


logger = create_logger()
logger.setLevel(logging.INFO)
1 change: 0 additions & 1 deletion extra_foam/pipeline/processors/tests/test_base_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def setUpClass(cls):
start_redis_server()

cls._meta = MetaProxy()
cls._meta.initialize_analysis_types()

@classmethod
def tearDownClass(cls):
Expand Down

0 comments on commit 77a7d6a

Please sign in to comment.