Skip to content

Commit

Permalink
Don't error in DogtagCertsConnectivityCheck with external CAs
Browse files Browse the repository at this point in the history
The purpose of the check is to validate that communication
with the CA works. In the past we looked up serial number 1
for this check. The problem is that if the server was
installed with RSNv3 so had no predictable CA serial number.

It also was broken with externally-issued CA certificate which
cannot be looked up in IPA.

Instead use the IPA RA agent certificate which should definitely
have a serial number in the IPA CA if one is configured.

Fixes: #285

Signed-off-by: Rob Crittenden <rcritten@redhat.com>
  • Loading branch information
rcritten committed Jul 19, 2023
1 parent 18178ba commit 29855ec
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 181 deletions.
45 changes: 13 additions & 32 deletions src/ipahealthcheck/dogtag/ca.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
from ipalib import api, errors, x509
from ipaplatform.paths import paths
from ipaserver.install import certs
from ipaserver.install import ca
from ipaserver.install import krainstance
from ipapython.directivesetter import get_directive
from ipapython.dn import DN
from cryptography.hazmat.primitives.serialization import Encoding

logger = logging.getLogger()
Expand Down Expand Up @@ -95,6 +93,10 @@ def check(self):
class DogtagCertsConnectivityCheck(DogtagPlugin):
"""
Test basic connectivity by using cert-show to fetch a cert
The RA agent certificate is used because if a CA is configured we
know this certificate should exist. Use its serial number to do
the lookup.
"""
requires = ('dirsrv',)

Expand All @@ -104,59 +106,38 @@ def check(self):
logger.debug('CA is not configured, skipping connectivity check')
return

config = api.Command.config_show()

subject_base = config['result']['ipacertificatesubjectbase'][0]
ipa_subject = ca.lookup_ca_subject(api, subject_base)
try:
certs = x509.load_certificate_list_from_file(paths.IPA_CA_CRT)
cert = x509.load_certificate_from_file(paths.RA_AGENT_PEM)
except Exception as e:
yield Result(self, constants.ERROR,
key='ipa_ca_crt_file_missing',
path=paths.IPA_CA_CRT,
key='ipa_ra_crt_file_missing',
path=paths.RA_AGENT_PEM,
error=str(e),
msg='The IPA CA cert file {path} could not be '
msg='The IPA RA cert file {path} could not be '
'opened: {error}')
return

found = False
for cert in certs:
if DN(cert.subject) == ipa_subject:
found = True
break

if not found:
yield Result(self, constants.ERROR,
key='ipa_ca_cert_not_found',
subject=str(ipa_subject),
path=paths.IPA_CA_CRT,
msg='The CA certificate with subject {subject} '
'was not found in {path}')
return
# Load the IPA CA certificate to obtain its serial number. This
# was traditionally 1 prior to random serial number support.
# There is nothing special about cert 1. Even if there is no cert
# serial number 1 but the connection is ok it is considered passing.
# We used to use serial #1 but with RSNv3 it can be anything.
try:
api.Command.cert_show(cert.serial_number, all=True)
except errors.CertificateOperationError as e:
if 'not found' in str(e):
yield Result(self, constants.ERROR,
key='cert_show_1',
key='cert_show_ra',
error=str(e),
serial=str(cert.serial_number),
msg='Serial number not found: {error}')
else:
yield Result(self, constants.ERROR,
key='cert_show_1',
key='cert_show_ra',
error=str(e),
serial=str(cert.serial_number),
msg='Request for certificate failed: {error}')
except Exception as e:
yield Result(self, constants.ERROR,
key='cert_show_1',
key='cert_show_ra',
error=str(e),
serial=str(cert.serial_number),
msg='Request for certificate failed: {error')
msg='Request for certificate failed: {error}')
else:
yield Result(self, constants.SUCCESS)
175 changes: 26 additions & 149 deletions tests/test_dogtag_connectivity.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,23 @@

from ipalib.errors import CertificateOperationError
from ipaplatform.paths import paths
from ipapython.dn import DN


default_subject_base = [{
'result':
{
'ipacertificatesubjectbase': [f'O={m_api.env.realm}'],
},
}]


class IPACertificate:
def __init__(self, serial_number=1,
subject='CN=Certificate Authority, O=%s' % m_api.env.realm):
subject='CN=Certificate Authority, O=%s' % m_api.env.realm,
issuer='CN=Certificate Authority, O=%s' % m_api.env.realm):
self.serial_number = serial_number
self.subject = subject
self.issuer = issuer

def __eq__(self, other):
return self.serial_number == other.serial_number
Expand Down Expand Up @@ -50,18 +59,15 @@ class TestCAConnectivity(BaseTest):
Mock(return_value=CAInstance()),
}

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_ok(self, mock_load_cert, mock_ca_subject):
@patch('ipalib.x509.load_certificate_from_file')
def test_ca_connection_ok(self, mock_load_cert):
"""CA connectivity check when cert_show returns a valid value"""
m_api.Command.cert_show.side_effect = None
m_api.Command.config_show.side_effect = subject_base
m_api.Command.cert_show.return_value = {
u'result': {u'revoked': False}
}
mock_load_cert.return_value = [IPACertificate(12345)]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')
mock_load_cert.return_value = IPACertificate(12345)

framework = object()
registry.initialize(framework, config.Config)
Expand All @@ -76,20 +82,16 @@ def test_ca_connection_ok(self, mock_load_cert, mock_ca_subject):
assert result.source == 'ipahealthcheck.dogtag.ca'
assert result.check == 'DogtagCertsConnectivityCheck'

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_cert_not_found(self, mock_load_cert,
mock_ca_subject):
@patch('ipalib.x509.load_certificate_from_file')
def test_ca_connection_cert_not_found(self, mock_load_cert):
"""CA connectivity check for a cert that doesn't exist"""
m_api.Command.cert_show.reset_mock()
m_api.Command.config_show.side_effect = subject_base
m_api.Command.cert_show.side_effect = CertificateOperationError(
message='Certificate operation cannot be completed: '
'EXCEPTION (Certificate serial number 0x0 not found)'
)
mock_load_cert.return_value = [IPACertificate()]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')
mock_load_cert.return_value = IPACertificate(serial_number=7)

framework = object()
registry.initialize(framework, config.Config)
Expand All @@ -103,46 +105,16 @@ def test_ca_connection_cert_not_found(self, mock_load_cert,
assert result.result == constants.ERROR
assert result.source == 'ipahealthcheck.dogtag.ca'
assert result.check == 'DogtagCertsConnectivityCheck'
assert result.kw.get('key') == 'cert_show_1'
assert result.kw.get('serial') == '1'
assert result.kw.get('key') == 'cert_show_ra'
assert result.kw.get('serial') == '7'
assert result.kw.get('msg') == 'Serial number not found: {error}'

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_cert_file_not_found(self, mock_load_cert,
mock_ca_subject):
@patch('ipalib.x509.load_certificate_from_file')
def test_ca_connection_cert_file_not_found(self, mock_load_cert):
"""CA connectivity check for a cert that doesn't exist"""
m_api.Command.cert_show.reset_mock()
m_api.Command.config_show.side_effect = subject_base
mock_load_cert.side_effect = FileNotFoundError()
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')

framework = object()
registry.initialize(framework, config.Config)
f = DogtagCertsConnectivityCheck(registry)

self.results = capture_results(f)

assert len(self.results) == 1

result = self.results.results[0]
assert result.result == constants.ERROR
assert result.source == 'ipahealthcheck.dogtag.ca'
assert result.check == 'DogtagCertsConnectivityCheck'
assert result.kw.get('key') == 'ipa_ca_crt_file_missing'
assert result.kw.get('path') == paths.IPA_CA_CRT

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_cert_not_in_file_list(self, mock_load_cert,
mock_ca_subject):
"""CA connectivity check for a cert that isn't in IPA_CA_CRT"""
m_api.Command.cert_show.reset_mock()
m_api.Command.config_show.side_effect = bad_subject_base
mock_load_cert.return_value = [IPACertificate()]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
'O=BAD')

framework = object()
registry.initialize(framework, config.Config)
Expand All @@ -156,26 +128,18 @@ def test_ca_connection_cert_not_in_file_list(self, mock_load_cert,
assert result.result == constants.ERROR
assert result.source == 'ipahealthcheck.dogtag.ca'
assert result.check == 'DogtagCertsConnectivityCheck'
bad = bad_subject_base[0]['result']['ipacertificatesubjectbase'][0]
bad_subject = DN(f'CN=Certificate Authority,{bad}')
assert DN(result.kw['subject']) == bad_subject
assert result.kw['path'] == paths.IPA_CA_CRT
assert result.kw['msg'] == (
'The CA certificate with subject {subject} was not found in {path}'
)
assert result.kw.get('key') == 'ipa_ra_crt_file_missing'
assert result.kw.get('path') == paths.RA_AGENT_PEM

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_down(self, mock_load_cert, mock_ca_subject):
@patch('ipalib.x509.load_certificate_from_file')
def test_ca_connection_down(self, mock_load_cert):
"""CA connectivity check with the CA down"""
m_api.Command.cert_show.side_effect = CertificateOperationError(
message='Certificate operation cannot be completed: '
'Unable to communicate with CMS (503)'
)
m_api.Command.config_show.side_effect = subject_base
mock_load_cert.return_value = [IPACertificate()]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')
mock_load_cert.return_value = IPACertificate()

framework = object()
registry.initialize(framework, config.Config)
Expand All @@ -192,90 +156,3 @@ def test_ca_connection_down(self, mock_load_cert, mock_ca_subject):
assert result.kw.get('msg') == (
'Request for certificate failed: {error}'
)

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_multiple_ok(self, mock_load_cert, mock_ca_subject):
"""CA connectivity check when cert_show returns a valid value"""
m_api.Command.cert_show.side_effect = None
m_api.Command.config_show.side_effect = subject_base
m_api.Command.cert_show.return_value = {
u'result': {u'revoked': False}
}
mock_load_cert.return_value = [
IPACertificate(1, 'CN=something'),
IPACertificate(12345),
]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')

framework = object()
registry.initialize(framework, config.Config)
f = DogtagCertsConnectivityCheck(registry)

self.results = capture_results(f)

assert len(self.results) == 1

result = self.results.results[0]
assert result.result == constants.SUCCESS
assert result.source == 'ipahealthcheck.dogtag.ca'

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_multiple_ok_reverse(self, mock_load_cert,
mock_ca_subject):
"""CA connectivity check when cert_show returns a valid value"""
m_api.Command.cert_show.side_effect = None
m_api.Command.config_show.side_effect = subject_base
m_api.Command.cert_show.return_value = {
u'result': {u'revoked': False}
}
mock_load_cert.return_value = [
IPACertificate(12345),
IPACertificate(1, 'CN=something'),
]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')

framework = object()
registry.initialize(framework, config.Config)
f = DogtagCertsConnectivityCheck(registry)

self.results = capture_results(f)

assert len(self.results) == 1

result = self.results.results[0]
assert result.result == constants.SUCCESS
assert result.source == 'ipahealthcheck.dogtag.ca'

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_not_found(self, mock_load_cert, mock_ca_subject):
"""CA connectivity check when cert_show returns a valid value"""
m_api.Command.cert_show.side_effect = None
m_api.Command.config_show.side_effect = subject_base
m_api.Command.cert_show.return_value = {
u'result': {u'revoked': False}
}
mock_load_cert.return_value = [
IPACertificate(1, 'CN=something'),
]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')

framework = object()
registry.initialize(framework, config.Config)
f = DogtagCertsConnectivityCheck(registry)

self.results = capture_results(f)

assert len(self.results) == 1

result = self.results.results[0]
assert result.result == constants.ERROR
assert result.source == 'ipahealthcheck.dogtag.ca'
assert result.kw['msg'] == (
'The CA certificate with subject {subject} was not found in {path}'
)

0 comments on commit 29855ec

Please sign in to comment.