diff --git a/.pylintrc b/.pylintrc index 4c23e84..9c51a70 100644 --- a/.pylintrc +++ b/.pylintrc @@ -24,7 +24,7 @@ confidence= # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" -disable=too-few-public-methods,too-many-instance-attributes,too-many-arguments,too-many-locals,logging-format-interpolation,not-an-iterable, too-many-public-methods +disable=too-few-public-methods,too-many-instance-attributes,too-many-arguments,too-many-locals,logging-format-interpolation,not-an-iterable, too-many-public-methods, duplicate-code, consider-using-f-string [FORMAT] diff --git a/cbw_api_toolbox/cbw_api.py b/cbw_api_toolbox/cbw_api.py index 1323be5..383e642 100644 --- a/cbw_api_toolbox/cbw_api.py +++ b/cbw_api_toolbox/cbw_api.py @@ -38,11 +38,11 @@ class CBWApi: # pylint: disable=R0904 """Class used to communicate with the CBW API""" def __init__( - self, - api_url=None, - api_key=None, - secret_key=None, - verify_ssl=False, + self, + api_url=None, + api_key=None, + secret_key=None, + verify_ssl=False, ): self.verify_ssl = verify_ssl diff --git a/cli/airgap/download_compliance_scripts.py b/cli/airgap/download_compliance_scripts.py index 0f5e9fd..67472bd 100644 --- a/cli/airgap/download_compliance_scripts.py +++ b/cli/airgap/download_compliance_scripts.py @@ -89,7 +89,7 @@ def download_individual_script(script_object, base_directory): script_filename = "".join((base_directory, "/", script.filename)) os.makedirs(dirname(script_filename), exist_ok=True) - with open(script_filename, "w") as filestream: + with open(script_filename, "w", encoding="utf-8") as filestream: filestream.write(script.script_content) if ".ps1" in script_object[0].filename: @@ -105,10 +105,10 @@ def create_run_scripts(os_target, base_directory): if os_target in "Windows": run_script = join(base_directory, "run.ps1") - with open(run_script, "w") as file_stream: + with open(run_script, "w", encoding="utf-8") as file_stream: file_stream.write(PS1_EXECUTE_SCRIPT) else: run_script = join(base_directory, "run") - with open(run_script, "w") as file_stream: + with open(run_script, "w", encoding="utf-8") as file_stream: file_stream.write(SH_EXECUTE_SCRIPT) os.chmod(run_script, 0o755) diff --git a/cli/airgap/download_scripts.py b/cli/airgap/download_scripts.py index 7464ba0..01b2ad4 100644 --- a/cli/airgap/download_scripts.py +++ b/cli/airgap/download_scripts.py @@ -64,9 +64,7 @@ def subcommand(args, api: CBWApi): print("INFO: Script saved in {}".format(script_dir)) -def download_individual_script( - script_object, base_directory, api: CBWApi, with_attachment=False -): +def download_individual_script(script_object, base_directory, api: CBWApi, with_attachment=False): """Get each script and put it in the correct category""" script = api.fetch_airgapped_script(str(script_object.id), params={"pristine": "1"}) if script is None or script.type is None: @@ -77,7 +75,7 @@ def download_individual_script( os.makedirs(dirname(script_filename), exist_ok=True) script_filename = append_extension(script_filename) - with open(script_filename, "w") as filestream: + with open(script_filename, "w", encoding="utf-8") as filestream: filestream.write(script.contents) if script.attachment and with_attachment: @@ -118,7 +116,7 @@ def create_run_scripts(script_os_association, base_directory): def add_sh_run_script(os_and_scripts, directory): """Create a shell run script in directory""" run_script = join(directory, "run") - with open(run_script, "w") as file_stream: + with open(run_script, "w", encoding="utf-8") as file_stream: file_stream.write( SH_EXECUTE_SCRIPT.format(" ".join(script for (_, script) in os_and_scripts)) ) @@ -128,7 +126,7 @@ def add_sh_run_script(os_and_scripts, directory): def add_pwsh_run_script(os_and_scripts, directory): """Creates a "windows launch all" powershell script""" run_script_filename = join(directory, "run.ps1") - with open(run_script_filename, "w") as file_stream: + with open(run_script_filename, "w", encoding="utf-8") as file_stream: file_stream.write("$ScriptDir = Split-Path $MyInvocation.MyCommand.Path\n") for _, script in os_and_scripts: file_stream.write(f'& "$ScriptDir/{script}"\n') diff --git a/cli/airgap/upload.py b/cli/airgap/upload.py index 4757b04..f8e2020 100644 --- a/cli/airgap/upload.py +++ b/cli/airgap/upload.py @@ -48,7 +48,7 @@ def upload_file(result_script_filename, api: CBWApi): def read_file_all_encodings(filename): """Return the content of `filename`. Detects the encoding used by the file.""" - with open(filename, "rb") as file_stream: + with open(filename, "rb", encoding="utf-8") as file_stream: raw_content = file_stream.read() detection = chardet.detect(raw_content) return raw_content.decode(detection["encoding"]) diff --git a/cli/airgap/upload_compliance.py b/cli/airgap/upload_compliance.py index 0c49f5a..3774ace 100644 --- a/cli/airgap/upload_compliance.py +++ b/cli/airgap/upload_compliance.py @@ -48,7 +48,7 @@ def upload_file(result_script_filename, api: CBWApi): def read_file_all_encodings(filename): """Return the content of `filename`. Detects the encoding used by the file.""" - with open(filename, "rb") as file_stream: + with open(filename, "rb", encoding="utf-8") as file_stream: raw_content = file_stream.read() detection = chardet.detect(raw_content) return raw_content.decode(detection["encoding"]) diff --git a/examples/air_gapped_scans/download_airgapped_scripts.py b/examples/air_gapped_scans/download_airgapped_scripts.py index ba4df91..dabfcc8 100644 --- a/examples/air_gapped_scans/download_airgapped_scripts.py +++ b/examples/air_gapped_scans/download_airgapped_scripts.py @@ -54,7 +54,7 @@ def download_scripts(parsed_args, scripts, client): elif "Windows" in file_name: file_name[-1] += '.ps1' path = os.path.join(os.path.dirname(__file__), "/".join(file_name)) - with open(path, 'w') as filehandle: + with open(path, 'w' , encoding="utf-8") as filehandle: filehandle.write(script.contents) if script.attachment and parsed_args.no_attachment: download_attachment(file_name, script.attachment) @@ -66,7 +66,7 @@ def download_attachment(path, url): attachment = requests.get(url, allow_redirects=True, verify=False) location = os.path.join(os.path.dirname(__file__), "/".join(path[:-1])) name = url.split("/")[-1] - with open(os.path.join(location, name), 'wb') as file: + with open(os.path.join(location, name), 'wb', encoding="utf-8") as file: file.write(attachment.content) @@ -83,7 +83,7 @@ def create_windows_launch_all(): }""" path = os.path.join(os.path.dirname(__file__), "Scripts", "Windows", "cbw_launch_all.ps1") - with open(path, 'w') as filehandle: + with open(path, 'w' , encoding="utf-8") as filehandle: filehandle.write(launch_all_powershell) diff --git a/examples/air_gapped_scans/upload_airgapped_results.py b/examples/air_gapped_scans/upload_airgapped_results.py index 9554252..33eea4b 100644 --- a/examples/air_gapped_scans/upload_airgapped_results.py +++ b/examples/air_gapped_scans/upload_airgapped_results.py @@ -20,13 +20,13 @@ def connect_api(): def upload(client): """Upload results from the folder 'Uploads' to Cyberwatch""" print("INFO: Searching for available results...") - files = ( file for file in sorted(os.listdir(os.path.join(os.path.dirname(__file__), 'Uploads'))) ) + files = (file for file in sorted(os.listdir(os.path.join(os.path.dirname(__file__), 'Uploads')))) for file in files: file_path = os.path.join(os.path.dirname(__file__), 'Uploads', file) if os.path.isfile(file_path): - with open(file_path, 'r') as filehandle: + with open(file_path, 'r', encoding="utf-8") as filehandle: filecontent = filehandle.read() - content = {'output': filecontent , 'groups': 'my_group_1, my_group_2'} + content = {'output': filecontent, 'groups': 'my_group_1, my_group_2'} print('INFO: Sending {} content to the API...'.format(file)) client.upload_airgapped_results(content) diff --git a/examples/apache_libcloud/public-cloud-modular_instance.py b/examples/apache_libcloud/public_cloud_modular_instance.py similarity index 84% rename from examples/apache_libcloud/public-cloud-modular_instance.py rename to examples/apache_libcloud/public_cloud_modular_instance.py index 7d9de30..474c97a 100644 --- a/examples/apache_libcloud/public-cloud-modular_instance.py +++ b/examples/apache_libcloud/public_cloud_modular_instance.py @@ -5,9 +5,12 @@ # Prerequisites : # - Install libcloud with command "pip3 install apache-libcloud" -# - If you are not using the default credentials for agentless connections configured in Cyberwatch, set up SERVER_LOGIN and/or WINRM_password variables -# - Set the constant variables on the first lines of the script depending on which cloud provider you use (https://libcloud.readthedocs.io/en/stable/compute/drivers/) -# - Set up your Cyberwatch API key in api.conf in the same folder as the script, for an example https://github.com/Cyberwatch/cyberwatch_api_toolbox#configuration +# - If you are not using the default credentials for agentless connections configured in Cyberwatch, +# set up SERVER_LOGIN and/or WINRM_password variables +# - Set the constant variables on the first lines of the script depending +# on which cloud provider you use (https://libcloud.readthedocs.io/en/stable/compute/drivers/) +# - Set up your Cyberwatch API key in api.conf in the same folder as the script, for an example: +# https://github.com/Cyberwatch/cyberwatch_api_toolbox#configuration # - SSH key file of servers to import named "id_rsa" # Notes : # - All servers will be imported with group "cloud_crawling" + zone (ex: "europe-west4-a") @@ -15,13 +18,13 @@ import argparse import os import socket - +# pylint: disable=E0401, R1705 from configparser import ConfigParser from libcloud.compute.types import Provider from libcloud.compute.providers import get_driver from cbw_api_toolbox.cbw_api import CBWApi -SSH_KEY_SERVERS = open(os.path.expanduser('id_rsa')).read() +SSH_KEY_SERVERS = open(os.path.expanduser('id_rsa'), encoding="utf-8").read() SERVER_LOGIN = "" WINRM_PASSWORD_SERVERS = "" @@ -49,7 +52,7 @@ def connect_api(): def get_node(): '''Get list of available nodes and prompt user to choose''' nodes = API.nodes() - if len(nodes) > 1 : + if len(nodes) > 1: print("Which Cyberwatch node do you want to use to import?") for node in nodes: print("ID: {}, name: {}".format(node.id, node.name)) @@ -60,7 +63,7 @@ def get_node(): return node_id else: raise ValueError("Please provide valid node id") - else: + else: return nodes[0].id @@ -98,15 +101,15 @@ def retrieve_ec2_servers(): return running -def port_checker(ip, port): +def port_checker(ip_address, port): '''Check if a specific port is open on an ip address''' - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(5) + socket1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + socket1.settimeout(5) try: - s.connect((ip, int(port))) - s.shutdown(2) + socket1.connect((ip_address, int(port))) + socket1.shutdown(2) return True - except: + except Exception: # pylint: disable=broad-except return False @@ -131,7 +134,8 @@ def check_add_server(servers, cloud_servers, node_id): "key": SSH_KEY_SERVERS}) to_add.append(info) else: - print('The server ' + cloud_server_ip + ' has no default port exposed (SSH/22 or WINRM/5985) so an agentless connection with Cyberwatch is not possible') + print("""The server ' + cloud_server_ip + ' has no default port exposed (SSH/22 or WINRM/5985) + so an agentless connection with Cyberwatch is not possible""") return to_add @@ -150,8 +154,7 @@ def check_delete_server(cloud_servers): def display_and_import(to_import_list, apply=False): '''Display to_import servers then import them''' - print('\n\n================= Total of {} cloud servers to import (apply={}) ================='.format(len(to_import_list), - apply)) + print('\n\n===== Total of {} cloud servers to import (apply={}) ====='.format(len(to_import_list), apply)) for to_add_server in to_import_list: print('{} --- {} --- {}'.format(to_add_server["address"], to_add_server["server_groups"], to_add_server["type"])) @@ -161,8 +164,7 @@ def display_and_import(to_import_list, apply=False): def display_and_delete(to_delete_list, apply=False): '''Display to_delete servers then delete them''' - print('\n\n================= Total of {} servers on Cyberwatch to delete (apply={}) ================='.format(len(to_delete_list), - apply)) + print('\n\n===== Total of {} servers on Cyberwatch to delete (apply={}) ====='.format(len(to_delete_list), apply)) for server in to_delete_list: print('{} --- {} --- {}'.format(server.remote_ip, server.hostname, server.id)) if apply is True: @@ -202,7 +204,8 @@ def main(args=None): '''Main function''' parser = argparse.ArgumentParser( - description='Script using Cyberwatch API to import not monitored cloud servers and delete terminated cloud servers in Cyberwatch.\nBy default this script is run in read-only mode.') + description="""Script using Cyberwatch API to import not monitored cloud servers and delete terminated + cloud servers in Cyberwatch.\nBy default this script is run in read-only mode.""") parser.add_argument( '-i', diff --git a/examples/clean_discovered_docker.py b/examples/clean_discovered_docker.py index a3dd100..391b976 100644 --- a/examples/clean_discovered_docker.py +++ b/examples/clean_discovered_docker.py @@ -39,8 +39,8 @@ def find_discoveries(client): discoveries_details = client.hosts() for host in discoveries_details: if host.discovery.type == "CbwAssets::Discovery::DockerRegistry": - for id in host.server_ids: - ids.append(str(id)) + for id_server in host.server_ids: + ids.append(str(id_server)) return ids diff --git a/examples/cleanup_duplicates.py b/examples/cleanup_duplicates.py index a5c21f6..709f4ab 100644 --- a/examples/cleanup_duplicates.py +++ b/examples/cleanup_duplicates.py @@ -3,7 +3,7 @@ import os from configparser import ConfigParser from cbw_api_toolbox.cbw_api import CBWApi - +# pylint: disable=duplicate-code def connect_api(): '''Connect to the API and test connection''' diff --git a/examples/cleanup_initialization_duplicates.py b/examples/cleanup_initialization_duplicates.py index 5711dd0..fae3cbe 100644 --- a/examples/cleanup_initialization_duplicates.py +++ b/examples/cleanup_initialization_duplicates.py @@ -9,6 +9,7 @@ from datetime import datetime from dateutil.relativedelta import relativedelta # pylint: disable=import-error from cbw_api_toolbox.cbw_api import CBWApi +# pylint: disable=duplicate-code def connect_api(): '''Connect ot the API''' diff --git a/examples/cleanup_lost_com_filters.py b/examples/cleanup_lost_com_filters.py index 74efe72..23cb674 100644 --- a/examples/cleanup_lost_com_filters.py +++ b/examples/cleanup_lost_com_filters.py @@ -39,11 +39,11 @@ def find_lost_com_servers(servers): def display_and_delete(delete_list, server_type, client, delete=DELETE_SERVERS): '''Display servers then delete them''' print('\n\n================ Total of {} {} to delete (delete={}) ================'.format(len(delete_list), - server_type, - delete)) + server_type, + delete)) for delete_server in delete_list: print('{} -- {} -- {} -- {}'.format(delete_server.id, delete_server.hostname, - delete_server.cve_announcements_count, delete_server.created_at)) + delete_server.cve_announcements_count, delete_server.created_at)) if delete is True: client.delete_server(str(delete_server.id)) @@ -52,9 +52,7 @@ def display_and_delete(delete_list, server_type, client, delete=DELETE_SERVERS): def launch_script(): '''Launch script''' client = connect_api() - filters = { - "communication_failed": "true" - } + filters = {"communication_failed": "true"} servers = client.servers(filters) lost_com_servers = find_lost_com_servers(servers) diff --git a/examples/cve_published_last_month_export_xlsx.py b/examples/cve_published_last_month_export_xlsx.py index e5e9aab..589b96b 100644 --- a/examples/cve_published_last_month_export_xlsx.py +++ b/examples/cve_published_last_month_export_xlsx.py @@ -120,10 +120,10 @@ def export_xls(cve_list, xls_export): xls_export.close() # Defines date to retrieve CVEs published last month -today = datetime.date.today() -firstDayOfLastMonth = (today.replace(day=1) - datetime.timedelta(days=1)).replace(day=1) -firstDayOfCurrentMonth = today.replace(day=1) +TODAY = datetime.date.today() +FIRSTDAYOFLASTMONTH = (TODAY.replace(day=1) - datetime.timedelta(days=1)).replace(day=1) +FIRSTDAYOFCURRENTMONTH = TODAY.replace(day=1) -print("Exporting vulnerabilities published between {} and {}.".format(firstDayOfLastMonth, firstDayOfCurrentMonth)) -export_xls(get_cyberwatch_cves(firstDayOfLastMonth, firstDayOfCurrentMonth), - instantiate_export("active_CVEs_{}_to_{}_export.xlsx".format(firstDayOfLastMonth, firstDayOfCurrentMonth))) +print("Exporting vulnerabilities published between {} and {}.".format(FIRSTDAYOFLASTMONTH, FIRSTDAYOFCURRENTMONTH)) +export_xls(get_cyberwatch_cves(FIRSTDAYOFLASTMONTH, FIRSTDAYOFCURRENTMONTH), + instantiate_export("active_CVEs_{}_to_{}_export.xlsx".format(FIRSTDAYOFLASTMONTH, FIRSTDAYOFCURRENTMONTH))) diff --git a/examples/detail_servers.py b/examples/detail_servers.py index 5a7bf08..b024fe0 100644 --- a/examples/detail_servers.py +++ b/examples/detail_servers.py @@ -40,7 +40,7 @@ def to_csv_lines(cve_catalog): def to_csv(csv_lines, name_csv='just_generated.csv', path=""): """Write objects in csv_lines into a csv file""" - with open(os.path.join(path, name_csv), 'w', newline='') as csvfile: + with open(os.path.join(path, name_csv), 'w', newline='', encoding="utf-8") as csvfile: spamwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL) spamwriter.writerow(['"sep=,"']) @@ -52,11 +52,11 @@ def to_csv(csv_lines, name_csv='just_generated.csv', path=""): # Fetch active CVE if an exploit is available logging.info('Fetching active CVE') -cve_list = CLIENT.cve_announcements({"exploitable": "true", "active": "true"}) +CVE_LIST = CLIENT.cve_announcements({"exploitable": "true", "active": "true"}) # Formating lines for the csv logging.info('Formating lines for the csv file') -csv_lines_list = to_csv_lines(cve_list) +CSV_LINES_LIST = to_csv_lines(CVE_LIST) # Exporting csv file -to_csv(csv_lines_list, path="") +to_csv(CSV_LINES_LIST, path="") diff --git a/examples/email_report_filters.py b/examples/email_report_filters.py index 5260383..8429519 100644 --- a/examples/email_report_filters.py +++ b/examples/email_report_filters.py @@ -7,6 +7,7 @@ from datetime import datetime from email.mime.text import MIMEText from cbw_api_toolbox.cbw_api import CBWApi +# pylint: disable=duplicate-code CONF = ConfigParser() CONF.read(os.path.join(os.path.abspath(os.path.dirname(__file__)), '..', 'api.conf')) @@ -35,11 +36,11 @@ # Filters to use, please comment unused parameters CVE_FILTERS = { - "level": "level_critical", #level_critical = CVSS score > 9, level_high = 7 < 9, level_medium = 4 < 7 - "active": "true", - # "technology_product": "", - "groups": ["", ""] # ( ["group"] or ["groupA", "groupB", "groupC"]...) - } + "level": "level_critical", #level_critical = CVSS score > 9, level_high = 7 < 9, level_medium = 4 < 7 + "active": "true", + # "technology_product": "", + "groups": ["", ""] # ( ["group"] or ["groupA", "groupB", "groupC"]...) + } ############################################################ @@ -143,7 +144,7 @@ def build_email(active_cves): """ - if filtered_active_cves == []: + if not FILTERED_ACTIVE_CVES: html = '

Aucun serveur avec une CVE active correspondant aux critères définis a été remonté

' data = html_start + html + html_end return data @@ -178,24 +179,24 @@ def build_email(active_cves): return html_start -filtered_active_cves = sort_cves() +FILTERED_ACTIVE_CVES = sort_cves() -HTML = build_email(filtered_active_cves) +HTML = build_email(FILTERED_ACTIVE_CVES) print("! Testing communication with SMTP server") -context = ssl.create_default_context() -smtpserver = smtplib.SMTP(SMTP_SETTINGS["server"], SMTP_SETTINGS["port"]) -smtpserver.ehlo() # Can be omitted -smtpserver.starttls(context=context) # Secure the connection -smtpserver.ehlo() # Can be omitted -smtpserver.login(SMTP_SETTINGS["username"], SMTP_SETTINGS["password"]) +CONTEXT = ssl.create_default_context() +SMTPSERVER = smtplib.SMTP(SMTP_SETTINGS["server"], SMTP_SETTINGS["port"]) +SMTPSERVER.ehlo() # Can be omitted +SMTPSERVER.starttls(context=CONTEXT) # Secure the connection +SMTPSERVER.ehlo() # Can be omitted +SMTPSERVER.login(SMTP_SETTINGS["username"], SMTP_SETTINGS["password"]) print("INFO:OK") -today = datetime.now().strftime("%Y-%m-%d %H:%M") -msg = MIMEText(HTML, 'html', 'utf-8') -msg['Subject'] = 'Cyberwatch - Bilan du '+ today -msg['From'] = SMTP_SETTINGS["sender"] -msg['To'] = SMTP_SETTINGS["recipient"] -smtpserver.send_message(msg) +TODAY = datetime.now().strftime("%Y-%m-%d %H:%M") +MSG = MIMEText(HTML, 'html', 'utf-8') +MSG['Subject'] = 'Cyberwatch - Bilan du '+ TODAY +MSG['From'] = SMTP_SETTINGS["sender"] +MSG['To'] = SMTP_SETTINGS["recipient"] +SMTPSERVER.send_message(MSG) -smtpserver.quit() +SMTPSERVER.quit() diff --git a/examples/fetch_daily_cves_to_redmine.py b/examples/fetch_daily_cves_to_redmine.py index debe529..54e6e14 100644 --- a/examples/fetch_daily_cves_to_redmine.py +++ b/examples/fetch_daily_cves_to_redmine.py @@ -22,12 +22,12 @@ def send_redmine(cve_list, project_id, tracker_id): message += "\n\n* \""+server.hostname+"\":"+CONF.get('cyberwatch', 'url')+"/servers/"+str(server.id) if cve.level is not None: - redmine_priority_id = redmine_priorities[cve.level[6:]] + redmine_priority_id = REDMINE_PRIORITIES[cve.level[6:]] else: - redmine_priority_id = redmine_priorities["unknown"] + redmine_priority_id = REDMINE_PRIORITIES["unknown"] - with redmine.session(return_response=False): - redmine.issue.create(project_id=project_id, subject='Cyberwatch new CVE : {}'.format(cve.cve_code), \ + with REDMINE.session(return_response=False): + REDMINE.issue.create(project_id=project_id, subject='Cyberwatch new CVE : {}'.format(cve.cve_code), \ priority_id=redmine_priority_id, description=message, tracker_id=tracker_id) def get_cves_today(): @@ -47,7 +47,7 @@ def get_cves_today(): today_date = datetime.strftime(datetime.now(), '%Y-%m-%d') print("* Saving the results to data/"+today_date+".txt") - with open("data/"+today_date+".txt", "w") as outfile: + with open("data/"+today_date+".txt", "w", encoding="utf-8") as outfile: outfile.write("\n".join(cve_list_today)) print("! Done.") @@ -61,7 +61,7 @@ def get_cves_yesterday(): print("* Yesterday: " + yesterday_date) cve_list_yesterday = [] - with open("data/"+yesterday_date+".txt") as infile: + with open("data/"+yesterday_date+".txt", encoding="utf-8") as infile: for line in infile: cve_list_yesterday.append(line.strip()) @@ -77,13 +77,13 @@ def get_cves_yesterday(): CLIENT.ping() # Redmine API informations -redmine = Redmine(CONF.get('redmine', 'url'), version=CONF.get('redmine', 'version'), key=CONF.get('redmine', 'key')) +REDMINE = Redmine(CONF.get('redmine', 'url'), version=CONF.get('redmine', 'version'), key=CONF.get('redmine', 'key')) # id of the Redmine project to affect newly created issues to REDMINE_PROJECT_ID = 2 # id of the tracker ; optional if a default tracker is defined in Redmine REDMINE_TRACKER_ID = 1 # dict of priorities and their ids in Redmine, available through admin interface : http://[redmine-url]/enumerations -redmine_priorities = { +REDMINE_PRIORITIES = { "low": 5, "medium": 4, "high": 3, @@ -93,12 +93,12 @@ def get_cves_yesterday(): # Finding the differences between yesterday and today print("! Computing the difference...") -diff = list(set(get_cves_today()) - set(get_cves_yesterday())) -diff.sort() -print(diff) +DIFF = list(set(get_cves_today()) - set(get_cves_yesterday())) +DIFF.sort() +print(DIFF) -if len(diff) == 0: +if len(DIFF) == 0: print("No new CVEs found between yesterday and today: nothing to send!") sys.exit(0) -send_redmine(diff, REDMINE_PROJECT_ID, REDMINE_TRACKER_ID) +send_redmine(DIFF, REDMINE_PROJECT_ID, REDMINE_TRACKER_ID) diff --git a/examples/find_outdated_last_detection.py b/examples/find_outdated_last_detection.py index bb1deb1..910c9e3 100644 --- a/examples/find_outdated_last_detection.py +++ b/examples/find_outdated_last_detection.py @@ -56,7 +56,8 @@ def display(server_list, what): for outdated_server in server_list: server = outdated_server["server"] print('{} --- {} --- {} --- Last Detection: {} days ago'.format(server.id, server.hostname, - server.cve_announcements_count, outdated_server["last_detection"])) + server.cve_announcements_count, + outdated_server["last_detection"])) def send_email(subject, sender, receiver, content, login, password, smtp, port): @@ -86,7 +87,8 @@ def build_email(server_list): for outdated_server in server_list: server = outdated_server["server"] content += '\n{} --- {} --- {} --- Dernière Détection : {} jours'.format(server.id, server.hostname, - server.cve_announcements_count, outdated_server["last_detection"]) + server.cve_announcements_count, + outdated_server["last_detection"]) mail_content = """ Bonjour, diff --git a/examples/groups_from_csv.py b/examples/groups_from_csv.py index 6386739..8a760dc 100644 --- a/examples/groups_from_csv.py +++ b/examples/groups_from_csv.py @@ -4,15 +4,11 @@ csv_file = PATH/TO/FILE """ -import argparse -from argparse import RawTextHelpFormatter +import csv import os -import json from configparser import ConfigParser -import requests from cbw_api_toolbox.cbw_api import CBWApi -from requests.auth import HTTPBasicAuth -import csv +# pylint: disable=W0621 def connect_api(): '''Connect to the API and test connection''' @@ -35,55 +31,58 @@ def get_file_name(): return file def get_groups_by_name_id(client): + """get_groups_by_name_id""" groups_by_name_id = {} for group in client.groups(): groups_by_name_id[group.name] = group.id return groups_by_name_id def get_assets_by_hostname_id(client): + """get_assets_by_hostname_id""" assets_by_hostname_id = {} for asset in client.assets(): assets_by_hostname_id[asset.hostname] = asset.id return assets_by_hostname_id -def get_asset_groups_by_id(client, id): +def get_asset_groups_by_id(client, asset_id): + """get_asset_groups_by_asset_id""" groups = [] - for group in client.asset(id).groups: - groups.append(group.id) + for group in client.asset(asset_id).groups: + groups.append(group.asset_id) return groups # Connect to Cyberwatch API -client = connect_api() +CLIENT = connect_api() # Read the CSV file and extract elements -file = open(get_file_name()) -csvreader = csv.reader(file) -next(csvreader) #comment this line if the CSV file doesn't have headers +FILE = open(get_file_name(), encoding="utf-8") +CSVREADER = csv.reader(FILE) +next(CSVREADER) #comment this line if the CSV file doesn't have headers # Associate each asset in Cyberwatch to its ID -hostname_id = get_assets_by_hostname_id(client) +HOSTNAME_ID = get_assets_by_hostname_id(CLIENT) # Associate each group in Cyberwatch to its ID -group_id = get_groups_by_name_id(client) +GROUP_ID = get_groups_by_name_id(CLIENT) -for element in csvreader: +for element in CSVREADER: #Separate hostname and groups from the extracted data elements = str(element[0]).split(";") #Get hostname and ID hostname = elements[0] - ID = hostname_id[hostname] + ID = HOSTNAME_ID[hostname] #Get groups that are already associated to the asset - total_groups = get_asset_groups_by_id(client, str(ID)) + total_groups = get_asset_groups_by_id(CLIENT, str(ID)) - # Append new groups to the total + # Append new groups to the total groups = str(elements[1]).split(":") for group in groups: - total_groups.append(group_id[group]) + total_groups.append(GROUP_ID[group]) # Update asset information in Cyberwatch PARAMS = { "groups": total_groups } - client.update_server(str(ID), PARAMS) + CLIENT.update_server(str(ID), PARAMS) diff --git a/examples/ignore_spooler_cves_after_mitigation.py b/examples/ignore_spooler_cves_after_mitigation.py index afb5f98..f5237a0 100644 --- a/examples/ignore_spooler_cves_after_mitigation.py +++ b/examples/ignore_spooler_cves_after_mitigation.py @@ -9,14 +9,14 @@ -CVE_CODES = ['CVE-2021-1675','CVE-2021-34527'] +CVE_CODES = ['CVE-2021-1675', 'CVE-2021-34527'] def disabled_spooler_assets(): '''finds assets with a disabled startup spooler service''' params = { - "service_name": "spooler", - "service_status": "disabled" - } + "service_name": "spooler", + "service_status": "disabled" + } return CLIENT.assets(params) @@ -47,18 +47,18 @@ def ignore(): "ignored": "true" } - disabled = disabled_spooler_assets() + disabled = disabled_spooler_assets() - for asset in vulnerable_assets() : + for asset in vulnerable_assets(): if any(d.id == asset.id for d in disabled): - for code in CVE_CODES : + for code in CVE_CODES: CLIENT.update_server_cve(str(asset.id), code, params) - ignored.append([code,asset]) + ignored.append([code, asset]) return ignored -result = ignore() -print('\n=========== Total of {} CVEs have been ingored on disabled spooler assets ==========='.format(len(result))) +RESULT = ignore() +print('\n=========== Total of {} CVEs have been ingored on disabled spooler assets ==========='.format(len(RESULT))) -for item in result: +for item in RESULT: print('{} --- {} --- {}'.format(item[1].id, item[1].hostname, item[0])) diff --git a/examples/json_import_example/import_json.py b/examples/json_import_example/import_json.py index e1b706f..59a5daf 100644 --- a/examples/json_import_example/import_json.py +++ b/examples/json_import_example/import_json.py @@ -34,7 +34,7 @@ def parse_json_file(json_file_path): } # Parse the json file, we assume "host" and "system" are mandatory values - with open(json_file_path) as json_data: + with open(json_file_path, encoding="utf-8") as json_data: data = json.load(json_data) for json_dict in data: remote_access_infos["address"] = json_dict["host"] @@ -46,11 +46,11 @@ def parse_json_file(json_file_path): remote_access_infos["credential_id"] = json_dict.get("credential_id", remote_access_infos["credential_id"]) remote_access_infos["node_id"] = json_dict.get("node_id", remote_access_infos["node_id"]) remote_access_infos["server_groups"] = json_dict.get("cyberwatch_groups", - remote_access_infos["server_groups"]) + remote_access_infos["server_groups"]) print("Trying to create Cyberwatch remote access with the following information : {}" - .format(remote_access_infos)) + .format(remote_access_infos)) CLIENT.create_remote_access(remote_access_infos) -json_file = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'example.json') -parse_json_file(json_file) +JSON_FILE = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'example.json') +parse_json_file(JSON_FILE) diff --git a/examples/new_high_priority_cve/find_new_cve.py b/examples/new_high_priority_cve/find_new_cve.py index 41d44d9..77e6ee8 100644 --- a/examples/new_high_priority_cve/find_new_cve.py +++ b/examples/new_high_priority_cve/find_new_cve.py @@ -9,6 +9,7 @@ from configparser import ConfigParser from datetime import datetime, timedelta from cbw_api_toolbox.cbw_api import CBWApi +# pylint: disable=duplicate-code ############################################################ # CONFIGURATION - USE THIS SECTION TO CONFIGURE SCRIPT @@ -43,7 +44,7 @@ def compare_for_new_cve(new_set): # Get latest backup of high-priority CVEs list_of_files = glob.glob((os.path.join(os.path.abspath( os.path.dirname(__file__)), '*new_cves.json'))) - old_backup = open(max(list_of_files, key=os.path.getctime), "r") + old_backup = open(max(list_of_files, key=os.path.getctime), "r", encoding="utf-8") old_list = json.load(old_backup) old_backup.close() @@ -53,7 +54,7 @@ def compare_for_new_cve(new_set): # Write new backup file with all high-priority CVEs new_backup = open((os.path.join(os.path.abspath( - os.path.dirname(__file__)), datetime.strftime(datetime.now(), '%d-%m-%Y') + "_new_cves.json")), "w") + os.path.dirname(__file__)), datetime.strftime(datetime.now(), '%d-%m-%Y') + "_new_cves.json")), "w", encoding="utf-8") new_backup.write(json.dumps({**old_high_priority_cves, **new_set})) new_backup.close() diff --git a/examples/ocs_inventory/import_ocs_inventory.py b/examples/ocs_inventory/import_ocs_inventory.py index 71048cf..0063523 100644 --- a/examples/ocs_inventory/import_ocs_inventory.py +++ b/examples/ocs_inventory/import_ocs_inventory.py @@ -7,13 +7,13 @@ import requests from cbw_api_toolbox.cbw_api import CBWApi -conf = ConfigParser() -conf.read(os.path.join(os.path.abspath(os.path.dirname(__file__)), '..', '..', 'api.conf')) +CONF = ConfigParser() +CONF.read(os.path.join(os.path.abspath(os.path.dirname(__file__)), '..', '..', 'api.conf')) def connect_cyberwatch_api(): '''Connect to the Cyberwatch API and test the connection''' - client = CBWApi(conf.get('cyberwatch', 'url'), conf.get('cyberwatch', 'api_key'), - conf.get('cyberwatch', 'secret_key')) + client = CBWApi(CONF.get('cyberwatch', 'url'), CONF.get('cyberwatch', 'api_key'), + CONF.get('cyberwatch', 'secret_key')) client.ping() return client @@ -39,7 +39,7 @@ def create_infoscript(computer, computer_id): # Write in the infoscript with open(os.path.join(os.path.dirname(__file__), 'ocs_export', '{}_infoscript.txt' - .format(computer_id)), 'w') as infoscript: + .format(computer_id)), 'w', encoding="utf-8") as infoscript: infoscript.write('IDENTIFIER_SCRIPT:{}'.format(identifier_script_value)+'\n') infoscript.write('IDENTIFIER_HOSTNAME:{}'.format(computer[computer_id]['hardware']['NAME'])+'\n') infoscript.write('HOSTNAME:{}'.format(computer[computer_id]['hardware']['NAME'])+'\n') @@ -51,7 +51,7 @@ def create_infoscript(computer, computer_id): arch = 'x86_64' infoscript.write('ARCH:{}'.format(arch)+'\n') infoscript.write('OS_PRETTYNAME:{}'.format(computer[computer_id]['hardware']['OSNAME'])+'\n') - if system_type == 'unix' or system_type == 'macos': + if system_type in'unix' or system_type in 'macos': infoscript.write('KERNEL_VERSION:{}'.format(computer[computer_id]['hardware']['OSVERSION'])+'\n') elif system_type == 'windows': # Set OS_VERSION, OS_BUILD and WUA_VERSION as hardcoded values since OCS does not provide this information @@ -67,7 +67,7 @@ def create_windows_packagesscript(computer, computer_id): # For Windows systems, create a packagesscript file filename = '{}_packagesscript.txt'.format(computer_id) softwares = computer[computer_id]['softwares'] - with open(os.path.join(os.path.dirname(__file__), 'ocs_export', filename), 'w') as packagesscript: + with open(os.path.join(os.path.dirname(__file__), 'ocs_export', filename), 'w', encoding="utf-8") as packagesscript: packagesscript.write('IDENTIFIER_SCRIPT:2'+'\n') packagesscript.write('IDENTIFIER_HOSTNAME:{}'.format(computer[computer_id]['hardware']['NAME'])+'\n') @@ -86,7 +86,7 @@ def create_general_packagesscript(computer, computer_id): # If system is macos or unix, we write packages in infoscript filename = '{}_infoscript.txt'.format(computer_id) softwares = computer[computer_id]['softwares'] - with open(os.path.join(os.path.dirname(__file__), 'ocs_export', filename), 'a') as packagesscript: + with open(os.path.join(os.path.dirname(__file__), 'ocs_export', filename), 'a', encoding="utf-8") as packagesscript: for software in softwares: packagesscript.write('PACKAGE:{}|{}'.format(software['NAME'], software['VERSION'])+'\n') @@ -95,14 +95,14 @@ def create_general_packagesscript(computer, computer_id): def create_empty_portsscript(computer, computer_id, identifier_id): '''Generates empty Cyberwatch portsscript to avoid status awaiting analysis''' filename = '{}_portsscript.txt'.format(computer_id) - with open(os.path.join(os.path.dirname(__file__), 'ocs_export', filename), 'w') as portsscript: + with open(os.path.join(os.path.dirname(__file__), 'ocs_export', filename), 'w', encoding="utf-8") as portsscript: portsscript.write('IDENTIFIER_SCRIPT:{}'.format(identifier_id)+'\n') portsscript.write('IDENTIFIER_HOSTNAME:{}'.format(computer[computer_id]['hardware']['NAME'])+'\n') #portsscript.write('TCP:135'+'\n') def export_ocs(): '''Create text files to be imported in Cyberwatch using all OCS Inventory''' - ocs_url = conf.get('ocs_inventory', 'url') + ocs_url = CONF.get('ocs_inventory', 'url') # Get a list of all the computer IDs list_id = request_to_json(ocs_url+'computers/listID') @@ -124,13 +124,13 @@ def export_ocs(): def upload_cyberwatch(client): """Upload results from the folder 'ocs_export' to Cyberwatch""" print('INFO: Searching for available results...') - files = ( file for file in sorted(os.listdir(os.path.join(os.path.dirname(__file__), 'ocs_export'))) ) + files = (file for file in sorted(os.listdir(os.path.join(os.path.dirname(__file__), 'ocs_export')))) for file in files: file_path = os.path.join(os.path.dirname(__file__), 'ocs_export', file) if os.path.isfile(file_path): - with open(file_path, 'r') as filehandle: + with open(file_path, 'r', encoding="utf-8") as filehandle: filecontent = filehandle.read() - content = {'output': filecontent , 'groups': 'OCS_Inventory'} + content = {'output': filecontent, 'groups': 'OCS_Inventory'} print('INFO: Sending {} content to the API...'.format(file)) client.upload_airgapped_results(content) print('INFO: Done.') diff --git a/examples/recovered_servers_script/communication_failure_recovered.py b/examples/recovered_servers_script/communication_failure_recovered.py index 0eef7f3..08c5315 100644 --- a/examples/recovered_servers_script/communication_failure_recovered.py +++ b/examples/recovered_servers_script/communication_failure_recovered.py @@ -58,7 +58,7 @@ def replace_file(servers): def find_communication_failure_servers(servers): """Find servers with status "Communication failure" and save them to a file""" print('INFO: Finding servers with "Communication failure" status and saving result in file') - with open(os.path.dirname(__file__) + '/communication_failure_list.txt', 'w+') as file: + with open(os.path.dirname(__file__) + '/communication_failure_list.txt', 'w+', encoding="utf-8") as file: for server in servers: if server.status == "server_update_comm_fail": json.dump({"id": server.id}, file) @@ -73,7 +73,7 @@ def find_recovered_servers(client): if server.status == "server_update_comm_fail": current_servers_list.append({"id": server.id}) - with open(os.path.dirname(__file__) + '/communication_failure_list.txt') as file: + with open(os.path.dirname(__file__) + '/communication_failure_list.txt', encoding="utf-8") as file: server_list = [json.loads(line) for line in file] diff = [i for i in current_servers_list + diff --git a/examples/servers_cves_csv.py b/examples/servers_cves_csv.py index b6dcdbf..494debd 100644 --- a/examples/servers_cves_csv.py +++ b/examples/servers_cves_csv.py @@ -2,7 +2,6 @@ import os import csv -import logging from configparser import ConfigParser from cbw_api_toolbox.cbw_api import CBWApi @@ -16,7 +15,7 @@ def to_csv(csv_lines, name_csv, path=""): """Write objects in csv_lines into a csv file""" - with open(os.path.join(path, name_csv), 'w', newline='') as csvfile: + with open(os.path.join(path, name_csv), 'w', newline='', encoding="utf-8") as csvfile: spamwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL) spamwriter.writerow(['"sep=,"']) @@ -33,24 +32,21 @@ def to_csv_lines(server_list): csv_lines = [] index_cve = 0 if server.cve_announcements_count > 0: - Z = CLIENT.server(str(server.id)) + server_details = CLIENT.server(str(server.id)) index_cve += 1 - for cve in Z.cve_announcements: + for cve in server_details.cve_announcements: csv_lines.append({"Vulnérabilité": cve.cve_code, - "Dernière analyse de l'actif": Z.analyzed_at, - "Score CVSS": cve.score, - "Vulnérabilité prioritaire": cve.prioritized, - "Date de détection": cve.detected_at, - "Ignorée": cve.ignored, - "Commentaire": cve.comment - }) - - logging.info('Generating ' + server.hostname + '.csv') - to_csv(csv_lines,name_csv=server.hostname + ".csv", path="") - - return None - -logging.info('Fetching Servers') -servers_in_group = CLIENT.servers({"group_id": GROUP_ID}) -to_csv_lines(servers_in_group) - + "Dernière analyse de l'actif": server_details.analyzed_at, + "Score CVSS": cve.score, + "Vulnérabilité prioritaire": cve.prioritized, + "Date de détection": cve.detected_at, + "Ignorée": cve.ignored, + "Commentaire": cve.comment + }) + + print('Generating ' + server.hostname + '.csv') + to_csv(csv_lines, name_csv=server.hostname + ".csv", path="") + +print('Fetching Servers') +SERVERS_IN_GROUP = CLIENT.servers({"group_id": GROUP_ID}) +to_csv_lines(SERVERS_IN_GROUP) diff --git a/examples/stored_credential_create.py b/examples/stored_credential_create.py index d94dff0..a83e833 100644 --- a/examples/stored_credential_create.py +++ b/examples/stored_credential_create.py @@ -19,6 +19,6 @@ "client_key": "", # client certificate for TLS credentials like a Docker engine "auth_password": "", # for SNMP, authentication password for SNMP credentials "priv_password": "" # for SNMP, encryption password for SNMP credentials -} + } CLIENT.create_stored_credential(INFO) diff --git a/examples/stored_credential_update.py b/examples/stored_credential_update.py index 14b21d5..c781c7f 100644 --- a/examples/stored_credential_update.py +++ b/examples/stored_credential_update.py @@ -19,7 +19,7 @@ "ca_cert": "", # certificate of the certificate authority for TLS credentials like a docker engine "client_cert": "", # specifies client private key for TLS credentials like a Docker engine "client_key": "" # client certificate for TLS credentials like a Docker engine -} + } STORED_CREDENTIALS_ID = '' diff --git a/setup.py b/setup.py index 79745df..27fa929 100644 --- a/setup.py +++ b/setup.py @@ -1,3 +1,4 @@ +"""setup.py""" from setuptools import setup, find_packages setup( diff --git a/spec/__init__.py b/spec/__init__.py index e69de29..d4291dd 100644 --- a/spec/__init__.py +++ b/spec/__init__.py @@ -0,0 +1 @@ +# pylint: disable=R0801 diff --git a/spec/test_cbw_api.py b/spec/test_cbw_api.py index 6b567a4..ec76b27 100644 --- a/spec/test_cbw_api.py +++ b/spec/test_cbw_api.py @@ -3,6 +3,7 @@ import vcr # pylint: disable=import-error import pytest # pylint: disable=import-error from cbw_api_toolbox.cbw_api import CBWApi +# pylint: disable=duplicate-code # To generate a new vcr cassette: # - DO NOT CHANGE THE API_URL @@ -709,14 +710,14 @@ def test_create_compliance_rule(): client = CBWApi(API_URL, API_KEY, SECRET_KEY) info = { - "audit": "rule audit", - "code": "SBP-Custom-001", - "description": "rule description", - "equation": "(1 && 2)", - "name": "rule name", - "rationale": "rule rationale", - "remediation": "rule remediation", - "checks": [ + "audit": "rule audit", + "code": "SBP-Custom-001", + "description": "rule description", + "equation": "(1 && 2)", + "name": "rule name", + "rationale": "rule rationale", + "remediation": "rule remediation", + "checks": [ { "order": 1, "content": "check content", diff --git a/spec/test_cbw_files_xlsx.py b/spec/test_cbw_files_xlsx.py index 7332a50..a0df8c6 100644 --- a/spec/test_cbw_files_xlsx.py +++ b/spec/test_cbw_files_xlsx.py @@ -3,6 +3,7 @@ import openpyxl # pylint: disable=import-error import vcr # pylint: disable=import-error from cbw_api_toolbox.cbw_file_xlsx import CBWXlsx +# pylint: disable=duplicate-code # To generate a new vcr cassette: # - DO NOT CHANGE THE API_URL