Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gitlab): GitlabCI ImageReferencer #3544

Merged
merged 17 commits into from
Sep 22, 2022
2 changes: 1 addition & 1 deletion checkov/cloudformation/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def get_graph_checks_report(self, root_folder: str, runner_filter: RunnerFilter)
return report

def extract_images(
self, graph_connector: DiGraph | None = None, resources: list[dict[str, Any]] | None = None
self, graph_connector: DiGraph | None = None, definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] | None = None
) -> list[Image]:
if not graph_connector:
# should not happen
Expand Down
10 changes: 7 additions & 3 deletions checkov/common/images/image_referencer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ class ImageReferencerMixin:

def check_container_image_references(
self,
graph_connector: DiGraph,
graph_connector: DiGraph | None,
root_path: str | Path | None,
runner_filter: RunnerFilter,
definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] | None = None
) -> Report | None:
"""Tries to find image references in graph based IaC templates"""
from checkov.common.bridgecrew.platform_integration import bc_integration
Expand All @@ -128,7 +129,7 @@ def check_container_image_references(
if not should_run_scan(runner_filter.checks):
return None

images = self.extract_images(graph_connector=graph_connector)
images = self.extract_images(graph_connector=graph_connector, definitions=definitions)
if not images:
return None

Expand Down Expand Up @@ -208,6 +209,9 @@ def add_image_records(
runner = sca_image_runner()

image_id = ImageReferencer.inspect(image.name)
if not image_id:
return None

scan_result = runner.scan(image_id, dockerfile_path, runner_filter)
if scan_result is None:
return None
Expand Down Expand Up @@ -283,7 +287,7 @@ def add_vulnerability_records(

@abstractmethod
def extract_images(
self, graph_connector: DiGraph | None = None, resources: list[dict[str, Any]] | None = None
self, graph_connector: DiGraph | None = None, definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] | None = None
) -> list[Image]:
"""Tries to find image references in the graph or supported resource"""

Expand Down
4 changes: 2 additions & 2 deletions checkov/common/runners/base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def strtobool(val: str) -> int:

class BaseRunner(ABC, Generic[_GraphManager]):
check_type = ""
definitions = None
definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] | None = {}
Eliran-Turgeman marked this conversation as resolved.
Show resolved Hide resolved
context: dict[str, dict[str, Any]] | None = None
breadcrumbs = None
external_registries: list[BaseRegistry] | None = None
Expand Down Expand Up @@ -87,7 +87,7 @@ def should_scan_file(self, filename: str) -> bool:

def set_external_data(
self,
definitions: dict[str, dict[str, Any]] | None,
definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] | None,
context: dict[str, dict[str, Any]] | None,
breadcrumbs: dict[str, dict[str, Any]] | None,
**kwargs: Any,
Expand Down
17 changes: 8 additions & 9 deletions checkov/common/runners/object_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@ class GhaMetadata(TypedDict):
class Runner(BaseRunner[None]): # if a graph is added, Any needs to replaced
def __init__(self) -> None:
super().__init__()
self.definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] = {}
self.map_file_path_to_gha_metadata_dict: dict[str, GhaMetadata] = {}

def _load_files(
self,
files_to_load: list[str],
definitions: dict[str, dict[str, Any] | list[dict[str, Any]]],
definitions_raw: dict[str, list[tuple[int, str]]],
filename_fn: Callable[[str], str] | None = None,
) -> None:
files_to_load = [filename_fn(file) if filename_fn else file for file in files_to_load]
results = parallel_runner.run_function(lambda f: (f, self._parse_file(f)), files_to_load)
for file, result in results:
if result:
(definitions[file], definitions_raw[file]) = result
(self.definitions[file], definitions_raw[file]) = result
definition = result[0]
if self.check_type == CheckType.GITHUB_ACTIONS and isinstance(definition, dict):
workflow_name = definition.get('name', '')
Expand Down Expand Up @@ -73,7 +73,6 @@ def run(

registry = self.import_registry()

definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] = {}
definitions_raw: dict[str, list[tuple[int, str]]] = {}

report = Report(self.check_type)
Expand All @@ -90,20 +89,20 @@ def run(
registry.load_external_checks(directory)

if files:
self._load_files(files, definitions, definitions_raw)
self._load_files(files, definitions_raw)

if root_folder:
for root, d_names, f_names in os.walk(root_folder):
filter_ignored_paths(root, d_names, runner_filter.excluded_paths, self.included_paths())
filter_ignored_paths(root, f_names, runner_filter.excluded_paths, self.included_paths())
files_to_load = [os.path.join(root, f_name) for f_name in f_names]
self._load_files(files_to_load=files_to_load, definitions=definitions, definitions_raw=definitions_raw)
self._load_files(files_to_load=files_to_load, definitions_raw=definitions_raw)

self.pbar.initiate(len(definitions))
for file_path in definitions.keys():
self.pbar.initiate(len(self.definitions))
for file_path in self.definitions.keys():
self.pbar.set_additional_data({'Current File Scanned': os.path.relpath(file_path, root_folder)})
skipped_checks = collect_suppressions_for_context(definitions_raw[file_path])
results = registry.scan(file_path, definitions[file_path], skipped_checks, runner_filter) # type:ignore[arg-type] # this is overridden in the subclass
results = registry.scan(file_path, self.definitions[file_path], skipped_checks, runner_filter) # type:ignore[arg-type] # this is overridden in the subclass
for key, result in results.items():
result_config = result["results_configuration"]
start = 0
Expand All @@ -127,7 +126,7 @@ def run(
code_block=definitions_raw[file_path][start - 1:end + 1],
file_path=f"/{os.path.relpath(file_path, root_folder)}",
file_line_range=[start, end + 1],
resource=self.get_resource(file_path, key, check.supported_entities, definitions[file_path]), # type:ignore[arg-type] # key is str not BaseCheck
resource=self.get_resource(file_path, key, check.supported_entities, self.definitions[file_path]), # type:ignore[arg-type] # key is str not BaseCheck
evaluations=None,
check_class=check.__class__.__module__,
file_abs_path=os.path.abspath(file_path),
Expand Down
Empty file.
57 changes: 57 additions & 0 deletions checkov/gitlab_ci/image_referencer/base_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from __future__ import annotations

from typing import Any

from checkov.common.images.image_referencer import Image


class BaseGitlabCiProvider:
Eliran-Turgeman marked this conversation as resolved.
Show resolved Hide resolved
__slots__ = ("supported_keys", "workflow_config", "file_path")

def __init__(self, supported_keys: tuple[str, str], workflow_config: dict[str, Any], file_path: str):
self.supported_keys = supported_keys
self.workflow_config = workflow_config
self.file_path = file_path

@staticmethod
def _get_start_end_lines(entity: dict[str, Any]) -> tuple[int, int]:
return entity.get('__startline__', 0), entity.get('__endline__', 0)

def extract_images_from_workflow(self) -> list[Image]:
images = []
for job_object in self.workflow_config.values():
if isinstance(job_object, dict):
start_line, end_line = BaseGitlabCiProvider._get_start_end_lines(job_object)
for key, subjob in job_object.items():
if key in self.supported_keys:
image_name = ""
if isinstance(subjob, dict):
start_line, end_line = BaseGitlabCiProvider._get_start_end_lines(subjob)
image_name = subjob['name']
elif isinstance(subjob, str):
image_name = subjob
elif isinstance(subjob, list):
for service in subjob:
if isinstance(service, dict):
start_line, end_line = BaseGitlabCiProvider._get_start_end_lines(service)
image_name = service['name']
elif isinstance(service, str):
image_name = service
if image_name:
image_obj = Image(
file_path=self.file_path,
name=image_name,
start_line=start_line,
end_line=end_line,
)
images.append(image_obj)
image_name = ""
if image_name:
image_obj = Image(
file_path=self.file_path,
name=image_name,
start_line=start_line,
end_line=end_line,
)
images.append(image_obj)
return images
23 changes: 23 additions & 0 deletions checkov/gitlab_ci/image_referencer/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from __future__ import annotations

from typing import Any

from checkov.common.images.image_referencer import Image
from checkov.gitlab_ci.image_referencer.base_provider import BaseGitlabCiProvider


class GitlabCiImageReferencerManager:
__slots__ = ("supported_keys", "workflow_config", "file_path")

def __init__(self, supported_keys: tuple[str, str], workflow_config: dict[str, Any], file_path: str):
self.supported_keys = supported_keys
self.workflow_config = workflow_config
self.file_path = file_path

def extract_images_from_workflow(self) -> list[Image]:
gitlab_base_provider = BaseGitlabCiProvider(supported_keys=self.supported_keys,
workflow_config=self.workflow_config,
file_path=self.file_path)
images: list[Image] = gitlab_base_provider.extract_images_from_workflow()

return images
123 changes: 53 additions & 70 deletions checkov/gitlab_ci/runner.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING, Any

from checkov.common.images.image_referencer import ImageReferencer, Image
from checkov.common.output.report import Report

from checkov.runner_filter import RunnerFilter

from checkov.common.images.image_referencer import Image, ImageReferencerMixin
from checkov.common.bridgecrew.check_type import CheckType
from checkov.gitlab_ci.checks.registry import registry
from checkov.gitlab_ci.image_referencer.manager import GitlabCiImageReferencerManager
from checkov.yaml_doc.runner import Runner as YamlRunner

if TYPE_CHECKING:
from checkov.common.checks.base_check_registry import BaseCheckRegistry
from collections.abc import Iterable
from networkx import DiGraph


class Runner(YamlRunner, ImageReferencer):
class Runner(ImageReferencerMixin, YamlRunner):
check_type = CheckType.GITLAB_CI # noqa: CCE003 # a static attribute

def __init__(self) -> None:
super().__init__()
self.file_path = ''
Eliran-Turgeman marked this conversation as resolved.
Show resolved Hide resolved

def require_external_checks(self) -> bool:
return False
Expand All @@ -27,9 +35,12 @@ def import_registry(self) -> BaseCheckRegistry:
def _parse_file(
self, f: str, file_content: str | None = None
) -> tuple[dict[str, Any] | list[dict[str, Any]], list[tuple[int, str]]] | None:
self.file_path = f
if self.is_workflow_file(f):
return super()._parse_file(f=f, file_content=file_content)

return None

def is_workflow_file(self, file_path: str) -> bool:
"""
:return: True if the file mentioned is in the gitlab workflow name .gitlab-ci.yml. Otherwise: False
Expand All @@ -39,73 +50,45 @@ def is_workflow_file(self, file_path: str) -> bool:
def included_paths(self) -> Iterable[str]:
return (".gitlab-ci.yml", ".gitlab-ci.yaml")

def get_images(self, file_path: str) -> set[Image]:
"""
Get container images mentioned in a file
:param file_path: File to be inspected
GitLab a workflow file can have a job and services run within a container.

in the following sample file we can see a node:14.16 image:

default:
image:
name: ruby:2.6
entrypoint: ["/bin/bash"]

image: nginx:1.18

services:
- name: privateregistry/stuff/my-postgres:11.7
alias: db-postgres
- name: redis:latest
- nginx:1.17
Source: https://docs.gitlab.com/ee/ci/docker/using_docker_images.html

:return: List of container image short ids mentioned in the file.
Example return value for a file with node:14.16 image: ['sha256:6a353e22ce']
"""
def run(
self,
root_folder: str | None = None,
external_checks_dir: list[str] | None = None,
files: list[str] | None = None,
runner_filter: RunnerFilter | None = None,
collect_skip_comments: bool = True,
) -> Report | list[Report]:
runner_filter = runner_filter or RunnerFilter()
report = super().run(root_folder=root_folder, external_checks_dir=external_checks_dir,
files=files, runner_filter=runner_filter, collect_skip_comments=collect_skip_comments)
if runner_filter.run_image_referencer:
if files:
# 'root_folder' shouldn't be empty to remove the whole path later and only leave the shortened form
root_folder = os.path.split(os.path.commonprefix(files))[0]

image_report = self.check_container_image_references(
graph_connector=None,
root_path=root_folder,
runner_filter=runner_filter,
definitions=self.definitions
)

if image_report:
return [report, image_report]

return report

def extract_images(
self, graph_connector: DiGraph | None = None, definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] | None = None
) -> list[Image]:
images: list[Image] = []
if not definitions:
return images

for file, config in definitions.items():
manager = GitlabCiImageReferencerManager(supported_keys=("image", "services"),
Eliran-Turgeman marked this conversation as resolved.
Show resolved Hide resolved
workflow_config=config,
file_path=file)
images.extend(manager.extract_images_from_workflow())

images = set()
imagesKeys = ("image", "services")
workflow, workflow_line_numbers = self._parse_file(file_path)

for job_object in workflow.values():
if isinstance(job_object, dict):
start_line = job_object.get('__startline__', 0)
end_line = job_object.get('__endline__', 0)
for key, subjob in job_object.items():
if key in imagesKeys:
imagename = ""
if isinstance(subjob, dict):
start_line = subjob.get('__startline__', 0)
end_line = subjob.get('__endline__', 0)
imagename = subjob['name']
elif isinstance(subjob, str):
imagename = subjob
elif isinstance(subjob, list):
for service in subjob:
if isinstance(service, dict):
start_line = service.get('__startline__', 0)
end_line = service.get('__endline__', 0)
imagename = service['name']
elif isinstance(service, str):
imagename = service
if imagename:
image_obj = Image(
file_path=file_path,
name=imagename,
start_line=start_line,
end_line=end_line,
)
images.add(image_obj)
imagename = ""
if imagename:
image_obj = Image(
file_path=file_path,
name=imagename,
start_line=start_line,
end_line=end_line,
)
images.add(image_obj)
imagename = ""
return images
3 changes: 3 additions & 0 deletions checkov/sca_image/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ def get_image_report(self, dockerfile_path: str, image: Image, runner_filter: Ru
elif strtobool(os.getenv("CHECKOV_EXPERIMENTAL_IMAGE_REFERENCING", "False")):
# experimental flag on running image referencers via local twistcli
image_id = ImageReferencer.inspect(image.name)
if not image_id:
logging.info(f"Unable to extract image id from {image.name}")
return Report(self.check_type)
scan_result = self.scan(image_id, dockerfile_path, runner_filter)
if scan_result is None:
return Report(self.check_type)
Expand Down
2 changes: 1 addition & 1 deletion checkov/terraform/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def __cache_file_content(file_name: str, file_modules: list[dict[str, Any]]) ->
__cache_file_content(file_name=file, file_modules=file_content["module"])

def extract_images(
self, graph_connector: DiGraph | None = None, resources: list[dict[str, Any]] | None = None
self, graph_connector: DiGraph | None = None, definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] | None = None
) -> list[Image]:
if not graph_connector:
# should not happen
Expand Down
Empty file.
Loading