Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/kics driven adapter #174

Merged
merged 18 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,17 @@ devsecops-engine-tools --platform_devops ["local","azure","github"] --remote_con
<th>Type</th>
</tr>
<tr>
<td rowspan="2">ENGINE_IAC</td>
<td rowspan="3">ENGINE_IAC</td>
<td><a href="https://www.checkov.io/">CHECKOV</a></td>
<td>Free</td>
</tr>
<tr>
<td><a href="https://kubescape.io/">KUBESCAPE</a></td>
<td>Free</td>
</tr>
<tr>
<td><a href="https://www.kics.io/">KICS</a></td>
<td>Free</td>
</tr>
<tr>
<td>ENGINE_DAST</td>
Expand Down
2 changes: 1 addition & 1 deletion example_remote_config_local/engine_core/ConfigTool.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
},
"ENGINE_IAC": {
"ENABLED": "true",
"TOOL": "CHECKOV|KUBESCAPE"
"TOOL": "CHECKOV|KUBESCAPE|KICS"
},
"ENGINE_CONTAINER": {
"ENABLED": "true",
Expand Down
18 changes: 18 additions & 0 deletions example_remote_config_local/engine_sast/engine_iac/ConfigTool.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,24 @@
"EXTERNAL_DIR_REPOSITORY": "",
"RULES": {

}
},
"KICS": {
"VERSION": "2.1.0",
"USE_EXTERNAL_CHECKS_GIT": "False",
"EXTERNAL_CHECKS_GIT": "",
"EXTERNAL_GIT_SSH_HOST": "",
"EXTERNAL_GIT_PUBLIC_KEY_FINGERPRINT": "",
"USE_EXTERNAL_CHECKS_DIR": "",
"EXTERNAL_DIR_OWNER": "",
"EXTERNAL_DIR_REPOSITORY": "",
"KICS_LINUX": "https://example.url/kics/bin/linux/kics.zip",
"KICS_WINDOWS": "https://example.url/kics/bin/windows/kics.zip",
"KICS_MAC": "https://example.url/kics/bin/mac/kics.zip",
"PATH_KICS": "./kics/kics",
"DOWNLOAD_KICS_ASSETS": false,
"RULES": {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def send_vulnerability_management(
"XRAY": "JFrog Xray On Demand Binary Scan",
"TRUFFLEHOG": "Trufflehog Scan",
"TRIVY": "Trivy Scan",
"KUBESCAPE": "Kubescape Scanner",
"KICS": "KICS Scanner"
}

if any(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from devsecops_engine_tools.engine_sast.engine_iac.src.infrastructure.driven_adapters.kubescape.kubescape_tool import (
KubescapeTool
)
from devsecops_engine_tools.engine_sast.engine_iac.src.infrastructure.driven_adapters.kics.kics_tool import (
KicsTool
)


def runner_engine_iac(dict_args, tool, secret_tool, devops_platform_gateway, env):
Expand All @@ -17,6 +20,7 @@ def runner_engine_iac(dict_args, tool, secret_tool, devops_platform_gateway, env
tools = {
"CHECKOV": CheckovTool(),
"KUBESCAPE": KubescapeTool(),
"KICS": KicsTool()
}

if tool in tools:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,13 @@ def __init__(self, json_data, tool):
self.exclusions_all = None
self.exclusions_scope = None
self.rules_all = {}
if "KICS_LINUX" in json_data[tool]:
self.kics_linux = json_data[tool]["KICS_LINUX"]
if "KICS_WINDOWS" in json_data[tool]:
self.kics_windows = json_data[tool]["KICS_WINDOWS"]
if "KICS_MAC" in json_data[tool]:
self.kics_mac = json_data[tool]["KICS_MAC"]
if "PATH_KICS" in json_data[tool]:
self.path_kics = json_data[tool]["PATH_KICS"]
if "DOWNLOAD_KICS_ASSETS" in json_data[tool]:
self.download_kics_assets = json_data[tool]["DOWNLOAD_KICS_ASSETS"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from devsecops_engine_tools.engine_core.src.domain.model.finding import (
Category,
Finding,
)
from datetime import datetime
from dataclasses import dataclass


@dataclass
class KicsDeserealizator:
def get_list_finding(self, results_scan_list: list) -> "list[Finding]":
list_open_findings = []

for result in results_scan_list:
finding_open = Finding(
id=result.get("id"),
cvss=None,
where=result.get("file_name"),
description=result.get("description"),
severity=result.get("severity").lower(),
identification_date=datetime.now().strftime("%d%m%Y"),
published_date_cve=None,
module="engine_iac",
category=Category.VULNERABILITY,
requirements=None,
tool="Kics"
)
list_open_findings.append(finding_open)

return list_open_findings

def get_findings(self, data):
filtered_results = []
for query in data.get("queries", []):
severity = query.get("severity", "").upper()
if severity in {"LOW", "MEDIUM", "HIGH", "CRITICAL"}:
description = query.get("query_name", "")
query_id = query.get("query_id", "")
for file in query.get("files", []):
file_name = file.get("file_name", "")
filtered_results.append({
"severity": severity,
"description": description,
"file_name": file_name,
"id": query_id
})
return filtered_results

def calculate_total_vulnerabilities(self, data):
severity_counters = data.get("severity_counters", {})

critical = severity_counters.get("CRITICAL", 0)
high = severity_counters.get("HIGH", 0)
medium = severity_counters.get("MEDIUM", 0)
low = severity_counters.get("LOW", 0)

return critical + high + medium + low
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import subprocess
import json
import platform
import requests
import os
from devsecops_engine_tools.engine_sast.engine_iac.src.domain.model.gateways.tool_gateway import (
ToolGateway,
)
from devsecops_engine_tools.engine_sast.engine_iac.src.domain.model.config_tool import (
ConfigTool,
)
from devsecops_engine_tools.engine_sast.engine_iac.src.infrastructure.driven_adapters.kics.kics_deserealizator import (
KicsDeserealizator
)
from devsecops_engine_tools.engine_utilities.utils.logger_info import MyLogger
from devsecops_engine_tools.engine_utilities import settings
from devsecops_engine_tools.engine_utilities.github.infrastructure.github_api import GithubApi

logger = MyLogger.__call__(**settings.SETTING_LOGGER).get_logger()


class KicsTool(ToolGateway):

def download(self, file, url):
try:
response = requests.get(url)
with open(file, "wb") as f:
f.write(response.content)
except Exception as ex:
logger.error(f"An error ocurred downloading {file} {ex}")

def install_tool(self, file, url, command_prefix):
github_api = GithubApi()
kics = f"./{command_prefix}/kics"
installed = subprocess.run(
["which", command_prefix],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if installed.returncode == 1:
try:
self.download(file, url)
github_api.unzip_file(file, command_prefix)
subprocess.run(["chmod", "+x", kics])
return kics
except Exception as e:
logger.error(f"Error installing KICS: {e}")
else:
return command_prefix

def install_tool_windows(self, file, url, command_prefix):
try:
subprocess.run(
[command_prefix, "version"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return command_prefix
except:
try:
github_api = GithubApi()
self.download(file, url)
github_api.unzip_file(file, command_prefix)
return f"./{command_prefix}/kics"

except Exception as e:
logger.error(f"Error installing KICS: {e}")

def execute_kics(self, folders_to_scan, prefix):
folders = ','.join(folders_to_scan)
command = [prefix, "scan", "-p", folders, "-q", "./kics_assets/assets", "--report-formats", "json", "-o", "./"]
try:
subprocess.run(command, capture_output=True)
except subprocess.CalledProcessError as e:
logger.error(f"Error during KICS execution: {e}")

def load_results(self):
try:
with open('results.json') as f:
data = json.load(f)
return data
except Exception as ex:
logger.error(f"An error ocurred loading KICS results {ex}")
return None

def select_operative_system(self, os_platform, folders_to_scan, config_tool: ConfigTool, path_kics):
command_prefix = path_kics
if os_platform == "Linux":
MikeHV14 marked this conversation as resolved.
Show resolved Hide resolved
kics_zip = "kics_linux.zip"
url_kics = config_tool.kics_linux
command_prefix = self.install_tool(kics_zip, url_kics, command_prefix)
elif os_platform == "Windows":
kics_zip = "kics_windows.zip"
url_kics = config_tool.kics_windows
command_prefix = self.install_tool_windows(kics_zip, url_kics, command_prefix)
elif os_platform == "Darwin":
kics_zip = "kics_macos.zip"
url_kics = config_tool.kics_mac
command_prefix = self.install_tool(kics_zip, url_kics, command_prefix)
else:
logger.warning(f"{os_platform} is not supported.")
return [], None

self.execute_kics(folders_to_scan, command_prefix)

def get_assets(self, kics_version):
name_zip = "assets_compressed.zip"
assets_url = f"https://github.com/Checkmarx/kics/releases/download/v{kics_version}/extracted-info.zip"
self.download(name_zip, assets_url)

directory_assets = "kics_assets"
github_api = GithubApi()
github_api.unzip_file(name_zip, directory_assets)

def run_tool(
self, config_tool: ConfigTool, folders_to_scan, environment, platform_to_scan, secret_tool
):
kics_version = config_tool.version
path_kics = config_tool.path_kics
download_kics_assets = config_tool.download_kics_assets
if download_kics_assets:
self.get_assets(kics_version)

os_platform = platform.system()
self.select_operative_system(os_platform, folders_to_scan, config_tool, path_kics)

data = self.load_results()
if data:
kics_deserealizator = KicsDeserealizator()
MikeHV14 marked this conversation as resolved.
Show resolved Hide resolved
total_vulnerabilities = kics_deserealizator.calculate_total_vulnerabilities(data)
path_file = os.path.abspath("results.json")

if total_vulnerabilities == 0:
return [], path_file

filtered_results = kics_deserealizator.get_findings(data)
finding_list = kics_deserealizator.get_list_finding(filtered_results)

return finding_list, path_file
return [], None
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,32 @@ def test_runner_engine_iac_kubescape(mock_entry_point_tool):

# Assert the expected behavior
assert input_output == input_core

@mock.patch(
"devsecops_engine_tools.engine_sast.engine_iac.src.applications.runner_iac_scan.init_engine_sast_rm"
)
def test_runner_engine_iac_kics(mock_entry_point_tool):
# Mock the output

input_core = InputCore(
totalized_exclusions=[],
threshold_defined=Threshold,
path_file_results="test/file",
custom_message_break_build="message",
scope_pipeline="pipeline",
stage_pipeline="Release",
)

mock_entry_point_tool.return_value = [] , input_core

# Define the input arguments
dict_args = {}
tool = "KICS"
secret_tool = "secret"
devops_platform_gateway = None

# Call the function
[] , input_output = runner_engine_iac(dict_args, tool, secret_tool, devops_platform_gateway, "qa")

# Assert the expected behavior
assert input_output == input_core
Loading