Skip to content

Commit

Permalink
Refactor azure exporter transport response codes (#1131)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen committed Jun 23, 2022
1 parent 7cbf82f commit 0919b61
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@
_REACHED_INGESTION_STATUS_CODES = (200, 206, 402, 408, 429, 439, 500)


class TransportStatusCode:
SUCCESS = 0
RETRY = 1
DROP = 2
STATSBEAT_SHUTDOWN = 3


class TransportMixin(object):

# check to see if collecting requests information related to statsbeats
Expand All @@ -59,7 +66,7 @@ def _transmit_from_storage(self):
if blob.lease(self.options.timeout + 5):
envelopes = blob.get()
result = self._transmit(envelopes)
if result > 0:
if result is TransportStatusCode.RETRY:
blob.lease(result)
else:
blob.delete()
Expand All @@ -74,7 +81,7 @@ def _transmit(self, envelopes):
"""
if not envelopes:
return 0
exception = None
status = None
try:
start_time = time.time()
headers = {
Expand All @@ -101,37 +108,37 @@ def _transmit(self, envelopes):
if not self._is_stats_exporter():
logger.warning(
'Request time out. Ingestion may be backed up. Retrying.')
exception = self.options.minimum_retry_interval
status = TransportStatusCode.RETRY
except requests.RequestException as ex:
if not self._is_stats_exporter():
logger.warning(
'Retrying due to transient client side error %s.', ex)
# client side error (retryable)
exception = self.options.minimum_retry_interval
status = TransportStatusCode.RETRY
except CredentialUnavailableError as ex:
if not self._is_stats_exporter():
logger.warning('Credential error. %s. Dropping telemetry.', ex)
exception = -1
status = TransportStatusCode.DROP
except ClientAuthenticationError as ex:
if not self._is_stats_exporter():
logger.warning('Authentication error %s', ex)
exception = self.options.minimum_retry_interval
status = TransportStatusCode.RETRY
except Exception as ex:
if not self._is_stats_exporter():
logger.warning(
'Error when sending request %s. Dropping telemetry.', ex)
# Extraneous error (non-retryable)
exception = -1
status = TransportStatusCode.DROP
finally:
end_time = time.time()
if self._check_stats_collection():
with _requests_lock:
duration = _requests_map.get('duration', 0)
_requests_map['duration'] = duration + (end_time - start_time) # noqa: E501
if exception is not None:
if status is not None:
if self._check_stats_collection():
with _requests_lock:
if exception >= 0:
if status is TransportStatusCode.RETRY:
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
else:
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
Expand All @@ -141,8 +148,8 @@ def _transmit(self, envelopes):
# If ingestion threshold during statsbeat initialization is
# reached, return back code to shut it down
if _statsbeat_failure_reached_threshold():
return -2
return exception
return TransportStatusCode.STATSBEAT_SHUTDOWN
return status

text = 'N/A'
data = None
Expand All @@ -167,14 +174,14 @@ def _transmit(self, envelopes):
elif _statsbeat_failure_reached_threshold():
# If ingestion threshold during statsbeat initialization is
# reached, return back code to shut it down
return -2
return TransportStatusCode.STATSBEAT_SHUTDOWN

if response.status_code == 200:
self._consecutive_redirects = 0
if self._check_stats_collection():
with _requests_lock:
_requests_map['success'] = _requests_map.get('success', 0) + 1 # noqa: E501
return 0
return TransportStatusCode.SUCCESS
# Status code not 200, 439 or 402 counts as failures
if self._check_stats_collection():
if response.status_code != 439 and response.status_code != 402:
Expand Down Expand Up @@ -211,7 +218,7 @@ def _transmit(self, envelopes):
text,
ex,
)
return -response.status_code
return TransportStatusCode.DROP
# cannot parse response body, fallback to retry
if response.status_code in (
206, # Partial Content
Expand All @@ -229,7 +236,7 @@ def _transmit(self, envelopes):
if self._check_stats_collection():
with _requests_lock:
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
return self.options.minimum_retry_interval
return TransportStatusCode.RETRY
# Authentication error
if response.status_code == 401:
if not self._is_stats_exporter():
Expand All @@ -241,7 +248,7 @@ def _transmit(self, envelopes):
if self._check_stats_collection():
with _requests_lock:
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
return self.options.minimum_retry_interval
return TransportStatusCode.RETRY
# Forbidden error
# Can occur when v2 endpoint is used while AI resource is configured
# with disableLocalAuth
Expand All @@ -255,7 +262,7 @@ def _transmit(self, envelopes):
if self._check_stats_collection():
with _requests_lock:
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
return self.options.minimum_retry_interval
return TransportStatusCode.RETRY
# Redirect
if response.status_code in (307, 308):
self._consecutive_redirects += 1
Expand Down Expand Up @@ -296,7 +303,7 @@ def _transmit(self, envelopes):
# 439: Monthly Quota Exceeded (old SDK) <- Currently OC SDK
with _requests_lock:
_requests_map['throttle'] = _requests_map.get('throttle', 0) + 1 # noqa: E501
return -response.status_code
return TransportStatusCode.DROP


def _reached_ingestion_status_code(status_code):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
Message,
)
from opencensus.ext.azure.common.storage import LocalFileStorage
from opencensus.ext.azure.common.transport import TransportMixin
from opencensus.ext.azure.common.transport import (
TransportMixin,
TransportStatusCode,
)
from opencensus.ext.azure.statsbeat import statsbeat
from opencensus.trace import execution_context

Expand Down Expand Up @@ -78,8 +81,11 @@ def _export(self, batch, event=None): # pragma: NO COVER
envelopes = self.apply_telemetry_processors(envelopes)
result = self._transmit(envelopes)
# Only store files if local storage enabled
if self.storage and result > 0:
self.storage.put(envelopes, result)
if self.storage and result is TransportStatusCode.RETRY:
self.storage.put(
envelopes,
self.options.minimum_retry_interval,
)
if event:
if isinstance(event, QueueExitEvent):
self._transmit_from_storage() # send files before exit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
MetricData,
)
from opencensus.ext.azure.common.storage import LocalFileStorage
from opencensus.ext.azure.common.transport import TransportMixin
from opencensus.ext.azure.common.transport import (
TransportMixin,
TransportStatusCode,
)
from opencensus.ext.azure.metrics_exporter import standard_metrics
from opencensus.metrics import transport
from opencensus.metrics.export.metric_descriptor import MetricDescriptorType
Expand Down Expand Up @@ -72,13 +75,14 @@ def export_metrics(self, metrics):
batch = self.apply_telemetry_processors(batch)
result = self._transmit(batch)
# If statsbeat exporter and received signal to shutdown
if self._is_stats_exporter() and result == -2:
if self._is_stats_exporter() and result is \
TransportStatusCode.STATSBEAT_SHUTDOWN:
from opencensus.ext.azure.statsbeat import statsbeat
statsbeat.shutdown_statsbeat_metrics()
return
# Only store files if local storage enabled
if self.storage and result > 0:
self.storage.put(batch, result)
if self.storage and result is TransportStatusCode.RETRY:
self.storage.put(batch, self.options.minimum_retry_interval)

# If there is still room to transmit envelopes, transmit from storage
# if available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
Request,
)
from opencensus.ext.azure.common.storage import LocalFileStorage
from opencensus.ext.azure.common.transport import TransportMixin
from opencensus.ext.azure.common.transport import (
TransportMixin,
TransportStatusCode,
)
from opencensus.ext.azure.statsbeat import statsbeat
from opencensus.trace import attributes_helper
from opencensus.trace.span import SpanKind
Expand Down Expand Up @@ -204,8 +207,11 @@ def emit(self, batch, event=None):
envelopes = self.apply_telemetry_processors(envelopes)
result = self._transmit(envelopes)
# Only store files if local storage enabled
if self.storage and result > 0:
self.storage.put(envelopes, result)
if self.storage and result is TransportStatusCode.RETRY:
self.storage.put(
envelopes,
self.options.minimum_retry_interval
)
if event:
if isinstance(event, QueueExitEvent):
self._transmit_from_storage() # send files before exit
Expand Down

0 comments on commit 0919b61

Please sign in to comment.