Skip to content

Commit

Permalink
Merge pull request #146 from earthgecko/py3
Browse files Browse the repository at this point in the history
py3 flux
  • Loading branch information
earthgecko committed Nov 13, 2019
2 parents 12a3d6f + ffa1a5b commit 3136636
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 22 deletions.
23 changes: 18 additions & 5 deletions skyline/flux/populate_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
import traceback
from ast import literal_eval

from redis import StrictRedis
# from redis import StrictRedis
import falcon
import graphyte

sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
sys.path.insert(0, os.path.dirname(__file__))
import settings
# @added 20191111 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
from skyline_functions import get_redis_conn

from logger import set_up_logging
import flux

Expand All @@ -19,6 +23,8 @@
# required metric data is submitted via a POST with a json payload.
validArguments = ['remote_target', 'metric', 'namespace_prefix', 'key', 'user']

skyline_app = 'flux'

LOCAL_DEBUG = False
# LOCAL_DEBUG = True

Expand All @@ -38,10 +44,17 @@
str(CARBON_HOST), str(CARBON_PORT),
str(GRAPHITE_METRICS_PREFIX)))

if settings.REDIS_PASSWORD:
redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
else:
redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
# @modified 20191111 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# if settings.REDIS_PASSWORD:
# redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
# else:
# redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
# @added 20191111 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Added a single functions to deal with Redis connection and the
# charset='utf-8', decode_responses=True arguments required in py3
redis_conn = get_redis_conn(skyline_app)


class PopulateMetric(object):
Expand Down
108 changes: 91 additions & 17 deletions skyline/flux/populate_metric_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
sys.path.insert(0, os.path.dirname(__file__))
import settings
from skyline_functions import send_graphite_metric
from skyline_functions import (
send_graphite_metric,
# @added 20191111 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
get_redis_conn)

logger = set_up_logging('populate_metric_worker')

Expand Down Expand Up @@ -72,10 +76,18 @@ class PopulateMetricWorker(Process):
"""
def __init__(self, queue, parent_pid):
super(PopulateMetricWorker, self).__init__()
if settings.REDIS_PASSWORD:
self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
else:
self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
# @modified 20191111 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# if settings.REDIS_PASSWORD:
# self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
# else:
# self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
# @added 20191111 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Added a single functions to deal with Redis connection and the
# charset='utf-8', decode_responses=True arguments required in py3
self.redis_conn = get_redis_conn(skyline_app)

self.q = queue
self.parent_pid = parent_pid
self.daemon = True
Expand Down Expand Up @@ -135,10 +147,15 @@ def pickle_data_to_graphite(data):
except:
logger.error('populate_metric_worker :: cannot connect to Redis at socket path %s' % (settings.REDIS_SOCKET_PATH))
sleep(2)
if settings.REDIS_PASSWORD:
self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
else:
self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)

# @modified 20191111 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# if settings.REDIS_PASSWORD:
# self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
# else:
# self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
self.redis_conn = get_redis_conn(skyline_app)

metricDict = None
try:
# Get a metric from the queue with a 1 second timeout, each
Expand Down Expand Up @@ -333,6 +350,12 @@ def pickle_data_to_graphite(data):
timestamp = None
value = None

# @added 20191111 - Bug #3312: flux - populate_metric_worker - handle None in datapoints
# And set flux.last key is the returned value from the remote is
# null so that time series that are mostly null do not keep on
# getting added to flux populate_metric by Vista
raw_timeseries = []

for fetch_url in fetch_resolution_urls:
# if recent_last_flux_timestamp_present and remote_host_type == 'prometheus':
# This was for the query query and resample method and not for
Expand Down Expand Up @@ -413,24 +436,33 @@ def pickle_data_to_graphite(data):
value = None
timestamp = None
if remote_host_type == 'graphite':
# @added 20191111 - Bug #3312: flux - populate_metric_worker - handle None in datapoints
raw_timeseries.append([datapoint[1], datapoint[0]])

try:
raw_value = datapoint[0]
if raw_value is None:
datapoints_with_no_value += 1
continue
value = float(datapoint[0])
timestamp = int(datapoint[1])
if value and timestamp:
valid_datapoints.append([value, timestamp])
valid_datapoints.append([value, timestamp])
except:
continue
if remote_host_type == 'prometheus':
# @added 20191111 - Bug #3312: flux - populate_metric_worker - handle None in datapoints
raw_timeseries.append([datapoint[0], datapoint[1]])

try:
raw_value = datapoint[1]
if raw_value is None:
datapoints_with_no_value += 1
continue
timestamp = int(datapoint[0])
value = float(datapoint[1])
except:
continue
if value and timestamp:
valid_datapoints.append([timestamp, value])
valid_datapoints.append([timestamp, value])
datapoints = valid_datapoints

# Order the time series by timestamp as the tuple can shift
Expand All @@ -453,11 +485,14 @@ def pickle_data_to_graphite(data):
for datapoint in datapoints:
try:
if remote_host_type == 'graphite':
raw_value = datapoint[0]
if raw_value is None:
try:
raw_value = datapoint[0]
if raw_value is None:
continue
value = float(datapoint[0])
timestamp = int(datapoint[1])
except:
continue
value = float(datapoint[0])
timestamp = int(datapoint[1])
if remote_host_type == 'prometheus':
# timestamp = int(datapoint[0])
try:
Expand Down Expand Up @@ -514,6 +549,45 @@ def pickle_data_to_graphite(data):
str(datapoints_added_to_timeseries), remote_target,
str(seconds_to_fetch)))

# @added 20191111 - Bug #3312: flux - populate_metric_worker - handle None in datapoints
# And set flux.last key is the returned value from the remote is
# null so that time series that are mostly null do not keep on
# getting added to flux populate_metric by Vista
if not timeseries:
set_flux_key = False
try:
sorted_raw_timeseries = sorted(raw_timeseries, key=lambda x: x[0])
last_ts = sorted_raw_timeseries[-1][0]
if int(last_ts) > (end_fecthing - 120):
if sorted_raw_timeseries[-1][1] is None:
set_flux_key = True
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_metric_worker :: failed to determine if last value was null')
if set_flux_key:
try:
# Update Redis flux key
cache_key = 'flux.last.%s' % metric
metric_data = [int(last_ts), None]
self.redis_conn.set(cache_key, str(metric_data))
logger.info('populate_metric_worker :: even though no data points so as to not loop round on this metric, set the metric Redis key - %s - %s' % (
cache_key, str(metric_data)))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_metric_worker :: even though no data points, failed to set Redis key - %s - %s' % (
cache_key, str(metric_data)))
# Adding to the vista.fetcher.unique_metrics Redis set
redis_set = 'vista.fetcher.unique_metrics'
data = str(remote_target)
try:
self.redis_conn.sadd(redis_set, data)
logger.info('populate_metric_worker :: even though no data points, added %s to Redis set %s' % (
remote_target, redis_set))
except:
logger.info(traceback.format_exc())
logger.error('error :: populate_metric_worker :: even though no data points, failed to add %s to Redis set %s' % (
str(data), str(redis_set)))

if not timeseries:
logger.info('populate_metric_worker :: no data in the timeseries list for the time series for %s' % metric)
continue
Expand Down

0 comments on commit 3136636

Please sign in to comment.