/
docker.py
166 lines (136 loc) · 5.16 KB
/
docker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import subprocess
import tempfile
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Set
import click
from pygitguardian.client import GGClient
from yaspin import yaspin
from ggshield.config import Cache
from ggshield.output import OutputHandler
from ggshield.scan import ScanCollection, get_files_from_docker_archive
from ggshield.utils import SupportedScanMode, handle_exception
# bailout if docker command takes longer than 6 minutes
DOCKER_COMMAND_TIMEOUT = 360
class DockerArchiveCreationError(Exception):
pass
def docker_pull_image(image_name: str) -> None:
"""
Pull docker image and raise exception on timeout or failed to find image
"""
command = ["docker", "pull", image_name]
try:
subprocess.run(
command,
check=True,
timeout=DOCKER_COMMAND_TIMEOUT,
)
except subprocess.CalledProcessError:
raise click.ClickException(f'Image "{image_name}" not found')
except subprocess.TimeoutExpired:
raise click.ClickException('Command "{}" timed out'.format(" ".join(command)))
def docker_save_to_tmp(image_name: str, temporary_path: str) -> Path:
"""
Do a `docker save <image_name> -o <temporary_path>` and return the
`temporary_path`.
"""
temp_archive_filename = Path(temporary_path) / (
image_name.replace("/", "--") + ".tar"
)
command = ["docker", "save", image_name, "-o", str(temp_archive_filename)]
try:
with yaspin(text="Saving docker image"):
subprocess.run(
command,
check=True,
stderr=subprocess.PIPE,
timeout=DOCKER_COMMAND_TIMEOUT,
)
except subprocess.CalledProcessError as exc:
err_string = str(exc.stderr)
if "No such image" in err_string or "reference does not exist" in err_string:
docker_pull_image(image_name)
return docker_save_to_tmp(image_name, temporary_path)
raise click.ClickException("Unable to save docker archive")
except subprocess.TimeoutExpired:
raise click.ClickException('Command "{}" timed out'.format(" ".join(command)))
return temp_archive_filename
def docker_scan_archive(
archive: str,
client: GGClient,
cache: Cache,
verbose: bool,
matches_ignore: Iterable[str],
all_policies: bool,
scan_id: str,
banlisted_detectors: Optional[Set[str]] = None,
) -> ScanCollection:
files = get_files_from_docker_archive(archive)
with click.progressbar(length=len(files.files), label="Scanning") as progressbar:
def update_progress(chunk: List[Dict[str, Any]]) -> None:
progressbar.update(len(chunk))
results = files.scan(
client=client,
cache=cache,
matches_ignore=matches_ignore,
all_policies=all_policies,
verbose=verbose,
mode_header=SupportedScanMode.DOCKER.value,
on_file_chunk_scanned=update_progress,
banlisted_detectors=banlisted_detectors,
)
return ScanCollection(id=scan_id, type="scan_docker_archive", results=results)
@click.command()
@click.argument("name", nargs=1, type=click.STRING, required=True)
@click.pass_context
def docker_name_cmd(ctx: click.Context, name: str) -> int:
"""
scan a docker image <NAME>.
ggshield will try to pull the image if it's not available locally.
"""
with tempfile.TemporaryDirectory(suffix="ggshield") as temporary_dir:
config = ctx.obj["config"]
output_handler: OutputHandler = ctx.obj["output_handler"]
try:
archive = str(docker_save_to_tmp(name, temporary_dir))
scan = docker_scan_archive(
archive=archive,
client=ctx.obj["client"],
cache=ctx.obj["cache"],
verbose=config.verbose,
matches_ignore=config.matches_ignore,
all_policies=config.all_policies,
scan_id=name,
banlisted_detectors=config.banlisted_detectors,
)
return output_handler.process_scan(scan)
except Exception as error:
return handle_exception(error, config.verbose)
@click.command(hidden=True)
@click.argument(
"archive", nargs=1, type=click.Path(exists=True, resolve_path=True), required=True
)
@click.pass_context
def docker_archive_cmd(
ctx: click.Context,
archive: str,
) -> int: # pragma: no cover
"""
scan a docker archive <ARCHIVE> without attempting to save or pull the image.
Hidden command `ggshield scan docker-archive`
"""
config = ctx.obj["config"]
output_handler: OutputHandler = ctx.obj["output_handler"]
try:
scan = docker_scan_archive(
archive=archive,
client=ctx.obj["client"],
cache=ctx.obj["cache"],
verbose=config.verbose,
matches_ignore=config.matches_ignore,
all_policies=config.all_policies,
scan_id=archive,
banlisted_detectors=config.banlisted_detectors,
)
return output_handler.process_scan(scan)
except Exception as error:
return handle_exception(error, config.verbose)