diff --git a/cli/dstack/_internal/backend/base/__init__.py b/cli/dstack/_internal/backend/base/__init__.py index 9cb56d735..f6ab4f06c 100644 --- a/cli/dstack/_internal/backend/base/__init__.py +++ b/cli/dstack/_internal/backend/base/__init__.py @@ -2,7 +2,9 @@ from datetime import datetime from typing import Generator, List, Optional +import dstack._internal.core.build from dstack._internal.backend.base import artifacts as base_artifacts +from dstack._internal.backend.base import build as base_build from dstack._internal.backend.base import cache as base_cache from dstack._internal.backend.base import jobs as base_jobs from dstack._internal.backend.base import repos as base_repos @@ -14,6 +16,7 @@ from dstack._internal.backend.base.secrets import SecretsManager from dstack._internal.backend.base.storage import Storage from dstack._internal.core.artifact import Artifact +from dstack._internal.core.build import BuildPlan from dstack._internal.core.instance import InstanceType from dstack._internal.core.job import Job, JobHead, JobStatus from dstack._internal.core.log_event import LogEvent @@ -230,6 +233,10 @@ def get_signed_download_url(self, object_key: str) -> str: def get_signed_upload_url(self, object_key: str) -> str: pass + @abstractmethod + def predict_build_plan(self, job: Job) -> BuildPlan: + pass + class ComponentBasedBackend(Backend): @abstractmethod @@ -264,6 +271,7 @@ def list_jobs(self, repo_id: str, run_name: str) -> List[Job]: return base_jobs.list_jobs(self.storage(), repo_id, run_name) def run_job(self, job: Job, failed_to_start_job_new_status: JobStatus): + self.predict_build_plan(job) # raises exception on missing build base_jobs.run_job(self.storage(), self.compute(), job, failed_to_start_job_new_status) def stop_job(self, repo_id: str, abort: bool, job_id: str): @@ -435,3 +443,8 @@ def delete_configuration_cache( base_cache.delete_configuration_cache( self.storage(), repo_id, hub_user_name, configuration_path ) + + def predict_build_plan(self, job: Job) -> BuildPlan: + return base_build.predict_build_plan( + self.storage(), job, dstack._internal.core.build.DockerPlatform.amd64 + ) diff --git a/cli/dstack/_internal/backend/base/build.py b/cli/dstack/_internal/backend/base/build.py new file mode 100644 index 000000000..c3821badf --- /dev/null +++ b/cli/dstack/_internal/backend/base/build.py @@ -0,0 +1,63 @@ +from pathlib import Path +from platform import uname as platform_uname +from typing import Optional + +import cpuinfo + +from dstack._internal.backend.base.storage import Storage +from dstack._internal.core.build import BuildNotFoundError, BuildPlan, DockerPlatform +from dstack._internal.core.job import Job +from dstack._internal.utils.escape import escape_head + + +def predict_build_plan( + storage: Storage, job: Job, platform: Optional[DockerPlatform] +) -> BuildPlan: + if job.build_policy in ["force-build", "build-only"]: + return BuildPlan.yes + + if platform is None: + platform = guess_docker_platform() + if build_exists(storage, job, platform): + return BuildPlan.use + + if job.build_commands: + if job.build_policy == "use-build": + raise BuildNotFoundError("Build not found. Run `dstack build` or add `--build` flag") + return BuildPlan.yes + + if job.optional_build_commands and job.build_policy == "build": + return BuildPlan.yes + return BuildPlan.no + + +def build_exists(storage: Storage, job: Job, platform: DockerPlatform) -> bool: + prefix = _get_build_head_prefix(job, platform) + return len(storage.list_objects(prefix)) > 0 + + +def _get_build_head_prefix(job: Job, platform: DockerPlatform) -> str: + parts = [ + job.configuration_type.value, + job.configuration_path or "", + (Path("/workflow") / (job.working_dir or "")).as_posix(), + job.image_name, + platform.value, + # digest + # timestamp_utc + ] + parts = ";".join(escape_head(p) for p in parts) + return f"builds/{job.repo_ref.repo_id}/{parts};" + + +def guess_docker_platform() -> DockerPlatform: + uname = platform_uname() + if uname.system == "Darwin": + brand = cpuinfo.get_cpu_info().get("brand_raw") + m_arch = "m1" in brand.lower() or "m2" in brand.lower() + arch = "arm64" if m_arch else "x86_64" + else: + arch = uname.machine + if uname.system == "Darwin" and arch in ["arm64", "aarch64"]: + return DockerPlatform.arm64 + return DockerPlatform.amd64 diff --git a/cli/dstack/_internal/backend/base/jobs.py b/cli/dstack/_internal/backend/base/jobs.py index b793301a6..06dc3e9a9 100644 --- a/cli/dstack/_internal/backend/base/jobs.py +++ b/cli/dstack/_internal/backend/base/jobs.py @@ -3,8 +3,10 @@ import yaml from dstack._internal.backend.base import runners +from dstack._internal.backend.base.build import predict_build_plan from dstack._internal.backend.base.compute import Compute, NoCapacityError from dstack._internal.backend.base.storage import Storage +from dstack._internal.core.build import DockerPlatform from dstack._internal.core.error import NoMatchingInstanceError from dstack._internal.core.instance import InstanceType from dstack._internal.core.job import Job, JobErrorCode, JobHead, JobStatus, SpotPolicy diff --git a/cli/dstack/_internal/backend/local/__init__.py b/cli/dstack/_internal/backend/local/__init__.py index 4c791db82..8a5b99b14 100644 --- a/cli/dstack/_internal/backend/local/__init__.py +++ b/cli/dstack/_internal/backend/local/__init__.py @@ -1,11 +1,14 @@ from typing import Optional from dstack._internal.backend.base import ComponentBasedBackend +from dstack._internal.backend.base import build as base_build from dstack._internal.backend.local.compute import LocalCompute from dstack._internal.backend.local.config import LocalConfig from dstack._internal.backend.local.logs import LocalLogging from dstack._internal.backend.local.secrets import LocalSecretsManager from dstack._internal.backend.local.storage import LocalStorage +from dstack._internal.core.build import BuildPlan +from dstack._internal.core.job import Job class LocalBackend(ComponentBasedBackend): @@ -39,3 +42,7 @@ def secrets_manager(self) -> LocalSecretsManager: def logging(self) -> LocalLogging: return self._logging + + def predict_build_plan(self, job: Job) -> BuildPlan: + # guess platform from uname + return base_build.predict_build_plan(self.storage(), job, platform=None) diff --git a/cli/dstack/_internal/cli/commands/build/__init__.py b/cli/dstack/_internal/cli/commands/build/__init__.py index 313fc9805..99ee3d591 100644 --- a/cli/dstack/_internal/cli/commands/build/__init__.py +++ b/cli/dstack/_internal/cli/commands/build/__init__.py @@ -51,7 +51,10 @@ def _command(self, args: argparse.Namespace): ssh_pub_key = _read_ssh_key_pub(config.repo_user_config.ssh_key_path) run_plan = hub_client.get_run_plan( - provider_name=provider_name, provider_data=provider_data, args=args + configuration_path=configuration_path, + provider_name=provider_name, + provider_data=provider_data, + args=args, ) console.print("dstack will execute the following plan:\n") _print_run_plan(configuration_path, run_plan) @@ -69,9 +72,6 @@ def _command(self, args: argparse.Namespace): ) runs = list_runs_hub(hub_client, run_name=run_name) run = runs[0] - if run.status == JobStatus.FAILED: - console.print("\nProvisioning failed\n") - exit(1) _poll_run( hub_client, run, diff --git a/cli/dstack/_internal/cli/commands/run/__init__.py b/cli/dstack/_internal/cli/commands/run/__init__.py index cc1fb0007..fbda9d4bd 100644 --- a/cli/dstack/_internal/cli/commands/run/__init__.py +++ b/cli/dstack/_internal/cli/commands/run/__init__.py @@ -134,7 +134,10 @@ def _command(self, args: Namespace): ssh_pub_key = _read_ssh_key_pub(config.repo_user_config.ssh_key_path) run_plan = hub_client.get_run_plan( - provider_name=provider_name, provider_data=provider_data, args=args + configuration_path=configuration_path, + provider_name=provider_name, + provider_data=provider_data, + args=args, ) console.print("dstack will execute the following plan:\n") _print_run_plan(configuration_path, run_plan) @@ -184,12 +187,20 @@ def _print_run_plan(configuration_file: str, run_plan: RunPlan): table.add_column("INSTANCE") table.add_column("RESOURCES") table.add_column("SPOT POLICY") + table.add_column("BUILD") job_plan = run_plan.job_plans[0] instance = job_plan.instance_type.instance_name or "-" instance_info = _format_resources(job_plan.instance_type) spot = job_plan.job.spot_policy.value + build_plan = job_plan.build_plan.value.title() table.add_row( - configuration_file, run_plan.hub_user_name, run_plan.project, instance, instance_info, spot + configuration_file, + run_plan.hub_user_name, + run_plan.project, + instance, + instance_info, + spot, + build_plan, ) console.print(table) console.print() diff --git a/cli/dstack/_internal/cli/commands/run/configurations.py b/cli/dstack/_internal/cli/commands/run/configurations.py index 9e4046ce8..763521f49 100644 --- a/cli/dstack/_internal/cli/commands/run/configurations.py +++ b/cli/dstack/_internal/cli/commands/run/configurations.py @@ -46,7 +46,8 @@ def _parse_dev_environment_configuration_data( "sea_green3]Command Palette[/sea_green3], executing [sea_green3]Shell Command: Install 'code' command in " "PATH[/sea_green3], and restarting terminal.[/]\n" ) - provider_data["optional_build"].append("pip install -q --no-cache-dir ipykernel") + for key in ["optional_build", "commands"]: + provider_data[key].append("pip install -q --no-cache-dir ipykernel") provider_data["commands"].extend(configuration_data.get("init") or []) return provider_name, provider_data diff --git a/cli/dstack/_internal/core/build.py b/cli/dstack/_internal/core/build.py new file mode 100644 index 000000000..ff712fbad --- /dev/null +++ b/cli/dstack/_internal/core/build.py @@ -0,0 +1,18 @@ +from enum import Enum + +from dstack._internal.core.error import DstackError + + +class DockerPlatform(str, Enum): + amd64 = "amd64" + arm64 = "arm64" + + +class BuildPlan(str, Enum): + no = "no" + use = "use" + yes = "yes" + + +class BuildNotFoundError(DstackError): + code = "build_not_found" diff --git a/cli/dstack/_internal/core/plan.py b/cli/dstack/_internal/core/plan.py index 5f2cf36af..a5ec13ab4 100644 --- a/cli/dstack/_internal/core/plan.py +++ b/cli/dstack/_internal/core/plan.py @@ -2,6 +2,7 @@ from pydantic import BaseModel +from dstack._internal.core.build import BuildPlan from dstack._internal.core.instance import InstanceType from dstack._internal.core.job import Job @@ -9,6 +10,7 @@ class JobPlan(BaseModel): job: Job instance_type: InstanceType + build_plan: BuildPlan class RunPlan(BaseModel): diff --git a/cli/dstack/_internal/hub/routers/runners.py b/cli/dstack/_internal/hub/routers/runners.py index 2450a0a4d..be196f3dc 100644 --- a/cli/dstack/_internal/hub/routers/runners.py +++ b/cli/dstack/_internal/hub/routers/runners.py @@ -1,5 +1,6 @@ from fastapi import APIRouter, Depends, HTTPException, status +from dstack._internal.core.build import BuildNotFoundError from dstack._internal.core.error import NoMatchingInstanceError from dstack._internal.core.job import Job, JobStatus from dstack._internal.hub.models import StopRunners @@ -29,6 +30,11 @@ async def run_runners(project_name: str, job: Job): NoMatchingInstanceError.message, code=NoMatchingInstanceError.code ), ) + except BuildNotFoundError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=error_detail(msg=e.message, code=e.code), + ) @router.post("/{project_name}/runners/stop") diff --git a/cli/dstack/_internal/hub/routers/runs.py b/cli/dstack/_internal/hub/routers/runs.py index 5eb9b2263..29043242d 100644 --- a/cli/dstack/_internal/hub/routers/runs.py +++ b/cli/dstack/_internal/hub/routers/runs.py @@ -4,6 +4,7 @@ from fastapi.responses import PlainTextResponse from dstack._internal.backend.base import Backend +from dstack._internal.core.build import BuildNotFoundError from dstack._internal.core.error import NoMatchingInstanceError from dstack._internal.core.job import Job, JobStatus from dstack._internal.core.plan import JobPlan, RunPlan @@ -35,7 +36,14 @@ async def get_run_plan( msg=NoMatchingInstanceError.message, code=NoMatchingInstanceError.code ), ) - job_plans.append(JobPlan(job=job, instance_type=instance_type)) + try: + build = backend.predict_build_plan(job) + except BuildNotFoundError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=error_detail(msg=e.message, code=e.code), + ) + job_plans.append(JobPlan(job=job, instance_type=instance_type, build_plan=build)) run_plan = RunPlan(project=project_name, hub_user_name=user.name, job_plans=job_plans) return run_plan diff --git a/cli/dstack/api/hub/_api_client.py b/cli/dstack/api/hub/_api_client.py index 1a65add4b..788211446 100644 --- a/cli/dstack/api/hub/_api_client.py +++ b/cli/dstack/api/hub/_api_client.py @@ -5,6 +5,7 @@ import requests from dstack._internal.core.artifact import Artifact +from dstack._internal.core.build import BuildNotFoundError from dstack._internal.core.error import NoMatchingInstanceError from dstack._internal.core.job import Job, JobHead from dstack._internal.core.log_event import LogEvent @@ -83,6 +84,8 @@ def get_run_plan(self, jobs: List[Job]) -> RunPlan: body = resp.json() if body["detail"]["code"] == NoMatchingInstanceError.code: raise HubClientError(body["detail"]["msg"]) + elif body["detail"]["code"] == BuildNotFoundError.code: + raise HubClientError(body["detail"]["msg"]) resp.raise_for_status() def create_run(self) -> str: @@ -168,6 +171,8 @@ def run_job(self, job: Job): body = resp.json() if body["detail"]["code"] == NoMatchingInstanceError.code: raise HubClientError(body["detail"]["msg"]) + elif body["detail"]["code"] == BuildNotFoundError.code: + raise HubClientError(body["detail"]["msg"]) resp.raise_for_status() def stop_job(self, job_id: str, abort: bool): diff --git a/cli/dstack/api/hub/_client.py b/cli/dstack/api/hub/_client.py index baaac4245..95cb87588 100644 --- a/cli/dstack/api/hub/_client.py +++ b/cli/dstack/api/hub/_client.py @@ -262,6 +262,7 @@ def delete_configuration_cache(self, configuration_path: str): def get_run_plan( self, + configuration_path: str, provider_name: str, provider_data: Optional[Dict[str, Any]] = None, args: Optional[argparse.Namespace] = None, @@ -277,7 +278,7 @@ def get_run_plan( run_name="dry-run", ssh_key_pub="", ) - jobs = provider.get_jobs(repo=self.repo) + jobs = provider.get_jobs(repo=self.repo, configuration_path=configuration_path) run_plan = self._api_client.get_run_plan(jobs) return run_plan diff --git a/runner/internal/backend/aws/backend.go b/runner/internal/backend/aws/backend.go index a7bc9d142..359a431cc 100644 --- a/runner/internal/backend/aws/backend.go +++ b/runner/internal/backend/aws/backend.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/dstackai/dstack/runner/internal/container" "io" "io/ioutil" "path" @@ -321,13 +322,20 @@ func (s *AWSBackend) GetRepoArchive(ctx context.Context, path, dir string) error return gerrors.Wrap(base.GetRepoArchive(ctx, s.storage, path, dir)) } +func (s *AWSBackend) GetBuildDiffInfo(ctx context.Context, spec *container.BuildSpec) (*base.StorageObject, error) { + obj, err := base.GetBuildDiffInfo(ctx, s.storage, spec) + if err != nil { + return nil, gerrors.Wrap(err) + } + return obj, nil +} + func (s *AWSBackend) GetBuildDiff(ctx context.Context, key, dst string) error { - _ = base.DownloadFile(ctx, s.storage, key, dst) - return nil + return gerrors.Wrap(base.DownloadFile(ctx, s.storage, key, dst)) } -func (s *AWSBackend) PutBuildDiff(ctx context.Context, src, key string) error { - return gerrors.Wrap(base.UploadFile(ctx, s.storage, src, key)) +func (s *AWSBackend) PutBuildDiff(ctx context.Context, src string, spec *container.BuildSpec) error { + return gerrors.Wrap(base.PutBuildDiff(ctx, s.storage, src, spec)) } func (s *AWSBackend) GetTMPDir(ctx context.Context) string { diff --git a/runner/internal/backend/azure/backend.go b/runner/internal/backend/azure/backend.go index 8164e1cd2..8987ad2d4 100644 --- a/runner/internal/backend/azure/backend.go +++ b/runner/internal/backend/azure/backend.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "github.com/dstackai/dstack/runner/internal/backend/base" + "github.com/dstackai/dstack/runner/internal/container" "io" "os" "path" @@ -236,13 +237,20 @@ func (azbackend *AzureBackend) GetRepoArchive(ctx context.Context, path, dir str return gerrors.Wrap(base.GetRepoArchive(ctx, azbackend.storage, path, dir)) } +func (azbackend *AzureBackend) GetBuildDiffInfo(ctx context.Context, spec *container.BuildSpec) (*base.StorageObject, error) { + obj, err := base.GetBuildDiffInfo(ctx, azbackend.storage, spec) + if err != nil { + return nil, gerrors.Wrap(err) + } + return obj, nil +} + func (azbackend *AzureBackend) GetBuildDiff(ctx context.Context, key, dst string) error { - _ = base.DownloadFile(ctx, azbackend.storage, key, dst) - return nil + return gerrors.Wrap(base.DownloadFile(ctx, azbackend.storage, key, dst)) } -func (azbackend *AzureBackend) PutBuildDiff(ctx context.Context, src, key string) error { - return gerrors.Wrap(base.UploadFile(ctx, azbackend.storage, src, key)) +func (azbackend *AzureBackend) PutBuildDiff(ctx context.Context, src string, spec *container.BuildSpec) error { + return gerrors.Wrap(base.PutBuildDiff(ctx, azbackend.storage, src, spec)) } func (azbackend *AzureBackend) GetTMPDir(ctx context.Context) string { diff --git a/runner/internal/backend/backend.go b/runner/internal/backend/backend.go index 071aeb347..72d66cf09 100644 --- a/runner/internal/backend/backend.go +++ b/runner/internal/backend/backend.go @@ -4,6 +4,7 @@ import ( "context" "errors" "github.com/dstackai/dstack/runner/internal/backend/base" + "github.com/dstackai/dstack/runner/internal/container" "io" "io/ioutil" "os" @@ -39,8 +40,9 @@ type Backend interface { GetJobByPath(ctx context.Context, path string) (*models.Job, error) GetRepoDiff(ctx context.Context, path string) (string, error) GetRepoArchive(ctx context.Context, path, dst string) error + GetBuildDiffInfo(ctx context.Context, spec *container.BuildSpec) (*base.StorageObject, error) GetBuildDiff(ctx context.Context, key, dst string) error - PutBuildDiff(ctx context.Context, src, key string) error + PutBuildDiff(ctx context.Context, src string, spec *container.BuildSpec) error GetTMPDir(ctx context.Context) string GetDockerBindings(ctx context.Context) []mount.Mount } diff --git a/runner/internal/backend/base/backend.go b/runner/internal/backend/base/backend.go index b0c79ceeb..66d4ff5d0 100644 --- a/runner/internal/backend/base/backend.go +++ b/runner/internal/backend/base/backend.go @@ -2,15 +2,17 @@ package base import ( "context" + "errors" "fmt" - "os" - "strings" - + "github.com/dstackai/dstack/runner/internal/container" "github.com/dstackai/dstack/runner/internal/gerrors" "github.com/dstackai/dstack/runner/internal/log" "github.com/dstackai/dstack/runner/internal/models" "github.com/dstackai/dstack/runner/internal/repo" "gopkg.in/yaml.v2" + "os" + "strings" + "time" ) func LoadRunnerState(ctx context.Context, storage Storage, id string, out interface{}) error { @@ -93,3 +95,58 @@ func GetRepoArchive(ctx context.Context, storage Storage, path, dir string) erro } return nil } + +var ErrBuildNotFound = errors.New("build not found") + +func GetBuildDiffInfo(ctx context.Context, storage Storage, spec *container.BuildSpec) (*StorageObject, error) { + prefix := getBuildDiffPrefix(spec) + builds := make([]*StorageObject, 0) + ch, errCh := storage.List(ctx, prefix) + for item := range ch { + item.Key = prefix + item.Key + builds = append(builds, item) + } + if err := <-errCh; err != nil { + return nil, gerrors.Wrap(err) + } + if len(builds) == 1 { + return builds[0], nil + } + return nil, gerrors.Wrap(ErrBuildNotFound) +} + +func PutBuildDiff(ctx context.Context, storage Storage, src string, spec *container.BuildSpec) error { + newDiffKey := getBuildDiffName(spec) + oldDiff, err := GetBuildDiffInfo(ctx, storage, spec) + if err == nil { + log.Trace(ctx, "Deleting old build diff", "key", oldDiff.Key) + if err = storage.Delete(ctx, oldDiff.Key); err != nil { + return gerrors.Wrap(err) + } + } else if !errors.Is(err, ErrBuildNotFound) { + return gerrors.Wrap(err) + } + log.Trace(ctx, "Uploading new build diff", "key", newDiffKey) + return gerrors.Wrap(UploadFile(ctx, storage, src, newDiffKey)) +} + +func getBuildDiffPrefix(spec *container.BuildSpec) string { + return fmt.Sprintf( + "builds/%s/%s;%s;%s;%s;%s;", + spec.RepoId, + models.EscapeHead(spec.ConfigurationType), + models.EscapeHead(spec.ConfigurationPath), + models.EscapeHead(spec.WorkDir), + models.EscapeHead(spec.BaseImageName), + models.EscapeHead(spec.Platform), + ) +} + +func getBuildDiffName(spec *container.BuildSpec) string { + return fmt.Sprintf( + "%s%s;%d.tar", + getBuildDiffPrefix(spec), + models.EscapeHead(spec.Hash()), + time.Now().Unix(), // created timestamp + ) +} diff --git a/runner/internal/backend/gcp/backend.go b/runner/internal/backend/gcp/backend.go index 08b191e9d..60884183b 100644 --- a/runner/internal/backend/gcp/backend.go +++ b/runner/internal/backend/gcp/backend.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "github.com/dstackai/dstack/runner/internal/backend/base" + "github.com/dstackai/dstack/runner/internal/container" "io" "os" "path" @@ -236,14 +237,20 @@ func (gbackend *GCPBackend) GetRepoArchive(ctx context.Context, path, dir string return gerrors.Wrap(base.GetRepoArchive(ctx, gbackend.storage, path, dir)) } +func (gbackend *GCPBackend) GetBuildDiffInfo(ctx context.Context, spec *container.BuildSpec) (*base.StorageObject, error) { + obj, err := base.GetBuildDiffInfo(ctx, gbackend.storage, spec) + if err != nil { + return nil, gerrors.Wrap(err) + } + return obj, nil +} + func (gbackend *GCPBackend) GetBuildDiff(ctx context.Context, key, dst string) error { - _ = base.DownloadFile(ctx, gbackend.storage, key, dst) - return nil + return gerrors.Wrap(base.DownloadFile(ctx, gbackend.storage, key, dst)) } -func (gbackend *GCPBackend) PutBuildDiff(ctx context.Context, src, key string) error { - err := base.UploadFile(ctx, gbackend.storage, src, key) - return gerrors.Wrap(err) +func (gbackend *GCPBackend) PutBuildDiff(ctx context.Context, src string, spec *container.BuildSpec) error { + return gerrors.Wrap(base.PutBuildDiff(ctx, gbackend.storage, src, spec)) } func (gbackend *GCPBackend) GetTMPDir(ctx context.Context) string { diff --git a/runner/internal/backend/lambda/backend.go b/runner/internal/backend/lambda/backend.go index b78ee6457..9a20dbf91 100644 --- a/runner/internal/backend/lambda/backend.go +++ b/runner/internal/backend/lambda/backend.go @@ -3,6 +3,7 @@ package lambda import ( "context" "github.com/dstackai/dstack/runner/internal/backend/base" + "github.com/dstackai/dstack/runner/internal/container" "io" "io/ioutil" "os" @@ -140,12 +141,20 @@ func (l *LambdaBackend) GetRepoArchive(ctx context.Context, path, dir string) er return l.storageBackend.GetRepoArchive(ctx, path, dir) } +func (l *LambdaBackend) GetBuildDiffInfo(ctx context.Context, spec *container.BuildSpec) (*base.StorageObject, error) { + obj, err := l.storageBackend.GetBuildDiffInfo(ctx, spec) + if err != nil { + return nil, gerrors.Wrap(err) + } + return obj, nil +} + func (l *LambdaBackend) GetBuildDiff(ctx context.Context, key, dst string) error { return l.storageBackend.GetBuildDiff(ctx, key, dst) } -func (l *LambdaBackend) PutBuildDiff(ctx context.Context, src, key string) error { - return l.storageBackend.PutBuildDiff(ctx, src, key) +func (l *LambdaBackend) PutBuildDiff(ctx context.Context, src string, spec *container.BuildSpec) error { + return l.storageBackend.PutBuildDiff(ctx, src, spec) } func (l *LambdaBackend) GetTMPDir(ctx context.Context) string { diff --git a/runner/internal/backend/local/backend.go b/runner/internal/backend/local/backend.go index 07f5d1bb6..e60b49816 100644 --- a/runner/internal/backend/local/backend.go +++ b/runner/internal/backend/local/backend.go @@ -2,9 +2,9 @@ package local import ( "context" - "errors" "fmt" "github.com/dstackai/dstack/runner/internal/backend/base" + "github.com/dstackai/dstack/runner/internal/container" "io" "io/ioutil" "os" @@ -70,14 +70,7 @@ func New(namespace string) *Local { func (l *Local) Init(ctx context.Context, ID string) error { log.Trace(ctx, "Initialize backend with ID runner", "runner ID", ID) l.runnerID = ID - pathRunner := filepath.Join("runners", fmt.Sprintf("%s.yaml", ID)) - log.Trace(ctx, "Fetch local runner state", "path", pathRunner) - contents, err := l.storage.GetFile(pathRunner) - if err != nil { - return gerrors.Wrap(err) - } - err = yaml.Unmarshal(contents, &l.state) - if err != nil { + if err := base.LoadRunnerState(ctx, l.storage, ID, &l.state); err != nil { return gerrors.Wrap(err) } return nil @@ -89,20 +82,14 @@ func (l *Local) Job(ctx context.Context) *models.Job { } func (l *Local) RefetchJob(ctx context.Context) (*models.Job, error) { - log.Trace(ctx, "Refetching job from state", "ID", l.state.Job.JobID) - contents, err := l.storage.GetFile(l.state.Job.JobFilepath()) - if err != nil { - return nil, gerrors.Wrap(err) - } - err = yaml.Unmarshal(contents, &l.state.Job) - if err != nil { + if err := base.RefetchJob(ctx, l.storage, l.state.Job); err != nil { return nil, gerrors.Wrap(err) } return l.state.Job, nil } func (l *Local) MasterJob(ctx context.Context) *models.Job { - contents, err := l.storage.GetFile(filepath.Join("jobs", l.state.Job.RepoUserName, l.state.Job.RepoName, fmt.Sprintf("%s.yaml", l.state.Job.MasterJobID))) + contents, err := base.GetObject(ctx, l.storage, filepath.Join("jobs", l.state.Job.RepoUserName, l.state.Job.RepoName, fmt.Sprintf("%s.yaml", l.state.Job.MasterJobID))) if err != nil { return nil } @@ -120,32 +107,7 @@ func (l *Local) Requirements(ctx context.Context) models.Requirements { } func (l *Local) UpdateState(ctx context.Context) error { - log.Trace(ctx, "Start update state") - log.Trace(ctx, "Marshaling job") - contents, err := yaml.Marshal(&l.state.Job) - if err != nil { - return gerrors.Wrap(err) - } - jobPath := l.state.Job.JobFilepath() - log.Trace(ctx, "Write to file job", "Path", jobPath) - err = l.storage.PutFile(jobPath, contents) - if err != nil { - return gerrors.Wrap(err) - } - log.Trace(ctx, "Fetching list jobs", "Repo username", l.state.Job.RepoUserName, "Repo name", l.state.Job.RepoName, "Job ID", l.state.Job.JobID) - files, err := l.storage.ListFile(l.state.Job.JobHeadFilepathPrefix()) - if err != nil { - return gerrors.Wrap(err) - } - jobHeadFilepath := l.state.Job.JobHeadFilepath() - for _, file := range files[:1] { - log.Trace(ctx, "Renaming file job", "From", file, "To", jobHeadFilepath) - err = l.storage.RenameFile(file, jobHeadFilepath) - if err != nil { - return gerrors.Wrap(err) - } - } - return nil + return gerrors.Wrap(base.UpdateState(ctx, l.storage, l.state.Job)) } func (l *Local) CheckStop(ctx context.Context) (bool, error) { @@ -209,12 +171,8 @@ func (l *Local) CreateLogger(ctx context.Context, logGroup, logName string) io.W } func (l *Local) GetJobByPath(ctx context.Context, path string) (*models.Job, error) { - contents, err := l.storage.GetFile(path) - if err != nil { - return nil, gerrors.Wrap(err) - } job := new(models.Job) - if err = yaml.Unmarshal(contents, job); err != nil { + if err := base.GetJobByPath(ctx, l.storage, path, job); err != nil { return nil, gerrors.Wrap(err) } return job, nil @@ -256,11 +214,11 @@ func (l *Local) Bucket(ctx context.Context) string { func (l *Local) ListSubDir(ctx context.Context, dir string) ([]string, error) { log.Trace(ctx, "Fetching list sub dir") - return l.storage.ListFile(dir) + return base.ListObjects(ctx, l.storage, dir) } func (l *Local) GetRepoDiff(ctx context.Context, path string) (string, error) { - diff, err := l.storage.GetFile(path) + diff, err := base.GetObject(ctx, l.storage, path) if err != nil { return "", gerrors.Wrap(err) } @@ -275,12 +233,20 @@ func (l *Local) GetRepoArchive(ctx context.Context, path, dir string) error { return nil } +func (l *Local) GetBuildDiffInfo(ctx context.Context, spec *container.BuildSpec) (*base.StorageObject, error) { + obj, err := base.GetBuildDiffInfo(ctx, l.storage, spec) + if err != nil { + return nil, gerrors.Wrap(err) + } + return obj, nil +} + func (l *Local) GetBuildDiff(ctx context.Context, key, dst string) error { - return errors.New("not implemented") + return gerrors.New("not implemented") } -func (l *Local) PutBuildDiff(ctx context.Context, src, key string) error { - return errors.New("not implemented") +func (l *Local) PutBuildDiff(ctx context.Context, src string, spec *container.BuildSpec) error { + return gerrors.Wrap(base.PutBuildDiff(ctx, l.storage, src, spec)) } func (l *Local) GetTMPDir(ctx context.Context) string { diff --git a/runner/internal/backend/local/storage.go b/runner/internal/backend/local/storage.go index c366671cf..022fb6618 100644 --- a/runner/internal/backend/local/storage.go +++ b/runner/internal/backend/local/storage.go @@ -1,6 +1,11 @@ package local import ( + "context" + "errors" + "github.com/dstackai/dstack/runner/internal/backend/base" + "io" + "io/fs" "os" "path/filepath" "strings" @@ -16,74 +21,120 @@ func NewLocalStorage(path string) *LocalStorage { return &LocalStorage{basepath: path} } -func (lstorage *LocalStorage) GetFile(path string) ([]byte, error) { - fullpath := filepath.Join(lstorage.basepath, path) - contents, err := os.ReadFile(fullpath) +func (s *LocalStorage) Download(ctx context.Context, key string, writer io.Writer) error { + path := filepath.Join(s.basepath, key) + file, err := os.Open(path) if err != nil { - return nil, gerrors.Wrap(err) + return gerrors.Wrap(err) } - return contents, nil + defer func() { _ = file.Close() }() + _, err = io.Copy(writer, file) + return gerrors.Wrap(err) } -func (lstorage *LocalStorage) PutFile(path string, contents []byte) error { - tmpfile, err := os.CreateTemp(filepath.Join(lstorage.basepath, "tmp"), "job") +func (s *LocalStorage) Upload(ctx context.Context, reader io.Reader, key string) error { + tmpfile, err := os.CreateTemp(filepath.Join(s.basepath, "tmp"), "job") if err != nil { return gerrors.Wrap(err) } - _, err = tmpfile.Write(contents) + defer func() { _ = os.Remove(tmpfile.Name()) }() + _, err = io.Copy(tmpfile, reader) if err != nil { return gerrors.Wrap(err) } - err = os.Rename(tmpfile.Name(), filepath.Join(lstorage.basepath, path)) - if err != nil { + dstPath := filepath.Join(s.basepath, key) + if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil { return gerrors.Wrap(err) } - return nil + return gerrors.Wrap(os.Rename(tmpfile.Name(), dstPath)) +} + +func (s *LocalStorage) Delete(ctx context.Context, key string) error { + return gerrors.Wrap(os.Remove(filepath.Join(s.basepath, key))) } -func (lstorage *LocalStorage) RenameFile(oldKey, newKey string) error { +func (s *LocalStorage) Rename(ctx context.Context, oldKey, newKey string) error { if oldKey == newKey { return nil } - tmpfile, err := os.CreateTemp(filepath.Join(lstorage.basepath, "tmp"), "jobhead") - if err != nil { - return gerrors.Wrap(err) - } - contents, err := lstorage.GetFile(oldKey) - if err != nil { - return gerrors.Wrap(err) - } - _, err = tmpfile.Write(contents) - if err != nil { - return gerrors.Wrap(err) - } - err = tmpfile.Close() - if err != nil { - return gerrors.Wrap(err) - } - err = os.Rename(tmpfile.Name(), filepath.Join(lstorage.basepath, newKey)) + + reader, err := os.Open(filepath.Join(s.basepath, oldKey)) if err != nil { return gerrors.Wrap(err) } - err = os.Remove(filepath.Join(lstorage.basepath, oldKey)) - if err != nil { + defer func() { _ = reader.Close() }() + + // Upload will create temp file on the same device and afterward replace newKey with oldKey copy + if err = s.Upload(ctx, reader, newKey); err != nil { return gerrors.Wrap(err) } - return nil + return gerrors.Wrap(os.Remove(filepath.Join(s.basepath, oldKey))) } -func (lstorage *LocalStorage) ListFile(prefix string) ([]string, error) { - dirpath := filepath.Dir(prefix) - filePrefix := filepath.Base(prefix) - entries, err := os.ReadDir(filepath.Join(lstorage.basepath, dirpath)) - if err != nil { - return nil, gerrors.Wrap(err) - } - fileNames := make([]string, 0) - for _, entry := range entries { - if strings.HasPrefix(entry.Name(), filePrefix) { - fileNames = append(fileNames, filepath.Join(dirpath, entry.Name())) +func (s *LocalStorage) CreateSymlink(ctx context.Context, key, symlink string) error { + return gerrors.Wrap(os.Symlink(symlink, key)) +} + +func (s *LocalStorage) GetMetadata(ctx context.Context, key, tag string) (string, error) { + return "", gerrors.New("not implemented") +} + +func (s *LocalStorage) List(ctx context.Context, prefix string) (<-chan *base.StorageObject, <-chan error) { + dirpath := filepath.Dir(filepath.Join(s.basepath, prefix)) + ch := make(chan *base.StorageObject) + errCh := make(chan error, 1) + go func() { + defer close(ch) + defer close(errCh) + if _, err := os.Stat(dirpath); errors.Is(err, os.ErrNotExist) { + return } - } - return fileNames, nil + if err := filepath.Walk(dirpath, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return gerrors.Wrap(err) + } + if dirpath == path { // not interested in parent directory itself + return nil + } + fullKey, err := filepath.Rel(s.basepath, path) + if err != nil { + return gerrors.Wrap(err) + } + if info.IsDir() { + fullKey += "/" + } + if !strings.HasPrefix(fullKey, prefix) { + if info.IsDir() { + return filepath.SkipDir + } + return nil + } + if info.IsDir() { + return nil + } + + symlink := "" + if info.Mode()&os.ModeSymlink == os.ModeSymlink { + symlink, err = os.Readlink(path) + if err != nil { + return gerrors.Wrap(err) + } + } + select { + case <-ctx.Done(): + return gerrors.New("context was canceled") + case ch <- &base.StorageObject{ + Key: strings.TrimPrefix(filepath.ToSlash(fullKey), prefix), + Size: info.Size(), + ModTime: info.ModTime(), + Symlink: symlink, + }: + } + return nil + }); err != nil { + errCh <- gerrors.Wrap(err) + return + } + }() + return ch, errCh } diff --git a/runner/internal/backend/local/storage_test.go b/runner/internal/backend/local/storage_test.go new file mode 100644 index 000000000..8ee84a9aa --- /dev/null +++ b/runner/internal/backend/local/storage_test.go @@ -0,0 +1,84 @@ +package local + +import ( + "context" + "github.com/dstackai/dstack/runner/internal/backend/base" + "github.com/stretchr/testify/assert" + "os" + "path/filepath" + "strings" + "testing" +) + +func TestListEmptyRoot(t *testing.T) { + s, err := NewLocalTest(t.TempDir(), []string{}) + assert.Nil(t, err) + items, err := base.ListObjects(context.TODO(), s, "") + assert.Nil(t, err) + assert.ElementsMatch(t, items, []string{}) +} + +func TestListNotExist(t *testing.T) { + s, err := NewLocalTest(t.TempDir(), []string{}) + assert.Nil(t, err) + items, err := base.ListObjects(context.TODO(), s, "a/b/c") + assert.Nil(t, err) + assert.ElementsMatch(t, items, []string{}) +} + +func TestListRecursive(t *testing.T) { + s, err := NewLocalTest(t.TempDir(), []string{ + "a/b/c", + "a/d", + "b/c", + "a/x/", + }) + assert.Nil(t, err) + items, err := base.ListObjects(context.TODO(), s, "a") + assert.Nil(t, err) + assert.ElementsMatch(t, items, []string{"a/b/c", "a/d"}) +} + +func TestListPrefix(t *testing.T) { + s, err := NewLocalTest(t.TempDir(), []string{ + "a/1234", + "a/12qw", + "a/12/3x", + "a/1123", + "a/qwerty", + }) + assert.Nil(t, err) + items, err := base.ListObjects(context.TODO(), s, "a/12") + assert.Nil(t, err) + assert.ElementsMatch(t, items, []string{"a/1234", "a/12qw", "a/12/3x"}) +} + +func TestListPrefixWithSlash(t *testing.T) { + s, err := NewLocalTest(t.TempDir(), []string{ + "a/1234", + "a/12/qwe", + }) + assert.Nil(t, err) + items, err := base.ListObjects(context.TODO(), s, "a/12/") + assert.Nil(t, err) + assert.ElementsMatch(t, items, []string{"a/12/qwe"}) +} + +func NewLocalTest(root string, files []string) (*LocalStorage, error) { + for _, file := range files { + path := filepath.Join(root, file) + if strings.HasSuffix(file, "/") { // create dir + if err := os.MkdirAll(path, 0o755); err != nil { + return nil, err + } + } else { // create file + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return nil, err + } + if err := os.WriteFile(path, []byte(file), 0o644); err != nil { + return nil, err + } + } + } + return &LocalStorage{basepath: root}, nil +} diff --git a/runner/internal/container/build.go b/runner/internal/container/build.go index f1007e505..c97def2c5 100644 --- a/runner/internal/container/build.go +++ b/runner/internal/container/build.go @@ -26,6 +26,8 @@ type BuildSpec struct { BaseImageName string RegistryAuthBase64 string RepoPath string + Platform string + RepoId string } func BuildImage(ctx context.Context, client docker.APIClient, spec *BuildSpec, imageName string, stoppedCh chan struct{}, logs io.Writer) error { diff --git a/runner/internal/container/engine.go b/runner/internal/container/engine.go index 9f535881b..52830d514 100644 --- a/runner/internal/container/engine.go +++ b/runner/internal/container/engine.go @@ -3,8 +3,9 @@ package container import ( "context" "fmt" + "github.com/dstackai/dstack/runner/internal/environment" + "github.com/dstackai/dstack/runner/internal/models" "io" - "io/ioutil" "os/exec" "runtime" "strings" @@ -285,7 +286,7 @@ func (r *Engine) pullImageIfAbsent(ctx context.Context, image string, registryAu return gerrors.Wrap(err) } defer func() { _ = reader.Close() }() - buf, err := ioutil.ReadAll(reader) + buf, err := io.ReadAll(reader) if err != nil { return gerrors.Wrap(err) } @@ -293,17 +294,45 @@ func (r *Engine) pullImageIfAbsent(ctx context.Context, image string, registryAu return nil } -func (r *Engine) GetBuildDigest(ctx context.Context, spec *BuildSpec) (string, error) { - err := r.pullImageIfAbsent(ctx, spec.BaseImageName, spec.RegistryAuthBase64) +func (r *Engine) NewBuildSpec(ctx context.Context, job *models.Job, spec *Spec, secrets map[string]string, repoPath string) (*BuildSpec, error) { + err := r.pullImageIfAbsent(ctx, spec.Image, spec.RegistryAuthBase64) if err != nil { - return "", gerrors.Wrap(err) + return nil, gerrors.Wrap(err) } - info, _, err := r.client.ImageInspectWithRaw(ctx, spec.BaseImageName) + baseImage, _, err := r.client.ImageInspectWithRaw(ctx, spec.Image) if err != nil { - return "", gerrors.Wrap(err) + return nil, gerrors.Wrap(err) } - spec.BaseImageID = info.ID - return spec.Hash(), nil + daemonInfo, err := r.client.Info(ctx) + if err != nil { + return nil, gerrors.Wrap(err) + } + + commands := append([]string{}, job.BuildCommands...) + commands = append(commands, job.OptionalBuildCommands...) + env := environment.New() + env.AddMapString(job.Environment) + env.AddMapString(secrets) + + buildSpec := &BuildSpec{ + BaseImageName: spec.Image, + BaseImageID: baseImage.ID, + WorkDir: spec.WorkDir, + ConfigurationPath: job.ConfigurationPath, + ConfigurationType: job.ConfigurationType, + Commands: ShellCommands(commands), + Entrypoint: spec.Entrypoint, + Env: env.ToSlice(), + RegistryAuthBase64: spec.RegistryAuthBase64, + RepoPath: repoPath, + RepoId: job.RepoId, + } + if daemonInfo.Architecture == "aarch64" { + buildSpec.Platform = "arm64" + } else { + buildSpec.Platform = "amd64" + } + return buildSpec, nil } func (r *Engine) Build(ctx context.Context, spec *BuildSpec, imageName string, stoppedCh chan struct{}, logs io.Writer) error { diff --git a/runner/internal/executor/executor.go b/runner/internal/executor/executor.go index 171d3556c..b59795726 100644 --- a/runner/internal/executor/executor.go +++ b/runner/internal/executor/executor.go @@ -710,105 +710,85 @@ func (ex *Executor) Shutdown(ctx context.Context) { func (ex *Executor) build(ctx context.Context, spec *container.Spec, stoppedCh chan struct{}, logs io.Writer) error { job := ex.backend.Job(ctx) - _, isLocalBackend := ex.backend.(*localbackend.Local) - commands := append([]string{}, job.BuildCommands...) - commands = append(commands, job.OptionalBuildCommands...) - - buildSpec := &container.BuildSpec{ - BaseImageName: spec.Image, - WorkDir: spec.WorkDir, - ConfigurationPath: job.ConfigurationPath, - ConfigurationType: job.ConfigurationType, - Commands: container.ShellCommands(commands), - Entrypoint: spec.Entrypoint, - Env: ex.environment(ctx, false), - RegistryAuthBase64: spec.RegistryAuthBase64, - RepoPath: path.Join(ex.backend.GetTMPDir(ctx), consts.RUNS_DIR, job.RunName, job.JobID), - } - buildName, err := ex.engine.GetBuildDigest(ctx, buildSpec) + if len(job.BuildCommands) == 0 && len(job.OptionalBuildCommands) == 0 { + return nil + } + secrets, err := ex.backend.Secrets(ctx) if err != nil { return gerrors.Wrap(err) } - - tempDir, err := os.MkdirTemp("", "build") + buildSpec, err := ex.engine.NewBuildSpec(ctx, job, spec, secrets, path.Join(ex.backend.GetTMPDir(ctx), consts.RUNS_DIR, job.RunName, job.JobID)) if err != nil { return gerrors.Wrap(err) } - defer func() { _ = os.RemoveAll(tempDir) }() - diffPath := filepath.Join(tempDir, "layer.tar") - key := fmt.Sprintf("builds/%s/%s.tar", job.RepoId, buildName) - imageName := fmt.Sprintf("dstackai/build:%s", buildName) + imageName := fmt.Sprintf("dstackai/build:%s", buildSpec.Hash()) + _, isLocalBackend := ex.backend.(*localbackend.Local) + diffPath, err := os.CreateTemp("", "layer*.tar") + if err != nil { + return gerrors.Wrap(err) + } + defer func() { _ = os.Remove(diffPath.Name()) }() if job.BuildPolicy == models.UseBuild || job.BuildPolicy == models.Build { - log.Trace(ctx, "Trying to fetch build image diff", "key", key, "image", imageName) - if _, err := fmt.Fprintf(ex.streamLogs, "Looking for the image...\n"); err != nil { - return gerrors.Wrap(err) - } - if isLocalBackend { - exists, err := ex.engine.ImageExists(ctx, imageName) - if err != nil { - return gerrors.Wrap(err) - } - if exists { - if _, err := fmt.Fprintf(ex.streamLogs, "Using the image from the cache\n\n"); err != nil { - return gerrors.Wrap(err) - } - spec.Image = imageName - return nil - } - } else { - if err := ex.backend.GetBuildDiff(ctx, key, diffPath); err != nil { - return gerrors.Wrap(err) - } - if stat, err := os.Stat(diffPath); err == nil { - if _, err = fmt.Fprintf(ex.streamLogs, "Loading the image (%s)...\n", humanize.Bytes(uint64(stat.Size()))); err != nil { + buildInfo, err := ex.backend.GetBuildDiffInfo(ctx, buildSpec) + if err == nil { + if isLocalBackend { + if exists, err := ex.engine.ImageExists(ctx, imageName); err != nil { return gerrors.Wrap(err) + } else if exists { + _, _ = fmt.Fprintf(ex.streamLogs, "The image is loaded\n") + spec.Image = imageName + return nil } - if err := ex.engine.ImportImageDiff(ctx, diffPath); err != nil { + err = gerrors.Wrap(base.ErrBuildNotFound) + } else { + _, _ = fmt.Fprintf(ex.streamLogs, "Downloading the image diff (%s)...\n", humanize.Bytes(uint64(buildInfo.Size))) + if err := ex.backend.GetBuildDiff(ctx, buildInfo.Key, diffPath.Name()); err != nil { return gerrors.Wrap(err) } - if _, err = fmt.Fprintf(ex.streamLogs, "The image is loaded\n\n"); err != nil { + _, _ = fmt.Fprintf(ex.streamLogs, "Loading the image diff...\n") + if err := ex.engine.ImportImageDiff(ctx, diffPath.Name()); err != nil { return gerrors.Wrap(err) } + _, _ = fmt.Fprintf(ex.streamLogs, "The image is loaded\n") spec.Image = imageName return nil } - } - if _, err = fmt.Fprintf(ex.streamLogs, "No image is found\n\n"); err != nil { + } else if !errors.Is(err, base.ErrBuildNotFound) { return gerrors.Wrap(err) } - if job.BuildPolicy == models.UseBuild && len(job.BuildCommands) > 0 { - job.ErrorCode = errorcodes.BuildNotFound - _ = ex.backend.UpdateState(ctx) - return gerrors.New("no build image is found") + // handle ErrBuildNotFound + if len(job.BuildCommands) > 0 { // if build is not optional + _, _ = fmt.Fprintf(ex.streamLogs, "No image is found\n") + if job.BuildPolicy == models.UseBuild { + job.ErrorCode = errorcodes.BuildNotFound + _ = ex.backend.UpdateState(ctx) + return gerrors.Wrap(err) + } } } if job.BuildPolicy == models.Build || job.BuildPolicy == models.ForceBuild || job.BuildPolicy == models.BuildOnly { - err := ex.engine.Build(ctx, buildSpec, imageName, stoppedCh, logs) - if err != nil { + if err := ex.engine.Build(ctx, buildSpec, imageName, stoppedCh, logs); err != nil { return gerrors.Wrap(err) } - // local backend: store image in daemon cache + // local backend: store image in daemon cache, put empty diff as head file if !isLocalBackend { - if _, err := fmt.Fprintf(ex.streamLogs, "Saving the image...\n"); err != nil { - return gerrors.Wrap(err) - } - if err := ex.engine.ExportImageDiff(ctx, imageName, diffPath); err != nil { + _, _ = fmt.Fprintf(ex.streamLogs, "Saving the image diff...\n") + if err := ex.engine.ExportImageDiff(ctx, imageName, diffPath.Name()); err != nil { return gerrors.Wrap(err) } - stat, err := os.Stat(diffPath) + diffStat, err := os.Stat(diffPath.Name()) if err != nil { return gerrors.Wrap(err) } - log.Trace(ctx, "Putting build image diff", "key", key, "image", imageName, "size", stat.Size()) - if _, err = fmt.Fprintf(ex.streamLogs, "Uploading the image (%s)...\n", humanize.Bytes(uint64(stat.Size()))); err != nil { - return gerrors.Wrap(err) - } - if err = ex.backend.PutBuildDiff(ctx, diffPath, key); err != nil { - return gerrors.Wrap(err) - } + log.Trace(ctx, "Putting build image diff", "image", imageName, "size", diffStat.Size()) + _, _ = fmt.Fprintf(ex.streamLogs, "Uploading the image diff (%s)...\n", humanize.Bytes(uint64(diffStat.Size()))) + } + if err := ex.backend.PutBuildDiff(ctx, diffPath.Name(), buildSpec); err != nil { + return gerrors.Wrap(err) } + _, _ = fmt.Fprintf(ex.streamLogs, "The image diff is saved\n") spec.Image = imageName }