Skip to content

Commit

Permalink
feat(api): Client-ID as a real UUID
Browse files Browse the repository at this point in the history
We store the UUID in binary form in DBs, and perform checking
on user inputs.  Compared with the hex form we currently using,
the binary form saves half space to store.  In addition, by
enforcing UUID on the server side, we can minimize the chance of
client ID collision.

Change-Id: Ic3048a0d2aa21bd201e2d2d9cd8a562662cf8f8e
Closes-Bug: 1233420
  • Loading branch information
lichray committed Oct 7, 2013
1 parent cdf6f1b commit cef47c6
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 40 deletions.
5 changes: 2 additions & 3 deletions marconi/queues/storage/base.py
Expand Up @@ -184,8 +184,7 @@ def list(self, queue, project=None, marker=None,
messages to return.
:param echo: (Default False) Boolean expressing whether
or not this client should receive its own messages.
:param client_uuid: Client's unique identifier. This param
is required when echo=False.
:param client_uuid: A UUID object. Required when echo=False.
:returns: An iterator giving a sequence of messages and
the marker of the next page.
Expand Down Expand Up @@ -245,7 +244,7 @@ def post(self, queue, messages, client_uuid, project=None):
:param messages: Messages to post to queue, an iterable
yielding 1 or more elements. An empty iterable
results in undefined behavior.
:param client_uuid: Client's unique identifier.
:param client_uuid: A UUID object.
:param project: Project id
:returns: List of message ids
Expand Down
14 changes: 13 additions & 1 deletion marconi/queues/storage/sqlite/driver.py
Expand Up @@ -15,6 +15,7 @@

import contextlib
import sqlite3
import uuid

import msgpack

Expand Down Expand Up @@ -43,11 +44,22 @@ def pack(o):
:param o: a Python str, unicode, int, long, float, bool, None
or a dict or list of %o
"""
return buffer(msgpack.dumps(o))
return sqlite3.Binary(msgpack.dumps(o))

sqlite3.register_converter('DOCUMENT', lambda s:
msgpack.loads(s, encoding='utf-8'))

@staticmethod
def uuid(o):
"""Converts a UUID object to a custom SQlite `UUID`.
:param o: a UUID object
"""
return sqlite3.Binary(o.bytes)

sqlite3.register_converter('UUID', lambda s:
uuid.UUID(hex=s))

def run(self, sql, *args):
"""Performs a SQL query.
Expand Down
7 changes: 4 additions & 3 deletions marconi/queues/storage/sqlite/messages.py
Expand Up @@ -35,7 +35,7 @@ def __init__(self, driver):
qid INTEGER,
ttl INTEGER,
content DOCUMENT,
client TEXT,
client UUID,
created DATETIME, -- seconds since the Julian day
PRIMARY KEY(id),
FOREIGN KEY(qid) references Queues(id) on delete cascade
Expand Down Expand Up @@ -157,7 +157,7 @@ def list(self, queue, project, marker=None, limit=None,
if not echo:
sql += '''
and M.client != ?'''
args += [client_uuid]
args += [self.driver.uuid(client_uuid)]

if marker:
sql += '''
Expand Down Expand Up @@ -214,7 +214,8 @@ def post(self, queue, messages, client_uuid, project):
def it():
for m in messages:
yield (my['newid'], qid, m['ttl'],
self.driver.pack(m['body']), client_uuid)
self.driver.pack(m['body']),
self.driver.uuid(client_uuid))
my['newid'] += 1

self.driver.run_multiple('''
Expand Down
9 changes: 4 additions & 5 deletions marconi/queues/transport/wsgi/messages.py
Expand Up @@ -74,8 +74,7 @@ def _get_by_id(self, base_path, project_id, queue_name, ids):
return messages

def _get(self, req, project_id, queue_name):
uuid = req.get_header('Client-ID', required=True)

client_uuid = wsgi_utils.get_client_uuid(req)
kwargs = {}

# NOTE(kgriffs): This syntax ensures that
Expand All @@ -90,7 +89,7 @@ def _get(self, req, project_id, queue_name):
results = self.message_controller.list(
queue_name,
project=project_id,
client_uuid=uuid,
client_uuid=client_uuid,
**kwargs)

# Buffer messages
Expand Down Expand Up @@ -136,7 +135,7 @@ def on_post(self, req, resp, project_id, queue_name):
u'project: %(project)s') %
{'queue': queue_name, 'project': project_id})

uuid = req.get_header('Client-ID', required=True)
client_uuid = wsgi_utils.get_client_uuid(req)

# Place JSON size restriction before parsing
if req.content_length > CFG.content_max_length:
Expand Down Expand Up @@ -164,7 +163,7 @@ def on_post(self, req, resp, project_id, queue_name):
queue_name,
messages=messages,
project=project_id,
client_uuid=uuid)
client_uuid=client_uuid)

except validate.ValidationFailed as ex:
raise wsgi_exceptions.HTTPBadRequestAPI(six.text_type(ex))
Expand Down
19 changes: 19 additions & 0 deletions marconi/queues/transport/wsgi/utils.py
Expand Up @@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import uuid

import marconi.openstack.common.log as logging

from marconi.queues.transport import utils
Expand Down Expand Up @@ -153,3 +155,20 @@ def get_checked_field(document, name, value_type):
description = _(u'The value of the "{name}" field must be a {vtype}.')
description = description.format(name=name, vtype=value_type.__name__)
raise exceptions.HTTPBadRequestBody(description)


def get_client_uuid(req):
"""Read a required Client-ID from a request.
:param req: A falcon.Request object
:raises: HTTPBadRequest if the Client-ID header is missing or
does not represent a valid UUID
:returns: A UUID object
"""

try:
return uuid.UUID(req.get_header('Client-ID', required=True))

except ValueError:
description = _(u'Malformed hexadecimal UUID.')
raise exceptions.HTTPBadRequestAPI(description)
46 changes: 28 additions & 18 deletions marconi/tests/queues/storage/base.py
Expand Up @@ -15,6 +15,7 @@

import datetime
import time
import uuid

import ddt
from testtools import matchers
Expand Down Expand Up @@ -121,9 +122,11 @@ def test_queue_lifecycle(self):
metadata = self.controller.get_metadata('test', project=self.project)
self.assertEqual(metadata['meta'], 'test_meta')

client_uuid = uuid.uuid4()

# Test queue statistic
_insert_fixtures(self.message_controller, 'test',
project=self.project, client_uuid='my_uuid',
project=self.project, client_uuid=client_uuid,
num=6)

# NOTE(kgriffs): We can't get around doing this, because
Expand All @@ -132,7 +135,7 @@ def test_queue_lifecycle(self):
time.sleep(1)

_insert_fixtures(self.message_controller, 'test',
project=self.project, client_uuid='my_uuid',
project=self.project, client_uuid=client_uuid,
num=6)

stats = self.controller.stats('test', project=self.project)
Expand Down Expand Up @@ -231,7 +234,7 @@ def test_message_lifecycle(self):
# Test Message Creation
created = list(self.controller.post(queue_name, messages,
project=self.project,
client_uuid='unused'))
client_uuid=uuid.uuid4()))
self.assertEqual(len(created), 1)

# Test Message Get
Expand All @@ -245,8 +248,10 @@ def test_message_lifecycle(self):
self.controller.get(queue_name, created[0], project=self.project)

def test_get_multi(self):
client_uuid = uuid.uuid4()

_insert_fixtures(self.controller, self.queue_name,
project=self.project, client_uuid='my_uuid', num=15)
project=self.project, client_uuid=client_uuid, num=15)

def load_messages(expected, *args, **kwargs):
interaction = self.controller.list(*args, **kwargs)
Expand All @@ -256,7 +261,7 @@ def load_messages(expected, *args, **kwargs):

# Test all messages, echo False and uuid
load_messages(0, self.queue_name, project=self.project,
client_uuid='my_uuid')
client_uuid=client_uuid)

# Test all messages and limit
load_messages(15, self.queue_name, project=self.project, limit=20,
Expand All @@ -265,17 +270,17 @@ def load_messages(expected, *args, **kwargs):
# Test all messages, echo True, and uuid
interaction = load_messages(10, self.queue_name, echo=True,
project=self.project,
client_uuid='my_uuid')
client_uuid=client_uuid)

# Test all messages, echo True, uuid and marker
load_messages(5, self.queue_name, echo=True, project=self.project,
marker=next(interaction), client_uuid='my_uuid')
marker=next(interaction), client_uuid=client_uuid)

def test_multi_ids(self):
messages_in = [{'ttl': 120, 'body': 0}, {'ttl': 240, 'body': 1}]
ids = self.controller.post(self.queue_name, messages_in,
project=self.project,
client_uuid='my_uuid')
client_uuid=uuid.uuid4())

messages_out = self.controller.bulk_get(self.queue_name, ids,
project=self.project)
Expand All @@ -292,13 +297,15 @@ def test_multi_ids(self):
next(result)

def test_claim_effects(self):
client_uuid = uuid.uuid4()

_insert_fixtures(self.controller, self.queue_name,
project=self.project, client_uuid='my_uuid', num=12)
project=self.project, client_uuid=client_uuid, num=12)

def list_messages(include_claimed=None):
kwargs = {
'project': self.project,
'client_uuid': 'my_uuid',
'client_uuid': client_uuid,
'echo': True,
}

Expand Down Expand Up @@ -360,14 +367,15 @@ def list_messages(include_claimed=None):
@testing.is_slow(condition=lambda self: self.gc_interval != 0)
def test_expired_messages(self):
messages = [{'body': 3.14, 'ttl': 0}]
client_uuid = uuid.uuid4()

[msgid] = self.controller.post(self.queue_name, messages,
project=self.project,
client_uuid='my_uuid')
client_uuid=client_uuid)

[msgid] = self.controller.post(self.queue_name, messages,
project=self.project,
client_uuid='my_uuid')
client_uuid=client_uuid)

time.sleep(self.gc_interval)

Expand Down Expand Up @@ -400,7 +408,7 @@ def test_bad_claim_id(self):
[msgid] = self.controller.post(self.queue_name,
[{'body': {}, 'ttl': 10}],
project=self.project,
client_uuid='my_uuid')
client_uuid=uuid.uuid4())

bad_claim_id = '; DROP TABLE queues'
self.controller.delete(self.queue_name,
Expand All @@ -412,6 +420,7 @@ def test_bad_marker(self):
bad_marker = 'xyz'
interaction = self.controller.list(self.queue_name,
project=self.project,
client_uuid=uuid.uuid4(),
marker=bad_marker)
messages = list(next(interaction))

Expand Down Expand Up @@ -442,7 +451,8 @@ def tearDown(self):

def test_claim_lifecycle(self):
_insert_fixtures(self.message_controller, self.queue_name,
project=self.project, client_uuid='my_uuid', num=20)
project=self.project, client_uuid=uuid.uuid4(),
num=20)

meta = {'ttl': 70, 'grace': 30}

Expand Down Expand Up @@ -500,7 +510,7 @@ def test_claim_lifecycle(self):

def test_extend_lifetime(self):
_insert_fixtures(self.message_controller, self.queue_name,
project=self.project, client_uuid='my_uuid',
project=self.project, client_uuid=uuid.uuid4(),
num=20, ttl=120)

meta = {'ttl': 777, 'grace': 0}
Expand All @@ -513,7 +523,7 @@ def test_extend_lifetime(self):

def test_extend_lifetime_with_grace_1(self):
_insert_fixtures(self.message_controller, self.queue_name,
project=self.project, client_uuid='my_uuid',
project=self.project, client_uuid=uuid.uuid4(),
num=20, ttl=120)

meta = {'ttl': 777, 'grace': 23}
Expand All @@ -526,7 +536,7 @@ def test_extend_lifetime_with_grace_1(self):

def test_extend_lifetime_with_grace_2(self):
_insert_fixtures(self.message_controller, self.queue_name,
project=self.project, client_uuid='my_uuid',
project=self.project, client_uuid=uuid.uuid4(),
num=20, ttl=120)

# Although ttl is less than the message's TTL, the grace
Expand All @@ -541,7 +551,7 @@ def test_extend_lifetime_with_grace_2(self):

def test_do_not_extend_lifetime(self):
_insert_fixtures(self.message_controller, self.queue_name,
project=self.project, client_uuid='my_uuid',
project=self.project, client_uuid=uuid.uuid4(),
num=20, ttl=120)

# Choose a ttl that is less than the message's current TTL
Expand Down
4 changes: 3 additions & 1 deletion tests/unit/queues/transport/wsgi/test_auth.py
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
"""Test Auth."""

import uuid

import falcon
from falcon import testing
from keystoneclient.middleware import auth_token
Expand All @@ -27,7 +29,7 @@ class TestWSGIAuth(base.TestBase):

def setUp(self):
super(TestWSGIAuth, self).setUp()
self.headers = {'Client-ID': '30387f00'}
self.headers = {'Client-ID': str(uuid.uuid4())}

def test_auth_install(self):
self.assertTrue(isinstance(self.app,
Expand Down
11 changes: 8 additions & 3 deletions tests/unit/queues/transport/wsgi/test_claims.py
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import json
import uuid

import ddt
import falcon
Expand Down Expand Up @@ -46,7 +47,7 @@ def setUp(self):

doc = json.dumps([{'body': 239, 'ttl': 300}] * 10)
self.simulate_post(self.queue_path + '/messages', self.project_id,
body=doc, headers={'Client-ID': '30387f00'})
body=doc, headers={'Client-ID': str(uuid.uuid4())})
self.assertEquals(self.srmock.status, falcon.HTTP_201)

def tearDown(self):
Expand Down Expand Up @@ -122,15 +123,19 @@ def test_lifecycle(self):
query_string='limit=3')
self.assertEquals(self.srmock.status, falcon.HTTP_204)

headers = {
'Client-ID': str(uuid.uuid4()),
}

# Listing messages, by default, won't include claimed
body = self.simulate_get(self.messages_path, self.project_id,
headers={'Client-ID': 'foo'})
headers=headers)
self.assertEquals(self.srmock.status, falcon.HTTP_204)

# Include claimed messages this time
body = self.simulate_get(self.messages_path, self.project_id,
query_string='include_claimed=true',
headers={'Client-ID': 'foo'})
headers=headers)
listed = json.loads(body[0])
self.assertEquals(self.srmock.status, falcon.HTTP_200)
self.assertEquals(len(listed['messages']), len(claimed))
Expand Down

0 comments on commit cef47c6

Please sign in to comment.