Skip to content

Commit

Permalink
hbase metaquery support
Browse files Browse the repository at this point in the history
- move HBase configuration comment to docs

- store metadata in resource table with prefix "r_"

- get_resources and get_meters, get_samples support metaquery

- enable api v1 metaquery tests

Change-Id: I3285bb420283c2385e6f340ff30e951d58dcb450
Implements: blueprint hbase-metadata-query
Fixes: bug #1146655
  • Loading branch information
shengjie min authored and shengjie-min committed May 26, 2013
1 parent 9d339a5 commit 09b4623
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 83 deletions.
200 changes: 117 additions & 83 deletions ceilometer/storage/impl_hbase.py
Expand Up @@ -16,36 +16,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Openstack Ceilometer HBase storage backend
.. note::
This driver is designed to enable Ceilometer store its data in HBase.
The implementation is using HBase Thrift interface so it's necessary to have
the HBase Thrift server installed and started:
(https://ccp.cloudera.com/display/CDHDOC/HBase+Installation)
This driver has been tested against HBase 0.92.1/CDH 4.1.1,
HBase 0.94.4/HDP 1.2 and HBase 0.94.5/Apache.
Versions earlier than 0.92.1 are not supported due to feature
incompatibility.
Due to limitations of HBase the driver implements its own data aggregations
which may harm its performance. It is likely that the performance could be
improved if co-processors were used, however at the moment the co-processor
support is not exposed through Thrift API.
The following four tables are expected to exist in HBase:
create 'project', {NAME=>'f'}
create 'user', {NAME=>'f'}
create 'resource', {NAME=>'f'}
create 'meter', {NAME=>'f'}
The driver is using HappyBase which is a wrapper library used to interact
with HBase via Thrift protocol:
http://happybase.readthedocs.org/en/latest/index.html#
"""HBase storage backend
"""

from sets import Set
from urlparse import urlparse
import json
import hashlib
Expand Down Expand Up @@ -223,16 +196,24 @@ def record_metering_data(self, data):
project['f:s_%s' % data['source']] = "1"
self.project.put(data['project_id'], project)

rts = reverse_timestamp(data['timestamp'])

resource = self.resource.row(data['resource_id'])
new_meter = "%s!%s!%s" % (
data['counter_name'], data['counter_type'], data['counter_unit'])
new_resource = {'f:resource_id': data['resource_id'],
'f:project_id': data['project_id'],
'f:user_id': data['user_id'],
'f:metadata': json.dumps(data['resource_metadata']),
'f:source': data["source"],
'f:m_%s' % new_meter: "1",
# store meters with prefix "m_"
'f:m_%s' % new_meter: "1"
}
# store metadata fields with prefix "r_"
resource_metadata = dict(('f:r_%s' % k, v)
for (k, v)
in data['resource_metadata'].iteritems())
new_resource.update(resource_metadata)

# Update if resource has new information
if new_resource != resource:
meters = _load_hbase_list(resource, 'm')
Expand All @@ -249,7 +230,6 @@ def record_metering_data(self, data):

# We use reverse timestamps in rowkeys as they are sorted
# alphabetically.
rts = reverse_timestamp(data['timestamp'])
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())

# Convert timestamp to string as json.dumps won't
Expand Down Expand Up @@ -309,41 +289,58 @@ def get_resources(self, user=None, project=None, source=None,
:param source: Optional source filter.
:param start_timestamp: Optional modified timestamp start range.
:param end_timestamp: Optional modified timestamp end range.
:param metaquery: Optional dict with metadata to match on.
"""
q, start_row, end_row = make_query(user=user,
project=project,
source=source,
start=start_timestamp,
end=end_timestamp,
require_meter=False)
LOG.debug("q: %s" % q)
# TODO implement metaquery support
if len(metaquery) > 0:
raise NotImplementedError('metaquery not implemented')

resource_ids = {}
g = self.meter.scan(filter=q, row_start=start_row,
row_stop=end_row)
for ignored, data in g:
resource_ids[data['f:resource_id']] = data['f:resource_id']

q = make_query(user=user, project=project, source=source,
query_only=True, require_meter=False)
LOG.debug("q: %s" % q)
for resource_id, data in self.resource.rows(resource_ids):
yield models.Resource(
resource_id=resource_id,
def make_resource(data):
""" transform HBase fields to Resource model
"""
# convert HBase metadata e.g. f:r_display_name to display_name
data['f:metadata'] = dict((k[4:], v)
for k, v in data.iteritems()
if k.startswith('f:r_'))

return models.Resource(
resource_id=data['f:resource_id'],
project_id=data['f:project_id'],
source=data['f:source'],
user_id=data['f:user_id'],
metadata=json.loads(data['f:metadata']),
metadata=data['f:metadata'],
meter=[
models.ResourceMeter(*(m[4:].split("!")))
for m in data
if m.startswith('f:m_')
],
)

q, start_row, stop_row = make_query(user=user,
project=project,
source=source,
start=start_timestamp,
end=end_timestamp,
require_meter=False,
query_only=False)
LOG.debug("Query Meter table: %s" % q)
gen = self.meter.scan(filter=q, row_start=start_row, row_stop=stop_row)

# put all the resource_ids in a Set
resource_ids = Set()
for ignored, data in gen:
resource_ids.add(data['f:resource_id'])

# handle metaquery
if len(metaquery) > 0:
for ignored, data in self.resource.rows(resource_ids):
for k, v in metaquery.iteritems():
# if metaquery matches, yield the resource model
# e.g. metaquery: metadata.display_name
# equals
# HBase: f:r_display_name
if data['f:r_' + k.split('.', 1)[1]] == v:
yield make_resource(data)
else:
for ignored, data in self.resource.rows(resource_ids):
yield make_resource(data)

def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery={}):
"""Return an iterable of models.Meter instances
Expand All @@ -354,13 +351,23 @@ def get_meters(self, user=None, project=None, resource=None, source=None,
:param source: Optional source filter.
:param metaquery: Optional dict with metadata to match on.
"""
q, ignored, ignored = make_query(user=user, project=project,
resource=resource, source=source,
require_meter=False)
LOG.debug("q: %s" % q)
# TODO implement metaquery support
q = make_query(user=user, project=project, resource=resource,
source=source, require_meter=False, query_only=True)
LOG.debug("Query Resource table: %s" % q)

# handle metaquery
if len(metaquery) > 0:
raise NotImplementedError('metaquery not implemented')
meta_q = []
for k, v in metaquery.iteritems():
meta_q.append(
"SingleColumnValueFilter ('f', '%s', =, 'binary:%s')"
% ('r_' + k.split('.', 1)[1], v))
meta_q = " AND ".join(meta_q)
# join query and metaquery
if q is not None:
q += " AND " + meta_q
else:
q = meta_q # metaquery only

gen = self.resource.scan(filter=q)

Expand Down Expand Up @@ -389,15 +396,36 @@ def get_meters(self, user=None, project=None, resource=None, source=None,
def get_samples(self, sample_filter):
"""Return an iterable of models.Sample instances
"""
def make_sample(data):
""" transform HBase fields to Sample model
"""
data = json.loads(data['f:message'])
data['timestamp'] = timeutils.parse_strtime(data['timestamp'])
return models.Sample(**data)

q, start, stop = make_query_from_filter(sample_filter,
require_meter=False)
LOG.debug("q: %s" % q)
LOG.debug("Query Meter Table: %s" % q)

gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)

for ignored, meter in gen:
meter = json.loads(meter['f:message'])
meter['timestamp'] = timeutils.parse_strtime(meter['timestamp'])
yield models.Sample(**meter)
# TODO (shengjie) put this implementation here because it's failing
# the test. bp hbase-meter-table-enhancement will address this
# properly.
# handle metaquery
metaquery = sample_filter.metaquery
if len(metaquery) > 0:
# metaquery checks resource table
resource = self.resource.row(meter['f:resource_id'])

for k, v in metaquery.iteritems():
if resource['f:r_' + k.split('.', 1)[1]] != v:
break # if one metaquery doesn't match, break
else:
yield make_sample(meter)
else:
yield make_sample(meter)

def _update_meter_stats(self, stat, meter):
"""Do the stats calculation on a requested time bucket in stats dict
Expand Down Expand Up @@ -660,7 +688,8 @@ def reverse_timestamp(dt):
def make_query(user=None, project=None, meter=None,
resource=None, source=None, start=None, end=None,
require_meter=True, query_only=False):
"""Return a filter query based on the selected parameters.
"""Return a filter query string based on the selected parameters.
:param user: Optional user-id
:param project: Optional project-id
:param meter: Optional counter-name
Expand All @@ -687,23 +716,19 @@ def make_query(user=None, project=None, meter=None,
if source:
q.append("SingleColumnValueFilter "
"('f', 'source', =, 'binary:%s')" % source)

start_row, end_row = "", ""
rts_start = str(reverse_timestamp(start) + 1) if start else ""
rts_end = str(reverse_timestamp(end) + 1) if end else ""

# when start_time and end_time is provided,
# if it's filtered by meter,
# rowkey will be used in the query;
# if it's non meter filter query(eg. project_id, user_id etc),
# else it's non meter filter query(e.g. project_id, user_id etc),
# SingleColumnValueFilter against rts will be appended to the query
# query other tables should have no start and end passed in
stopRow, startRow = "", ""
rts_start = str(reverse_timestamp(start) + 1) if start else ""
rts_end = str(reverse_timestamp(end) + 1) if end else ""

if meter:
# if it's meter filter without start and end,
# startRow = meter while stopRow = meter + MAX_BYTE
if not rts_start:
rts_start = chr(127)
stopRow = "%s_%s" % (meter, rts_start)
startRow = "%s_%s" % (meter, rts_end)
start_row, end_row = _make_rowkey_scan(meter, rts_start, rts_end)
elif require_meter:
raise RuntimeError('Missing required meter specifier')
else:
Expand All @@ -717,10 +742,11 @@ def make_query(user=None, project=None, meter=None,
sample_filter = None
if len(q):
sample_filter = " AND ".join(q)

if query_only:
return sample_filter
else:
return sample_filter, startRow, stopRow
return sample_filter, start_row, end_row


def make_query_from_filter(sample_filter, require_meter=True):
Expand All @@ -730,16 +756,24 @@ def make_query_from_filter(sample_filter, require_meter=True):
:param require_meter: If true and the filter does not have a meter,
raise an error.
"""
if sample_filter.metaquery is not None and \
len(sample_filter.metaquery) > 0:
raise NotImplementedError('metaquery not implemented')

return make_query(sample_filter.user, sample_filter.project,
sample_filter.meter, sample_filter.resource,
sample_filter.source, sample_filter.start,
sample_filter.end, require_meter)


def _make_rowkey_scan(meter, rts_start=None, rts_end=None):
""" if it's meter filter without start and end,
start_row = meter while end_row = meter + MAX_BYTE
"""
if not rts_start:
rts_start = chr(127)
end_row = "%s_%s" % (meter, rts_start)
start_row = "%s_%s" % (meter, rts_end)

return start_row, end_row


def _load_hbase_list(d, prefix):
"""Deserialise dict stored as HBase column family
"""
Expand Down
12 changes: 12 additions & 0 deletions tests/api/v1/test_impl_hbase.py
Expand Up @@ -34,6 +34,10 @@ class TestListEvents(list_events.TestListEvents):
database_connection = 'hbase://__test__'


class TestListEventsMetaQuery(list_events.TestListEventsMetaquery):
database_connection = 'hbase://__test__'


class TestListEmptyMeters(list_meters.TestListEmptyMeters):
database_connection = 'hbase://__test__'

Expand All @@ -42,6 +46,10 @@ class TestListMeters(list_meters.TestListMeters):
database_connection = 'hbase://__test__'


class TestListMetersMetaquery(list_meters.TestListMetersMetaquery):
database_connection = 'hbase://__test__'


class TestListEmptyUsers(list_users.TestListEmptyUsers):
database_connection = 'hbase://__test__'

Expand Down Expand Up @@ -70,6 +78,10 @@ class TestListResources(list_resources.TestListResources):
database_connection = 'hbase://__test__'


class TestListResourcesMetaquery(list_resources.TestListResourcesMetaquery):
database_connection = 'hbase://__test__'


class TestListSource(list_sources.TestListSource):
database_connection = 'hbase://__test__'

Expand Down

0 comments on commit 09b4623

Please sign in to comment.