Skip to content

Commit

Permalink
Fix pylint errors for pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
fenrisl committed Jul 19, 2022
1 parent 0ba478d commit c6ce727
Show file tree
Hide file tree
Showing 26 changed files with 159 additions and 156 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ confidence=
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
disable=too-few-public-methods,too-many-instance-attributes,too-many-arguments,too-many-locals,logging-format-interpolation,not-an-iterable, too-many-public-methods
disable=too-few-public-methods,too-many-instance-attributes,too-many-arguments,too-many-locals,logging-format-interpolation,not-an-iterable, too-many-public-methods, duplicate-code, consider-using-f-string

[FORMAT]

Expand Down
10 changes: 5 additions & 5 deletions cbw_api_toolbox/cbw_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ class CBWApi: # pylint: disable=R0904
"""Class used to communicate with the CBW API"""

def __init__(
self,
api_url=None,
api_key=None,
secret_key=None,
verify_ssl=False,
self,
api_url=None,
api_key=None,
secret_key=None,
verify_ssl=False,
):

self.verify_ssl = verify_ssl
Expand Down
4 changes: 1 addition & 3 deletions cli/airgap/download_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ def subcommand(args, api: CBWApi):
print("INFO: Script saved in {}".format(script_dir))


def download_individual_script(
script_object, base_directory, api: CBWApi, with_attachment=False
):
def download_individual_script(script_object, base_directory, api: CBWApi, with_attachment=False):
"""Get each script and put it in the correct category"""
script = api.fetch_airgapped_script(str(script_object.id), params={"pristine": "1"})
if script is None or script.type is None:
Expand Down
4 changes: 2 additions & 2 deletions examples/air_gapped_scans/upload_airgapped_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ def connect_api():
def upload(client):
"""Upload results from the folder 'Uploads' to Cyberwatch"""
print("INFO: Searching for available results...")
files = ( file for file in sorted(os.listdir(os.path.join(os.path.dirname(__file__), 'Uploads'))) )
files = (file for file in sorted(os.listdir(os.path.join(os.path.dirname(__file__), 'Uploads'))))
for file in files:
file_path = os.path.join(os.path.dirname(__file__), 'Uploads', file)
if os.path.isfile(file_path):
with open(file_path, 'r') as filehandle:
filecontent = filehandle.read()
content = {'output': filecontent , 'groups': 'my_group_1, my_group_2'}
content = {'output': filecontent, 'groups': 'my_group_1, my_group_2'}
print('INFO: Sending {} content to the API...'.format(file))
client.upload_airgapped_results(content)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@

# Prerequisites :
# - Install libcloud with command "pip3 install apache-libcloud"
# - If you are not using the default credentials for agentless connections configured in Cyberwatch, set up SERVER_LOGIN and/or WINRM_password variables
# - Set the constant variables on the first lines of the script depending on which cloud provider you use (https://libcloud.readthedocs.io/en/stable/compute/drivers/)
# - Set up your Cyberwatch API key in api.conf in the same folder as the script, for an example https://github.com/Cyberwatch/cyberwatch_api_toolbox#configuration
# - If you are not using the default credentials for agentless connections configured in Cyberwatch,
# set up SERVER_LOGIN and/or WINRM_password variables
# - Set the constant variables on the first lines of the script depending
# on which cloud provider you use (https://libcloud.readthedocs.io/en/stable/compute/drivers/)
# - Set up your Cyberwatch API key in api.conf in the same folder as the script, for an example:
# https://github.com/Cyberwatch/cyberwatch_api_toolbox#configuration
# - SSH key file of servers to import named "id_rsa"
# Notes :
# - All servers will be imported with group "cloud_crawling" + zone (ex: "europe-west4-a")

import argparse
import os
import socket

# pylint: disable=E0401, R1705
from configparser import ConfigParser
from libcloud.compute.types import Provider
from libcloud.compute.providers import get_driver
Expand Down Expand Up @@ -49,7 +52,7 @@ def connect_api():
def get_node():
'''Get list of available nodes and prompt user to choose'''
nodes = API.nodes()
if len(nodes) > 1 :
if len(nodes) > 1:
print("Which Cyberwatch node do you want to use to import?")
for node in nodes:
print("ID: {}, name: {}".format(node.id, node.name))
Expand All @@ -60,7 +63,7 @@ def get_node():
return node_id
else:
raise ValueError("Please provide valid node id")
else:
else:
return nodes[0].id


Expand Down Expand Up @@ -98,15 +101,15 @@ def retrieve_ec2_servers():
return running


def port_checker(ip, port):
def port_checker(ip_address, port):
'''Check if a specific port is open on an ip address'''
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5)
socket1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket1.settimeout(5)
try:
s.connect((ip, int(port)))
s.shutdown(2)
socket1.connect((ip_address, int(port)))
socket1.shutdown(2)
return True
except:
except Exception: # pylint: disable=broad-except
return False


Expand All @@ -131,7 +134,8 @@ def check_add_server(servers, cloud_servers, node_id):
"key": SSH_KEY_SERVERS})
to_add.append(info)
else:
print('The server ' + cloud_server_ip + ' has no default port exposed (SSH/22 or WINRM/5985) so an agentless connection with Cyberwatch is not possible')
print("""The server ' + cloud_server_ip + ' has no default port exposed (SSH/22 or WINRM/5985)
so an agentless connection with Cyberwatch is not possible""")
return to_add


Expand All @@ -150,8 +154,7 @@ def check_delete_server(cloud_servers):
def display_and_import(to_import_list, apply=False):
'''Display to_import servers then import them'''

print('\n\n================= Total of {} cloud servers to import (apply={}) ================='.format(len(to_import_list),
apply))
print('\n\n===== Total of {} cloud servers to import (apply={}) ====='.format(len(to_import_list), apply))
for to_add_server in to_import_list:
print('{} --- {} --- {}'.format(to_add_server["address"],
to_add_server["server_groups"], to_add_server["type"]))
Expand All @@ -161,8 +164,7 @@ def display_and_import(to_import_list, apply=False):

def display_and_delete(to_delete_list, apply=False):
'''Display to_delete servers then delete them'''
print('\n\n================= Total of {} servers on Cyberwatch to delete (apply={}) ================='.format(len(to_delete_list),
apply))
print('\n\n===== Total of {} servers on Cyberwatch to delete (apply={}) ====='.format(len(to_delete_list), apply))
for server in to_delete_list:
print('{} --- {} --- {}'.format(server.remote_ip, server.hostname, server.id))
if apply is True:
Expand Down Expand Up @@ -202,7 +204,8 @@ def main(args=None):
'''Main function'''

parser = argparse.ArgumentParser(
description='Script using Cyberwatch API to import not monitored cloud servers and delete terminated cloud servers in Cyberwatch.\nBy default this script is run in read-only mode.')
description="""Script using Cyberwatch API to import not monitored cloud servers and delete terminated
cloud servers in Cyberwatch.\nBy default this script is run in read-only mode.""")

parser.add_argument(
'-i',
Expand Down
4 changes: 2 additions & 2 deletions examples/clean_discovered_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def find_discoveries(client):
discoveries_details = client.hosts()
for host in discoveries_details:
if host.discovery.type == "CbwAssets::Discovery::DockerRegistry":
for id in host.server_ids:
ids.append(str(id))
for id_server in host.server_ids:
ids.append(str(id_server))
return ids


Expand Down
2 changes: 1 addition & 1 deletion examples/cleanup_duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
from configparser import ConfigParser
from cbw_api_toolbox.cbw_api import CBWApi

# pylint: disable=duplicate-code

def connect_api():
'''Connect to the API and test connection'''
Expand Down
1 change: 1 addition & 0 deletions examples/cleanup_initialization_duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datetime import datetime
from dateutil.relativedelta import relativedelta # pylint: disable=import-error
from cbw_api_toolbox.cbw_api import CBWApi
# pylint: disable=duplicate-code

def connect_api():
'''Connect ot the API'''
Expand Down
10 changes: 4 additions & 6 deletions examples/cleanup_lost_com_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ def find_lost_com_servers(servers):
def display_and_delete(delete_list, server_type, client, delete=DELETE_SERVERS):
'''Display servers then delete them'''
print('\n\n================ Total of {} {} to delete (delete={}) ================'.format(len(delete_list),
server_type,
delete))
server_type,
delete))
for delete_server in delete_list:
print('{} -- {} -- {} -- {}'.format(delete_server.id, delete_server.hostname,
delete_server.cve_announcements_count, delete_server.created_at))
delete_server.cve_announcements_count, delete_server.created_at))

if delete is True:
client.delete_server(str(delete_server.id))
Expand All @@ -52,9 +52,7 @@ def display_and_delete(delete_list, server_type, client, delete=DELETE_SERVERS):
def launch_script():
'''Launch script'''
client = connect_api()
filters = {
"communication_failed": "true"
}
filters = {"communication_failed": "true"}
servers = client.servers(filters)

lost_com_servers = find_lost_com_servers(servers)
Expand Down
12 changes: 6 additions & 6 deletions examples/cve_published_last_month_export_xlsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ def export_xls(cve_list, xls_export):
xls_export.close()

# Defines date to retrieve CVEs published last month
today = datetime.date.today()
firstDayOfLastMonth = (today.replace(day=1) - datetime.timedelta(days=1)).replace(day=1)
firstDayOfCurrentMonth = today.replace(day=1)
TODAY = datetime.date.today()
FIRSTDAYOFLASTMONTH = (TODAY.replace(day=1) - datetime.timedelta(days=1)).replace(day=1)
FIRSTDAYOFCURRENTMONTH = TODAY.replace(day=1)

print("Exporting vulnerabilities published between {} and {}.".format(firstDayOfLastMonth, firstDayOfCurrentMonth))
export_xls(get_cyberwatch_cves(firstDayOfLastMonth, firstDayOfCurrentMonth),
instantiate_export("active_CVEs_{}_to_{}_export.xlsx".format(firstDayOfLastMonth, firstDayOfCurrentMonth)))
print("Exporting vulnerabilities published between {} and {}.".format(FIRSTDAYOFLASTMONTH, FIRSTDAYOFCURRENTMONTH))
export_xls(get_cyberwatch_cves(FIRSTDAYOFLASTMONTH, FIRSTDAYOFCURRENTMONTH),
instantiate_export("active_CVEs_{}_to_{}_export.xlsx".format(FIRSTDAYOFLASTMONTH, FIRSTDAYOFCURRENTMONTH)))
6 changes: 3 additions & 3 deletions examples/detail_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ def to_csv(csv_lines, name_csv='just_generated.csv', path=""):

# Fetch active CVE if an exploit is available
logging.info('Fetching active CVE')
cve_list = CLIENT.cve_announcements({"exploitable": "true", "active": "true"})
CVE_LIST = CLIENT.cve_announcements({"exploitable": "true", "active": "true"})

# Formating lines for the csv
logging.info('Formating lines for the csv file')
csv_lines_list = to_csv_lines(cve_list)
CSV_LINES_LIST = to_csv_lines(CVE_LIST)

# Exporting csv file
to_csv(csv_lines_list, path="")
to_csv(CSV_LINES_LIST, path="")
43 changes: 22 additions & 21 deletions examples/email_report_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime
from email.mime.text import MIMEText
from cbw_api_toolbox.cbw_api import CBWApi
# pylint: disable=duplicate-code

CONF = ConfigParser()
CONF.read(os.path.join(os.path.abspath(os.path.dirname(__file__)), '..', 'api.conf'))
Expand Down Expand Up @@ -35,11 +36,11 @@

# Filters to use, please comment unused parameters
CVE_FILTERS = {
"level": "level_critical", #level_critical = CVSS score > 9, level_high = 7 < 9, level_medium = 4 < 7
"active": "true",
# "technology_product": "",
"groups": ["", ""] # ( ["group"] or ["groupA", "groupB", "groupC"]...)
}
"level": "level_critical", #level_critical = CVSS score > 9, level_high = 7 < 9, level_medium = 4 < 7
"active": "true",
# "technology_product": "",
"groups": ["", ""] # ( ["group"] or ["groupA", "groupB", "groupC"]...)
}

############################################################

Expand Down Expand Up @@ -143,7 +144,7 @@ def build_email(active_cves):
</body>
</html>
"""
if filtered_active_cves == []:
if FILTERED_ACTIVE_CVES == []:
html = '<p>Aucun serveur avec une CVE active correspondant aux critères définis a été remonté</p>'
data = html_start + html + html_end
return data
Expand Down Expand Up @@ -178,24 +179,24 @@ def build_email(active_cves):

return html_start

filtered_active_cves = sort_cves()
FILTERED_ACTIVE_CVES = sort_cves()

HTML = build_email(filtered_active_cves)
HTML = build_email(FILTERED_ACTIVE_CVES)

print("! Testing communication with SMTP server")
context = ssl.create_default_context()
smtpserver = smtplib.SMTP(SMTP_SETTINGS["server"], SMTP_SETTINGS["port"])
smtpserver.ehlo() # Can be omitted
smtpserver.starttls(context=context) # Secure the connection
smtpserver.ehlo() # Can be omitted
smtpserver.login(SMTP_SETTINGS["username"], SMTP_SETTINGS["password"])
CONTEXT = ssl.create_default_context()
SMTPSERVER = smtplib.SMTP(SMTP_SETTINGS["server"], SMTP_SETTINGS["port"])
SMTPSERVER.ehlo() # Can be omitted
SMTPSERVER.starttls(context=CONTEXT) # Secure the connection
SMTPSERVER.ehlo() # Can be omitted
SMTPSERVER.login(SMTP_SETTINGS["username"], SMTP_SETTINGS["password"])
print("INFO:OK")

today = datetime.now().strftime("%Y-%m-%d %H:%M")
msg = MIMEText(HTML, 'html', 'utf-8')
msg['Subject'] = 'Cyberwatch - Bilan du '+ today
msg['From'] = SMTP_SETTINGS["sender"]
msg['To'] = SMTP_SETTINGS["recipient"]
smtpserver.send_message(msg)
TODAY = datetime.now().strftime("%Y-%m-%d %H:%M")
MSG = MIMEText(HTML, 'html', 'utf-8')
MSG['Subject'] = 'Cyberwatch - Bilan du '+ TODAY
MSG['From'] = SMTP_SETTINGS["sender"]
MSG['To'] = SMTP_SETTINGS["recipient"]
SMTPSERVER.send_message(MSG)

smtpserver.quit()
SMTPSERVER.quit()
22 changes: 11 additions & 11 deletions examples/fetch_daily_cves_to_redmine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ def send_redmine(cve_list, project_id, tracker_id):
message += "\n\n* \""+server.hostname+"\":"+CONF.get('cyberwatch', 'url')+"/servers/"+str(server.id)

if cve.level is not None:
redmine_priority_id = redmine_priorities[cve.level[6:]]
redmine_priority_id = REDMINE_PRIORITIES[cve.level[6:]]
else:
redmine_priority_id = redmine_priorities["unknown"]
redmine_priority_id = REDMINE_PRIORITIES["unknown"]

with redmine.session(return_response=False):
redmine.issue.create(project_id=project_id, subject='Cyberwatch new CVE : {}'.format(cve.cve_code), \
with REDMINE.session(return_response=False):
REDMINE.issue.create(project_id=project_id, subject='Cyberwatch new CVE : {}'.format(cve.cve_code), \
priority_id=redmine_priority_id, description=message, tracker_id=tracker_id)

def get_cves_today():
Expand Down Expand Up @@ -77,13 +77,13 @@ def get_cves_yesterday():
CLIENT.ping()

# Redmine API informations
redmine = Redmine(CONF.get('redmine', 'url'), version=CONF.get('redmine', 'version'), key=CONF.get('redmine', 'key'))
REDMINE = Redmine(CONF.get('redmine', 'url'), version=CONF.get('redmine', 'version'), key=CONF.get('redmine', 'key'))
# id of the Redmine project to affect newly created issues to
REDMINE_PROJECT_ID = 2
# id of the tracker ; optional if a default tracker is defined in Redmine
REDMINE_TRACKER_ID = 1
# dict of priorities and their ids in Redmine, available through admin interface : http://[redmine-url]/enumerations
redmine_priorities = {
REDMINE_PRIORITIES = {
"low": 5,
"medium": 4,
"high": 3,
Expand All @@ -93,12 +93,12 @@ def get_cves_yesterday():

# Finding the differences between yesterday and today
print("! Computing the difference...")
diff = list(set(get_cves_today()) - set(get_cves_yesterday()))
diff.sort()
print(diff)
DIFF = list(set(get_cves_today()) - set(get_cves_yesterday()))
DIFF.sort()
print(DIFF)

if len(diff) == 0:
if len(DIFF) == 0:
print("No new CVEs found between yesterday and today: nothing to send!")
sys.exit(0)

send_redmine(diff, REDMINE_PROJECT_ID, REDMINE_TRACKER_ID)
send_redmine(DIFF, REDMINE_PROJECT_ID, REDMINE_TRACKER_ID)
6 changes: 4 additions & 2 deletions examples/find_outdated_last_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def display(server_list, what):
for outdated_server in server_list:
server = outdated_server["server"]
print('{} --- {} --- {} --- Last Detection: {} days ago'.format(server.id, server.hostname,
server.cve_announcements_count, outdated_server["last_detection"]))
server.cve_announcements_count,
outdated_server["last_detection"]))


def send_email(subject, sender, receiver, content, login, password, smtp, port):
Expand Down Expand Up @@ -86,7 +87,8 @@ def build_email(server_list):
for outdated_server in server_list:
server = outdated_server["server"]
content += '\n{} --- {} --- {} --- Dernière Détection : {} jours'.format(server.id, server.hostname,
server.cve_announcements_count, outdated_server["last_detection"])
server.cve_announcements_count,
outdated_server["last_detection"])

mail_content = """
Bonjour,
Expand Down
Loading

0 comments on commit c6ce727

Please sign in to comment.