Skip to content

Commit

Permalink
Merge pull request #54719 from ClickHouse/backport/23.8/54504
Browse files Browse the repository at this point in the history
Backport #54504 to 23.8: S3 artifacts
  • Loading branch information
Felixoid committed Oct 9, 2023
2 parents 6deedff + e1f86de commit 39bff08
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 35 deletions.
192 changes: 192 additions & 0 deletions tests/ci/artifacts_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
#!/usr/bin/env python

"""Manages artifacts similar to GH actions, but in S3"""

from dataclasses import dataclass
from datetime import datetime
from fnmatch import fnmatch
from os import path as op
from pathlib import Path
from shutil import copy2
from typing import List, Optional, Union

from github.Commit import Commit

from build_download_helper import download_build_with_progress
from commit_status_helper import post_commit_status
from compress_files import SUFFIX, compress_fast, decompress_fast
from env_helper import CI, RUNNER_TEMP, S3_BUILDS_BUCKET
from git_helper import SHA_REGEXP
from report import HEAD_HTML_TEMPLATE, FOOTER_HTML_TEMPLATE
from s3_helper import S3Helper

ARTIFACTS_PATH = Path(RUNNER_TEMP) / "artifacts"


@dataclass
class S3Object:
key: str
last_modified: str
size: int


class ArtifactsHelper:
INDEX = "index.html"
RESTRICTED_SYMBOLS = r"\/':<>|*?\""

def __init__(
self,
s3_helper: S3Helper,
commit: Union[str, Commit],
s3_prefix: str = "artifacts",
):
"""The helper to compress+upload and download+decompress artifacts
If `commit` is github.Commit.Commit instance, the status Artifacts for a
given commit will be updated on an uploading"""
self._commit = commit
assert SHA_REGEXP.match(self.commit)
self.temp_path = ARTIFACTS_PATH
self.temp_path.mkdir(parents=True, exist_ok=True)
self.s3_helper = s3_helper
# The s3 prefix is done with trailing slash!
self._s3_prefix = op.join(s3_prefix, self.commit, "")
self._s3_index_key = f"{self.s3_prefix}{self.INDEX}"
self._s3_index_url = None # type: Optional[str]

@property
def commit(self) -> str:
"""string of the commit SHA"""
if isinstance(self._commit, str):
return self._commit
return self._commit.sha

@property
def s3_prefix(self) -> str:
"""Prefix with the trailing slash"""
return self._s3_prefix

@property
def s3_index_key(self) -> str:
"""Prefix with the trailing slash"""
return self._s3_index_key

@property
def s3_index_url(self) -> str:
if self._s3_index_url is None:
self._s3_index_url = self.s3_helper.get_url(
S3_BUILDS_BUCKET, self.s3_index_key
)
return self._s3_index_url

def upload(self, artifact_name: str, artifact_path: Path) -> None:
"""Creates archive 'artifact_name.tar{compress_files.SUFFIX} with directory of"""
assert not any(s in artifact_name for s in self.RESTRICTED_SYMBOLS)
archive_path = self.temp_path / f"{artifact_name}.tar{SUFFIX}"
s3_artifact_key = f"{self.s3_prefix}{archive_path.name}"
compress_fast(artifact_path, archive_path)
self.s3_helper.upload_build_file_to_s3(archive_path, s3_artifact_key)
self._regenerate_index()

def download(
self,
artifact_name: str,
extract_directory: Path = ARTIFACTS_PATH,
keep_archive: bool = False,
) -> Path:
"""Downloads artifact, if exists, and extracts it. If not, returns False"""
assert not any(s in artifact_name for s in self.RESTRICTED_SYMBOLS)
assert extract_directory.is_dir()
archive_path = self.temp_path / f"{artifact_name}.tar{SUFFIX}"
artifact_path = extract_directory / artifact_name
s3_artifact_key = f"{self.s3_prefix}{archive_path.name}"
url = self.s3_helper.url_if_exists(s3_artifact_key, S3_BUILDS_BUCKET)
if not url:
return artifact_path

if url.startswith("file://"):
copy2(Path(url[7:]), archive_path)
else:
download_build_with_progress(url, archive_path)
artifact_path.mkdir(parents=True, exist_ok=True)
decompress_fast(archive_path, artifact_path)
if not keep_archive:
archive_path.unlink()
return artifact_path

def list_artifacts(self, glob: str = "") -> List[str]:
"""return the list of artifacts existing for a commit"""

def ignore(key: str) -> bool:
if key == self.s3_index_key:
return False
if glob:
return fnmatch(key, glob)
return True

results = filter(
ignore, self.s3_helper.list_prefix(self.s3_prefix, S3_BUILDS_BUCKET)
)
return list(results)

@staticmethod
def post_commit_status(commit: Commit, url: str) -> None:
post_commit_status(
commit, "success", url, "Artifacts for workflow", "Artifacts"
)

def _regenerate_index(self) -> None:
if CI:
files = self._get_s3_objects()
else:
files = self._get_local_s3_objects()

def name(uri: str) -> str:
return Path(uri).name

links = [
f'<tr><td><a href="{f.key}">{name(f.key)}</a></td><td>{f.size}</td>'
f"<td>{f.last_modified}</td></tr>"
for f in files
]
index_path = self.temp_path / self.INDEX
title = f"Artifacts for workflow commit {self.commit}"
index_content = (
HEAD_HTML_TEMPLATE.format(title=title, header=title)
+ "<table><tr><th>Artifact</th><th>Size</th><th>Modified</th></tr>"
+ "\n".join(links)
+ "</table>"
+ FOOTER_HTML_TEMPLATE
)
index_path.write_text(index_content, encoding="utf-8")
url = self.s3_helper.upload_build_file_to_s3(index_path, self.s3_index_key)
if isinstance(self._commit, Commit):
self.post_commit_status(self._commit, url)

def _get_s3_objects(self) -> List[S3Object]:
objects = self.s3_helper.client.list_objects_v2(
Bucket=S3_BUILDS_BUCKET, Prefix=self.s3_prefix
)
files = [] # type: List[S3Object]
if "Contents" in objects:
files = [
S3Object(
obj["Key"][len(self.s3_prefix) :],
obj["LastModified"].isoformat(),
obj["Size"],
)
for obj in objects["Contents"]
]
return files

def _get_local_s3_objects(self) -> List[S3Object]:
files = [
S3Object(
fp.as_uri(),
datetime.fromtimestamp(fp.stat().st_mtime).isoformat(),
fp.stat().st_size,
)
for fp in self.s3_helper.local_path(S3_BUILDS_BUCKET, self.s3_prefix)
.absolute()
.iterdir()
]
return files
9 changes: 6 additions & 3 deletions tests/ci/compress_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@


PIGZ = Path("/usr/bin/pigz")
SUFFIX = ".zst"


def compress_file_fast(path: Path, archive_path: Path) -> None:
if archive_path.suffix == ".zst":
if archive_path.suffix == SUFFIX:
subprocess.check_call(f"zstd < {path} > {archive_path}", shell=True)
elif PIGZ.exists():
subprocess.check_call(f"pigz < {path} > {archive_path}", shell=True)
Expand All @@ -22,7 +23,7 @@ def compress_fast(
path: Path, archive_path: Path, exclude: Optional[Path] = None
) -> None:
program_part = ""
if archive_path.suffix == ".zst":
if archive_path.suffix == SUFFIX:
logging.info("zstd will be used for compression")
program_part = "--use-compress-program='zstd --threads=0'"
elif PIGZ.exists():
Expand All @@ -39,6 +40,7 @@ def compress_fast(
else:
exclude_part = f"--exclude {exclude}"

archive_path.parent.mkdir(parents=True, exist_ok=True)
fname = path.name

cmd = (
Expand All @@ -50,7 +52,7 @@ def compress_fast(

def decompress_fast(archive_path: Path, result_path: Optional[Path] = None) -> None:
program_part = ""
if archive_path.suffix == ".zst":
if archive_path.suffix == SUFFIX:
logging.info(
"zstd will be used for decompression ('%s' -> '%s')",
archive_path,
Expand All @@ -75,6 +77,7 @@ def decompress_fast(archive_path: Path, result_path: Optional[Path] = None) -> N
if result_path is None:
subprocess.check_call(f"tar {program_part} -xf {archive_path}", shell=True)
else:
result_path.mkdir(parents=True, exist_ok=True)
subprocess.check_call(
f"tar {program_part} -xf {archive_path} -C {result_path}",
shell=True,
Expand Down
2 changes: 1 addition & 1 deletion tests/ci/docker_images_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def gen_versions(
pr_commit_version = str(pr_info.number) + "-" + pr_info.sha
# The order is important, PR number is used as cache during the build
versions = [str(pr_info.number), pr_commit_version]
result_version = pr_commit_version
result_version = pr_commit_version # type: Union[str, List[str]]
if pr_info.number == 0 and pr_info.base_ref == "master":
# First get the latest for cache
versions.insert(0, "latest")
Expand Down
22 changes: 13 additions & 9 deletions tests/ci/git_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
TAG_REGEXP = (
r"\Av\d{2}[.][1-9]\d*[.][1-9]\d*[.][1-9]\d*-(testing|prestable|stable|lts)\Z"
)
SHA_REGEXP = r"\A([0-9]|[a-f]){40}\Z"
SHA_REGEXP = re.compile(r"\A([0-9]|[a-f]){40}\Z")

CWD = p.dirname(p.realpath(__file__))
TWEAK = 1
Expand All @@ -34,8 +34,7 @@ def removesuffix(string: str, suffix: str) -> str:


def commit(name: str) -> str:
r = re.compile(SHA_REGEXP)
if not r.match(name):
if not SHA_REGEXP.match(name):
raise argparse.ArgumentTypeError(
"commit hash should contain exactly 40 hex characters"
)
Expand All @@ -52,8 +51,11 @@ def release_branch(name: str) -> str:
class Runner:
"""lightweight check_output wrapper with stripping last NEW_LINE"""

def __init__(self, cwd: str = CWD):
def __init__(self, cwd: str = CWD, set_cwd_to_git_root: bool = False):
self._cwd = cwd
# delayed set cwd to the repo's root, to not do it at the import stage
self._git_root = None # type: Optional[str]
self._set_cwd_to_git_root = set_cwd_to_git_root

def run(self, cmd: str, cwd: Optional[str] = None, **kwargs: Any) -> str:
if cwd is None:
Expand All @@ -68,6 +70,12 @@ def run(self, cmd: str, cwd: Optional[str] = None, **kwargs: Any) -> str:

@property
def cwd(self) -> str:
if self._set_cwd_to_git_root:
if self._git_root is None:
self._git_root = p.realpath(
p.join(self._cwd, self.run("git rev-parse --show-cdup", self._cwd))
)
return self._git_root
return self._cwd

@cwd.setter
Expand All @@ -81,11 +89,7 @@ def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)


git_runner = Runner()
# Set cwd to abs path of git root
git_runner.cwd = p.relpath(
p.join(git_runner.cwd, git_runner.run("git rev-parse --show-cdup"))
)
git_runner = Runner(set_cwd_to_git_root=True)


def is_shallow() -> bool:
Expand Down
28 changes: 17 additions & 11 deletions tests/ci/pr_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def __init__(
self.event = github_event
self.changed_files = set() # type: Set[str]
self.body = ""
self.diff_urls = []
self.diff_urls = [] # type: List[str]
# release_pr and merged_pr are used for docker images additional cache
self.release_pr = 0
self.merged_pr = 0
Expand All @@ -104,7 +104,7 @@ def __init__(

# workflow completed event, used for PRs only
if "action" in github_event and github_event["action"] == "completed":
self.sha = github_event["workflow_run"]["head_sha"]
self.sha = github_event["workflow_run"]["head_sha"] # type: str
prs_for_sha = get_gh_api(
f"https://api.github.com/repos/{GITHUB_REPOSITORY}/commits/{self.sha}"
"/pulls",
Expand All @@ -114,7 +114,7 @@ def __init__(
github_event["pull_request"] = prs_for_sha[0]

if "pull_request" in github_event: # pull request and other similar events
self.number = github_event["pull_request"]["number"]
self.number = github_event["pull_request"]["number"] # type: int
if pr_event_from_api:
try:
response = get_gh_api(
Expand Down Expand Up @@ -144,20 +144,24 @@ def __init__(
self.pr_html_url = f"{repo_prefix}/pull/{self.number}"

# master or backport/xx.x/xxxxx - where the PR will be merged
self.base_ref = github_event["pull_request"]["base"]["ref"]
self.base_ref = github_event["pull_request"]["base"]["ref"] # type: str
# ClickHouse/ClickHouse
self.base_name = github_event["pull_request"]["base"]["repo"]["full_name"]
self.base_name = github_event["pull_request"]["base"]["repo"][
"full_name"
] # type: str
# any_branch-name - the name of working branch name
self.head_ref = github_event["pull_request"]["head"]["ref"]
self.head_ref = github_event["pull_request"]["head"]["ref"] # type: str
# UserName/ClickHouse or ClickHouse/ClickHouse
self.head_name = github_event["pull_request"]["head"]["repo"]["full_name"]
self.head_name = github_event["pull_request"]["head"]["repo"][
"full_name"
] # type: str
self.body = github_event["pull_request"]["body"]
self.labels = {
label["name"] for label in github_event["pull_request"]["labels"]
} # type: Set[str]

self.user_login = github_event["pull_request"]["user"]["login"]
self.user_orgs = set([])
self.user_login = github_event["pull_request"]["user"]["login"] # type: str
self.user_orgs = set() # type: Set[str]
if need_orgs:
user_orgs_response = get_gh_api(
github_event["pull_request"]["user"]["organizations_url"],
Expand All @@ -170,7 +174,7 @@ def __init__(
self.diff_urls.append(github_event["pull_request"]["diff_url"])
elif "commits" in github_event:
# `head_commit` always comes with `commits`
commit_message = github_event["head_commit"]["message"]
commit_message = github_event["head_commit"]["message"] # type: str
if commit_message.startswith("Merge pull request #"):
merged_pr = commit_message.split(maxsplit=4)[3]
try:
Expand Down Expand Up @@ -234,7 +238,9 @@ def __init__(
else:
print("event.json does not match pull_request or push:")
print(json.dumps(github_event, sort_keys=True, indent=4))
self.sha = os.getenv("GITHUB_SHA")
self.sha = os.getenv(
"GITHUB_SHA", "0000000000000000000000000000000000000000"
)
self.number = 0
self.labels = set()
repo_prefix = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}"
Expand Down

0 comments on commit 39bff08

Please sign in to comment.