Permalink
Browse files

Remove security vulnerability associated with repository configuration (

#382)

* Allow KnowledgePostProcessors to be instantiated with runtime configuration.

* Make functions in repository configuration take the repository as the first argument.

* Extend default repository configuration so that runtime Python execution is not necessary.

* Make GitRepositories load configuration (only) from YAML files.

* Update documentation for changes.
  • Loading branch information...
matthewwardrop committed Mar 24, 2018
1 parent f8e8dc4 commit 5d6668206f0b3fa90c091e682b93867460501f11
View
@@ -1,3 +1,4 @@
include knowledge_repo/config_defaults.yml
recursive-include knowledge_repo/templates *
recursive-include knowledge_repo/app/templates *
recursive-include knowledge_repo/app/static *
View
@@ -21,20 +21,12 @@ this stage, two backends have been fully implemented:
- `DbKnowledgeRepository`: A repository backed by a database backend (most
databases supported by SQLAlchemy can be used).
All backends also allow configuration using a Python configuration file
at '/.knowledge_repo_config' within the repository. A template for creating this
file is available `here <repo_config_>`__, if one does not already exist or
All backends also allow configuration using a YAML configuration file at
'/.knowledge_repo_config.yml' within the repository. A template for creating
this file is available `here <repo_config_>`__, if one does not already exist or
the repository configuration has grown out of sync with upstream changes.
.. _`repo_config`: https://github.com/airbnb/knowledge-repo/blob/master/knowledge_repo/config_defaults.py
.. warning::
The `.knowledge_repo_config` file is executed at runtime on server and on
client machines. As such, a malicious user with appropriate permissions could
submit arbitrary code that will be executed at runtime on users' machines.
Using the Knowledge Repo in non-trusted environments is not recommended
until this is remedied.
.. _`repo_config`: https://github.com/airbnb/knowledge-repo/blob/master/knowledge_repo/config_defaults.yml
Git Knowledge Repositories
^^^^^^^^^^^^^^^^^^^^^^^^^^
@@ -25,6 +25,7 @@
'pygments', # Code highlighting support in markdown
'gitpython', # Git abstraction
'tabulate', # Rendering information prettily in knowledge_repo script
'pyaml', # Used to configure knowledge repositories
# Flask App Dependencies
'flask', # Main flask framework
View
@@ -1,23 +1,28 @@
import functools
import imp
import types
import logging
import os
import time
import types
logger = logging.getLogger(__name__)
class KnowledgeRepositoryConfig(dict):
def __init__(self, *args, **kwargs):
def __init__(self, repo, *args, **kwargs):
self._repo = repo
super(KnowledgeRepositoryConfig, self).__init__(*args, **kwargs)
self.DEFAULT_CONFIGURATION = {}
def __getitem__(self, key):
try:
return super(KnowledgeRepositoryConfig, self).__getitem__(key)
except:
return self.DEFAULT_CONFIGURATION[key]
value = super(KnowledgeRepositoryConfig, self).__getitem__(key)
except KeyError:
value = self.DEFAULT_CONFIGURATION[key]
if isinstance(value, types.FunctionType):
value = functools.partial(value, self._repo)
return value
def __getattr__(self, attr):
return self[attr]
@@ -31,9 +36,9 @@ def __dir__(self):
def update(self, *values, **kwargs):
for value in values:
if isinstance(value, dict):
if 'DEFAULT_CONFIGURATION' in value:
value = value.copy()
value.pop('DEFAULT_CONFIGURATION')
value = value.copy()
value.pop('DEFAULT_CONFIGURATION', None)
value.pop('_repo', None)
dict.update(self, value)
elif isinstance(value, types.ModuleType):
self.__update_from_module(value)
@@ -79,8 +84,13 @@ def __update_from_module(self, module):
self.__set_from_module(self, module)
def __set_from_file(self, d, filename, force=False):
config = imp.load_source(u'knowledge_repo.config_{}'.format(str(time.time()).replace('.', '')), filename)
self.__set_from_module(d, config, force)
if filename.endswith('.py'):
config = imp.load_source(u'knowledge_repo.config_{}'.format(str(time.time()).replace('.', '')), filename)
self.__set_from_module(d, config, force)
elif filename.endswith('.yml'):
with open(filename) as f:
config = yaml.load(f)
self.update(config)
def __set_from_module(self, d, module, force=False):
for key in dir(module):
@@ -1,13 +1,31 @@
# Configuration for this knowledge data repository.
# Default configuration for knowledge repositories
import re
# A function called to see whether a specified path is permitted in the repository
# Only enforced when creating/modifying posts. It should return the path as a standard
# unix path (virtual folders separated by '/' and the final node should end in '.kp').
# It should raise an exception if the provided path is is not permitted in the knowledge
# repository.
def path_parse(path):
return path
# repository. A default implementation is provided using `path_patterns`, which
# can be provided more readily in a YAML configuration file.
def path_parse(repo, path):
if not path.endswith('.kp'):
path += '.kp'
for pattern in repo.config.path_patterns:
if re.match(pattern, path):
return path
raise ValueError(
"Provided path '{path}' does not match any of the following patterns:\n" +
'\n'.join("'{}': {}".format(pattern, desc) for pattern, desc in repo.config.path_patterns.items())
)
# The accepted path patterns should be provided as a dictionary mapping regex
# patterns to descriptions of that pattern.
path_patterns = {
'.*': "Any path is valid."
}
# A dictionary of aliases which point to knowledge posts. This allows you to alias posts
@@ -20,8 +38,9 @@ def path_parse(path):
# Postprocessors to apply when importing KnowledgePost objects into the repository.
# Note that KnowledgePost objects by default run 'extract_images' and 'format_checks'.
# Order is important.
postprocessors = []
# Order is important. Should be a list of tuples, of form:
# ('name of postprocessor', {'init_kwarg': 'value'})
postprocessors = {}
# Usernames of users to keep informed of changes to the knowledge data repo
@@ -30,33 +49,69 @@ def path_parse(path):
# Function to check whether provided username is a valid username, and if not, mutate it
# such that it is. Should raise an exception if that is not possible, and otherwise
# return the parsed username.
def username_parse(username):
# return the parsed username. A default implementation is provided using
# `username_pattern`, which can be provided more readily in a YAML configuration
# file.
def username_parse(repo, username):
if not re.match(repo.config.username_pattern, username):
raise ValueError(
"Username '{}' does not follow the required pattern: '{}'"
.format(repo.config.username_pattern)
)
return username
# Function to convert a username to a person's proper name
def username_to_name(username):
return username
# A regex pattern to which valid usernames must adhere
username_pattern = '.*'
# Function to convert a username to a person's proper name. A default
# implementation is provided using `username_to_name_pattern`, which can be
# provided more readily in a YAML configuration file.
def username_to_name(repo, username):
m = re.match(repo.config.username_to_name_pattern[0], username)
return repo.config.username_to_name_pattern[1].format(**m.groupdict())
# A tuple/list of strings, the first being a regex with named groups, and the
# second being a format string with the group names available.
username_to_name_pattern = ('(?P<username>.*)', '{username}')
# Function to convert a username to a person's email
def username_to_email(username):
return u'{}@example.com'.format(username)
# Function to convert a username to a person's email. A default implementation
# is provided using `username_to_email_pattern`, which can be provided more
# readily in a YAML configuration file.
def username_to_email(repo, username):
m = re.match(repo.config.username_to_email_pattern[0], username)
return repo.config.username_to_email_pattern[1].format(**m.groupdict())
# Function to generate the web uri for a knowledge post at
# path `path`. If `None`, should return the web uri for
# the entire repository. Return `None` if a web uri does
# not exist.
def web_uri(path=None):
# A tuple/list of strings, the first being a regex with named groups, and the
# second being a format string with the group names available.
username_to_email_pattern = ('(?P<username>.*)', '{username}@example.com')
# Function to generate the web uri for a knowledge post at path `path`. If
# `None`, should return the web uri for the entire repository. Return `None` if
# a web uri does not exist. A default implementation is provided using
# `web_uri_base`, which can be provided more readily in a YAML configuration
# file.
def web_uri(repo, path=None):
if repo.config.web_uri_base:
if path:
return '/'.join([repo.config.web_uri_base, 'post', path])
return repo.config.web_uri_base
return None
# The base url of the server hosting this repository.
web_uri_base = None
# WARNING: ADVANCED!
# Function that is called on a Flask web app instance before it is launched
# This is useful if you need to add a route for your local deployment, or other
# equally invasive action. Not recommended unless you know exactly what you are doing.
# It must return a KnowledgeFlask app instance.
def prepare_app(app):
def prepare_app(repo, app):
return app
@@ -25,7 +25,7 @@ def __init__(self, kp, format=None, postprocessors=None, **kwargs):
self.kp = kp
self.format = format
if postprocessors is None:
postprocessors = ['extract_images', 'format_checks']
postprocessors = [('extract_images', {}), ('format_checks', {})]
self.postprocessors = postprocessors
self.init(**kwargs)
@@ -49,8 +49,8 @@ def wrapped(*args, **kwargs):
if postprocessors is None:
postprocessors = []
for postprocessor in postprocessors:
KnowledgePostProcessor._get_subclass_for(postprocessor).process(kp)
for postprocessor, kwargs in postprocessors:
KnowledgePostProcessor._get_subclass_for(postprocessor)(**kwargs).process(kp)
return kp
return wrapped
View
@@ -337,7 +337,7 @@ def is_valid(self):
if not self._has_ref('knowledge.md'):
return False
try:
FormatChecks.process(self)
FormatChecks().process(self)
except:
return False
return True
@@ -6,6 +6,5 @@
class KnowledgePostProcessor(with_metaclass(SubclassRegisteringABCMeta, object)):
_registry_keys = []
@classmethod
def process(cls, kp):
def process(self, kp):
return None
@@ -11,15 +11,13 @@
class ExtractImages(KnowledgePostProcessor):
_registry_keys = ['extract_images']
@classmethod
def process(cls, kp):
images = cls.find_images(kp.read())
image_mapping = cls.collect_images(kp, images)
cls.update_thumbnail_uri(kp, images, image_mapping)
cls.cleanup(kp)
@classmethod
def update_thumbnail_uri(cls, kp, images, image_mapping):
def process(self, kp):
images = self.find_images(kp.read())
image_mapping = self.collect_images(kp, images)
self.update_thumbnail_uri(kp, images, image_mapping)
self.cleanup(kp)
def update_thumbnail_uri(self, kp, images, image_mapping):
thumbnail = kp.headers.get('thumbnail', 0)
# if thumbnail is a number, select the nth image in this post as the thumbnail
@@ -35,84 +33,77 @@ def update_thumbnail_uri(cls, kp, images, image_mapping):
thumbnail = None
# if thumbnail is a url, copy it locally to the post unless already collected during collection
if thumbnail and not cls.skip_image(kp, {'src': thumbnail}):
if thumbnail and not self.skip_image(kp, {'src': thumbnail}):
orig_path = os.path.join(kp.orig_context, os.path.expanduser(thumbnail))
if thumbnail in image_mapping:
thumbnail = image_mapping[thumbnail]
elif os.path.exists(orig_path):
thumbnail = cls.copy_image(kp, orig_path)
thumbnail = self.copy_image(kp, orig_path)
else:
logger.warning("Could not find a thumbnail image at: {}".format(thumbnail))
# update post headers to point to new thumbnail image
kp.update_headers(thumbnail=thumbnail)
@classmethod
def find_images(cls, md):
def find_images(self, md):
images = []
images.extend(cls.collect_images_for_pattern(
images.extend(self.collect_images_for_pattern(
md,
# Match all <img /> tags with attributes of form <name>(="<value>") with optional quotes (can be single or double)
# The src attribute is exected to always be surrounded by quotes.
r'<img\s+(?:\w+(?:=([\'\"])?(?(1)(?:(?!\1).)*?\1|[^>]*?))?\s+?)*src=([\'\"])(?P<src>(?:(?!\2).)*?)\2(?:\s+\w+(?:=([\'\"])?(?(1)(?:(?!\4).)*?\4|[^>]*?))?)*\s*\/?>'
))
images.extend(cls.collect_images_for_pattern(md, r'\!\[.*\]\((?P<src>[^\)]*)\)'))
images.extend(self.collect_images_for_pattern(md, r'\!\[.*\]\((?P<src>[^\)]*)\)'))
return sorted(images, key=lambda x: x['offset'])
@classmethod
def collect_images_for_pattern(cls, md, pattern=None):
def collect_images_for_pattern(self, md, pattern=None):
p = re.compile(pattern)
return [{'offset': m.start(), 'tag': m.group(0), 'src': m.group('src')} for m in p.finditer(md)]
@classmethod
def collect_images(cls, kp, images):
def collect_images(self, kp, images):
image_mapping = {}
if len(images) == 0:
return image_mapping
md = kp.read()
images = images[::-1]
for image in images:
if cls.skip_image(kp, image):
if self.skip_image(kp, image):
continue
orig_path = os.path.join(kp.orig_context, os.path.expanduser(image['src']))
new_path = None
if image['src'] in image_mapping:
new_path = image_mapping[image['src']]
elif kp._has_ref(image['src']):
new_path = cls.copy_image(kp, image['src'], is_ref=True)
new_path = self.copy_image(kp, image['src'], is_ref=True)
elif os.path.exists(orig_path):
new_path = cls.copy_image(kp, orig_path)
new_path = self.copy_image(kp, orig_path)
else:
logger.warning("Could not find an image at: {}".format(image['src']))
if not new_path:
continue
image_mapping[image['src']] = new_path
md = cls.replace_image_locations(md, image['offset'], image['tag'], image['src'], new_path)
md = self.replace_image_locations(md, image['offset'], image['tag'], image['src'], new_path)
kp.write(md)
return image_mapping
@classmethod
def skip_image(cls, kp, image):
def skip_image(self, kp, image):
if re.match('http[s]?://', image['src']):
return True
if image['src'].startswith('images/') and image['src'] in kp.image_paths:
return True
return False
@classmethod
def copy_image(cls, kp, path, is_ref=False):
def copy_image(self, kp, path, is_ref=False):
if is_ref:
return
with open(path, 'rb') as f:
kp.write_image(os.path.basename(path), f.read())
return posixpath.join('images', os.path.basename(path))
@classmethod
def replace_image_locations(cls, md, offset, match, old_path, new_path):
def replace_image_locations(self, md, offset, match, old_path, new_path):
pre = md[:offset]
post = md[offset + len(match):]
return pre + match.replace(old_path, new_path) + post
@classmethod
def cleanup(cls, kp):
def cleanup(self, kp):
pass
Oops, something went wrong.

0 comments on commit 5d66682

Please sign in to comment.