Skip to content

Commit

Permalink
feat (proxy/admin): allow partition modifications
Browse files Browse the repository at this point in the history
This patchset adds one new method to the partitions API
- PATCH /v1/partitions/{partition}

The partitions storage layer is also updated to support a simpler
update interface:
- update(self, name, **kwargs) [with kwargs in ('hosts', 'weight')]

The partitions transport layer is cleared of cruft and reuses JSON
handling utilities from the queues transport layer.

This patch implementation uses jsonschema to achieve self-documenting,
strict format validation of the incoming input.

The partition storage controllers are modified accordingly, as are the
tests. More tests are added for the partitions transport layer to
verify the correctness and to ease future refactorings.

Change-Id: Ifa92b1225421196f95131c2b74e3c07b07c4cfd4
Implements: blueprint placement-service
Closes-Bug: 1230841
  • Loading branch information
Alejandro Cabrera committed Oct 2, 2013
1 parent 53c386a commit 99aaa02
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 52 deletions.
13 changes: 13 additions & 0 deletions marconi/proxy/storage/base.py
Expand Up @@ -96,6 +96,19 @@ def drop_all(self):
"""Drops all partitions from storage."""
raise NotImplementedError

def update(self, name, **kwargs):
"""Updates the weight or hosts of this partition.
:param name: Name of the partition
:type name: text
:param weight: Weight, > 0
:type weight: int
:param kwargs: one of 'hosts' or 'weight'
:type kwargs: dict
:raises: PartitionNotFound
"""
raise NotImplementedError


@six.add_metaclass(abc.ABCMeta)
class CatalogueBase(ControllerBase):
Expand Down
8 changes: 8 additions & 0 deletions marconi/proxy/storage/memory/partitions.py
Expand Up @@ -45,6 +45,14 @@ def create(self, name, weight, hosts):
'w': weight,
'h': hosts}

def update(self, name, **kwargs):
key, value = kwargs.popitem()
assert key in ('weight', 'hosts'), "kwargs (hosts, weight)"
try:
self._col[name][key[0]] = value
except KeyError:
raise exceptions.PartitionNotFound(name)

def delete(self, name):
try:
del self._col[name]
Expand Down
10 changes: 10 additions & 0 deletions marconi/proxy/storage/mongodb/partitions.py
Expand Up @@ -84,6 +84,16 @@ def drop_all(self):
self._col.drop()
self._col.ensure_index(PARTITIONS_INDEX, unique=True)

@utils.raises_conn_error
def update(self, name, **kwargs):
key, value = kwargs.popitem()
assert key in ('hosts', 'weight'), "kwargs (hosts, weight)"
res = self._col.update({'n': name},
{'$set': {key[0]: value}},
upsert=False)
if not res['updatedExisting']:
raise exceptions.PartitionNotFound(name)


def _normalize(entry):
return {
Expand Down
44 changes: 44 additions & 0 deletions marconi/proxy/transport/schema.py
@@ -0,0 +1,44 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""schema: JSON Schemas for marconi proxy transports."""

partition_patch_hosts = {
'type': 'object', 'properties': {
'hosts': {
'type': 'array', 'minItems': 1, 'items': {
'type': 'string'
}
},
'additionalProperties': False
}
}

partition_patch_weight = {
'type': 'object', 'properties': {
'weight': {
'type': 'integer', 'minimum': 1, 'maximum': 2**32 - 1
},
'additionalProperties': False
}
}

partition_create = {
'type': 'object', 'properties': {
'weight': partition_patch_weight['properties']['weight'],
'hosts': partition_patch_hosts['properties']['hosts']
},
'required': ['hosts', 'weight'],
'additionalProperties': False
}
37 changes: 37 additions & 0 deletions marconi/proxy/transport/utils.py
@@ -0,0 +1,37 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""utils: utilities for transport handling."""

import jsonschema

from marconi.queues.transport.wsgi import exceptions as wsgi_errors


# TODO(cpp-cabrera): generalize this
def validate(validator, document):
"""Verifies a document against a schema.
:param validator: a validator to use to check validity
:type validator: jsonschema.Draft4Validator
:param document: document to check
:type document: dict
:raises: wsgi_errors.HTTPBadRequestBody
"""
try:
validator.validate(document)
except jsonschema.ValidationError as ex:
raise wsgi_errors.HTTPBadRequestBody(
'{0}: {1}'.format(ex.args, ex.message)
)
102 changes: 56 additions & 46 deletions marconi/proxy/transport/wsgi/partitions.py
Expand Up @@ -27,8 +27,29 @@
import json

import falcon
import jsonschema

from marconi.proxy.storage import exceptions
from marconi.proxy.transport import schema, utils
from marconi.queues.transport import utils as json_utils
from marconi.queues.transport.wsgi import exceptions as wsgi_errors


def load(req):
"""Reads request body, raising an exception if it is not JSON.
:param req: The request object to read from
:type req: falcon.Request
:return: a dictionary decoded from the JSON stream
:rtype: dict
:raises: wsgi_errors.HTTPBadRequestBody
"""
try:
return json_utils.read_json(req.stream, req.content_length)
except (json_utils.MalformedJSON, json_utils.OverflowedJSONInteger):
raise wsgi_errors.HTTPBadRequestBody(
'JSON could not be parsed.'
)


class Listing(object):
Expand Down Expand Up @@ -69,6 +90,10 @@ class Resource(object):
"""
def __init__(self, partitions_controller):
self._ctrl = partitions_controller
validator_type = jsonschema.Draft4Validator
self._put_validator = validator_type(schema.partition_create)
self._hosts_validator = validator_type(schema.partition_patch_hosts)
self._weight_validator = validator_type(schema.partition_patch_weight)

def on_get(self, request, response, partition):
"""Returns a JSON object for a single partition entry:
Expand All @@ -89,63 +114,19 @@ def on_get(self, request, response, partition):
'weight': data['weight'],
}, ensure_ascii=False)

def _validate_put(self, data):
if not isinstance(data, dict):
raise falcon.HTTPBadRequest(
'Invalid metadata', 'Define a partition as a dict'
)

if 'hosts' not in data:
raise falcon.HTTPBadRequest(
'Missing hosts list', 'Provide a list of hosts'
)

if not data['hosts']:
raise falcon.HTTPBadRequest(
'Empty hosts list', 'Hosts list cannot be empty'
)

if not isinstance(data['hosts'], list):
raise falcon.HTTPBadRequest(
'Invalid hosts', 'Hosts must be a list of URLs'
)

# TODO(cpp-cabrera): check [str]
if 'weight' not in data:
raise falcon.HTTPBadRequest(
'Missing weight',
'Provide an integer weight for this partition'
)

if not isinstance(data['weight'], int):
raise falcon.HTTPBadRequest(
'Invalid weight', 'Weight must be an integer'
)

def on_put(self, request, response, partition):
"""Creates a new partition. Expects the following input:
{"weight": 100, "hosts": [""]}
:returns: HTTP | [201, 204]
"""
if partition.startswith('_'):
raise falcon.HTTPBadRequest(
'Reserved name', '_names are reserved for internal use'
)

if self._ctrl.exists(partition):
response.status = falcon.HTTP_204
return

try:
data = json.loads(request.stream.read().decode('utf8'))
except ValueError:
raise falcon.HTTPBadRequest(
'Invalid JSON', 'This is not a valid JSON stream.'
)

self._validate_put(data)
data = load(request)
utils.validate(self._put_validator, data)
self._ctrl.create(partition,
weight=data['weight'],
hosts=data['hosts'])
Expand All @@ -158,3 +139,32 @@ def on_delete(self, request, response, partition):
"""
self._ctrl.delete(partition)
response.status = falcon.HTTP_204

def on_patch(self, request, response, partition):
"""Allows one to update a partition's weight and/or hosts.
This method expects the user to submit a JSON object
containing both or either of 'hosts' and 'weight'. If neither
is found, the request is flagged as bad. There is also strict
format checking through the use of jsonschema. Appropriate
errors are returned in each case for badly formatted input.
:returns: HTTP | 200,400
"""
data = load(request)

if 'weight' not in data and 'hosts' not in data:
raise wsgi_errors.HTTPBadRequestBody(
'One of `hosts` or `weight` needs to be specified'
)

utils.validate(self._weight_validator, data)
utils.validate(self._hosts_validator, data)
try:
if 'weight' in data:
self._ctrl.update(partition, weight=data['weight'])
if 'hosts' in data:
self._ctrl.update(partition, hosts=data['hosts'])
except exceptions.PartitionNotFound:
raise falcon.HTTPNotFound()
1 change: 1 addition & 0 deletions requirements.txt
Expand Up @@ -3,6 +3,7 @@ pbr>=0.5.21,<1.0
Babel>=0.9.6
netaddr
falcon>=0.1.6,<0.1.7
jsonschema>=1.3.0,!=1.4.0
iso8601>=0.1.4
msgpack-python
pymongo>=2.4
Expand Down
15 changes: 15 additions & 0 deletions tests/unit/proxy/storage/base.py
Expand Up @@ -52,6 +52,7 @@ def setUp(self):
self.name = six.text_type(uuid.uuid1())

def tearDown(self):
self.controller.drop_all()
super(PartitionsControllerTest, self).tearDown()

def _check_structure(self, partition):
Expand Down Expand Up @@ -88,6 +89,19 @@ def test_partition_life_cycle(self):
# verify it isn't listable
self.assertEqual(len(list(self.controller.list())), 0)

def test_partition_updates(self):
with helpers.partition(self.controller, 'a', 10, ['a']):
self.controller.update('a', weight=11)
self.assertEqual(self.controller.get('a')['weight'], 11)

self.controller.update('a', hosts=['b'])
self.assertEqual(self.controller.get('a')['hosts'], ['b'])

def test_update_on_nonexisting_raises(self):
self.assertRaises(exceptions.PartitionNotFound,
self.controller.update,
'a', weight=10)

def test_list(self):
with helpers.partitions(self.controller, 10) as expect:
values = zip(self.controller.list(), expect)
Expand Down Expand Up @@ -131,6 +145,7 @@ def setUp(self):
self.project = six.text_type(uuid.uuid1())

def tearDown(self):
self.controller.drop_all()
super(CatalogueControllerTest, self).tearDown()

def _check_structure(self, entry):
Expand Down

0 comments on commit 99aaa02

Please sign in to comment.