Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pylint errors for pipeline #292

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ confidence=
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
disable=too-few-public-methods,too-many-instance-attributes,too-many-arguments,too-many-locals,logging-format-interpolation,not-an-iterable, too-many-public-methods
disable=too-few-public-methods,too-many-instance-attributes,too-many-arguments,too-many-locals,logging-format-interpolation,not-an-iterable, too-many-public-methods, duplicate-code, consider-using-f-string

[FORMAT]

Expand Down
10 changes: 5 additions & 5 deletions cbw_api_toolbox/cbw_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ class CBWApi: # pylint: disable=R0904
"""Class used to communicate with the CBW API"""

def __init__(
self,
api_url=None,
api_key=None,
secret_key=None,
verify_ssl=False,
self,
api_url=None,
api_key=None,
secret_key=None,
verify_ssl=False,
):

self.verify_ssl = verify_ssl
Expand Down
6 changes: 3 additions & 3 deletions cli/airgap/download_compliance_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def download_individual_script(script_object, base_directory):
script_filename = "".join((base_directory, "/", script.filename))

os.makedirs(dirname(script_filename), exist_ok=True)
with open(script_filename, "w") as filestream:
with open(script_filename, "w", encoding="utf-8") as filestream:
filestream.write(script.script_content)

if ".ps1" in script_object[0].filename:
Expand All @@ -105,10 +105,10 @@ def create_run_scripts(os_target, base_directory):

if os_target in "Windows":
run_script = join(base_directory, "run.ps1")
with open(run_script, "w") as file_stream:
with open(run_script, "w", encoding="utf-8") as file_stream:
file_stream.write(PS1_EXECUTE_SCRIPT)
else:
run_script = join(base_directory, "run")
with open(run_script, "w") as file_stream:
with open(run_script, "w", encoding="utf-8") as file_stream:
file_stream.write(SH_EXECUTE_SCRIPT)
os.chmod(run_script, 0o755)
10 changes: 4 additions & 6 deletions cli/airgap/download_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ def subcommand(args, api: CBWApi):
print("INFO: Script saved in {}".format(script_dir))


def download_individual_script(
script_object, base_directory, api: CBWApi, with_attachment=False
):
def download_individual_script(script_object, base_directory, api: CBWApi, with_attachment=False):
"""Get each script and put it in the correct category"""
script = api.fetch_airgapped_script(str(script_object.id), params={"pristine": "1"})
if script is None or script.type is None:
Expand All @@ -77,7 +75,7 @@ def download_individual_script(
os.makedirs(dirname(script_filename), exist_ok=True)
script_filename = append_extension(script_filename)

with open(script_filename, "w") as filestream:
with open(script_filename, "w", encoding="utf-8") as filestream:
filestream.write(script.contents)

if script.attachment and with_attachment:
Expand Down Expand Up @@ -118,7 +116,7 @@ def create_run_scripts(script_os_association, base_directory):
def add_sh_run_script(os_and_scripts, directory):
"""Create a shell run script in directory"""
run_script = join(directory, "run")
with open(run_script, "w") as file_stream:
with open(run_script, "w", encoding="utf-8") as file_stream:
file_stream.write(
SH_EXECUTE_SCRIPT.format(" ".join(script for (_, script) in os_and_scripts))
)
Expand All @@ -128,7 +126,7 @@ def add_sh_run_script(os_and_scripts, directory):
def add_pwsh_run_script(os_and_scripts, directory):
"""Creates a "windows launch all" powershell script"""
run_script_filename = join(directory, "run.ps1")
with open(run_script_filename, "w") as file_stream:
with open(run_script_filename, "w", encoding="utf-8") as file_stream:
file_stream.write("$ScriptDir = Split-Path $MyInvocation.MyCommand.Path\n")
for _, script in os_and_scripts:
file_stream.write(f'& "$ScriptDir/{script}"\n')
2 changes: 1 addition & 1 deletion cli/airgap/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def upload_file(result_script_filename, api: CBWApi):
def read_file_all_encodings(filename):
"""Return the content of `filename`. Detects the encoding used by the
file."""
with open(filename, "rb") as file_stream:
with open(filename, "rb", encoding="utf-8") as file_stream:
raw_content = file_stream.read()
detection = chardet.detect(raw_content)
return raw_content.decode(detection["encoding"])
2 changes: 1 addition & 1 deletion cli/airgap/upload_compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def upload_file(result_script_filename, api: CBWApi):
def read_file_all_encodings(filename):
"""Return the content of `filename`. Detects the encoding used by the
file."""
with open(filename, "rb") as file_stream:
with open(filename, "rb", encoding="utf-8") as file_stream:
raw_content = file_stream.read()
detection = chardet.detect(raw_content)
return raw_content.decode(detection["encoding"])
6 changes: 3 additions & 3 deletions examples/air_gapped_scans/download_airgapped_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def download_scripts(parsed_args, scripts, client):
elif "Windows" in file_name:
file_name[-1] += '.ps1'
path = os.path.join(os.path.dirname(__file__), "/".join(file_name))
with open(path, 'w') as filehandle:
with open(path, 'w' , encoding="utf-8") as filehandle:
filehandle.write(script.contents)
if script.attachment and parsed_args.no_attachment:
download_attachment(file_name, script.attachment)
Expand All @@ -66,7 +66,7 @@ def download_attachment(path, url):
attachment = requests.get(url, allow_redirects=True, verify=False)
location = os.path.join(os.path.dirname(__file__), "/".join(path[:-1]))
name = url.split("/")[-1]
with open(os.path.join(location, name), 'wb') as file:
with open(os.path.join(location, name), 'wb', encoding="utf-8") as file:
file.write(attachment.content)


Expand All @@ -83,7 +83,7 @@ def create_windows_launch_all():
}"""

path = os.path.join(os.path.dirname(__file__), "Scripts", "Windows", "cbw_launch_all.ps1")
with open(path, 'w') as filehandle:
with open(path, 'w' , encoding="utf-8") as filehandle:
filehandle.write(launch_all_powershell)


Expand Down
6 changes: 3 additions & 3 deletions examples/air_gapped_scans/upload_airgapped_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ def connect_api():
def upload(client):
"""Upload results from the folder 'Uploads' to Cyberwatch"""
print("INFO: Searching for available results...")
files = ( file for file in sorted(os.listdir(os.path.join(os.path.dirname(__file__), 'Uploads'))) )
files = (file for file in sorted(os.listdir(os.path.join(os.path.dirname(__file__), 'Uploads'))))
for file in files:
file_path = os.path.join(os.path.dirname(__file__), 'Uploads', file)
if os.path.isfile(file_path):
with open(file_path, 'r') as filehandle:
with open(file_path, 'r', encoding="utf-8") as filehandle:
filecontent = filehandle.read()
content = {'output': filecontent , 'groups': 'my_group_1, my_group_2'}
content = {'output': filecontent, 'groups': 'my_group_1, my_group_2'}
print('INFO: Sending {} content to the API...'.format(file))
client.upload_airgapped_results(content)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,26 @@

# Prerequisites :
# - Install libcloud with command "pip3 install apache-libcloud"
# - If you are not using the default credentials for agentless connections configured in Cyberwatch, set up SERVER_LOGIN and/or WINRM_password variables
# - Set the constant variables on the first lines of the script depending on which cloud provider you use (https://libcloud.readthedocs.io/en/stable/compute/drivers/)
# - Set up your Cyberwatch API key in api.conf in the same folder as the script, for an example https://github.com/Cyberwatch/cyberwatch_api_toolbox#configuration
# - If you are not using the default credentials for agentless connections configured in Cyberwatch,
# set up SERVER_LOGIN and/or WINRM_password variables
# - Set the constant variables on the first lines of the script depending
# on which cloud provider you use (https://libcloud.readthedocs.io/en/stable/compute/drivers/)
# - Set up your Cyberwatch API key in api.conf in the same folder as the script, for an example:
# https://github.com/Cyberwatch/cyberwatch_api_toolbox#configuration
# - SSH key file of servers to import named "id_rsa"
# Notes :
# - All servers will be imported with group "cloud_crawling" + zone (ex: "europe-west4-a")

import argparse
import os
import socket

# pylint: disable=E0401, R1705
from configparser import ConfigParser
from libcloud.compute.types import Provider
from libcloud.compute.providers import get_driver
from cbw_api_toolbox.cbw_api import CBWApi

SSH_KEY_SERVERS = open(os.path.expanduser('id_rsa')).read()
SSH_KEY_SERVERS = open(os.path.expanduser('id_rsa'), encoding="utf-8").read()
SERVER_LOGIN = ""
WINRM_PASSWORD_SERVERS = ""

Expand Down Expand Up @@ -49,7 +52,7 @@ def connect_api():
def get_node():
'''Get list of available nodes and prompt user to choose'''
nodes = API.nodes()
if len(nodes) > 1 :
if len(nodes) > 1:
print("Which Cyberwatch node do you want to use to import?")
for node in nodes:
print("ID: {}, name: {}".format(node.id, node.name))
Expand All @@ -60,7 +63,7 @@ def get_node():
return node_id
else:
raise ValueError("Please provide valid node id")
else:
else:
return nodes[0].id


Expand Down Expand Up @@ -98,15 +101,15 @@ def retrieve_ec2_servers():
return running


def port_checker(ip, port):
def port_checker(ip_address, port):
'''Check if a specific port is open on an ip address'''
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5)
socket1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket1.settimeout(5)
try:
s.connect((ip, int(port)))
s.shutdown(2)
socket1.connect((ip_address, int(port)))
socket1.shutdown(2)
return True
except:
except Exception: # pylint: disable=broad-except
return False


Expand All @@ -131,7 +134,8 @@ def check_add_server(servers, cloud_servers, node_id):
"key": SSH_KEY_SERVERS})
to_add.append(info)
else:
print('The server ' + cloud_server_ip + ' has no default port exposed (SSH/22 or WINRM/5985) so an agentless connection with Cyberwatch is not possible')
print("""The server ' + cloud_server_ip + ' has no default port exposed (SSH/22 or WINRM/5985)
so an agentless connection with Cyberwatch is not possible""")
return to_add


Expand All @@ -150,8 +154,7 @@ def check_delete_server(cloud_servers):
def display_and_import(to_import_list, apply=False):
'''Display to_import servers then import them'''

print('\n\n================= Total of {} cloud servers to import (apply={}) ================='.format(len(to_import_list),
apply))
print('\n\n===== Total of {} cloud servers to import (apply={}) ====='.format(len(to_import_list), apply))
for to_add_server in to_import_list:
print('{} --- {} --- {}'.format(to_add_server["address"],
to_add_server["server_groups"], to_add_server["type"]))
Expand All @@ -161,8 +164,7 @@ def display_and_import(to_import_list, apply=False):

def display_and_delete(to_delete_list, apply=False):
'''Display to_delete servers then delete them'''
print('\n\n================= Total of {} servers on Cyberwatch to delete (apply={}) ================='.format(len(to_delete_list),
apply))
print('\n\n===== Total of {} servers on Cyberwatch to delete (apply={}) ====='.format(len(to_delete_list), apply))
for server in to_delete_list:
print('{} --- {} --- {}'.format(server.remote_ip, server.hostname, server.id))
if apply is True:
Expand Down Expand Up @@ -202,7 +204,8 @@ def main(args=None):
'''Main function'''

parser = argparse.ArgumentParser(
description='Script using Cyberwatch API to import not monitored cloud servers and delete terminated cloud servers in Cyberwatch.\nBy default this script is run in read-only mode.')
description="""Script using Cyberwatch API to import not monitored cloud servers and delete terminated
cloud servers in Cyberwatch.\nBy default this script is run in read-only mode.""")

parser.add_argument(
'-i',
Expand Down
4 changes: 2 additions & 2 deletions examples/clean_discovered_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def find_discoveries(client):
discoveries_details = client.hosts()
for host in discoveries_details:
if host.discovery.type == "CbwAssets::Discovery::DockerRegistry":
for id in host.server_ids:
ids.append(str(id))
for id_server in host.server_ids:
ids.append(str(id_server))
return ids


Expand Down
2 changes: 1 addition & 1 deletion examples/cleanup_duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
from configparser import ConfigParser
from cbw_api_toolbox.cbw_api import CBWApi

# pylint: disable=duplicate-code

def connect_api():
'''Connect to the API and test connection'''
Expand Down
1 change: 1 addition & 0 deletions examples/cleanup_initialization_duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datetime import datetime
from dateutil.relativedelta import relativedelta # pylint: disable=import-error
from cbw_api_toolbox.cbw_api import CBWApi
# pylint: disable=duplicate-code

def connect_api():
'''Connect ot the API'''
Expand Down
10 changes: 4 additions & 6 deletions examples/cleanup_lost_com_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ def find_lost_com_servers(servers):
def display_and_delete(delete_list, server_type, client, delete=DELETE_SERVERS):
'''Display servers then delete them'''
print('\n\n================ Total of {} {} to delete (delete={}) ================'.format(len(delete_list),
server_type,
delete))
server_type,
delete))
for delete_server in delete_list:
print('{} -- {} -- {} -- {}'.format(delete_server.id, delete_server.hostname,
delete_server.cve_announcements_count, delete_server.created_at))
delete_server.cve_announcements_count, delete_server.created_at))

if delete is True:
client.delete_server(str(delete_server.id))
Expand All @@ -52,9 +52,7 @@ def display_and_delete(delete_list, server_type, client, delete=DELETE_SERVERS):
def launch_script():
'''Launch script'''
client = connect_api()
filters = {
"communication_failed": "true"
}
filters = {"communication_failed": "true"}
servers = client.servers(filters)

lost_com_servers = find_lost_com_servers(servers)
Expand Down
12 changes: 6 additions & 6 deletions examples/cve_published_last_month_export_xlsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ def export_xls(cve_list, xls_export):
xls_export.close()

# Defines date to retrieve CVEs published last month
today = datetime.date.today()
firstDayOfLastMonth = (today.replace(day=1) - datetime.timedelta(days=1)).replace(day=1)
firstDayOfCurrentMonth = today.replace(day=1)
TODAY = datetime.date.today()
FIRSTDAYOFLASTMONTH = (TODAY.replace(day=1) - datetime.timedelta(days=1)).replace(day=1)
FIRSTDAYOFCURRENTMONTH = TODAY.replace(day=1)

print("Exporting vulnerabilities published between {} and {}.".format(firstDayOfLastMonth, firstDayOfCurrentMonth))
export_xls(get_cyberwatch_cves(firstDayOfLastMonth, firstDayOfCurrentMonth),
instantiate_export("active_CVEs_{}_to_{}_export.xlsx".format(firstDayOfLastMonth, firstDayOfCurrentMonth)))
print("Exporting vulnerabilities published between {} and {}.".format(FIRSTDAYOFLASTMONTH, FIRSTDAYOFCURRENTMONTH))
export_xls(get_cyberwatch_cves(FIRSTDAYOFLASTMONTH, FIRSTDAYOFCURRENTMONTH),
instantiate_export("active_CVEs_{}_to_{}_export.xlsx".format(FIRSTDAYOFLASTMONTH, FIRSTDAYOFCURRENTMONTH)))
8 changes: 4 additions & 4 deletions examples/detail_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def to_csv_lines(cve_catalog):

def to_csv(csv_lines, name_csv='just_generated.csv', path=""):
"""Write objects in csv_lines into a csv file"""
with open(os.path.join(path, name_csv), 'w', newline='') as csvfile:
with open(os.path.join(path, name_csv), 'w', newline='', encoding="utf-8") as csvfile:
spamwriter = csv.writer(csvfile, delimiter=' ',
quotechar='|', quoting=csv.QUOTE_MINIMAL)
spamwriter.writerow(['"sep=,"'])
Expand All @@ -52,11 +52,11 @@ def to_csv(csv_lines, name_csv='just_generated.csv', path=""):

# Fetch active CVE if an exploit is available
logging.info('Fetching active CVE')
cve_list = CLIENT.cve_announcements({"exploitable": "true", "active": "true"})
CVE_LIST = CLIENT.cve_announcements({"exploitable": "true", "active": "true"})

# Formating lines for the csv
logging.info('Formating lines for the csv file')
csv_lines_list = to_csv_lines(cve_list)
CSV_LINES_LIST = to_csv_lines(CVE_LIST)

# Exporting csv file
to_csv(csv_lines_list, path="")
to_csv(CSV_LINES_LIST, path="")
Loading