Skip to content

Commit

Permalink
Fix pylint errors for pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
fenrisl committed Jul 19, 2022
1 parent 0ba478d commit 9e70ee7
Show file tree
Hide file tree
Showing 31 changed files with 185 additions and 182 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ confidence=
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
disable=too-few-public-methods,too-many-instance-attributes,too-many-arguments,too-many-locals,logging-format-interpolation,not-an-iterable, too-many-public-methods
disable=too-few-public-methods,too-many-instance-attributes,too-many-arguments,too-many-locals,logging-format-interpolation,not-an-iterable, too-many-public-methods, duplicate-code, consider-using-f-string

[FORMAT]

Expand Down
10 changes: 5 additions & 5 deletions cbw_api_toolbox/cbw_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ class CBWApi: # pylint: disable=R0904
"""Class used to communicate with the CBW API"""

def __init__(
self,
api_url=None,
api_key=None,
secret_key=None,
verify_ssl=False,
self,
api_url=None,
api_key=None,
secret_key=None,
verify_ssl=False,
):

self.verify_ssl = verify_ssl
Expand Down
6 changes: 3 additions & 3 deletions cli/airgap/download_compliance_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def download_individual_script(script_object, base_directory):
script_filename = "".join((base_directory, "/", script.filename))

os.makedirs(dirname(script_filename), exist_ok=True)
with open(script_filename, "w") as filestream:
with open(script_filename, "w", encoding="utf-8") as filestream:
filestream.write(script.script_content)

if ".ps1" in script_object[0].filename:
Expand All @@ -105,10 +105,10 @@ def create_run_scripts(os_target, base_directory):

if os_target in "Windows":
run_script = join(base_directory, "run.ps1")
with open(run_script, "w") as file_stream:
with open(run_script, "w", encoding="utf-8") as file_stream:
file_stream.write(PS1_EXECUTE_SCRIPT)
else:
run_script = join(base_directory, "run")
with open(run_script, "w") as file_stream:
with open(run_script, "w", encoding="utf-8") as file_stream:
file_stream.write(SH_EXECUTE_SCRIPT)
os.chmod(run_script, 0o755)
10 changes: 4 additions & 6 deletions cli/airgap/download_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ def subcommand(args, api: CBWApi):
print("INFO: Script saved in {}".format(script_dir))


def download_individual_script(
script_object, base_directory, api: CBWApi, with_attachment=False
):
def download_individual_script(script_object, base_directory, api: CBWApi, with_attachment=False):
"""Get each script and put it in the correct category"""
script = api.fetch_airgapped_script(str(script_object.id), params={"pristine": "1"})
if script is None or script.type is None:
Expand All @@ -77,7 +75,7 @@ def download_individual_script(
os.makedirs(dirname(script_filename), exist_ok=True)
script_filename = append_extension(script_filename)

with open(script_filename, "w") as filestream:
with open(script_filename, "w", encoding="utf-8") as filestream:
filestream.write(script.contents)

if script.attachment and with_attachment:
Expand Down Expand Up @@ -118,7 +116,7 @@ def create_run_scripts(script_os_association, base_directory):
def add_sh_run_script(os_and_scripts, directory):
"""Create a shell run script in directory"""
run_script = join(directory, "run")
with open(run_script, "w") as file_stream:
with open(run_script, "w", encoding="utf-8") as file_stream:
file_stream.write(
SH_EXECUTE_SCRIPT.format(" ".join(script for (_, script) in os_and_scripts))
)
Expand All @@ -128,7 +126,7 @@ def add_sh_run_script(os_and_scripts, directory):
def add_pwsh_run_script(os_and_scripts, directory):
"""Creates a "windows launch all" powershell script"""
run_script_filename = join(directory, "run.ps1")
with open(run_script_filename, "w") as file_stream:
with open(run_script_filename, "w", encoding="utf-8") as file_stream:
file_stream.write("$ScriptDir = Split-Path $MyInvocation.MyCommand.Path\n")
for _, script in os_and_scripts:
file_stream.write(f'& "$ScriptDir/{script}"\n')
2 changes: 1 addition & 1 deletion cli/airgap/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def upload_file(result_script_filename, api: CBWApi):
def read_file_all_encodings(filename):
"""Return the content of `filename`. Detects the encoding used by the
file."""
with open(filename, "rb") as file_stream:
with open(filename, "rb", encoding="utf-8") as file_stream:
raw_content = file_stream.read()
detection = chardet.detect(raw_content)
return raw_content.decode(detection["encoding"])
2 changes: 1 addition & 1 deletion cli/airgap/upload_compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def upload_file(result_script_filename, api: CBWApi):
def read_file_all_encodings(filename):
"""Return the content of `filename`. Detects the encoding used by the
file."""
with open(filename, "rb") as file_stream:
with open(filename, "rb", encoding="utf-8") as file_stream:
raw_content = file_stream.read()
detection = chardet.detect(raw_content)
return raw_content.decode(detection["encoding"])
6 changes: 3 additions & 3 deletions examples/air_gapped_scans/download_airgapped_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def download_scripts(parsed_args, scripts, client):
elif "Windows" in file_name:
file_name[-1] += '.ps1'
path = os.path.join(os.path.dirname(__file__), "/".join(file_name))
with open(path, 'w') as filehandle:
with open(path, 'w' , encoding="utf-8") as filehandle:
filehandle.write(script.contents)
if script.attachment and parsed_args.no_attachment:
download_attachment(file_name, script.attachment)
Expand All @@ -66,7 +66,7 @@ def download_attachment(path, url):
attachment = requests.get(url, allow_redirects=True, verify=False)
location = os.path.join(os.path.dirname(__file__), "/".join(path[:-1]))
name = url.split("/")[-1]
with open(os.path.join(location, name), 'wb') as file:
with open(os.path.join(location, name), 'wb', encoding="utf-8") as file:
file.write(attachment.content)


Expand All @@ -83,7 +83,7 @@ def create_windows_launch_all():
}"""

path = os.path.join(os.path.dirname(__file__), "Scripts", "Windows", "cbw_launch_all.ps1")
with open(path, 'w') as filehandle:
with open(path, 'w' , encoding="utf-8") as filehandle:
filehandle.write(launch_all_powershell)


Expand Down
6 changes: 3 additions & 3 deletions examples/air_gapped_scans/upload_airgapped_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ def connect_api():
def upload(client):
"""Upload results from the folder 'Uploads' to Cyberwatch"""
print("INFO: Searching for available results...")
files = ( file for file in sorted(os.listdir(os.path.join(os.path.dirname(__file__), 'Uploads'))) )
files = (file for file in sorted(os.listdir(os.path.join(os.path.dirname(__file__), 'Uploads'))))
for file in files:
file_path = os.path.join(os.path.dirname(__file__), 'Uploads', file)
if os.path.isfile(file_path):
with open(file_path, 'r') as filehandle:
with open(file_path, 'r', encoding="utf-8") as filehandle:
filecontent = filehandle.read()
content = {'output': filecontent , 'groups': 'my_group_1, my_group_2'}
content = {'output': filecontent, 'groups': 'my_group_1, my_group_2'}
print('INFO: Sending {} content to the API...'.format(file))
client.upload_airgapped_results(content)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,26 @@

# Prerequisites :
# - Install libcloud with command "pip3 install apache-libcloud"
# - If you are not using the default credentials for agentless connections configured in Cyberwatch, set up SERVER_LOGIN and/or WINRM_password variables
# - Set the constant variables on the first lines of the script depending on which cloud provider you use (https://libcloud.readthedocs.io/en/stable/compute/drivers/)
# - Set up your Cyberwatch API key in api.conf in the same folder as the script, for an example https://github.com/Cyberwatch/cyberwatch_api_toolbox#configuration
# - If you are not using the default credentials for agentless connections configured in Cyberwatch,
# set up SERVER_LOGIN and/or WINRM_password variables
# - Set the constant variables on the first lines of the script depending
# on which cloud provider you use (https://libcloud.readthedocs.io/en/stable/compute/drivers/)
# - Set up your Cyberwatch API key in api.conf in the same folder as the script, for an example:
# https://github.com/Cyberwatch/cyberwatch_api_toolbox#configuration
# - SSH key file of servers to import named "id_rsa"
# Notes :
# - All servers will be imported with group "cloud_crawling" + zone (ex: "europe-west4-a")

import argparse
import os
import socket

# pylint: disable=E0401, R1705
from configparser import ConfigParser
from libcloud.compute.types import Provider
from libcloud.compute.providers import get_driver
from cbw_api_toolbox.cbw_api import CBWApi

SSH_KEY_SERVERS = open(os.path.expanduser('id_rsa')).read()
SSH_KEY_SERVERS = open(os.path.expanduser('id_rsa'), encoding="utf-8").read()
SERVER_LOGIN = ""
WINRM_PASSWORD_SERVERS = ""

Expand Down Expand Up @@ -49,7 +52,7 @@ def connect_api():
def get_node():
'''Get list of available nodes and prompt user to choose'''
nodes = API.nodes()
if len(nodes) > 1 :
if len(nodes) > 1:
print("Which Cyberwatch node do you want to use to import?")
for node in nodes:
print("ID: {}, name: {}".format(node.id, node.name))
Expand All @@ -60,7 +63,7 @@ def get_node():
return node_id
else:
raise ValueError("Please provide valid node id")
else:
else:
return nodes[0].id


Expand Down Expand Up @@ -98,15 +101,15 @@ def retrieve_ec2_servers():
return running


def port_checker(ip, port):
def port_checker(ip_address, port):
'''Check if a specific port is open on an ip address'''
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5)
socket1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket1.settimeout(5)
try:
s.connect((ip, int(port)))
s.shutdown(2)
socket1.connect((ip_address, int(port)))
socket1.shutdown(2)
return True
except:
except Exception: # pylint: disable=broad-except
return False


Expand All @@ -131,7 +134,8 @@ def check_add_server(servers, cloud_servers, node_id):
"key": SSH_KEY_SERVERS})
to_add.append(info)
else:
print('The server ' + cloud_server_ip + ' has no default port exposed (SSH/22 or WINRM/5985) so an agentless connection with Cyberwatch is not possible')
print("""The server ' + cloud_server_ip + ' has no default port exposed (SSH/22 or WINRM/5985)
so an agentless connection with Cyberwatch is not possible""")
return to_add


Expand All @@ -150,8 +154,7 @@ def check_delete_server(cloud_servers):
def display_and_import(to_import_list, apply=False):
'''Display to_import servers then import them'''

print('\n\n================= Total of {} cloud servers to import (apply={}) ================='.format(len(to_import_list),
apply))
print('\n\n===== Total of {} cloud servers to import (apply={}) ====='.format(len(to_import_list), apply))
for to_add_server in to_import_list:
print('{} --- {} --- {}'.format(to_add_server["address"],
to_add_server["server_groups"], to_add_server["type"]))
Expand All @@ -161,8 +164,7 @@ def display_and_import(to_import_list, apply=False):

def display_and_delete(to_delete_list, apply=False):
'''Display to_delete servers then delete them'''
print('\n\n================= Total of {} servers on Cyberwatch to delete (apply={}) ================='.format(len(to_delete_list),
apply))
print('\n\n===== Total of {} servers on Cyberwatch to delete (apply={}) ====='.format(len(to_delete_list), apply))
for server in to_delete_list:
print('{} --- {} --- {}'.format(server.remote_ip, server.hostname, server.id))
if apply is True:
Expand Down Expand Up @@ -202,7 +204,8 @@ def main(args=None):
'''Main function'''

parser = argparse.ArgumentParser(
description='Script using Cyberwatch API to import not monitored cloud servers and delete terminated cloud servers in Cyberwatch.\nBy default this script is run in read-only mode.')
description="""Script using Cyberwatch API to import not monitored cloud servers and delete terminated
cloud servers in Cyberwatch.\nBy default this script is run in read-only mode.""")

parser.add_argument(
'-i',
Expand Down
4 changes: 2 additions & 2 deletions examples/clean_discovered_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def find_discoveries(client):
discoveries_details = client.hosts()
for host in discoveries_details:
if host.discovery.type == "CbwAssets::Discovery::DockerRegistry":
for id in host.server_ids:
ids.append(str(id))
for id_server in host.server_ids:
ids.append(str(id_server))
return ids


Expand Down
2 changes: 1 addition & 1 deletion examples/cleanup_duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
from configparser import ConfigParser
from cbw_api_toolbox.cbw_api import CBWApi

# pylint: disable=duplicate-code

def connect_api():
'''Connect to the API and test connection'''
Expand Down
1 change: 1 addition & 0 deletions examples/cleanup_initialization_duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datetime import datetime
from dateutil.relativedelta import relativedelta # pylint: disable=import-error
from cbw_api_toolbox.cbw_api import CBWApi
# pylint: disable=duplicate-code

def connect_api():
'''Connect ot the API'''
Expand Down
10 changes: 4 additions & 6 deletions examples/cleanup_lost_com_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ def find_lost_com_servers(servers):
def display_and_delete(delete_list, server_type, client, delete=DELETE_SERVERS):
'''Display servers then delete them'''
print('\n\n================ Total of {} {} to delete (delete={}) ================'.format(len(delete_list),
server_type,
delete))
server_type,
delete))
for delete_server in delete_list:
print('{} -- {} -- {} -- {}'.format(delete_server.id, delete_server.hostname,
delete_server.cve_announcements_count, delete_server.created_at))
delete_server.cve_announcements_count, delete_server.created_at))

if delete is True:
client.delete_server(str(delete_server.id))
Expand All @@ -52,9 +52,7 @@ def display_and_delete(delete_list, server_type, client, delete=DELETE_SERVERS):
def launch_script():
'''Launch script'''
client = connect_api()
filters = {
"communication_failed": "true"
}
filters = {"communication_failed": "true"}
servers = client.servers(filters)

lost_com_servers = find_lost_com_servers(servers)
Expand Down
12 changes: 6 additions & 6 deletions examples/cve_published_last_month_export_xlsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ def export_xls(cve_list, xls_export):
xls_export.close()

# Defines date to retrieve CVEs published last month
today = datetime.date.today()
firstDayOfLastMonth = (today.replace(day=1) - datetime.timedelta(days=1)).replace(day=1)
firstDayOfCurrentMonth = today.replace(day=1)
TODAY = datetime.date.today()
FIRSTDAYOFLASTMONTH = (TODAY.replace(day=1) - datetime.timedelta(days=1)).replace(day=1)
FIRSTDAYOFCURRENTMONTH = TODAY.replace(day=1)

print("Exporting vulnerabilities published between {} and {}.".format(firstDayOfLastMonth, firstDayOfCurrentMonth))
export_xls(get_cyberwatch_cves(firstDayOfLastMonth, firstDayOfCurrentMonth),
instantiate_export("active_CVEs_{}_to_{}_export.xlsx".format(firstDayOfLastMonth, firstDayOfCurrentMonth)))
print("Exporting vulnerabilities published between {} and {}.".format(FIRSTDAYOFLASTMONTH, FIRSTDAYOFCURRENTMONTH))
export_xls(get_cyberwatch_cves(FIRSTDAYOFLASTMONTH, FIRSTDAYOFCURRENTMONTH),
instantiate_export("active_CVEs_{}_to_{}_export.xlsx".format(FIRSTDAYOFLASTMONTH, FIRSTDAYOFCURRENTMONTH)))
8 changes: 4 additions & 4 deletions examples/detail_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def to_csv_lines(cve_catalog):

def to_csv(csv_lines, name_csv='just_generated.csv', path=""):
"""Write objects in csv_lines into a csv file"""
with open(os.path.join(path, name_csv), 'w', newline='') as csvfile:
with open(os.path.join(path, name_csv), 'w', newline='', encoding="utf-8") as csvfile:
spamwriter = csv.writer(csvfile, delimiter=' ',
quotechar='|', quoting=csv.QUOTE_MINIMAL)
spamwriter.writerow(['"sep=,"'])
Expand All @@ -52,11 +52,11 @@ def to_csv(csv_lines, name_csv='just_generated.csv', path=""):

# Fetch active CVE if an exploit is available
logging.info('Fetching active CVE')
cve_list = CLIENT.cve_announcements({"exploitable": "true", "active": "true"})
CVE_LIST = CLIENT.cve_announcements({"exploitable": "true", "active": "true"})

# Formating lines for the csv
logging.info('Formating lines for the csv file')
csv_lines_list = to_csv_lines(cve_list)
CSV_LINES_LIST = to_csv_lines(CVE_LIST)

# Exporting csv file
to_csv(csv_lines_list, path="")
to_csv(CSV_LINES_LIST, path="")
Loading

0 comments on commit 9e70ee7

Please sign in to comment.