-
Notifications
You must be signed in to change notification settings - Fork 11
/
auditpolcis.py
executable file
·302 lines (238 loc) · 11.1 KB
/
auditpolcis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
#!/usr/bin/env python3
import paramiko
import re
import socket
from dotenv import load_dotenv
from tabulate import tabulate
from colorama import Fore, Style, init
def load_target_variables():
import os
def load_env_file(env_file_path):
if not os.path.exists(env_file_path):
print(f"Error: Unable to find or open {env_file_path}")
exit(1)
load_dotenv(dotenv_path=env_file_path)
def get_env_var(key):
try:
value = os.environ[key]
except KeyError:
print(f"Error: {key} not found in .env file")
exit(1)
return value
# Load variables from .env file
load_env_file(".env")
# Read variables with error handling
hostname = get_env_var("HOSTNAME")
username = get_env_var("USERNAME")
password = get_env_var("PASSWORD")
return [hostname, username, password]
def retention(ssh, secrets):
hostname, username, password = secrets
commands = [
'C:\\Windows\\System32\\wevtutil.exe gl System',
'C:\\Windows\\System32\\wevtutil.exe gl Application',
'C:\\Windows\\System32\\wevtutil.exe gl Security'
]
try:
# Authenticate to Windows Server
ssh.connect(hostname, username=username, password=password, timeout=10)
except paramiko.AuthenticationException:
print("[" + Fore.RED + "fail" + Style.RESET_ALL + "] Error: SSH Authentication failed. Please check your "
"username and password.")
except paramiko.SSHException:
print("[" + Fore.RED + "fail" + Style.RESET_ALL + "] Error: An SSH-related error occurred.")
except Exception as e:
print("[" + Fore.RED + "fail" + Style.RESET_ALL + "] " + str(e))
else:
for command in commands:
print(f"Executing command: {command}")
print("Here's the highlights...")
print("\n")
# Execute the command
stdin, stdout, stderr = ssh.exec_command(command)
# Get the output and error
output = stdout.read().decode()
error = stderr.read().decode().strip()
if error:
print(f"An error occurred: {error}")
else:
for line in output.split('\n'):
if any(keyword in line for keyword in ["logFileName", "retention", "autoBackup", "maxSize"]):
print(line.strip())
print("\n")
def get_dict_from_yaml():
import yaml
from pathlib import Path
yaml_dict = {}
def load_yaml_file(file_path):
try:
with open(file_path, 'r') as file:
yaml_dict = yaml.safe_load(file)
return yaml_dict
except FileNotFoundError:
print(f"Error: File not found - {file_path}")
except yaml.YAMLError as e:
print(f"Error: Invalid YAML format in {file_path} - {e}")
file_path = 'cis-benchmarks.yaml'
yaml_dict = load_yaml_file(file_path)
# if yaml_dict is not None:
# print("YAML file loaded successfully:")
# print(yaml_dict)
return yaml_dict
def filter_trash(cis_dict, auditpol_dict):
missing_in_auditpol = {}
missing_in_cis = {}
for category, subcategories in cis_dict.items():
if category not in auditpol_dict:
missing_in_auditpol[category] = set(subcategories.keys())
else:
for subcategory in subcategories:
if subcategory not in auditpol_dict[category]:
if category not in missing_in_auditpol:
missing_in_auditpol[category] = set()
missing_in_auditpol[category].add(subcategory)
for category, subcategories in auditpol_dict.items():
if category not in cis_dict:
missing_in_cis[category] = set(subcategories.keys())
else:
for subcategory in subcategories:
if subcategory not in cis_dict[category]:
if category not in missing_in_cis:
missing_in_cis[category] = set()
missing_in_cis[category].add(subcategory)
for category, subcategories in list(cis_dict.items()):
if category not in auditpol_dict:
del cis_dict[category]
else:
for subcategory in list(subcategories.keys()):
if subcategory not in auditpol_dict[category]:
del cis_dict[category][subcategory]
for category, subcategories in list(auditpol_dict.items()):
if category not in cis_dict:
del auditpol_dict[category]
else:
for subcategory in list(subcategories.keys()):
if subcategory not in cis_dict[category]:
del auditpol_dict[category][subcategory]
return [auditpol_dict, cis_dict, missing_in_auditpol, missing_in_cis]
def test_audit_policy(ssh, secrets):
hostname, username, password = secrets
try:
# Authenticate to Windows Server
ssh.connect(hostname, username=username, password=password, timeout=10)
except paramiko.AuthenticationException:
print("[" + Fore.RED + "fail" + Style.RESET_ALL + "] Error: SSH Authentication failed. Please check your "
"username and password.")
except paramiko.SSHException:
print("[" + Fore.RED + "fail" + Style.RESET_ALL + "] Error: An SSH-related error occurred.")
except Exception as e:
print("[" + Fore.RED + "fail" + Style.RESET_ALL + "] " + str(e))
else:
# Run the 'auditpol /get /category:*' command over SSH on the remote server
stdin, stdout, stderr = ssh.exec_command('auditpol /get /category:*')
error = stderr.read()
if error:
print(f"Error encountered: {error.decode('utf-8')}")
exit(1)
output = stdout.read().decode('utf-8')
# Split the string into lines
lines = output.split('\n')
# Remove the first two lines and join the remaining lines
output = '\n'.join(lines[2:])
text = output
category_pattern = r'^(\w+.*?)(\r)?$'
subcategory_pattern = r'^( {2})([^ ]+.*?)(?=\s{3,})(.*\S)'
auditpol_dict = {}
current_category = None
for line in text.split('\n'):
category_match = re.match(category_pattern, line)
subcategory_match = re.match(subcategory_pattern, line)
if category_match:
current_category = category_match.group(1)
auditpol_dict[current_category] = {}
elif subcategory_match:
subcategory = subcategory_match.group(2).strip()
setting = subcategory_match.group(3).strip()
auditpol_dict[current_category][subcategory] = setting
cis_dict = get_dict_from_yaml()
filter_result = filter_trash(cis_dict, auditpol_dict)
auditpol_dict = filter_result[0]
cis_dict = filter_result[1]
in_cisyaml_but_missing_from_auditpol = filter_result[2]
in_auditpol_but_missing_from_cisyaml = filter_result[3]
results = {}
for main_key in auditpol_dict:
results[main_key] = {}
for sub_key in auditpol_dict[main_key]:
if auditpol_dict[main_key][sub_key] == cis_dict[main_key][sub_key]['CIS Recommended']:
results[main_key][sub_key] = {'CIS_included': cis_dict[main_key][sub_key]['CIS Benchmark'],
'result_expected': cis_dict[main_key][sub_key]['CIS Recommended'],
'result': auditpol_dict[main_key][sub_key],
'verdict': 'pass'}
else:
results[main_key][sub_key] = {'CIS_included': cis_dict[main_key][sub_key]['CIS Benchmark'],
'result_expected': cis_dict[main_key][sub_key]['CIS Recommended'],
'result': auditpol_dict[main_key][sub_key],
'verdict': 'fail'}
init(autoreset=True) # Automatically reset colorama styles after each print
headers = ["Category", "Subcategory", "CIS Benchmark?", "Test Result (result [expected])", "Verdict"]
table_data = []
cis_benchmark_count = 0
cis_benchmark_pass_count = 0
for category, subcategories in results.items():
for subcategory, details in subcategories.items():
CIS_included = details['CIS_included']
result = details['result']
result_expected = details['result_expected']
verdict = details['verdict']
if verdict == 'fail':
colored_verdict = Fore.RED + verdict + Style.RESET_ALL
elif verdict == 'pass':
colored_verdict = Fore.GREEN + verdict + Style.RESET_ALL
else:
colored_verdict = verdict
if CIS_included:
cis_benchmark_count += 1
if verdict == 'pass':
cis_benchmark_pass_count += 1
table_data.append(
[category, subcategory, CIS_included, f"{result} [{result_expected}]", colored_verdict])
print(tabulate(table_data, headers=headers, tablefmt='grid'))
# Print the final score
print("\n")
print(f"CIS Benchmarks Test Score: {cis_benchmark_pass_count} / {cis_benchmark_count}")
if in_cisyaml_but_missing_from_auditpol:
print("\n")
print("Note: some [Sub]categories were found to be in the CIS Benchmarks YAML file, but could not"
" be matched with output [Sub]categories from the auditpol command:")
for category, subcategory in in_cisyaml_but_missing_from_auditpol.items():
for j in subcategory:
print(f"\t{category} --> {j}")
if in_auditpol_but_missing_from_cisyaml:
print("\n")
print("Note: some [Sub]categories were found to be in the auditpol command output, but could not be "
"matched with output [Sub]categories from the CIS Benchmarks YAML file:")
for category, subcategory in in_auditpol_but_missing_from_cisyaml.items():
for j in subcategory:
print(f"\t{category} --> {j}")
finally:
# Close the SSH connection
ssh.close()
if __name__ == '__main__':
# Set up SSH client
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def check_ssh_availability(host, port=22, timeout=10):
try:
sock = socket.create_connection((host, port), timeout)
sock.close()
return True
except socket.error:
return False
secrets = load_target_variables()
test_audit_policy(ssh, secrets)
print("\n")
print("Log Size and Retention Settings")
print("-------------------------------")
print("\n")
retention(ssh, secrets)