Skip to content

Commit

Permalink
Merge pull request #2103 from DanCech/metrics-index
Browse files Browse the repository at this point in the history
Move /metrics/index.json handling into finders & use worker pool
  • Loading branch information
DanCech committed Nov 13, 2017
2 parents 830367b + 8eef60d commit d148f20
Show file tree
Hide file tree
Showing 18 changed files with 302 additions and 210 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ env:
- TOXENV=py27-django18-pyparsing2
- TOXENV=py27-django19-pyparsing2
- TOXENV=py27-django110-pyparsing2
- TOXENV=py27-django111-pyparsing2
- TOXENV=py27-django111-pyparsing2-rrdtool
- TOXENV=py27-django111-pyparsing2-mysql TEST_MYSQL_PASSWORD=graphite
- TOXENV=py27-django111-pyparsing2-postgresql TEST_POSTGRESQL_PASSWORD=graphite
- TOXENV=docs
Expand All @@ -16,6 +16,7 @@ addons:
apt:
packages:
- libcairo2-dev
- librrd-dev
postgresql: "9.5"

services:
Expand Down
4 changes: 2 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
[tox]
envlist =
py27-django1{8,9,10,11}-pyparsing2,
py27-django111-pyparsing2-{mysql,postgresql},
py27-django1{8,9,10,11}-pyparsing2{,-mysql,-postgresql,-rrdtool},
lint, docs

[testenv]
Expand Down Expand Up @@ -38,6 +37,7 @@ deps =
redis
mysql: mysqlclient
postgresql: psycopg2
rrdtool: rrdtool

[testenv:docs]
basepython = python2.7
Expand Down
21 changes: 21 additions & 0 deletions webapp/graphite/finders/ceres.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

import os.path

# Use the built-in version of walk if possible, otherwise
# use the scandir module version
try:
from os import walk
except ImportError:
from scandir import walk

from glob import glob
from ceres import CeresTree, CeresNode
from django.conf import settings
Expand Down Expand Up @@ -47,3 +54,17 @@ def find_nodes(self, query):

elif os.path.isdir(fs_path):
yield BranchNode(metric_path)

def get_index(self, requestContext):
matches = []

for root, _, files in walk(settings.CERES_DIR):
root = root.replace(settings.CERES_DIR, '')
for filename in files:
if filename == '.ceres-node':
matches.append(root)

return sorted([
m.replace('/', '.').lstrip('.')
for m in matches
])
26 changes: 25 additions & 1 deletion webapp/graphite/finders/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from graphite.logger import log
from graphite.node import LeafNode, BranchNode
from graphite.render.hashing import compactHash
from graphite.util import unpickle, logtime, is_local_interface
from graphite.util import unpickle, logtime, is_local_interface, json

from graphite.finders.utils import BaseFinder
from graphite.readers.remote import RemoteReader
Expand Down Expand Up @@ -126,6 +126,30 @@ def fetch(self, patterns, start_time, end_time, now=None, requestContext=None):
reader = RemoteReader(self, {}, bulk_query=patterns)
return reader.fetch_multi(start_time, end_time, now, requestContext)

def get_index(self, requestContext):
url = '/metrics/index.json'

headers = requestContext.get('forwardHeaders')

result = self.request(
url,
fields=[
('local', '1'),
],
headers=headers,
timeout=settings.REMOTE_FIND_TIMEOUT)

try:
results = json.loads(result.data)
except Exception as err:
self.fail()
log.exception(
"RemoteFinder[%s] Error decoding index response from %s: %s" %
(self.host, result.url_full, err))
raise Exception("Error decoding index response from %s: %s" % (result.url_full, err))

return results

def request(self, url, fields=None, headers=None, timeout=None):
url = "%s://%s%s" % (
'https' if settings.INTRACLUSTER_HTTPS else 'http', self.host, url)
Expand Down
32 changes: 31 additions & 1 deletion webapp/graphite/finders/standard.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import bisect
import fnmatch
import operator
from os.path import isdir, isfile, join, basename
from os.path import isdir, isfile, join, basename, splitext
from django.conf import settings

# Use the built-in version of scandir/walk if possible, otherwise
Expand Down Expand Up @@ -166,3 +168,31 @@ def _find_paths(self, current_dir, patterns):

for base_name in matching_files + matching_subdirs:
yield join(current_dir, base_name)

def get_index(self, requestContext):
matches = []

for root, _, files in walk(settings.WHISPER_DIR):
root = root.replace(settings.WHISPER_DIR, '')
for base_name in files:
if fnmatch.fnmatch(base_name, '*.wsp'):
match = join(root, base_name).replace('.wsp', '').replace('/', '.').lstrip('.')
bisect.insort_left(matches, match)

# unlike 0.9.x, we're going to use os.walk with followlinks
# since we require Python 2.7 and newer that supports it
if RRDReader.supported:
for root, _, files in walk(settings.RRD_DIR, followlinks=True):
root = root.replace(settings.RRD_DIR, '')
for base_name in files:
if fnmatch.fnmatch(base_name, '*.rrd'):
absolute_path = join(settings.RRD_DIR, root, base_name)
base_name = splitext(base_name)[0]
metric_path = join(root, base_name)
rrd = RRDReader(absolute_path, metric_path)
for datasource_name in rrd.get_datasources(absolute_path):
match = join(metric_path, datasource_name).replace('.rrd', '').replace('/', '.').lstrip('.')
if match not in matches:
bisect.insort_left(matches, match)

return matches
18 changes: 18 additions & 0 deletions webapp/graphite/finders/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ def find_nodes(self, query):
generator of Node
"""

def get_index(self, requestContext):
"""Get a list of all series
Args:
requestContext
Returns:
list of series
"""
query = FindQuery(
'**', None, None,
local=requestContext.get('localOnly'),
headers=requestContext.get('forwardHeaders'),
leaves_only=True,
)

return sorted([node.path for node in self.find_nodes(query) if node.is_leaf])

# The methods below are fully optional and BaseFinder provides
# a default implementation. They can be re-implemented by finders
# that could provide a more efficient way of doing it.
Expand Down
21 changes: 3 additions & 18 deletions webapp/graphite/local_settings.py.example
Original file line number Diff line number Diff line change
Expand Up @@ -265,26 +265,11 @@ DEFAULT_XFILES_FACTOR = 0
# used.
#CLUSTER_SERVERS = ["10.0.2.2:80", "10.0.2.3:80"]

# Creates a pool of worker threads to which tasks can be dispatched. This makes
# sense if there are multiple CLUSTER_SERVERS because then the communication
# with them can be parallelized
# The number of threads is equal to:
# POOL_WORKERS_PER_BACKEND * len(CLUSTER_SERVERS) + POOL_WORKERS
# Be careful when increasing the number of threads, in particular if your start
# multiple graphite-web processes (with uwsgi or similar) as this will increase
# memory consumption (and number of connections to memcached).
# Use a pool of worker threads to dispatch finder requests in parallel
#USE_WORKER_POOL = True

# The number of worker threads that should be created per backend server.
# It makes sense to have more than one thread per backend server if
# the graphite-web web server itself is multi threaded and can handle multiple
# incoming requests at once.
#POOL_WORKERS_PER_BACKEND = 1

# A baseline number of workers that should always be created, no matter how many
# cluster servers are configured. These are used for other tasks that can be
# off-loaded from the request handling threads.
#POOL_WORKERS = 1
# Maximum number of worker threads for concurrent storage operations
#POOL_MAX_WORKERS = 10

# This setting controls whether https is used to communicate between cluster members
#INTRACLUSTER_HTTPS = False
Expand Down
86 changes: 22 additions & 64 deletions webapp/graphite/metrics/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,20 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
import fnmatch
import os
import pytz
import urllib

from datetime import datetime
from django.conf import settings

from graphite.carbonlink import CarbonLink
from graphite.compat import HttpResponse, HttpResponseBadRequest
from graphite.user_util import getProfile
from graphite.util import json
from graphite.logger import log
from graphite.readers import RRDReader
from graphite.storage import STORE
from graphite.carbonlink import CarbonLink
from graphite.storage import extractForwardHeaders
from graphite.render.attime import parseATTime
from graphite.storage import STORE, extractForwardHeaders
from graphite.user_util import getProfile
from graphite.util import epoch
from graphite.util import json

try:
import cPickle as pickle
Expand All @@ -39,59 +36,19 @@ def index_json(request):
queryParams = request.GET.copy()
queryParams.update(request.POST)

jsonp = queryParams.get('jsonp', False)
cluster = queryParams.get('cluster', False)

def find_matches():
matches = []

for root, dirs, files in os.walk(settings.WHISPER_DIR):
root = root.replace(settings.WHISPER_DIR, '')
for basename in files:
if fnmatch.fnmatch(basename, '*.wsp'):
matches.append(os.path.join(root, basename))

for root, dirs, files in os.walk(settings.CERES_DIR):
root = root.replace(settings.CERES_DIR, '')
for filename in files:
if filename == '.ceres-node':
matches.append(root)

# unlike 0.9.x, we're going to use os.walk with followlinks
# since we require Python 2.7 and newer that supports it
if RRDReader.supported:
for root, dirs, files in os.walk(settings.RRD_DIR, followlinks=True):
root = root.replace(settings.RRD_DIR, '')
for basename in files:
if fnmatch.fnmatch(basename, '*.rrd'):
absolute_path = os.path.join(settings.RRD_DIR, root, basename)
(basename,extension) = os.path.splitext(basename)
metric_path = os.path.join(root, basename)
rrd = RRDReader(absolute_path, metric_path)
for datasource_name in rrd.get_datasources(absolute_path):
matches.append(os.path.join(metric_path, datasource_name))

matches = [
m
.replace('.wsp', '')
.replace('.rrd', '')
.replace('/', '.')
.lstrip('.')
for m in sorted(matches)
]
return matches

matches = []
if cluster and len(settings.CLUSTER_SERVERS) >= 1:
try:
matches = reduce( lambda x, y: list(set(x + y)), \
[json.loads(urllib.urlopen('http://' + cluster_server + '/metrics/index.json').read()) \
for cluster_server in settings.CLUSTER_SERVERS])
except urllib.URLError:
log.exception()
return json_response_for(request, matches, jsonp=jsonp, status=500)
else:
matches = find_matches()
try:
jsonp = queryParams.get('jsonp', False)

requestContext = {
'localOnly': int( queryParams.get('local', 0) ),
'forwardHeaders': extractForwardHeaders(request),
}

matches = STORE.get_index(requestContext)
except Exception:
log.exception()
return json_response_for(request, [], jsonp=jsonp, status=500)

return json_response_for(request, matches, jsonp=jsonp)


Expand Down Expand Up @@ -387,12 +344,13 @@ def json_nodes(nodes):
return sorted(nodes_info, key=lambda item: item['path'])


def json_response_for(request, data, content_type='application/json',
jsonp=False, **kwargs):
def json_response_for(request, data, content_type='application/json', jsonp=False, **kwargs):
accept = request.META.get('HTTP_ACCEPT', 'application/json')
ensure_ascii = accept == 'application/json'

content = json.dumps(data, ensure_ascii=ensure_ascii)
pretty = bool(request.POST.get('pretty', request.GET.get('pretty')))

content = json.dumps(data, ensure_ascii=ensure_ascii, indent=(2 if pretty else None))
if jsonp:
content = "%s(%s)" % (jsonp, content)
content_type = 'text/javascript'
Expand Down
5 changes: 3 additions & 2 deletions webapp/graphite/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@

# Cluster settings
CLUSTER_SERVERS = []

# Worker Pool
USE_WORKER_POOL = True
POOL_WORKERS_PER_BACKEND = 1
POOL_WORKERS = 1
POOL_MAX_WORKERS = 10

# This settings control whether https is used to communicate between cluster members
INTRACLUSTER_HTTPS = False
Expand Down

0 comments on commit d148f20

Please sign in to comment.