Skip to content

Commit

Permalink
Backport #54504 to 23.8: S3 artifacts
Browse files Browse the repository at this point in the history
  • Loading branch information
robot-clickhouse committed Sep 16, 2023
1 parent 2872992 commit 701fa79
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 39 deletions.
192 changes: 192 additions & 0 deletions tests/ci/artifacts_helper.py
@@ -0,0 +1,192 @@
#!/usr/bin/env python

"""Manages artifacts similar to GH actions, but in S3"""

from dataclasses import dataclass
from datetime import datetime
from fnmatch import fnmatch
from os import path as op
from pathlib import Path
from shutil import copy2
from typing import List, Optional, Union

from github.Commit import Commit

from build_download_helper import download_build_with_progress
from commit_status_helper import post_commit_status
from compress_files import SUFFIX, compress_fast, decompress_fast
from env_helper import CI, RUNNER_TEMP, S3_BUILDS_BUCKET
from git_helper import SHA_REGEXP
from report import HEAD_HTML_TEMPLATE, FOOTER_HTML_TEMPLATE
from s3_helper import S3Helper

ARTIFACTS_PATH = Path(RUNNER_TEMP) / "artifacts"


@dataclass
class S3Object:
key: str
last_modified: str
size: int


class ArtifactsHelper:
INDEX = "index.html"
RESTRICTED_SYMBOLS = r"\/':<>|*?\""

def __init__(
self,
s3_helper: S3Helper,
commit: Union[str, Commit],
s3_prefix: str = "artifacts",
):
"""The helper to compress+upload and download+decompress artifacts
If `commit` is github.Commit.Commit instance, the status Artifacts for a
given commit will be updated on an uploading"""
self._commit = commit
assert SHA_REGEXP.match(self.commit)
self.temp_path = ARTIFACTS_PATH
self.temp_path.mkdir(parents=True, exist_ok=True)
self.s3_helper = s3_helper
# The s3 prefix is done with trailing slash!
self._s3_prefix = op.join(s3_prefix, self.commit, "")
self._s3_index_key = f"{self.s3_prefix}{self.INDEX}"
self._s3_index_url = None # type: Optional[str]

@property
def commit(self) -> str:
"""string of the commit SHA"""
if isinstance(self._commit, str):
return self._commit
return self._commit.sha

@property
def s3_prefix(self) -> str:
"""Prefix with the trailing slash"""
return self._s3_prefix

@property
def s3_index_key(self) -> str:
"""Prefix with the trailing slash"""
return self._s3_index_key

@property
def s3_index_url(self) -> str:
if self._s3_index_url is None:
self._s3_index_url = self.s3_helper.get_url(
S3_BUILDS_BUCKET, self.s3_index_key
)
return self._s3_index_url

def upload(self, artifact_name: str, artifact_path: Path) -> None:
"""Creates archive 'artifact_name.tar{compress_files.SUFFIX} with directory of"""
assert not any(s in artifact_name for s in self.RESTRICTED_SYMBOLS)
archive_path = self.temp_path / f"{artifact_name}.tar{SUFFIX}"
s3_artifact_key = f"{self.s3_prefix}{archive_path.name}"
compress_fast(artifact_path, archive_path)
self.s3_helper.upload_build_file_to_s3(archive_path, s3_artifact_key)
self._regenerate_index()

def download(
self,
artifact_name: str,
extract_directory: Path = ARTIFACTS_PATH,
keep_archive: bool = False,
) -> Path:
"""Downloads artifact, if exists, and extracts it. If not, returns False"""
assert not any(s in artifact_name for s in self.RESTRICTED_SYMBOLS)
assert extract_directory.is_dir()
archive_path = self.temp_path / f"{artifact_name}.tar{SUFFIX}"
artifact_path = extract_directory / artifact_name
s3_artifact_key = f"{self.s3_prefix}{archive_path.name}"
url = self.s3_helper.url_if_exists(s3_artifact_key, S3_BUILDS_BUCKET)
if not url:
return artifact_path

if url.startswith("file://"):
copy2(Path(url[7:]), archive_path)
else:
download_build_with_progress(url, archive_path)
artifact_path.mkdir(parents=True, exist_ok=True)
decompress_fast(archive_path, artifact_path)
if not keep_archive:
archive_path.unlink()
return artifact_path

def list_artifacts(self, glob: str = "") -> List[str]:
"""return the list of artifacts existing for a commit"""

def ignore(key: str) -> bool:
if key == self.s3_index_key:
return False
if glob:
return fnmatch(key, glob)
return True

results = filter(
ignore, self.s3_helper.list_prefix(self.s3_prefix, S3_BUILDS_BUCKET)
)
return list(results)

@staticmethod
def post_commit_status(commit: Commit, url: str) -> None:
post_commit_status(
commit, "success", url, "Artifacts for workflow", "Artifacts"
)

def _regenerate_index(self) -> None:
if CI:
files = self._get_s3_objects()
else:
files = self._get_local_s3_objects()

def name(uri: str) -> str:
return Path(uri).name

links = [
f'<tr><td><a href="{f.key}">{name(f.key)}</a></td><td>{f.size}</td>'
f"<td>{f.last_modified}</td></tr>"
for f in files
]
index_path = self.temp_path / self.INDEX
title = f"Artifacts for workflow commit {self.commit}"
index_content = (
HEAD_HTML_TEMPLATE.format(title=title, header=title)
+ "<table><tr><th>Artifact</th><th>Size</th><th>Modified</th></tr>"
+ "\n".join(links)
+ "</table>"
+ FOOTER_HTML_TEMPLATE
)
index_path.write_text(index_content, encoding="utf-8")
url = self.s3_helper.upload_build_file_to_s3(index_path, self.s3_index_key)
if isinstance(self._commit, Commit):
self.post_commit_status(self._commit, url)

def _get_s3_objects(self) -> List[S3Object]:
objects = self.s3_helper.client.list_objects_v2(
Bucket=S3_BUILDS_BUCKET, Prefix=self.s3_prefix
)
files = [] # type: List[S3Object]
if "Contents" in objects:
files = [
S3Object(
obj["Key"][len(self.s3_prefix) :],
obj["LastModified"].isoformat(),
obj["Size"],
)
for obj in objects["Contents"]
]
return files

def _get_local_s3_objects(self) -> List[S3Object]:
files = [
S3Object(
fp.as_uri(),
datetime.fromtimestamp(fp.stat().st_mtime).isoformat(),
fp.stat().st_size,
)
for fp in self.s3_helper.local_path(S3_BUILDS_BUCKET, self.s3_prefix)
.absolute()
.iterdir()
]
return files
9 changes: 6 additions & 3 deletions tests/ci/compress_files.py
Expand Up @@ -7,10 +7,11 @@


PIGZ = Path("/usr/bin/pigz")
SUFFIX = ".zst"


def compress_file_fast(path: Path, archive_path: Path) -> None:
if archive_path.suffix == ".zst":
if archive_path.suffix == SUFFIX:
subprocess.check_call(f"zstd < {path} > {archive_path}", shell=True)
elif PIGZ.exists():
subprocess.check_call(f"pigz < {path} > {archive_path}", shell=True)
Expand All @@ -22,7 +23,7 @@ def compress_fast(
path: Path, archive_path: Path, exclude: Optional[Path] = None
) -> None:
program_part = ""
if archive_path.suffix == ".zst":
if archive_path.suffix == SUFFIX:
logging.info("zstd will be used for compression")
program_part = "--use-compress-program='zstd --threads=0'"
elif PIGZ.exists():
Expand All @@ -39,6 +40,7 @@ def compress_fast(
else:
exclude_part = f"--exclude {exclude}"

archive_path.parent.mkdir(parents=True, exist_ok=True)
fname = path.name

cmd = (
Expand All @@ -50,7 +52,7 @@ def compress_fast(

def decompress_fast(archive_path: Path, result_path: Optional[Path] = None) -> None:
program_part = ""
if archive_path.suffix == ".zst":
if archive_path.suffix == SUFFIX:
logging.info(
"zstd will be used for decompression ('%s' -> '%s')",
archive_path,
Expand All @@ -75,6 +77,7 @@ def decompress_fast(archive_path: Path, result_path: Optional[Path] = None) -> N
if result_path is None:
subprocess.check_call(f"tar {program_part} -xf {archive_path}", shell=True)
else:
result_path.mkdir(parents=True, exist_ok=True)
subprocess.check_call(
f"tar {program_part} -xf {archive_path} -C {result_path}",
shell=True,
Expand Down
2 changes: 1 addition & 1 deletion tests/ci/docker_images_check.py
Expand Up @@ -149,7 +149,7 @@ def gen_versions(
pr_commit_version = str(pr_info.number) + "-" + pr_info.sha
# The order is important, PR number is used as cache during the build
versions = [str(pr_info.number), pr_commit_version]
result_version = pr_commit_version
result_version = pr_commit_version # type: Union[str, List[str]]
if pr_info.number == 0 and pr_info.base_ref == "master":
# First get the latest for cache
versions.insert(0, "latest")
Expand Down
5 changes: 3 additions & 2 deletions tests/ci/docker_manifests_merge.py
Expand Up @@ -272,16 +272,17 @@ def main():
if test_result != "OK":
status = "failure"

enriched_images = changed_images.copy()
try:
# changed_images now contains all the images that are changed in this PR. Let's find the latest tag for the images that are not changed.
enrich_images(changed_images)
enrich_images(enriched_images)
except CHException as ex:
logging.warning("Couldn't get proper tags for not changed images: %s", ex)

with open(
os.path.join(args.path, "changed_images.json"), "w", encoding="utf-8"
) as ci:
json.dump(changed_images, ci)
json.dump(enriched_images, ci)

pr_info = PRInfo()
s3_helper = S3Helper()
Expand Down
22 changes: 13 additions & 9 deletions tests/ci/git_helper.py
Expand Up @@ -14,7 +14,7 @@
TAG_REGEXP = (
r"\Av\d{2}[.][1-9]\d*[.][1-9]\d*[.][1-9]\d*-(testing|prestable|stable|lts)\Z"
)
SHA_REGEXP = r"\A([0-9]|[a-f]){40}\Z"
SHA_REGEXP = re.compile(r"\A([0-9]|[a-f]){40}\Z")

CWD = p.dirname(p.realpath(__file__))
TWEAK = 1
Expand All @@ -34,8 +34,7 @@ def removesuffix(string: str, suffix: str) -> str:


def commit(name: str) -> str:
r = re.compile(SHA_REGEXP)
if not r.match(name):
if not SHA_REGEXP.match(name):
raise argparse.ArgumentTypeError(
"commit hash should contain exactly 40 hex characters"
)
Expand All @@ -52,8 +51,11 @@ def release_branch(name: str) -> str:
class Runner:
"""lightweight check_output wrapper with stripping last NEW_LINE"""

def __init__(self, cwd: str = CWD):
def __init__(self, cwd: str = CWD, set_cwd_to_git_root: bool = False):
self._cwd = cwd
# delayed set cwd to the repo's root, to not do it at the import stage
self._git_root = None # type: Optional[str]
self._set_cwd_to_git_root = set_cwd_to_git_root

def run(self, cmd: str, cwd: Optional[str] = None, **kwargs: Any) -> str:
if cwd is None:
Expand All @@ -68,6 +70,12 @@ def run(self, cmd: str, cwd: Optional[str] = None, **kwargs: Any) -> str:

@property
def cwd(self) -> str:
if self._set_cwd_to_git_root:
if self._git_root is None:
self._git_root = p.realpath(
p.join(self._cwd, self.run("git rev-parse --show-cdup", self._cwd))
)
return self._git_root
return self._cwd

@cwd.setter
Expand All @@ -81,11 +89,7 @@ def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)


git_runner = Runner()
# Set cwd to abs path of git root
git_runner.cwd = p.relpath(
p.join(git_runner.cwd, git_runner.run("git rev-parse --show-cdup"))
)
git_runner = Runner(set_cwd_to_git_root=True)


def is_shallow() -> bool:
Expand Down

0 comments on commit 701fa79

Please sign in to comment.