Skip to content

Commit

Permalink
Shutdown statsbeat after failure threshold is met (#1127)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen committed Jun 14, 2022
1 parent 94e49ec commit 7cbf82f
Show file tree
Hide file tree
Showing 12 changed files with 337 additions and 116 deletions.
3 changes: 3 additions & 0 deletions contrib/opencensus-ext-azure/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Shutdown Statsbeat when hitting error/exception threshold
([#1127](https://github.com/census-instrumentation/opencensus-python/pull/1127))

## 1.1.4
Released 2022-04-20

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@

import json
import logging
import os
import threading
import time

import requests
from azure.core.exceptions import ClientAuthenticationError
from azure.identity._exceptions import CredentialUnavailableError

from opencensus.ext.azure.statsbeat import state

try:
from urllib.parse import urlparse
except ImportError:
Expand All @@ -34,13 +35,19 @@
_MONITOR_OAUTH_SCOPE = "https://monitor.azure.com//.default"
_requests_lock = threading.Lock()
_requests_map = {}
_REACHED_INGESTION_STATUS_CODES = (200, 206, 402, 408, 429, 439, 500)


class TransportMixin(object):

# check to see if collecting requests information related to statsbeats
def _check_stats_collection(self):
return not os.environ.get("APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL") and (not hasattr(self, '_is_stats') or not self._is_stats) # noqa: E501
return state.is_statsbeat_enabled() and \
not state.get_statsbeat_shutdown() and \
not self._is_stats_exporter()

# check if the current exporter is a statsbeat metric exporter
# only applies to metrics exporter
def _is_stats_exporter(self):
return hasattr(self, '_is_stats') and self._is_stats

Expand Down Expand Up @@ -128,7 +135,13 @@ def _transmit(self, envelopes):
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
else:
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501

if self._is_stats_exporter() and \
not state.get_statsbeat_shutdown() and \
not state.get_statsbeat_initial_success():
# If ingestion threshold during statsbeat initialization is
# reached, return back code to shut it down
if _statsbeat_failure_reached_threshold():
return -2
return exception

text = 'N/A'
Expand All @@ -143,6 +156,19 @@ def _transmit(self, envelopes):
data = json.loads(text)
except Exception:
pass

if self._is_stats_exporter() and \
not state.get_statsbeat_shutdown() and \
not state.get_statsbeat_initial_success():
# If statsbeat exporter, record initialization as success if
# appropriate status code is returned
if _reached_ingestion_status_code(response.status_code):
state.set_statsbeat_initial_success(True)
elif _statsbeat_failure_reached_threshold():
# If ingestion threshold during statsbeat initialization is
# reached, return back code to shut it down
return -2

if response.status_code == 200:
self._consecutive_redirects = 0
if self._check_stats_collection():
Expand Down Expand Up @@ -271,3 +297,13 @@ def _transmit(self, envelopes):
with _requests_lock:
_requests_map['throttle'] = _requests_map.get('throttle', 0) + 1 # noqa: E501
return -response.status_code


def _reached_ingestion_status_code(status_code):
return status_code in _REACHED_INGESTION_STATUS_CODES


def _statsbeat_failure_reached_threshold():
# increment failure counter for sending statsbeat if in initialization
state.increment_statsbeat_initial_failure_count()
return state.get_statsbeat_initial_failure_count() >= 3
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
)
from opencensus.ext.azure.common.storage import LocalFileStorage
from opencensus.ext.azure.common.transport import TransportMixin
from opencensus.ext.azure.metrics_exporter import statsbeat_metrics
from opencensus.ext.azure.statsbeat import statsbeat
from opencensus.trace import execution_context

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -67,7 +67,7 @@ def __init__(self, **options):
atexit.register(self.close, self.options.grace_period)
# start statsbeat on exporter instantiation
if not os.environ.get("APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"):
statsbeat_metrics.collect_statsbeat_metrics(self.options)
statsbeat.collect_statsbeat_metrics(self.options)
# For redirects
self._consecutive_redirects = 0 # To prevent circular redirects

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def export_metrics(self, metrics):
for batch in batched_envelopes:
batch = self.apply_telemetry_processors(batch)
result = self._transmit(batch)
# If statsbeat exporter and received signal to shutdown
if self._is_stats_exporter() and result == -2:
from opencensus.ext.azure.statsbeat import statsbeat
statsbeat.shutdown_statsbeat_metrics()
return
# Only store files if local storage enabled
if self.storage and result > 0:
self.storage.put(batch, result)
Expand Down Expand Up @@ -144,10 +149,12 @@ def _create_envelope(self, data_point, timestamp, properties):
return envelope

def shutdown(self):
# Flush the exporter thread
# Do not flush if metrics exporter for stats
if self.exporter_thread and not self._is_stats:
self.exporter_thread.close()
if self.exporter_thread:
# flush if metrics exporter is not for stats
if not self._is_stats:
self.exporter_thread.close()
else:
self.exporter_thread.cancel()
# Shutsdown storage worker
if self.storage:
self.storage.close()
Expand All @@ -163,7 +170,7 @@ def new_metrics_exporter(**options):
exporter,
interval=exporter.options.export_interval)
if not os.environ.get("APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"):
from opencensus.ext.azure.metrics_exporter import statsbeat_metrics
from opencensus.ext.azure.statsbeat import statsbeat
# Stats will track the user's ikey
statsbeat_metrics.collect_statsbeat_metrics(exporter.options)
statsbeat.collect_statsbeat_metrics(exporter.options)
return exporter
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2020, OpenCensus Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import threading

_STATSBEAT_STATE = {
"INITIAL_FAILURE_COUNT": 0,
"INITIAL_SUCCESS": False,
"SHUTDOWN": False,
}
_STATSBEAT_STATE_LOCK = threading.Lock()


def is_statsbeat_enabled():
return not os.environ.get("APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL")


def increment_statsbeat_initial_failure_count():
with _STATSBEAT_STATE_LOCK:
_STATSBEAT_STATE["INITIAL_FAILURE_COUNT"] += 1


def get_statsbeat_initial_failure_count():
return _STATSBEAT_STATE["INITIAL_FAILURE_COUNT"]


def set_statsbeat_initial_success(success):
with _STATSBEAT_STATE_LOCK:
_STATSBEAT_STATE["INITIAL_SUCCESS"] = success


def get_statsbeat_initial_success():
return _STATSBEAT_STATE["INITIAL_SUCCESS"]


def get_statsbeat_shutdown():
return _STATSBEAT_STATE["SHUTDOWN"]
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading

from opencensus.ext.azure.metrics_exporter import MetricsExporter
from opencensus.ext.azure.metrics_exporter.statsbeat_metrics.statsbeat import (
from opencensus.ext.azure.statsbeat.state import (
_STATSBEAT_STATE,
_STATSBEAT_STATE_LOCK,
)
from opencensus.ext.azure.statsbeat.statsbeat_metrics import (
_STATS_SHORT_EXPORT_INTERVAL,
_get_stats_connection_string,
_StatsbeatMetrics,
Expand Down Expand Up @@ -55,6 +58,29 @@ def collect_statsbeat_metrics(options):
exporter,
exporter.options.export_interval)
_STATSBEAT_EXPORTER = exporter
with _STATSBEAT_STATE_LOCK:
_STATSBEAT_STATE["INITIAL_FAILURE_COUNT"] = 0
_STATSBEAT_STATE["INITIAL_SUCCESS"] = 0
_STATSBEAT_STATE["SHUTDOWN"] = False


def shutdown_statsbeat_metrics():
# pylint: disable=global-statement
global _STATSBEAT_METRICS
global _STATSBEAT_EXPORTER
shutdown_success = False
if _STATSBEAT_METRICS is not None and _STATSBEAT_EXPORTER is not None and not _STATSBEAT_STATE["SHUTDOWN"]: # noqa: E501
with _STATSBEAT_LOCK:
try:
_STATSBEAT_EXPORTER.shutdown()
_STATSBEAT_EXPORTER = None
_STATSBEAT_METRICS = None
shutdown_success = True
except: # pylint: disable=broad-except # noqa: E722
pass
if shutdown_success:
with _STATSBEAT_STATE_LOCK:
_STATSBEAT_STATE["SHUTDOWN"] = True


class _AzureStatsbeatMetricsProducer(MetricProducer):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
)
from opencensus.ext.azure.common.storage import LocalFileStorage
from opencensus.ext.azure.common.transport import TransportMixin
from opencensus.ext.azure.metrics_exporter import statsbeat_metrics
from opencensus.ext.azure.statsbeat import statsbeat
from opencensus.trace import attributes_helper
from opencensus.trace.span import SpanKind

Expand Down Expand Up @@ -76,7 +76,7 @@ def __init__(self, **options):
atexit.register(self._stop, self.options.grace_period)
# start statsbeat on exporter instantiation
if not os.environ.get("APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"):
statsbeat_metrics.collect_statsbeat_metrics(self.options)
statsbeat.collect_statsbeat_metrics(self.options)
# For redirects
self._consecutive_redirects = 0 # To prevent circular redirects

Expand Down
36 changes: 17 additions & 19 deletions contrib/opencensus-ext-azure/tests/test_azure_metrics_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,24 @@ def test_shutdown(self):
mock_thread.close.assert_called_once()
mock_storage.close.assert_called_once()

def test_shutdown_statsbeat(self):
mock_thread = mock.Mock()
mock_storage = mock.Mock()
exporter = MetricsExporter(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd'
)
exporter.exporter_thread = mock_thread
exporter._is_stats = True
exporter.storage = mock_storage
exporter.shutdown()
mock_thread.cancel.assert_called_once()
mock_storage.close.assert_called_once()

@mock.patch('opencensus.ext.azure.metrics_exporter'
'.transport.get_exporter_thread')
def test_new_metrics_exporter(self, exporter_mock):
with mock.patch('opencensus.ext.azure.metrics_exporter'
'.statsbeat_metrics.collect_statsbeat_metrics') as hb:
with mock.patch('opencensus.ext.azure.statsbeat'
'.statsbeat.collect_statsbeat_metrics') as hb:
hb.return_value = None
iKey = '12345678-1234-5678-abcd-12345678abcd'
exporter = new_metrics_exporter(instrumentation_key=iKey)
Expand All @@ -227,8 +240,8 @@ def test_new_metrics_exporter(self, exporter_mock):
@mock.patch('opencensus.ext.azure.metrics_exporter'
'.transport.get_exporter_thread')
def test_new_metrics_exporter_no_standard_metrics(self, exporter_mock):
with mock.patch('opencensus.ext.azure.metrics_exporter'
'.statsbeat_metrics.collect_statsbeat_metrics') as hb:
with mock.patch('opencensus.ext.azure.statsbeat'
'.statsbeat.collect_statsbeat_metrics') as hb:
hb.return_value = None
iKey = '12345678-1234-5678-abcd-12345678abcd'
exporter = new_metrics_exporter(
Expand All @@ -240,18 +253,3 @@ def test_new_metrics_exporter_no_standard_metrics(self, exporter_mock):
producer_class = standard_metrics.AzureStandardMetricsProducer
self.assertFalse(isinstance(exporter_mock.call_args[0][0][0],
producer_class))

@unittest.skip("Skip because disabling heartbeat metrics")
@mock.patch('opencensus.ext.azure.metrics_exporter'
'.transport.get_exporter_thread')
def test_new_metrics_exporter_heartbeat(self, exporter_mock):
with mock.patch('opencensus.ext.azure.metrics_exporter'
'.statsbeat_metrics.collect_statsbeat_metrics') as hb:
iKey = '12345678-1234-5678-abcd-12345678abcd'
exporter = new_metrics_exporter(instrumentation_key=iKey)

self.assertEqual(exporter.options.instrumentation_key, iKey)
self.assertEqual(len(hb.call_args_list), 1)
self.assertEqual(len(hb.call_args[0]), 2)
self.assertEqual(hb.call_args[0][0], None)
self.assertEqual(hb.call_args[0][1], iKey)

0 comments on commit 7cbf82f

Please sign in to comment.