diff --git a/lemur/certificates/service.py b/lemur/certificates/service.py index 6643969c9..f2d798552 100644 --- a/lemur/certificates/service.py +++ b/lemur/certificates/service.py @@ -19,6 +19,7 @@ from lemur.notifications.models import Notification from lemur.authorities.models import Authority from lemur.domains.models import Domain +from lemur.users.models import View from lemur.roles.models import Role from lemur.roles import service as role_service @@ -129,6 +130,19 @@ def update(cert_id, owner, description, notify, destinations, notifications, rep return database.update(cert) +def log_private_key_view(certificate, user): + """ + Creates a record each time a certificates private key is viewed. + + :param certificate: + :param user: + :return: + """ + view = View(user=user) + certificate.views.append(view) + database.commit() + + def create_certificate_roles(**kwargs): # create an role for the owner and assign it owner_role = role_service.get_by_name(kwargs['owner']) diff --git a/lemur/certificates/views.py b/lemur/certificates/views.py index 8a1a2ee27..cdf781786 100644 --- a/lemur/certificates/views.py +++ b/lemur/certificates/views.py @@ -437,16 +437,18 @@ def get(self, certificate_id): if not cert: return dict(message="Cannot find specified certificate"), 404 - owner_role = role_service.get_by_name(cert.owner) - permission = CertificatePermission(cert.id, owner_role, [x.name for x in cert.roles]) + if not g.current_user.is_admin: + owner_role = role_service.get_by_name(cert.owner) + permission = CertificatePermission(cert.id, owner_role, [x.name for x in cert.roles]) - if permission.can(): - response = make_response(jsonify(key=cert.private_key), 200) - response.headers['cache-control'] = 'private, max-age=0, no-cache, no-store' - response.headers['pragma'] = 'no-cache' - return response + if not permission.can(): + return dict(message='You are not authorized to view this key'), 403 - return dict(message='You are not authorized to view this key'), 403 + service.log_private_key_view(cert, g.current_user) + response = make_response(jsonify(key=cert.private_key), 200) + response.headers['cache-control'] = 'private, max-age=0, no-cache, no-store' + response.headers['pragma'] = 'no-cache' + return response class Certificates(AuthenticatedResource): @@ -908,36 +910,32 @@ def post(self, certificate_id, data=None): """ cert = service.get(certificate_id) - owner_role = role_service.get_by_name(cert.owner) - permission = CertificatePermission(cert.id, owner_role, [x.name for x in cert.roles]) + if not cert: + return dict(message="Cannot find specified certificate"), 404 - options = data['plugin']['plugin_options'] plugin = data['plugin']['plugin_object'] if plugin.requires_key: - if cert.private_key: - if permission.can(): - extension, passphrase, data = plugin.export(cert.body, cert.chain, cert.private_key, options) - else: - return dict(message='You are not authorized to export this certificate.'), 403 - else: - return dict(message='Unable to export certificate, plugin: {0} requires a private key but no key was found.'.format(plugin.slug)) - else: - extension, passphrase, data = plugin.export(cert.body, cert.chain, cert.private_key, options) + if not cert.private_key: + return dict( + message='Unable to export certificate, plugin: {0} requires a private key but no key was found.'.format( + plugin.slug)) - # we take a hit in message size when b64 encoding - return dict(extension=extension, passphrase=passphrase, data=base64.b64encode(data).decode('utf-8')) + else: + if not g.current_user.is_admin: + owner_role = role_service.get_by_name(cert.owner) + permission = CertificatePermission(cert.id, owner_role, [x.name for x in cert.roles]) + if not permission.can(): + return dict(message='You are not authorized to export this certificate.'), 403 -class CertificateClone(AuthenticatedResource): - def __init__(self): - self.reqparse = reqparse.RequestParser() - super(CertificateExport, self).__init__() + options = data['plugin']['plugin_options'] - @validate_schema(None, certificate_output_schema) - def get(self, certificate_id): + service.log_private_key_view(cert, g.current_user) + extension, passphrase, data = plugin.export(cert.body, cert.chain, cert.private_key, options) - pass + # we take a hit in message size when b64 encoding + return dict(extension=extension, passphrase=passphrase, data=base64.b64encode(data).decode('utf-8')) api.add_resource(CertificatesList, '/certificates', endpoint='certificates') @@ -946,7 +944,6 @@ def get(self, certificate_id): api.add_resource(CertificatesUpload, '/certificates/upload', endpoint='certificateUpload') api.add_resource(CertificatePrivateKey, '/certificates//key', endpoint='privateKeyCertificates') api.add_resource(CertificateExport, '/certificates//export', endpoint='exportCertificate') -api.add_resource(CertificateClone, '/certificates//clone', endpoint='cloneCertificate') api.add_resource(NotificationCertificatesList, '/notifications//certificates', endpoint='notificationCertificates') api.add_resource(CertificatesReplacementsList, '/certificates//replacements', diff --git a/lemur/factory.py b/lemur/factory.py index 28f48a983..d121b3c13 100644 --- a/lemur/factory.py +++ b/lemur/factory.py @@ -94,14 +94,39 @@ def configure_app(app, config=None): if config and config != 'None': app.config.from_object(from_file(config)) - try: - app.config.from_envvar("LEMUR_CONF") - except RuntimeError: - # look in default paths - if os.path.isfile(os.path.expanduser("~/.lemur/lemur.conf.py")): - app.config.from_object(from_file(os.path.expanduser("~/.lemur/lemur.conf.py"))) - else: - app.config.from_object(from_file(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default.conf.py'))) + else: + try: + app.config.from_envvar("LEMUR_CONF") + except RuntimeError: + # look in default paths + if os.path.isfile(os.path.expanduser("~/.lemur/lemur.conf.py")): + app.config.from_object(from_file(os.path.expanduser("~/.lemur/lemur.conf.py"))) + else: + app.config.from_object(from_file(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default.conf.py'))) + + validate_conf(app) + + +def validate_conf(app): + """ + There are a few configuration variables that are 'required' by Lemur. Here + we validate those required variables are set. + """ + required_vars = [ + 'LEMUR_SECURITY_TEAM_EMAIL', + 'LEMUR_DEFAULT_ORGANIZATIONAL_UNIT', + 'LEMUR_DEFAULT_ORGANIZATION', + 'LEMUR_DEFAULT_LOCATION', + 'LEMUR_DEFAULT_COUNTRY', + 'LEMUR_DEFAULT_STATE', + 'SQLALCHEMY_DATABASE_URI' + ] + + for var in required_vars: + if not app.config.get(var): + raise Exception("Required variable {var} is not set, ensure that it is set in Lemur's configuration file".format( + var=var + )) def configure_extensions(app): diff --git a/lemur/tests/test_certificates.py b/lemur/tests/test_certificates.py index 5fb8d0489..a3fe3f99b 100644 --- a/lemur/tests/test_certificates.py +++ b/lemur/tests/test_certificates.py @@ -423,6 +423,21 @@ def test_upload_private_key_str(user): assert cert +def test_private_key_audit(client, certificate): + assert len(certificate.views) == 0 + client.get(api.url_for(CertificatePrivateKey, certificate_id=certificate.id), headers=VALID_ADMIN_HEADER_TOKEN) + assert len(certificate.views) == 1 + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 403), + (VALID_ADMIN_HEADER_TOKEN, 200), + ('', 401) +]) +def test_certificate_get_private_key(client, token, status): + assert client.get(api.url_for(Certificates, certificate_id=1), headers=token).status_code == status + + @pytest.mark.parametrize("token,status", [ (VALID_USER_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200), diff --git a/lemur/users/models.py b/lemur/users/models.py index 62165cb5e..18283b20f 100644 --- a/lemur/users/models.py +++ b/lemur/users/models.py @@ -8,11 +8,11 @@ .. moduleauthor:: Kevin Glisson """ -import sys from sqlalchemy.orm import relationship -from sqlalchemy import Column, Integer, String, Boolean, DateTime +from sqlalchemy import Integer, ForeignKey, String, PassiveDefault, func, Column, Boolean from sqlalchemy.event import listen +from sqlalchemy_utils.types.arrow import ArrowType from lemur.database import db from lemur.models import roles_users @@ -37,7 +37,7 @@ class User(db.Model): id = Column(Integer, primary_key=True) password = Column(String(128)) active = Column(Boolean()) - confirmed_at = Column(DateTime()) + confirmed_at = Column(ArrowType()) username = Column(String(255), nullable=False, unique=True) email = Column(String(128), unique=True) profile_picture = Column(String(255)) @@ -63,11 +63,7 @@ def hash_password(self): :return: """ if self.password: - if sys.version_info[0] >= 3: - self.password = bcrypt.generate_password_hash(self.password).decode('utf-8') - else: - self.password = bcrypt.generate_password_hash(self.password) - return self.password + self.password = bcrypt.generate_password_hash(self.password).decode('utf-8') @property def is_admin(self): @@ -85,4 +81,14 @@ def __repr__(self): return "User(username={username})".format(username=self.username) +class View(db.Model): + __tablename__ = 'views' + id = Column(Integer, primary_key=True) + certificate_id = Column(Integer, ForeignKey('certificates.id')) + certificates = relationship("Certificate", backref='views') + viewed_at = Column(ArrowType(), PassiveDefault(func.now()), nullable=False) + user_id = Column(Integer, ForeignKey('users.id')) + users = relationship("User", backref="views") + + listen(User, 'before_insert', hash_password)