Skip to content

Commit

Permalink
Test script for ipa-custodia
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Heimes <cheimes@redhat.com>
  • Loading branch information
tiran committed Nov 2, 2017
1 parent 59802d3 commit 4dbbedf
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 0 deletions.
1 change: 1 addition & 0 deletions freeipa.spec.in
Expand Up @@ -1312,6 +1312,7 @@ fi
%{_libexecdir}/certmonger/ipa-server-guard
%dir %{_libexecdir}/ipa
%{_libexecdir}/ipa/ipa-custodia
%{_libexecdir}/ipa/ipa-custodia-check
%{_libexecdir}/ipa/ipa-dnskeysyncd
%{_libexecdir}/ipa/ipa-dnskeysync-replica
%{_libexecdir}/ipa/ipa-ods-exporter
Expand Down
1 change: 1 addition & 0 deletions install/tools/Makefile.am
Expand Up @@ -34,6 +34,7 @@ dist_sbin_SCRIPTS = \
appdir = $(libexecdir)/ipa/
dist_app_SCRIPTS = \
ipa-custodia \
ipa-custodia-check \
ipa-httpd-kdcproxy \
ipa-pki-retrieve-key \
$(NULL)
282 changes: 282 additions & 0 deletions install/tools/ipa-custodia-check
@@ -0,0 +1,282 @@
#!/usr/bin/env python2
"""Test client for ipa-custodia
The test script is expected to be executed on an IPA server with existing
Custodia server keys.
"""
from __future__ import print_function
import argparse
import logging
import os
import platform
import socket
import warnings

from custodia.message.kem import KEY_USAGE_SIG, KEY_USAGE_ENC, KEY_USAGE_MAP

from jwcrypto.common import json_decode
from jwcrypto.jwk import JWK

from ipalib import api
from ipaplatform.paths import paths
import ipapython.version
try:
# FreeIPA >= 4.5
from ipaserver.secrets.client import CustodiaClient
except ImportError:
# FreeIPA <= 4.4
from ipapython.secrets.client import CustodiaClient

# Ignore security warning from vendored and non-vendored urllib3
try:
from urllib3.exceptions import SecurityWarning
except ImportError:
SecurityWarning = None
else:
warnings.simplefilter("ignore", SecurityWarning)

try:
from requests.packages.urllib3.exceptions import SecurityWarning
except ImportError:
SecurityWarning = None
else:
warnings.simplefilter("ignore", SecurityWarning)


KEYS = [
'dm/DMHash',
'ra/ipaCert',
'ca/auditSigningCert cert-pki-ca',
'ca/caSigningCert cert-pki-ca',
'ca/ocspSigningCert cert-pki-ca',
'ca/subsystemCert cert-pki-ca',
]

IPA_CUSTODIA_KEYFILE = os.path.join(paths.IPA_CUSTODIA_CONF_DIR,
'server.keys')


logger = logging.getLogger('ipa-custodia-tester')


parser = argparse.ArgumentParser(
"IPA Custodia check",
)
# --store is dangerous and therefore hidden! Don't use it unless you really
# know what you are doing! Keep in mind that it might destroy your NSSDB
# unless it uses sqlite format.
parser.add_argument(
"--store", action='store_true', dest='store',
help=argparse.SUPPRESS
)
parser.add_argument(
"--debug", action='store_true',
help="Debug mode"
)
parser.add_argument(
"--verbose", action='store_true',
help='Verbose mode'
)
parser.add_argument(
"server",
help="FQDN of a IPA server (can be own FQDN for self-test)"
)
parser.add_argument(
'keys', nargs='*', default=KEYS,
help="Remote key ({})".format(', '.join(KEYS))
)


class IPACustodiaTester(object):
files = [
paths.IPA_DEFAULT_CONF,
paths.KRB5_KEYTAB,
paths.IPA_CUSTODIA_CONF,
IPA_CUSTODIA_KEYFILE
]

def __init__(self, parser, args):
self.parser = parser
self.args = args
if not api.isdone('bootstrap'):
# bootstrap to initialize api.env
api.bootstrap()
self.debug("IPA API bootstrapped")
self.realm = api.env.realm
self.host = api.env.host
self.host_spn = 'host/{}@{}'.format(self.host, self.realm)
self.server_spn = 'host/{}@{}'.format(self.args.server, self.realm)
self.client = None
self._errors = []

def error(self, msg, fatal=False):
self._errors.append(msg)
logger.error(msg, exc_info=self.args.verbose)
if fatal:
self.exit()

def exit(self):
if self._errors:
self.parser.exit(1, "[ERROR] One or more tests have failed.\n")
else:
self.parser.exit(0, "All tests have passed successfully.\n")

def warning(self, msg):
logger.warning(msg)

def info(self, msg):
logger.info(msg)

def debug(self, msg):
logger.debug(msg)

def check(self):
self.status()
self.check_fqdn()
self.check_files()
self.check_client()
self.check_jwk()
self.check_keys()

def status(self):
self.info("Platform: {}".format(platform.platform()))
self.info("IPA version: {}".format(
ipapython.version.VERSION
))
self.info("IPA vendor version: {}".format(
ipapython.version.VENDOR_VERSION
))
self.info("Realm: {}".format(self.realm))
self.info("Host: {}".format(self.host))
self.info("Remote server: {}".format(self.args.server))
if self.host == self.args.server:
self.warning("Performing self-test only.")

def check_fqdn(self):
fqdn = socket.getfqdn()
if self.host != fqdn:
self.warning(
"socket.getfqdn() reports hostname '{}'".format(fqdn)
)

def check_files(self):
for filename in self.files:
if not os.path.isfile(filename):
self.error("File '{0}' is missing.".format(filename))
else:
self.info("File '{0}' exists.".format(filename))

def check_client(self):
try:
self.client = CustodiaClient(
server=self.args.server,
client_service='host@{}'.format(self.host),
keyfile=IPA_CUSTODIA_KEYFILE,
keytab=paths.KRB5_KEYTAB,
realm=self.realm,
)
except Exception as e:
self.error("Failed to create client: {}".format(e), fatal=True)
else:
self.info("Custodia client created.")

def _check_jwk_single(self, usage_id):
usage = KEY_USAGE_MAP[usage_id]
with open(IPA_CUSTODIA_KEYFILE) as f:
dictkeys = json_decode(f.read())

try:
pkey = JWK(**dictkeys[usage_id])
local_pubkey = json_decode(pkey.export_public())
except Exception:
self.error("Failed to load and parse local JWK.", fatal=True)
else:
self.info("Loaded key for usage '{}' from '{}'.".format(
usage, IPA_CUSTODIA_KEYFILE
))

if pkey.key_id != self.host_spn:
self.error(
"KID '{}' != host service principal name '{}' "
"(usage: {})".format(pkey.key_id, self.host_spn, usage),
fatal=True
)
else:
self.info(
"JWK KID matches host's service principal name '{}'.".format(
self.host_spn
))

# LDAP doesn't contain KID
local_pubkey.pop("kid", None)
find_key = self.client.ikk.find_key
try:
host_pubkey = json_decode(find_key(self.host_spn, usage_id))
except Exception:
self.error("Fetching host keys {} (usage: {}) failed.".format(
self.host_spn, usage), fatal=True)
else:
self.info("Checked host LDAP keys '{}' for usage {}.".format(
self.host_spn, usage
))

if host_pubkey != local_pubkey:
self.debug("LDAP: '{}'".format(host_pubkey))
self.debug("Local: '{}'".format(local_pubkey))
self.error(
"Host key in LDAP does not match local key.", fatal=True)
else:
self.info(
"Local key for usage '{}' matches key in LDAP.".format(usage)
)

try:
server_pubkey = json_decode(find_key(self.server_spn, usage_id))
except Exception:
self.error("Fetching server keys {} (usage: {}) failed.".format(
self.server_spn, usage), fatal=True)
else:
self.info("Checked server LDAP keys '{}' for usage {}.".format(
self.server_spn, usage
))

return local_pubkey, host_pubkey, server_pubkey

def check_jwk(self):
self._check_jwk_single(KEY_USAGE_SIG)
self._check_jwk_single(KEY_USAGE_ENC)

def check_keys(self):
for key in self.args.keys:
try:
result = self.client.fetch_key(key, store=self.args.store)
except Exception as e:
self.error("Failed to retrieve key '{}': {}.".format(
key, e
))
else:
self.info("Successfully retrieved '{}'.".format(key))
if not self.args.store:
self.debug(result)


def main():
args = parser.parse_args()
if args.debug:
args.verbose = True

if os.geteuid() != 0:
parser.error("Script must be executed as root.\n")
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format='[%(asctime)s %(name)s] <%(levelname)s>: %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S',
)

tester = IPACustodiaTester(parser, args)
tester.check()
tester.exit()


if __name__ == '__main__':
main()

0 comments on commit 4dbbedf

Please sign in to comment.