/
plugin.py
185 lines (146 loc) · 5.89 KB
/
plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# Authors:
# Jack Magne <jmagne@redhat.com>
#
# Copyright Red Hat, Inc.
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
from ipahealthcheck.core.plugin import Plugin, Registry
from pki.server.instance import PKIInstance
from pki.client import PKIConnection
from pki.system import SecurityDomainClient
from pki.server.healthcheck.core.main import merge_dogtag_config
import logging
import subprocess
logger = logging.getLogger(__name__)
# Temporary workaround to skip VERBOSE data. Fix already pushed to upstream
# freeipa-healthcheck: https://github.com/freeipa/freeipa-healthcheck/pull/126
logging.getLogger().setLevel(logging.WARNING)
class ClonesPlugin(Plugin):
def __init__(self, registry):
# pylint: disable=redefined-outer-name
super(ClonesPlugin, self).__init__(registry)
self.security_domain = None
self.db_dir = None
self.subsystem_token = None
self.passwd = None
self.master_cas = []
self.clone_cas = []
self.master_kras = []
self.clone_kras = []
self.master_ocsps = []
self.clone_ocsps = []
self.master_tpss = []
self.clone_tpss = []
self.master_tkss = []
self.clone_tkss = []
self.instance = PKIInstance(self.config.instance_name)
def contact_subsystem_using_pki(
self, subport, subhost, subsystemnick,
token_pwd, db_path, cmd, exts=None):
command = ["/usr/bin/pki",
"-p", str(subport),
"-h", subhost,
"-n", subsystemnick,
"-P", "https",
"-d", db_path,
"-c", token_pwd,
cmd]
if exts is not None:
command.extend(exts)
output = None
try:
output = subprocess.check_output(command, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
output = e.output.decode('utf-8')
return output
output = output.decode('utf-8')
return output
def contact_subsystem_using_sslget(
self, port, host, subsystemnick,
token_pwd, db_path, params, url):
command = ["/usr/bin/sslget"]
if subsystemnick is not None:
command.extend(["-n", subsystemnick])
command.extend(["-p", token_pwd, "-d", db_path])
if params is not None:
command.extend(["-e", params])
command.extend([
"-r", url, host + ":" + port])
logger.info(' command : %s ', command)
output = None
try:
output = subprocess.check_output(command, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
output = e.output.decode('utf-8')
return output
output = output.decode('utf-8')
return output
def get_security_domain_data(self, host, port):
domain_data = None
try:
connection = PKIConnection(protocol='http',
hostname=host,
port=port,
verify=False)
securityDomainClient = SecurityDomainClient(connection)
domain_data = securityDomainClient.get_domain_info()
except BaseException as e:
logger.error("Internal server error %s", e)
return domain_data
systems = domain_data.subsystems
for s in systems.values():
for h in s.hosts.values():
if s.id == 'CA':
if h.Clone == 'TRUE':
self.clone_cas.append(h)
else:
self.master_cas.append(h)
elif s.id == 'KRA':
if h.Clone == 'TRUE':
self.clone_kras.append(h)
else:
self.master_kras.append(h)
elif s.id == 'OCSP':
if h.Clone == 'TRUE':
self.clone_ocsps.append(h)
else:
self.master_ocsps.append(h)
elif s.id == 'TPS':
if h.Clone == 'TRUE':
self.clone_tpss.append(h)
else:
self.master_tpss.append(h)
elif s.id == 'TKS':
if h.Clone == 'TRUE':
self.clone_tkss.append(h)
else:
self.master_tkss.append(h)
return domain_data
def get_security_domain_ca(self):
sec_domain = None
sechost = None
secport = None
ca_subsystem = self.instance.get_subsystem('ca')
if(ca_subsystem):
# make sure this CA is the security domain
service_host = ca_subsystem.config.get('service.machineName')
service_port = ca_subsystem.config.get('service.unsecurePort')
sechost = ca_subsystem.config.get('securitydomain.host')
secport = ca_subsystem.config.get('securitydomain.httpport')
if sechost == service_host and secport == service_port:
sec_domain = ca_subsystem
if sec_domain:
self.security_domain = sec_domain
# Set some vars we will be using later
self.db_dir = self.security_domain.config.get('jss.configDir')
self.subsystem_token = self.security_domain.config.get('ca.subsystem.tokenname')
self.passwd = self.instance.get_token_password(self.subsystem_token)
return sec_domain, sechost, secport
class ClonesRegistry(Registry):
def initialize(self, framework, config, options=None):
# Read dogtag specific config values and merge with already existing config
# before adding it to registry
merge_dogtag_config(config)
super(ClonesRegistry, self).initialize(framework, config)
registry = ClonesRegistry()