From 3ddee4ff6f74e69832a29e548e94dcad60e5bb78 Mon Sep 17 00:00:00 2001 From: Matthias Runge Date: Wed, 17 Feb 2021 11:03:07 +0100 Subject: [PATCH 01/11] Catch Redis server errors There seems to happen a bad interaction with Redis over HAProxy, and Redis connections are reported as closed, which is only temporary. --- gnocchi/cli/metricd.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/gnocchi/cli/metricd.py b/gnocchi/cli/metricd.py index e29aa0156..9d6560195 100644 --- a/gnocchi/cli/metricd.py +++ b/gnocchi/cli/metricd.py @@ -23,6 +23,7 @@ from cotyledon import oslo_config_glue import daiquiri from oslo_config import cfg +from redis.exceptions import ConnectionError import tenacity import tooz from tooz import coordination @@ -85,6 +86,9 @@ def run(self): with utils.StopWatch() as timer: try: self._run_job() + except ConnectionError as ce: + LOG.debug("Redis server closed connection. Retrying.") + raise tenacity.TryAgain(ce) except Exception: LOG.error("Unexpected error during %s job", self.name, @@ -192,6 +196,9 @@ def _fill_sacks_to_process(self): self.wakeup() except exceptions.NotImplementedError: LOG.info("Incoming driver does not support notification") + except ConnectionError as ce: + LOG.debug("Redis server closed connection. Retrying.") + raise tenacity.TryAgain(ce) except Exception as e: LOG.error( "Error while listening for new measures notification, " @@ -240,6 +247,9 @@ def _run_job(self): s_count += 1 self.incoming.finish_sack_processing(s) self.sacks_with_measures_to_process.discard(s) + except ConnectionError as ce: + LOG.debug("Redis server closed connection. Retrying.") + raise tenacity.TryAgain(ce) except Exception: LOG.error("Unexpected error processing assigned job", exc_info=True) From 5353276fe817669c8cb5a0384f5ba4142151dd63 Mon Sep 17 00:00:00 2001 From: Matthias Runge Date: Wed, 17 Feb 2021 13:22:16 +0100 Subject: [PATCH 02/11] Move redis to install dependencies --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 213efdb51..d8d8ef931 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,6 +34,7 @@ install_requires = futures; python_version < '3' jsonpatch cotyledon>=1.5.0 + redis >= 3.2.0 # MIT six stevedore ujson @@ -70,7 +71,6 @@ s3 = boto3 botocore>=1.5 redis = - redis>=2.10.0 # MIT hiredis swift = python-swiftclient>=3.1.0 From 841287a5250b8e09e79ccf001187f4c70041880e Mon Sep 17 00:00:00 2001 From: Matthias Runge Date: Wed, 17 Feb 2021 19:05:46 +0100 Subject: [PATCH 03/11] decorate function and skip other exception --- gnocchi/cli/metricd.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/gnocchi/cli/metricd.py b/gnocchi/cli/metricd.py index 9d6560195..71a734141 100644 --- a/gnocchi/cli/metricd.py +++ b/gnocchi/cli/metricd.py @@ -88,7 +88,6 @@ def run(self): self._run_job() except ConnectionError as ce: LOG.debug("Redis server closed connection. Retrying.") - raise tenacity.TryAgain(ce) except Exception: LOG.error("Unexpected error during %s job", self.name, @@ -198,7 +197,6 @@ def _fill_sacks_to_process(self): LOG.info("Incoming driver does not support notification") except ConnectionError as ce: LOG.debug("Redis server closed connection. Retrying.") - raise tenacity.TryAgain(ce) except Exception as e: LOG.error( "Error while listening for new measures notification, " @@ -226,6 +224,7 @@ def _get_sacks_to_process(self): finally: return self._tasks or self.fallback_tasks + @utils.retry_on_exception.wraps def _run_job(self): m_count = 0 s_count = 0 @@ -249,7 +248,6 @@ def _run_job(self): self.sacks_with_measures_to_process.discard(s) except ConnectionError as ce: LOG.debug("Redis server closed connection. Retrying.") - raise tenacity.TryAgain(ce) except Exception: LOG.error("Unexpected error processing assigned job", exc_info=True) From c27ba760cfe8dd6f171360ceda7693b74871afcf Mon Sep 17 00:00:00 2001 From: Matthias Runge Date: Wed, 17 Feb 2021 21:19:12 +0100 Subject: [PATCH 04/11] Remove left over variables in excepts --- gnocchi/cli/metricd.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gnocchi/cli/metricd.py b/gnocchi/cli/metricd.py index 71a734141..66425ce76 100644 --- a/gnocchi/cli/metricd.py +++ b/gnocchi/cli/metricd.py @@ -86,7 +86,7 @@ def run(self): with utils.StopWatch() as timer: try: self._run_job() - except ConnectionError as ce: + except ConnectionError: LOG.debug("Redis server closed connection. Retrying.") except Exception: LOG.error("Unexpected error during %s job", @@ -195,7 +195,7 @@ def _fill_sacks_to_process(self): self.wakeup() except exceptions.NotImplementedError: LOG.info("Incoming driver does not support notification") - except ConnectionError as ce: + except ConnectionError: LOG.debug("Redis server closed connection. Retrying.") except Exception as e: LOG.error( @@ -246,7 +246,7 @@ def _run_job(self): s_count += 1 self.incoming.finish_sack_processing(s) self.sacks_with_measures_to_process.discard(s) - except ConnectionError as ce: + except ConnectionError: LOG.debug("Redis server closed connection. Retrying.") except Exception: LOG.error("Unexpected error processing assigned job", From 79e23637148e589403cf33fb8eb6726fdde00a29 Mon Sep 17 00:00:00 2001 From: Matthias Runge Date: Fri, 19 Feb 2021 09:39:18 +0100 Subject: [PATCH 05/11] Move all exception handling to incoming/redis as suggested in the review. Closes #1120 --- gnocchi/cli/metricd.py | 9 +-------- gnocchi/incoming/redis.py | 21 ++++++++++++++++----- setup.cfg | 2 +- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/gnocchi/cli/metricd.py b/gnocchi/cli/metricd.py index 66425ce76..899691c5e 100644 --- a/gnocchi/cli/metricd.py +++ b/gnocchi/cli/metricd.py @@ -23,7 +23,6 @@ from cotyledon import oslo_config_glue import daiquiri from oslo_config import cfg -from redis.exceptions import ConnectionError import tenacity import tooz from tooz import coordination @@ -86,8 +85,6 @@ def run(self): with utils.StopWatch() as timer: try: self._run_job() - except ConnectionError: - LOG.debug("Redis server closed connection. Retrying.") except Exception: LOG.error("Unexpected error during %s job", self.name, @@ -195,8 +192,6 @@ def _fill_sacks_to_process(self): self.wakeup() except exceptions.NotImplementedError: LOG.info("Incoming driver does not support notification") - except ConnectionError: - LOG.debug("Redis server closed connection. Retrying.") except Exception as e: LOG.error( "Error while listening for new measures notification, " @@ -224,7 +219,6 @@ def _get_sacks_to_process(self): finally: return self._tasks or self.fallback_tasks - @utils.retry_on_exception.wraps def _run_job(self): m_count = 0 s_count = 0 @@ -246,8 +240,7 @@ def _run_job(self): s_count += 1 self.incoming.finish_sack_processing(s) self.sacks_with_measures_to_process.discard(s) - except ConnectionError: - LOG.debug("Redis server closed connection. Retrying.") + except Exception: LOG.error("Unexpected error processing assigned job", exc_info=True) diff --git a/gnocchi/incoming/redis.py b/gnocchi/incoming/redis.py index 2a5191b41..dd70550fe 100644 --- a/gnocchi/incoming/redis.py +++ b/gnocchi/incoming/redis.py @@ -17,10 +17,13 @@ import uuid import daiquiri +from redis.exceptions import ConnectionError import six +import tenacity from gnocchi.common import redis from gnocchi import incoming +from gnocchi import utils LOG = daiquiri.getLogger(__name__) @@ -176,6 +179,10 @@ def process_measures_for_sack(self, sack): pipe.ltrim(key, item_len + 1, -1) pipe.execute() + @tenacity.retry( + wait=utils.wait_exponential, + # Never retry except when explicitly asked by raising TryAgain + retry=tenacity.retry_never) def iter_on_sacks_to_process(self): self._client.config_set("notify-keyspace-events", "K$") p = self._client.pubsub() @@ -183,11 +190,15 @@ def iter_on_sacks_to_process(self): keyspace = b"__keyspace@" + str(db).encode() + b"__:" pattern = keyspace + self._get_sack_name("*").encode() p.psubscribe(pattern) - for message in p.listen(): - if message['type'] == 'pmessage' and message['pattern'] == pattern: - # FIXME(jd) This is awful, we need a better way to extract this - # Format is defined by _get_sack_name: incoming128-17 - yield self._make_sack(int(message['channel'].split(b"-")[-1])) + try: + for message in p.listen(): + if message['type'] == 'pmessage' and message['pattern'] == pattern: + # FIXME(jd) This is awful, we need a better way to extract this + # Format is defined by _get_sack_name: incoming128-17 + yield self._make_sack(int(message['channel'].split(b"-")[-1])) + except ConnectionError as ce: + LOG.debug("Redis Server closed connection. Retrying.") + tenacity.TryAgain(ce) def finish_sack_processing(self, sack): # Delete the sack key which handles no data but is used to get a SET diff --git a/setup.cfg b/setup.cfg index d8d8ef931..93bea080b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,7 +34,6 @@ install_requires = futures; python_version < '3' jsonpatch cotyledon>=1.5.0 - redis >= 3.2.0 # MIT six stevedore ujson @@ -71,6 +70,7 @@ s3 = boto3 botocore>=1.5 redis = + redis >= 3.2.0 # MIT hiredis swift = python-swiftclient>=3.1.0 From e507e41ac1e8c7b1c8939428b0173f549da25d5e Mon Sep 17 00:00:00 2001 From: Matthias Runge Date: Fri, 19 Feb 2021 09:45:48 +0100 Subject: [PATCH 06/11] Remove empty line added in previous commit --- gnocchi/cli/metricd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/gnocchi/cli/metricd.py b/gnocchi/cli/metricd.py index 899691c5e..e29aa0156 100644 --- a/gnocchi/cli/metricd.py +++ b/gnocchi/cli/metricd.py @@ -240,7 +240,6 @@ def _run_job(self): s_count += 1 self.incoming.finish_sack_processing(s) self.sacks_with_measures_to_process.discard(s) - except Exception: LOG.error("Unexpected error processing assigned job", exc_info=True) From 7677b1c3bb4ffe2137deddae086adb109e4d7d70 Mon Sep 17 00:00:00 2001 From: Matthias Runge Date: Fri, 19 Feb 2021 10:18:25 +0100 Subject: [PATCH 07/11] Remove s3 and try to enable ceph again in tests. s3 Was spuriously failing, we turned off testing for ceph some time ago. Time to turn it on again. --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 7064eb0a2..da21115ad 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,10 +18,10 @@ env: - TARGET: py36-mysql-file - TARGET: py36-mysql-swift - - TARGET: py36-mysql-s3 + - TARGET: py36-mysql-ceph - TARGET: py36-postgresql-file - TARGET: py36-postgresql-swift - - TARGET: py36-postgresql-s3 + - TARGET: py36-postgresql-ceph before_script: # NOTE(sileht): We need to fetch all tags/branches for documentation. From 8fc147f3e2d81302fc70f31fab1bed7ca971383a Mon Sep 17 00:00:00 2001 From: Matthias Runge Date: Fri, 19 Feb 2021 10:40:11 +0100 Subject: [PATCH 08/11] Catch exception in another place --- gnocchi/incoming/redis.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/gnocchi/incoming/redis.py b/gnocchi/incoming/redis.py index dd70550fe..d4dcd2eb1 100644 --- a/gnocchi/incoming/redis.py +++ b/gnocchi/incoming/redis.py @@ -96,6 +96,10 @@ def add_measures_batch(self, metrics_and_measures): notified_sacks.add(sack_name) pipe.execute() + @tenacity.retry( + wait=utils.wait_exponential, + # Never retry except when explicitly asked by raising TryAgain + retry=tenacity.retry_never) def _build_report(self, details): report_vars = {'measures': 0, 'metric_details': {}} @@ -109,20 +113,24 @@ def update_report(results, m_list): metrics = 0 m_list = [] pipe = self._client.pipeline() - for key in self._client.scan_iter(match=match, count=1000): - metrics += 1 - pipe.llen(key) - if details: - m_list.append(key.split(redis.SEP)[1].decode("utf8")) - # group 100 commands/call - if metrics % 100 == 0: + try: + for key in self._client.scan_iter(match=match, count=1000): + metrics += 1 + pipe.llen(key) + if details: + m_list.append(key.split(redis.SEP)[1].decode("utf8")) + # group 100 commands/call + if metrics % 100 == 0: + results = pipe.execute() + update_report(results, m_list) + m_list = [] + pipe = self._client.pipeline() + else: results = pipe.execute() update_report(results, m_list) - m_list = [] - pipe = self._client.pipeline() - else: - results = pipe.execute() - update_report(results, m_list) + except ConnectionError as ce: + LOG.debug("Redis Server closed connection. Retrying.") + tenacity.TryAgain(ce) return (metrics, report_vars['measures'], report_vars['metric_details'] if details else None) From df180702cdf611ca9e4fbd8ea1b2256b9d01e2a8 Mon Sep 17 00:00:00 2001 From: Matthias Runge Date: Fri, 19 Feb 2021 11:37:18 +0100 Subject: [PATCH 09/11] Turn back testing to s3 and address review feedback --- .travis.yml | 4 ++-- gnocchi/incoming/redis.py | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/.travis.yml b/.travis.yml index da21115ad..7064eb0a2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,10 +18,10 @@ env: - TARGET: py36-mysql-file - TARGET: py36-mysql-swift - - TARGET: py36-mysql-ceph + - TARGET: py36-mysql-s3 - TARGET: py36-postgresql-file - TARGET: py36-postgresql-swift - - TARGET: py36-postgresql-ceph + - TARGET: py36-postgresql-s3 before_script: # NOTE(sileht): We need to fetch all tags/branches for documentation. diff --git a/gnocchi/incoming/redis.py b/gnocchi/incoming/redis.py index d4dcd2eb1..d6b52e7ce 100644 --- a/gnocchi/incoming/redis.py +++ b/gnocchi/incoming/redis.py @@ -99,7 +99,7 @@ def add_measures_batch(self, metrics_and_measures): @tenacity.retry( wait=utils.wait_exponential, # Never retry except when explicitly asked by raising TryAgain - retry=tenacity.retry_never) + retry=tenacity.retry_if_exception_type(ConnectionError)) def _build_report(self, details): report_vars = {'measures': 0, 'metric_details': {}} @@ -128,9 +128,8 @@ def update_report(results, m_list): else: results = pipe.execute() update_report(results, m_list) - except ConnectionError as ce: + except ConnectionError: LOG.debug("Redis Server closed connection. Retrying.") - tenacity.TryAgain(ce) return (metrics, report_vars['measures'], report_vars['metric_details'] if details else None) @@ -190,7 +189,7 @@ def process_measures_for_sack(self, sack): @tenacity.retry( wait=utils.wait_exponential, # Never retry except when explicitly asked by raising TryAgain - retry=tenacity.retry_never) + retry=tenacity.retry_if_exception_type(ConnectionError)) def iter_on_sacks_to_process(self): self._client.config_set("notify-keyspace-events", "K$") p = self._client.pubsub() @@ -204,9 +203,8 @@ def iter_on_sacks_to_process(self): # FIXME(jd) This is awful, we need a better way to extract this # Format is defined by _get_sack_name: incoming128-17 yield self._make_sack(int(message['channel'].split(b"-")[-1])) - except ConnectionError as ce: + except ConnectionError: LOG.debug("Redis Server closed connection. Retrying.") - tenacity.TryAgain(ce) def finish_sack_processing(self, sack): # Delete the sack key which handles no data but is used to get a SET From d6fb9e8f20a11fb87045ff23f3d3f27fe540f513 Mon Sep 17 00:00:00 2001 From: Matthias Runge Date: Fri, 19 Feb 2021 12:28:27 +0100 Subject: [PATCH 10/11] Drop debug output --- gnocchi/incoming/redis.py | 40 +++++++++++++++++---------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/gnocchi/incoming/redis.py b/gnocchi/incoming/redis.py index d6b52e7ce..5c2e33f51 100644 --- a/gnocchi/incoming/redis.py +++ b/gnocchi/incoming/redis.py @@ -113,23 +113,20 @@ def update_report(results, m_list): metrics = 0 m_list = [] pipe = self._client.pipeline() - try: - for key in self._client.scan_iter(match=match, count=1000): - metrics += 1 - pipe.llen(key) - if details: - m_list.append(key.split(redis.SEP)[1].decode("utf8")) - # group 100 commands/call - if metrics % 100 == 0: - results = pipe.execute() - update_report(results, m_list) - m_list = [] - pipe = self._client.pipeline() - else: + for key in self._client.scan_iter(match=match, count=1000): + metrics += 1 + pipe.llen(key) + if details: + m_list.append(key.split(redis.SEP)[1].decode("utf8")) + # group 100 commands/call + if metrics % 100 == 0: results = pipe.execute() update_report(results, m_list) - except ConnectionError: - LOG.debug("Redis Server closed connection. Retrying.") + m_list = [] + pipe = self._client.pipeline() + else: + results = pipe.execute() + update_report(results, m_list) return (metrics, report_vars['measures'], report_vars['metric_details'] if details else None) @@ -197,14 +194,11 @@ def iter_on_sacks_to_process(self): keyspace = b"__keyspace@" + str(db).encode() + b"__:" pattern = keyspace + self._get_sack_name("*").encode() p.psubscribe(pattern) - try: - for message in p.listen(): - if message['type'] == 'pmessage' and message['pattern'] == pattern: - # FIXME(jd) This is awful, we need a better way to extract this - # Format is defined by _get_sack_name: incoming128-17 - yield self._make_sack(int(message['channel'].split(b"-")[-1])) - except ConnectionError: - LOG.debug("Redis Server closed connection. Retrying.") + for message in p.listen(): + if message['type'] == 'pmessage' and message['pattern'] == pattern: + # FIXME(jd) This is awful, we need a better way to extract this + # Format is defined by _get_sack_name: incoming128-17 + yield self._make_sack(int(message['channel'].split(b"-")[-1])) def finish_sack_processing(self, sack): # Delete the sack key which handles no data but is used to get a SET From c2e6fd02d5569877aab7cbf6a2a528ae0129f37c Mon Sep 17 00:00:00 2001 From: Matthias Runge Date: Fri, 19 Feb 2021 15:16:10 +0100 Subject: [PATCH 11/11] Add stop condition and fix comment --- gnocchi/incoming/redis.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gnocchi/incoming/redis.py b/gnocchi/incoming/redis.py index 5c2e33f51..28859fc77 100644 --- a/gnocchi/incoming/redis.py +++ b/gnocchi/incoming/redis.py @@ -96,9 +96,10 @@ def add_measures_batch(self, metrics_and_measures): notified_sacks.add(sack_name) pipe.execute() + # if ConnectionError exception occurs, try again, max 5 times. @tenacity.retry( wait=utils.wait_exponential, - # Never retry except when explicitly asked by raising TryAgain + stop=tenacity.stop_after_attempt(5), retry=tenacity.retry_if_exception_type(ConnectionError)) def _build_report(self, details): report_vars = {'measures': 0, 'metric_details': {}} @@ -183,9 +184,10 @@ def process_measures_for_sack(self, sack): pipe.ltrim(key, item_len + 1, -1) pipe.execute() + # if ConnectionError exception occurs, try again, max 5 times. @tenacity.retry( wait=utils.wait_exponential, - # Never retry except when explicitly asked by raising TryAgain + stop=tenacity.stop_after_attempt(5), retry=tenacity.retry_if_exception_type(ConnectionError)) def iter_on_sacks_to_process(self): self._client.config_set("notify-keyspace-events", "K$")