Skip to content

Commit

Permalink
adding clustering support
Browse files Browse the repository at this point in the history
  • Loading branch information
cdavis committed Jun 23, 2009
1 parent c144a5e commit 71d395e
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 16 deletions.
80 changes: 80 additions & 0 deletions lib/graphite/clustering.py
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,80 @@
from httplib import HTTPConnection
from urllib import urlencode
try:
import cPickle as pickle
except ImportError:
import pickle


class ClusterMember:
def __init__(self, host, timeout=15):
self.host = host
self.timeout = timeout

def find(self, pattern):
request = FindRequest(self, pattern)
request.send()
return request


class FindRequest:
def __init__(self, server, pattern):
self.server = server
self.pattern = pattern
self.connection = None

def send(self):
self.connection = HTTPConnection(self.server.host, timeout=self.server.timeout)
self.connection.request('GET', '/browser/local/?pattern=' + self.pattern)

def get_results(self, suppressErrors=True):
if not self.connection:
self.send()

try:
response = self.connection.getresponse()
assert response.status == 200, "received error response %s - %s" % (response.status, response.reason)
result_data = response.read()
results = pickle.loads(result_data)
except:
if not suppressErrors:
raise
else:
results = []

return [ RemoteNode(self,graphite_path,isLeaf) for (graphite_path,isLeaf) in results ]


class RemoteNode:
def __init__(self, server, graphite_path, isLeaf):
self.server = server
self.fs_path = None
self.graphite_path
self.__isLeaf = isLeaf

def fetch(self, startTime, endTime):
if self.__isLeaf:
return []

query_params = [
('target', self.graphite_path),
('pickle', 'true'),
('from', str(startTime)),
('until', str(endTime))
]
query_string = urlencode(query_params)

connection = HTTPConnection(self.server.host, timeout=self.server.timeout)
connection.request('GET', '/render/?' + query_string)
response = connection.getresponse()
rawData = response.read()

seriesList = pickle.loads(rawData)
assert len(seriesList) == 1, "Invalid result: seriesList=%s" % str(seriesList)
series = seriesList[0]

timeInfo = (series['start'], series['end'], series['step'])
return (timeInfo, series['values'])

def isLeaf(self):
return self.__isLeaf
45 changes: 40 additions & 5 deletions lib/graphite/tree.py
Original file line number Original file line Diff line number Diff line change
@@ -1,6 +1,7 @@
import os, time, fnmatch import os, time, fnmatch, socket, errno
from os.path import isdir, isfile, join, splitext, basename from os.path import isdir, isfile, join, splitext, basename
from graphite import whisper from graphite import whisper
from graphite import clustering


try: try:
import rrdtool import rrdtool
Expand All @@ -9,16 +10,50 @@


DATASOURCE_DELIMETER = '::RRD_DATASOURCE::' DATASOURCE_DELIMETER = '::RRD_DATASOURCE::'


# Exposed API
class Finder: class MetricFinder:
"Encapsulate find() functionality for one or more directory trees" "Encapsulate find() functionality for one or more data stores"

def __init__(self, dirs): def __init__(self, dirs):
self.dirs = dirs self.dirs = dirs
self.cluster = []

def configureClusterServers(self, hosts):
self.cluster = [ clustering.ClusterMember(host) for host in hosts if not is_local_interface(host) ]


def find(self, path_pattern): def find(self, path_pattern):
# Start remote searches
found = set()
remote_requests = [ member.find(path_pattern) for member in self.cluster ]

# Search locally
for dir in self.dirs: for dir in self.dirs:
for match in find(dir, path_pattern): for match in find(dir, path_pattern):
yield match if match.graphite_path not in found:
yield match
found.add(match.graphite_path)

# Gather remote search results
for remote_request in remote_requests:
for match in remote_request.get_results():
if match.graphite_path not in found:
yield match
found.add(graphite_path)


def is_local_interface(host):
if ':' in host:
host = host.split(':',1)[0]

try:
sock = socket.socket()
sock.bind( (host,42) ) #port doesn't matter
sock.close()
except Exception, e:
if hasattr(e,'errno') and e.errno == errno.EADDRNOTAVAIL:
return False

return True




def find(root_dir, pattern): def find(root_dir, pattern):
Expand Down
1 change: 1 addition & 0 deletions webapp/web/browser/urls.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
('^header/?$', 'header'), ('^header/?$', 'header'),
('^search/?$', 'search'), ('^search/?$', 'search'),
('^tree/?$', 'treeLookup'), ('^tree/?$', 'treeLookup'),
('^local/?$', 'localLookup'),
('^mygraph/?$', 'myGraphLookup'), ('^mygraph/?$', 'myGraphLookup'),
('^usergraph/?$', 'userGraphLookup'), ('^usergraph/?$', 'userGraphLookup'),
('', 'browser'), ('', 'browser'),
Expand Down
16 changes: 16 additions & 0 deletions webapp/web/browser/views.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
from web.account.models import Profile from web.account.models import Profile
from web.util import getProfile, getProfileByUsername, getQueryString, defaultUser from web.util import getProfile, getProfileByUsername, getQueryString, defaultUser
from web.logger import log from web.logger import log
try:
import cPickle as pickle
except ImportError:
import pickle




def header(request): def header(request):
Expand Down Expand Up @@ -121,6 +125,18 @@ def treeLookup(request):
return json_response(nodes) return json_response(nodes)




def localLookup(request):
"View used for graphite.tree clustering"
pattern = str( request.GET['pattern'] )

matches = list( settings.LOCAL_FINDER.find(pattern) )
matches.sort(key=lambda node: node.name)

results = [ (node.graphite_path, node.isLeaf()) for node in matches ]
result_data = pickle.dumps(results, protocol=-1)
return HttpResponse(result_data, mimetype='application/pickle')


def myGraphLookup(request): def myGraphLookup(request):
"View for My Graphs navigation" "View for My Graphs navigation"
profile = getProfile(request,allowDefault=False) profile = getProfile(request,allowDefault=False)
Expand Down
18 changes: 18 additions & 0 deletions webapp/web/render/datatypes.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License.""" limitations under the License."""
import copy_reg



class TimeSeries(list): class TimeSeries(list):
def __init__(self,name,start,end,step,values,consolidate='average'): def __init__(self,name,start,end,step,values,consolidate='average'):
Expand Down Expand Up @@ -60,3 +62,19 @@ def __consolidate(self,values): #TODO pluggable consolidators? wrap multiple ser
def __str__(self): def __str__(self):
return 'TimeSeries(name=%s,start=%s,end=%s,step=%s)' % (self.name, self.start, self.end, self.step) return 'TimeSeries(name=%s,start=%s,end=%s,step=%s)' % (self.name, self.start, self.end, self.step)
__repr__ = __str__ __repr__ = __str__

@staticmethod
def __pickle__(self):
info = {
'name' : self.name,
'start' : self.start,
'end' : self.end,
'step' : self.step,
'values' : list(self),
}
type = dict
constructorArgs = (info,)
return (type, constructorArgs)


copy_reg.pickle(TimeSeries, TimeSeries.__pickle__)
4 changes: 1 addition & 3 deletions webapp/web/render/evaluator.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from web.logger import log from web.logger import log




Finder = settings.FINDER



def evaluateTarget(target, timeInterval): def evaluateTarget(target, timeInterval):
tokens = grammar.parseString(target) tokens = grammar.parseString(target)
Expand All @@ -28,7 +26,7 @@ def evaluateTokens(tokens, timeInterval):
pathExpr = pathExpr[9:] pathExpr = pathExpr[9:]
seriesList = [] seriesList = []
(startTime,endTime) = timeInterval (startTime,endTime) = timeInterval
for dbFile in Finder.find(pathExpr): for dbFile in settings.FINDER.find(pathExpr):
log.metric_access(dbFile.graphite_path) log.metric_access(dbFile.graphite_path)
getCacheResults = CarbonLink.sendRequest(dbFile.graphite_path) getCacheResults = CarbonLink.sendRequest(dbFile.graphite_path)
dbResults = dbFile.fetch( timestamp(startTime), timestamp(endTime) ) dbResults = dbFile.fetch( timestamp(startTime), timestamp(endTime) )
Expand Down
24 changes: 18 additions & 6 deletions webapp/web/render/views.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
from httplib import HTTPConnection, CannotSendRequest from httplib import HTTPConnection, CannotSendRequest
from urlparse import urlsplit from urlparse import urlsplit
from cStringIO import StringIO from cStringIO import StringIO
from cPickle import dumps, loads try:
import cPickle as pickle
except ImportError:
import pickle


from web.util import getProfileByUsername from web.util import getProfileByUsername
from web.logger import log from web.logger import log
Expand Down Expand Up @@ -85,17 +88,24 @@ def renderView(request):
data.extend(seriesList) data.extend(seriesList)
log.rendering('Retrieval of %s took %.6f' % (target,time() - t)) log.rendering('Retrieval of %s took %.6f' % (target,time() - t))


if useCache:
cache.set(dataKey, data)

# If data is all we needed, we're done # If data is all we needed, we're done
if 'pickle' in requestOptions:
response = HttpResponse(mimetype='application/pickle')
pickle.dump(data, response, protocol=-1)

log.rendering('Total pickle rendering time %.6f' % (time() - start))
return response

if 'rawData' in requestOptions: if 'rawData' in requestOptions:
response = HttpResponse(mimetype='text/plain') response = HttpResponse(mimetype='text/plain')
for series in data: for series in data:
response.write( "%s,%d,%d,%d|" % (series.name, series.start, series.end, series.step) ) response.write( "%s,%d,%d,%d|" % (series.name, series.start, series.end, series.step) )
response.write( ','.join(map(str,series)) ) response.write( ','.join(map(str,series)) )
response.write('\n') response.write('\n')


if useCache:
cache.set(dataKey, data)

log.rendering('Total rawData rendering time %.6f' % (time() - start)) log.rendering('Total rawData rendering time %.6f' % (time() - start))
return response return response


Expand Down Expand Up @@ -135,6 +145,8 @@ def parseOptions(request):
target = target[9:] target = target[9:]
requestOptions['targets'].append(target) requestOptions['targets'].append(target)


if 'pickle' in queryParams:
requestOptions['pickle'] = True
if 'rawData' in queryParams: if 'rawData' in queryParams:
requestOptions['rawData'] = True requestOptions['rawData'] = True
if 'noCache' in queryParams: if 'noCache' in queryParams:
Expand Down Expand Up @@ -178,7 +190,7 @@ def parseOptions(request):


def delegateRendering(graphType, graphOptions): def delegateRendering(graphType, graphOptions):
start = time() start = time()
postData = graphType + '\n' + dumps(graphOptions) postData = graphType + '\n' + pickle.dumps(graphOptions)
servers = settings.RENDERING_HOSTS[:] #make a copy so we can shuffle it safely servers = settings.RENDERING_HOSTS[:] #make a copy so we can shuffle it safely
shuffle(servers) shuffle(servers)
for server in servers: for server in servers:
Expand Down Expand Up @@ -225,7 +237,7 @@ def renderLocalView(request):
optionsPickle = reqParams.read() optionsPickle = reqParams.read()
reqParams.close() reqParams.close()
graphClass = GraphTypes[graphType] graphClass = GraphTypes[graphType]
options = loads(optionsPickle) options = pickle.loads(optionsPickle)
image = doImageRender(graphClass, options) image = doImageRender(graphClass, options)
log.rendering("Delegated rendering request took %.6f seconds" % (time() - start)) log.rendering("Delegated rendering request took %.6f seconds" % (time() - start))
return buildResponse(image) return buildResponse(image)
Expand Down
11 changes: 9 additions & 2 deletions webapp/web/settings.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# DO NOT MODIFY THIS FILE DIRECTLY - use local_settings.py instead # DO NOT MODIFY THIS FILE DIRECTLY - use local_settings.py instead
import sys import sys
from os.path import join, dirname, abspath from os.path import join, dirname, abspath
from graphite.tree import Finder from graphite.tree import MetricFinder


DEBUG = False DEBUG = False


Expand All @@ -31,14 +31,14 @@
INDEX_FILE = STORAGE_DIR + 'index' INDEX_FILE = STORAGE_DIR + 'index'
WHITELIST_FILE = LISTS_DIR + 'whitelist' WHITELIST_FILE = LISTS_DIR + 'whitelist'
LOG_DIR = STORAGE_DIR + 'log/' LOG_DIR = STORAGE_DIR + 'log/'
CLUSTER_SERVERS = []


try: try:
import rrdtool import rrdtool
DATA_DIRS = [WHISPER_DIR, RRD_DIR] DATA_DIRS = [WHISPER_DIR, RRD_DIR]
except: except:
DATA_DIRS = [WHISPER_DIR] DATA_DIRS = [WHISPER_DIR]


FINDER = Finder(DATA_DIRS)


#Memcache settings #Memcache settings
MEMCACHE_HOSTS = ['127.0.0.1:11211'] MEMCACHE_HOSTS = ['127.0.0.1:11211']
Expand Down Expand Up @@ -82,6 +82,13 @@
print >> sys.stderr, "Could not import web.local_settings, using defaults!" print >> sys.stderr, "Could not import web.local_settings, using defaults!"




LOCAL_FINDER = MetricFinder(DATA_DIRS)
FINDER = MetricFinder(DATA_DIRS)

if CLUSTER_SERVERS:
FINDER.configureClusterServers(CLUSTER_SERVERS)


#Django settings below, do not touch! #Django settings below, do not touch!
APPEND_SLASH = False APPEND_SLASH = False
TEMPLATE_DEBUG = DEBUG TEMPLATE_DEBUG = DEBUG
Expand Down

0 comments on commit 71d395e

Please sign in to comment.