Browse files

Clean rights from file and remove tests

  • Loading branch information...
1 parent 7b15832 commit 4c064bcf353a583ee8c41e23e471b2218c8c101f @liZe liZe committed Sep 15, 2012
Showing with 123 additions and 387 deletions.
  1. +85 −67 radicale/__init__.py
  2. +1 −1 radicale/config.py
  3. +35 −131 radicale/rights/from_file.py
  4. +1 −1 radicale/xmlutils.py
  5. +1 −17 setup.py
  6. +0 −7 test/python/rights/__init__.py
  7. +0 −163 test/python/rights/test_from_file.py
View
152 radicale/__init__.py
@@ -51,16 +51,12 @@
VERSION = "git"
-# Standard "not allowed" response that is returned when an authenticated
-# user tries to access information they don't have rights to.
-NOT_ALLOWED = (
- client.FORBIDDEN,
- {},
- None)
+# Standard "not allowed" response that is returned when an authenticated user
+# tries to access information they don't have rights to
+NOT_ALLOWED = (client.FORBIDDEN, {}, None)
-# Standard "authenticate" response that is returned when a
-# user tries to access non-public information w/o submitting
-# proper authentication credentials
+# Standard "authenticate" response that is returned when a user tries to access
+# non-public information w/o submitting proper authentication credentials
WRONG_CREDENTIALS = (
client.UNAUTHORIZED,
{"WWW-Authenticate": "Basic realm=\"Radicale - Password Required\""},
@@ -185,57 +181,63 @@ def sanitize_uri(uri):
trailing_slash = "" if uri == "/" else trailing_slash
return uri + trailing_slash
-
def collect_allowed_items(self, items, user):
- """ Collect those items from the request that the user
- is actually allowed to access """
-
+ """Get items from request that user is allowed to access."""
read_last_collection_allowed = None
write_last_collection_allowed = None
read_allowed_items = []
write_allowed_items = []
+
for item in items:
if isinstance(item, ical.Collection):
if rights.read_authorized(user, item):
- log.LOGGER.debug("%s has read access to collection %s" % (user, item.url or "/"))
+ log.LOGGER.debug(
+ "%s has read access to collection %s" %
+ (user or "Anonymous", item.url or "/"))
read_last_collection_allowed = True
read_allowed_items.append(item)
else:
- log.LOGGER.debug("%s has NO read access to collection %s" % (user, item.url or "/"))
+ log.LOGGER.debug(
+ "%s has NO read access to collection %s" %
+ (user or "Anonymous", item.url or "/"))
read_last_collection_allowed = False
if rights.write_authorized(user, item):
- log.LOGGER.debug("%s has write access to collection %s" % (user, item.url or "/"))
+ log.LOGGER.debug(
+ "%s has write access to collection %s" %
+ (user or "Anonymous", item.url or "/"))
write_last_collection_allowed = True
write_allowed_items.append(item)
else:
- log.LOGGER.debug("%s has NO write access to collection %s" % (user, item.url or "/"))
+ log.LOGGER.debug(
+ "%s has NO write access to collection %s" %
+ (user or "Anonymous", item.url or "/"))
write_last_collection_allowed = False
+ else:
# item is not a collection, it's the child of the last
# collection we've met in the loop. Only add this item
# if this last collection was allowed.
- else:
if read_last_collection_allowed:
- log.LOGGER.debug("%s has read access to item %s" % (user, item.name or "/"))
+ log.LOGGER.debug(
+ "%s has read access to item %s" %
+ (user or "Anonymous", item.name))
read_allowed_items.append(item)
+ else:
+ log.LOGGER.debug(
+ "%s has NO read access to item %s" %
+ (user or "Anonymous", item.name))
+
if write_last_collection_allowed:
- log.LOGGER.debug("%s has write access to item %s" % (user, item.name or "/"))
+ log.LOGGER.debug(
+ "%s has write access to item %s" %
+ (user or "Anonymous", item.name))
write_allowed_items.append(item)
-
- if (not write_last_collection_allowed) and (not read_last_collection_allowed):
- log.LOGGER.info("%s has NO access to item %s" % (user, item.name or "/"))
-
- return read_allowed_items, write_allowed_items
-
+ else:
+ log.LOGGER.debug(
+ "%s has NO write access to item %s" %
+ (user or "Anonymous", item.name))
- def _union(self, list1, list2):
- out = []
- out.extend(list1)
- for thing in list2:
- if not thing in list1:
- list1.append(thing)
- return out
-
+ return read_allowed_items, write_allowed_items
def __call__(self, environ, start_response):
"""Manage a request."""
@@ -280,14 +282,17 @@ def __call__(self, environ, start_response):
if not items or function == self.options or \
auth.is_authenticated(user, password):
- read_allowed_items, write_allowed_items = self.collect_allowed_items(items, user)
+ read_allowed_items, write_allowed_items = \
+ self.collect_allowed_items(items, user)
- if read_allowed_items or write_allowed_items or function == self.options:
+ if read_allowed_items or write_allowed_items or \
+ function == self.options:
# Collections found
status, headers, answer = function(
- environ, read_allowed_items, write_allowed_items, content, user)
+ environ, read_allowed_items, write_allowed_items, content,
+ user)
else:
- # Good user but has no rights to any of the given collections
+ # Good user but has no rights to any of the given collections
status, headers, answer = NOT_ALLOWED
else:
# Unknown or unauthorized user
@@ -312,11 +317,12 @@ def __call__(self, environ, start_response):
# All these functions must have the same parameters, some are useless
# pylint: disable=W0612,W0613,R0201
- def delete(self, environ, read_collections, write_collections, content, user):
+ def delete(self, environ, read_collections, write_collections, content,
+ user):
"""Manage DELETE request."""
if not len(write_collections):
return client.PRECONDITION_FAILED, {}, None
-
+
collection = write_collections[0]
if collection.path == environ["PATH_INFO"].strip("/"):
@@ -354,9 +360,9 @@ def get(self, environ, read_collections, write_collections, content, user):
if not len(read_collections):
return NOT_ALLOWED
-
+
collection = read_collections[0]
-
+
item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
if item_name:
@@ -374,10 +380,13 @@ def get(self, environ, read_collections, write_collections, content, user):
# Create the collection if it does not exist
if not collection.exists:
if collection in write_collections:
- log.LOGGER.debug("Creating collection %s" % collection.name)
+ log.LOGGER.debug(
+ "Creating collection %s" % collection.name)
collection.write()
else:
- log.LOGGER.debug("Collection %s not available and could not be created due to missing write rights" % collection.name)
+ log.LOGGER.debug(
+ "Collection %s not available and could not be created "
+ "due to missing write rights" % collection.name)
return NOT_ALLOWED
# Get whole collection
@@ -391,18 +400,21 @@ def get(self, environ, read_collections, write_collections, content, user):
answer = answer_text.encode(self.encoding)
return client.OK, headers, answer
- def head(self, environ, read_collections, write_collections, content, user):
+ def head(self, environ, read_collections, write_collections, content,
+ user):
"""Manage HEAD request."""
- status, headers, answer = self.get(environ, read_collections, write_collections, content, user)
+ status, headers, answer = self.get(
+ environ, read_collections, write_collections, content, user)
return status, headers, None
- def mkcalendar(self, environ, read_collections, write_collections, content, user):
+ def mkcalendar(self, environ, read_collections, write_collections, content,
+ user):
"""Manage MKCALENDAR request."""
if not len(write_collections):
return NOT_ALLOWED
-
+
collection = write_collections[0]
-
+
props = xmlutils.props_from_request(content)
timezone = props.get("C:calendar-timezone")
if timezone:
@@ -414,27 +426,29 @@ def mkcalendar(self, environ, read_collections, write_collections, content, user
collection.write()
return client.CREATED, {}, None
- def mkcol(self, environ, read_collections, write_collections, content, user):
+ def mkcol(self, environ, read_collections, write_collections, content,
+ user):
"""Manage MKCOL request."""
if not len(write_collections):
return NOT_ALLOWED
-
+
collection = write_collections[0]
-
+
props = xmlutils.props_from_request(content)
with collection.props as collection_props:
for key, value in props.items():
collection_props[key] = value
collection.write()
return client.CREATED, {}, None
- def move(self, environ, read_collections, write_collections, content, user):
+ def move(self, environ, read_collections, write_collections, content,
+ user):
"""Manage MOVE request."""
if not len(write_collections):
return NOT_ALLOWED
-
+
from_collection = write_collections[0]
-
+
from_name = xmlutils.name_from_path(
environ["PATH_INFO"], from_collection)
if from_name:
@@ -463,32 +477,35 @@ def move(self, environ, read_collections, write_collections, content, user):
# Moving collections, not supported
return client.FORBIDDEN, {}, None
- def options(self, environ, read_collections, write_collections, content, user):
+ def options(self, environ, read_collections, write_collections, content,
+ user):
"""Manage OPTIONS request."""
headers = {
"Allow": ("DELETE, HEAD, GET, MKCALENDAR, MKCOL, MOVE, "
"OPTIONS, PROPFIND, PROPPATCH, PUT, REPORT"),
"DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol"}
return client.OK, headers, None
- def propfind(self, environ, read_collections, write_collections, content, user):
+ def propfind(self, environ, read_collections, write_collections, content,
+ user):
"""Manage PROPFIND request."""
# Rights is handled by collection in xmlutils.propfind
headers = {
"DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
"Content-Type": "text/xml"}
- collections = self._union(read_collections, write_collections)
+ collections = set(read_collections + write_collections)
answer = xmlutils.propfind(
environ["PATH_INFO"], content, collections, user)
return client.MULTI_STATUS, headers, answer
- def proppatch(self, environ, read_collections, write_collections, content, user):
+ def proppatch(self, environ, read_collections, write_collections, content,
+ user):
"""Manage PROPPATCH request."""
if not len(write_collections):
return NOT_ALLOWED
-
+
collection = write_collections[0]
-
+
answer = xmlutils.proppatch(
environ["PATH_INFO"], content, collection)
headers = {
@@ -500,9 +517,9 @@ def put(self, environ, read_collections, write_collections, content, user):
"""Manage PUT request."""
if not len(write_collections):
return NOT_ALLOWED
-
+
collection = write_collections[0]
-
+
collection.set_mimetype(environ.get("CONTENT_TYPE"))
headers = {}
item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
@@ -531,15 +548,16 @@ def put(self, environ, read_collections, write_collections, content, user):
status = client.PRECONDITION_FAILED
return status, headers, None
- def report(self, environ, read_collections, write_collections, content, user):
+ def report(self, environ, read_collections, write_collections, content,
+ user):
"""Manage REPORT request."""
if not len(read_collections):
return NOT_ALLOWED
-
+
collection = read_collections[0]
-
+
headers = {"Content-Type": "text/xml"}
-
+
answer = xmlutils.report(environ["PATH_INFO"], content, collection)
return client.MULTI_STATUS, headers, answer
View
2 radicale/config.py
@@ -68,7 +68,7 @@
"courier_socket": ""},
"rights": {
"type": "None",
- "file" : "None"},
+ "file": ""},
"storage": {
"type": "filesystem",
"filesystem_folder": os.path.expanduser(
View
166 radicale/rights/from_file.py
@@ -21,156 +21,60 @@
The owner is implied to have all rights on their collections.
-Rights are read from a file whose name is specified in the config
-(section "right", key "file").
-
-The file's format is per line:
-
-collectionpath ":" principal " " rights {", " principal " " rights}*
-
-collectionpath is the path part of the collection's url
-
-principal is a user name (no whitespace allowed)
-rights is a string w/o whitespace that contains "r" for reading rights,
-"w" for writing rights and a combination of these for all rights.
-
-Empty lines are ignored. Lines starting with "#" (hash sign) are comments.
+Rights are read from a file whose name is specified in the config (section
+"right", key "file").
Example:
# This means user1 may read, user2 may write, user3 has full access
-/user0/calendar : user1 r, user2 w, user3 rw
+[/user0/calendar]
+user1: r
+user2: w
+user3: rw
+
# user0 can read /user1/cal
-/user1/cal : user0 r
+[/user1/cal]
+user0: r
-If a collection /a/b is shared and other users than the owner are
-supposed to find the collection in a propfind request, an additional
-line for /a has to be in the defintions. E.g.:
+# If a collection /a/b is shared and other users than the owner are supposed to
+# find the collection in a propfind request, an additional line for /a has to
+# be in the defintions. E.g.:
-/user0/cal: user
+[/user0]
+user1: r
"""
from radicale import config, log
from radicale.rights import owner_only
+# Manage Python2/3 different modules
+# pylint: disable=F0401
+try:
+ from configparser import RawConfigParser as ConfigParser
+except ImportError:
+ from ConfigParser import RawConfigParser as ConfigParser
+# pylint: enable=F0401
-READ_AUTHORIZED = None
-WRITE_AUTHORIZED = None
-
-
-class ParsingError(BaseException):
- """Raised if the file cannot be parsed"""
+FILENAME = config.get("rights", "file")
+if FILENAME:
+ log.LOGGER.debug("Reading rights from file %s" % FILENAME)
+ RIGHTS = ConfigParser()
+ RIGHTS.read(FILENAME)
+else:
+ log.LOGGER.error("No file name configured for rights type 'from_file'")
+ RIGHTS = None
def read_authorized(user, collection):
"""Check if the user is allowed to read the collection."""
- if owner_only.read_authorized(user, collection):
- return True
-
- curl = _normalize_trail_slash(collection.url)
-
- return _dict_knows(READ_AUTHORIZED, curl, user)
-
+ return (
+ owner_only.read_authorized(user, collection) or
+ "r" in RIGHTS.get(collection.url.rstrip("/") or "/", user))
def write_authorized(user, collection):
"""Check if the user is allowed to write the collection."""
- if owner_only.read_authorized(user, collection):
- return True
-
- curl = _normalize_trail_slash(collection.url)
-
- return _dict_knows(WRITE_AUTHORIZED, curl, user)
-
-
-
-def _dict_knows(adict, url, user):
- return adict.has_key(url) and adict.get(url).count(user) != 0
-
-
-
-def _load():
- read = {}
- write = {}
- file_name = config.get("rights", "file")
- if file_name == "None":
- log.LOGGER.error("No file name configured for rights type 'from_file'")
- return
-
- log.LOGGER.debug("Reading rights from file %s" % file_name)
-
- lines = open(file_name, "r").readlines()
-
- for line in lines:
- _process(line, read, write)
-
- global READ_AUTHORIZED, WRITE_AUTHORIZED
- READ_AUTHORIZED = read
- WRITE_AUTHORIZED = write
-
-
-
-def _process(line, read, write):
- line = line.strip()
- if line == "":
- """Empty line"""
- return
-
- if line.startswith("#"):
- """Comment"""
- return
-
- collection, sep, rights_part = line.partition(":")
-
- rights_part = rights_part.strip()
-
- if rights_part == "":
- return
-
- collection = collection.strip()
-
- if collection == "":
- raise ParsingError
-
- collection = _normalize_trail_slash(collection)
-
- rights = rights_part.split(",")
- for right in rights:
- user, sep, right_defs = right.strip().partition(" ")
-
- if user == "" or right_defs == "":
- raise ParsingError
-
- user = user.strip()
- right_defs = right_defs.strip()
-
- for right_def in list(right_defs):
-
- if right_def == 'r':
- _append(read, collection, user)
- elif right_def == 'w':
- _append(write, collection, user)
- else:
- raise ParsingError
-
-
-
-def _append(rdict, key, value):
- if rdict.has_key(key):
- rlist = rdict[key]
- rlist.append(value)
- else:
- rlist = [value]
- rdict[key] = rlist
-
-
-
-def _normalize_trail_slash(s):
- """Removes a maybe existing trailing slash"""
- if s != "/" and s.endswith("/"):
- s, sep, empty = s.rpartition("/")
- return s
-
-
-_load()
+ return (
+ owner_only.write_authorized(user, collection) or
+ "w" in RIGHTS.get(collection.url.rstrip("/") or "/", user))
View
2 radicale/xmlutils.py
@@ -188,7 +188,7 @@ def propfind(path, xml_request, collections, user=None):
"""Read and answer PROPFIND requests.
Read rfc4918-9.1 for info.
-
+
The collections parameter is a list of collections that are
to be included in the output. Rights checking has to be done
by the caller.
View
18 setup.py
@@ -36,23 +36,8 @@
"""
-from distutils.core import setup, Command
-import unittest
+from distutils.core import setup
import radicale
-import sys
-
-
-class RunTests(Command):
- user_options = []
- def initialize_options(self):
- pass
- def finalize_options(self):
- pass
- def run(self):
- tests = unittest.defaultTestLoader.discover("test/python")
- result = unittest.TextTestRunner(stream=sys.stdout, verbosity=99)._makeResult()
- tests.run(result)
-
# When the version is updated, ``radicale.VERSION`` must be modified.
@@ -73,7 +58,6 @@ def run(self):
"radicale", "radicale.auth", "radicale.rights", "radicale.storage"],
provides=["radicale"],
scripts=["bin/radicale"],
- cmdclass={'test': RunTests},
keywords=["calendar", "addressbook", "CalDAV", "CardDAV"],
classifiers=[
"Development Status :: 4 - Beta",
View
7 test/python/rights/__init__.py
@@ -1,7 +0,0 @@
-'''
-Created on 09.08.2012
-
-Tests for rights-related code.
-
-@author: mj
-'''
View
163 test/python/rights/test_from_file.py
@@ -1,163 +0,0 @@
-"""
-
-Unit test for radicale.rights.from_file.
-
-Tests reading the file. The processing is untested, yet.
-
-"""
-
-
-from radicale.rights import from_file
-import unittest
-
-
-
-class Test1(unittest.TestCase):
-
- def testProcessEmptyLine(self):
- """ Line with a comment """
-
- # Input values
- line = " "
- read = {}
- write = {}
-
- try:
- # Call SUT
- from_file._process(line, read, write)
- except from_file.ParsingError:
- self.assertTrue(False)
-
- self.assertTrue(len(read.keys()) == 0)
- self.assertTrue(len(write.keys()) == 0)
-
-
- def testProcessComment(self):
- """ Line with a comment """
-
- # Input values
- line = "# some comment"
- read = {}
- write = {}
-
- try:
- # Call SUT
- from_file._process(line, read, write)
- except from_file.ParsingError:
- self.assertTrue(False)
-
- self.assertTrue(len(read.keys()) == 0)
- self.assertTrue(len(write.keys()) == 0)
-
-
- def testProcess0a(self):
- """ Pointless line: no rights definitions """
-
- # Input values
- line = "/user1/collection1 : "
- read = {}
- write = {}
-
- try:
- # Call SUT
- from_file._process(line, read, write)
- except from_file.ParsingError:
- self.fail("Unexpected exception")
-
- self.assertTrue(len(read.keys()) == 0)
- self.assertTrue(len(write.keys()) == 0)
-
-
- def testProcess1a(self):
- """ Malformed line: no collection definitions """
-
- # Input values
- line = " : a b"
- read = {}
- write = {}
-
- try:
- # Call SUT
- from_file._process(line, read, write)
- except from_file.ParsingError:
- """Exception expected"""
- else:
- self.fail("Expected exception not raised")
-
-
-
- def testProcess1b(self):
- """ Malformed line: right "b" unknown """
-
- # Input values
- line = "/user1/collection1 : a b"
- read = {}
- write = {}
-
- try:
- # Call SUT
- from_file._process(line, read, write)
- except from_file.ParsingError:
- """Exception expected"""
- else:
- self.fail("Expected exception not raised")
-
-
- def testProcess1c(self):
- """ Malformed line: user/right empty """
-
- # Input values
- line = "/user1/collection1 : a"
- read = {}
- write = {}
-
- try:
- # Call SUT
- from_file._process(line, read, write)
- except from_file.ParsingError:
- """Exception expected"""
- else:
- self.fail("Expected exception not raised")
-
-
- def testProcess2(self):
- """Actual sensible input all of which means the same"""
-
- lines = [
- "/user1/collection1 : other1 r, other2 w, other6 rw",
- "/user1/collection1/ : other1 r, other2 w, other6 rw",
- "/user1/collection1: other1 r, other2 w, other6 rw",
- "/user1/collection1/: other1 r, other2 w, other6 rw",
- "/user1/collection1: other1 r, other2 w,other6 rw",
- "/user1/collection1 :other1 r,other2 w, other6 rw",
- "/user1/collection1\t:other1 r,\tother2 w,\tother6 rw",
- ]
-
- for line in lines:
- # Input values
- read = {}
- write = {}
-
- try:
- # Call SUT
- from_file._process(line, read, write)
- except:
- self.fail("unexpected exception for input %s" % line)
-
- # Check
- self.assertEquals(len(read.keys()), 1, "keys in %s" % line)
- self.assertEquals(len(read.get("/user1/collection1")), 2, "rights in %s" % line)
- self.assertTrue(read.get("/user1/collection1").count("other1"), "other1 read in %s" % line)
- self.assertTrue(read.get("/user1/collection1").count("other6"), "other6 read in %s" % line)
-
- self.assertEquals(len(write.keys()), 1, "keys in %s" % line)
- self.assertEquals(len(write.get("/user1/collection1")), 2, "rights in %s" % line)
- self.assertTrue(write.get("/user1/collection1").count("other2"), "other2 write in %s" % line)
- self.assertTrue(write.get("/user1/collection1").count("other6"), "other6 write in %s" % line)
-
-
-
-
-
-if __name__ == "__main__":
- unittest.main()

0 comments on commit 4c064bc

Please sign in to comment.