Skip to content
Permalink
Browse files
Add CSRF to user, ssh and repo management
  • Loading branch information
ikus060 committed Oct 18, 2021
1 parent 38a1723 commit 0eba3b7857e61f864d38b5e3cfa13344445963f8
@@ -1,5 +1,26 @@
from wtforms.form import Form
# -*- coding: utf-8 -*-
# rdiffweb, A web interface to rdiff-backup repositories
# Copyright (C) 2012-2021 rdiffweb contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import os
from datetime import timedelta

import cherrypy
from wtforms.csrf.session import SessionCSRF
from wtforms.form import Form

SUBMIT_METHODS = {'POST', 'PUT', 'PATCH', 'DELETE'}

@@ -26,14 +47,26 @@ def getlist(self, key):

_AUTO = _ProxyFormdata()

# Provide a default secret
_CSRF_SECRET = os.environ.get('RDIFFWEB_CSRF_SECRET', 'b5WPUOw3Zd7gr1ligLilAAUA9zuWkW41')


class CherryForm(Form):
"""
Custom implementation of WTForm for cherrypy to support kwargs parms.
If ``formdata`` is not specified, this will use cherrypy.request.params
Explicitly pass ``formdata=None`` to prevent this.
"""
class Meta:
csrf = True
csrf_class = SessionCSRF
csrf_secret = _CSRF_SECRET.encode('utf-8')
csrf_time_limit = timedelta(minutes=30)

@property
def csrf_context(self):
return cherrypy.session

def __init__(self, formdata=_AUTO, **kwargs):
super().__init__(formdata=formdata, **kwargs)
@@ -51,3 +84,8 @@ def validate_on_submit(self):
This is a shortcut for ``form.is_submitted() and form.validate()``.
"""
return self.is_submitted() and self.validate()

@property
def error_message(self):
if self.errors:
return ' '.join(['%s: %s' % (field, ', '.join(messages)) for field, messages in self.errors.items()])
@@ -22,22 +22,21 @@
import pwd
import subprocess
import sys
from collections import OrderedDict

import cherrypy
import humanfriendly
import psutil
from wtforms import validators, widgets
from wtforms.fields.core import StringField, SelectField, Field
from wtforms.fields.html5 import EmailField
from wtforms.fields.simple import PasswordField

from rdiffweb.controller import Controller, flash
from rdiffweb.controller.cherrypy_wtf import CherryForm
from rdiffweb.core.config import Option
from rdiffweb.core.i18n import ugettext as _
from rdiffweb.core.quota import QuotaUnsupported
from rdiffweb.core.store import ADMIN_ROLE, MAINTAINER_ROLE, USER_ROLE
from collections import OrderedDict
from wtforms import validators, widgets
from wtforms.fields.core import Field, SelectField, StringField
from wtforms.fields.html5 import EmailField
from wtforms.fields.simple import PasswordField

# Define the logger
logger = logging.getLogger(__name__)
@@ -49,12 +48,18 @@ def get_pyinfo():
yield _('OS Version'), '%s %s (%s %s)' % (platform.system(), platform.release(), distro.linux_distribution()[0].capitalize(), distro.linux_distribution()[1])
except:
yield _('OS Version'), '%s %s' % (platform.system(), platform.release())
if hasattr(os, 'path'): yield _('OS Path'), os.environ['PATH']
if hasattr(sys, 'version'): yield _('Python Version'), ''.join(sys.version)
if hasattr(sys, 'subversion'): yield _('Python Subversion'), ', '.join(sys.subversion)
if hasattr(sys, 'prefix'): yield _('Python Prefix'), sys.prefix
if hasattr(sys, 'executable'): yield _('Python Executable'), sys.executable
if hasattr(sys, 'path'): yield _('Python Path'), ', '.join(sys.path)
if hasattr(os, 'path'):
yield _('OS Path'), os.environ['PATH']
if hasattr(sys, 'version'):
yield _('Python Version'), ''.join(sys.version)
if hasattr(sys, 'subversion'):
yield _('Python Subversion'), ', '.join(sys.subversion)
if hasattr(sys, 'prefix'):
yield _('Python Prefix'), sys.prefix
if hasattr(sys, 'executable'):
yield _('Python Executable'), sys.executable
if hasattr(sys, 'path'):
yield _('Python Path'), ', '.join(sys.path)


def get_osinfo():
@@ -71,7 +76,8 @@ def pw_name(uid):
except:
return

if hasattr(sys, 'getfilesystemencoding'): yield _('File System Encoding'), sys.getfilesystemencoding()
if hasattr(sys, 'getfilesystemencoding'):
yield _('File System Encoding'), sys.getfilesystemencoding()
if hasattr(os, 'getcwd'):
yield _('Current Working Directory'), os.getcwd()
if hasattr(os, 'getegid'):
@@ -167,10 +173,9 @@ class UserForm(CherryForm):
validators=[validators.optional()],
description=_("Disk spaces (in bytes) used by this user."))

@property
def error_message(self):
if self.errors:
return ' '.join(['%s: %s' % (field, ', '.join(messages)) for field, messages in self.errors.items()])

class DeleteUserForm(CherryForm):
username = StringField(_('Username'), validators=[validators.required()])


@cherrypy.tools.is_admin()
@@ -180,6 +185,67 @@ class AdminPage(Controller):
logfile = Option('log_file')
logaccessfile = Option('log_access_file')

def _add_or_edit_user(self, action, form):
assert action in ['add', 'edit']
assert form
# Validate form.
if not form.validate():
flash(form.error_message, level='error')
return
if action == 'add':
user = self.app.store.add_user(form.username.data, form.password.data)
else:
user = self.app.store.get_user(form.username.data)
if form.password.data:
user.set_password(form.password.data, old_password=None)
# Don't allow the user to changes it's "role" state.
if form.username.data != self.app.currentuser.username:
user.role = form.role.data
user.email = form.email.data or ''
if form.user_root.data:
user.user_root = form.user_root.data
if not user.valid_user_root():
flash(_("User's root directory %s is not accessible!") % user.user_root, level='error')
logger.warning("user's root directory %s is not accessible" % user.user_root)
# Try to update disk quota if the human readable value changed.
# Report error using flash.
if form.disk_quota and form.disk_quota.data:
try:
new_quota = form.disk_quota.data
old_quota = humanfriendly.parse_size(humanfriendly.format_size(user.disk_quota, binary=True))
if old_quota != new_quota:
user.disk_quota = new_quota
flash(_("User's quota updated"), level='success')
except QuotaUnsupported as e:
flash(_("Setting user's quota is not supported"), level='warning')
except Exception as e:
flash(_("Failed to update user's quota: %s") % e, level='error')
logger.warning("failed to update user's quota", exc_info=1)
if action == 'add':
flash(_("User added successfully."))
else:
flash(_("User information modified successfully."))

def _delete_user(self, action, form):
assert action == 'delete'
assert form
# Validate form.
if not form.validate():
flash(form.error_message, level='error')
return
if form.username.data == self.app.currentuser.username:
flash(_("You cannot remove your own account!"), level='error')
else:
try:
user = self.app.store.get_user(form.username.data)
if user:
user.delete()
flash(_("User account removed."))
else:
flash(_("User doesn't exists!"), level='warning')
except ValueError as e:
flash(e, level='error')

def _get_log_files(self):
"""
Return a list of log files to be shown in admin area.
@@ -226,57 +292,10 @@ def users(self, criteria=u"", search=u"", action=u"", **kwargs):

# If we're just showing the initial page, just do that
form = UserForm()
if action in ["add", "edit"] and not form.validate():
# Display the form error to user using flash
flash(form.error_message, level='error')

elif action in ["add", "edit"] and form.validate():
if action == 'add':
user = self.app.store.add_user(form.username.data, form.password.data)
else:
user = self.app.store.get_user(form.username.data)
if form.password.data:
user.set_password(form.password.data, old_password=None)
# Don't allow the user to changes it's "role" state.
if form.username.data != self.app.currentuser.username:
user.role = form.role.data
user.email = form.email.data or ''
if form.user_root.data:
user.user_root = form.user_root.data
if not user.valid_user_root():
flash(_("User's root directory %s is not accessible!") % user.user_root, level='error')
logger.warning("user's root directory %s is not accessible" % user.user_root)
# Try to update disk quota if the human readable value changed.
# Report error using flash.
if form.disk_quota and form.disk_quota.data:
try:
new_quota = form.disk_quota.data
old_quota = humanfriendly.parse_size(humanfriendly.format_size(user.disk_quota, binary=True))
if old_quota != new_quota:
user.disk_quota = new_quota
flash(_("User's quota updated"), level='success')
except QuotaUnsupported as e:
flash(_("Setting user's quota is not supported"), level='warning')
except Exception as e:
flash(_("Failed to update user's quota: %s") % e, level='error')
logger.warning("failed to update user's quota", exc_info=1)
if action == 'add':
flash(_("User added successfully."))
else:
flash(_("User information modified successfully."))
if action in ["add", "edit"]:
self._add_or_edit_user(action, form)
elif action == 'delete':
if form.username.data == self.app.currentuser.username:
flash(_("You cannot remove your own account!"), level='error')
else:
try:
user = self.app.store.get_user(form.username.data)
if user:
user.delete()
flash(_("User account removed."))
else:
flash(_("User doesn't exists!"), level='warning')
except ValueError as e:
flash(e, level='error')
self._delete_user(action, DeleteUserForm())

params = {
"form": UserForm(formdata=None),
@@ -24,31 +24,42 @@
import logging

import cherrypy
from rdiffweb.controller import Controller, validate
from rdiffweb.controller import Controller, flash
from rdiffweb.controller.cherrypy_wtf import CherryForm
from rdiffweb.controller.dispatch import poppath
from rdiffweb.controller.filter_authorization import is_maintainer

from rdiffweb.core.i18n import ugettext as _
from wtforms import validators
from wtforms.fields.core import StringField

_logger = logging.getLogger(__name__)


class DeleteRepoForm(CherryForm):
confirm = StringField(_('Confirmation'), validators=[validators.required()])
redirect = StringField(default='/')


@poppath()
class DeletePage(Controller):

@cherrypy.expose
def default(self, path=b"", confirm=None, redirect='/', **kwargs):
def default(self, path=b"", **kwargs):
# Check permissions on path/repo
unused, path_obj = self.app.store.get_repo_path(path)
# Check user's permissions
is_maintainer()

# validate form
form = DeleteRepoForm()
if not form.validate():
raise cherrypy.HTTPError(400, form.error_message)

# Validate the name
validate(confirm)
if confirm != path_obj.display_name:
_logger.info("do not delete repo, bad confirmation %r != %r", confirm, path_obj.display_name)
raise cherrypy.HTTPError(400)
if form.confirm.data != path_obj.display_name:
_logger.info("do not delete repo, bad confirmation %r != %r", form.confirm.data, path_obj.display_name)
raise cherrypy.HTTPError(400, 'bad confirmation')

# Delete repository
self.app.scheduler.add_task(path_obj.delete, args=[])

raise cherrypy.HTTPRedirect(redirect)
raise cherrypy.HTTPRedirect(form.redirect.data)

0 comments on commit 0eba3b7

Please sign in to comment.