Skip to content

Commit

Permalink
feature: support push image in daemon side
Browse files Browse the repository at this point in the history
implement push command in daemon side, since containerd v1.0.3
version push get some problem when push image with index.json which
have multiple platforms. But it works good with only one platform,
this feature is our need.

fixes: AliyunContainerService#2099

Signed-off-by: Ace-Tang <aceapril@126.com>
  • Loading branch information
Ace-Tang authored and allencloud committed Jan 17, 2019
1 parent 6ca29af commit ff8b8fb
Show file tree
Hide file tree
Showing 14 changed files with 364 additions and 13 deletions.
23 changes: 23 additions & 0 deletions apis/server/image_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,26 @@ func (s *Server) getImageHistory(ctx context.Context, rw http.ResponseWriter, re

return EncodeResponse(rw, http.StatusOK, history)
}

// pushImage will push an image to a specified registry.
func (s *Server) pushImage(ctx context.Context, rw http.ResponseWriter, req *http.Request) error {
name := mux.Vars(req)["name"]
tag := req.FormValue("tag")

// get registry auth from Request header
authStr := req.Header.Get("X-Registry-Auth")
authConfig := types.AuthConfig{}
if authStr != "" {
data := base64.NewDecoder(base64.URLEncoding, strings.NewReader(authStr))
if err := json.NewDecoder(data).Decode(&authConfig); err != nil {
return err
}
}

if err := s.ImageMgr.PushImage(ctx, name, tag, &authConfig, newWriteFlusher(rw)); err != nil {
logrus.Errorf("failed to push image %s with tag %s: %v", name, tag, err)
return err
}

return nil
}
1 change: 1 addition & 0 deletions apis/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func initRoute(s *Server) *mux.Router {
{Method: http.MethodPost, Path: "/images/load", HandlerFunc: withCancelHandler(s.loadImage)},
{Method: http.MethodGet, Path: "/images/save", HandlerFunc: withCancelHandler(s.saveImage)},
{Method: http.MethodGet, Path: "/images/{name:.*}/history", HandlerFunc: s.getImageHistory},
{Method: http.MethodPost, Path: "/images/{name:.*}/push", HandlerFunc: s.pushImage},

// volume
{Method: http.MethodGet, Path: "/volumes", HandlerFunc: s.listVolume},
Expand Down
27 changes: 27 additions & 0 deletions apis/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,33 @@ paths:
500:
$ref: "#/responses/500ErrorResponse"

/images/{imageid}/push:
post:
summary: "push an image"
description: "push an image on the registry"
operationId: "ImagePush"
consumes:
- "application/octet-stream"
produces:
- "application/json"
parameters:
- $ref: "#/parameters/imageid"
- name: "tag"
in: "query"
description: "the tag to associate with the image on the registry. This is optional."
type: "string"
- name: "X-Registry-Auth"
in: "header"
description: "A base64-encoded auth configuration. [See the authentication section for details.](#section/Authentication)"
type: "string"
responses:
200:
description: "no error"
404:
$ref: "#/responses/404ErrorResponse"
500:
$ref: "#/responses/500ErrorResponse"

/containers/create:
post:
summary: "Create a container"
Expand Down
2 changes: 1 addition & 1 deletion cli/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func displayImageReferenceProgress(output io.Writer, isTerminal bool, msgs []jso
current += msg.Detail.Current
}

status := jsonstream.PullReferenceStatus(!isTerminal, msg)
status := jsonstream.ProcessStatus(!isTerminal, msg)
if _, err := fmt.Fprint(tw, status); err != nil {
return err
}
Expand Down
42 changes: 42 additions & 0 deletions client/image_push.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package client

import (
"context"
"errors"
"io"
"net/url"

"github.com/alibaba/pouch/pkg/reference"
)

// ImagePush requests daemon to push an image to registry.
func (client *APIClient) ImagePush(ctx context.Context, ref, encodedAuth string) (io.ReadCloser, error) {
namedRef, err := reference.Parse(ref)
if err != nil {
return nil, err
}

if reference.IsCanonicalDigested(namedRef) {
return nil, errors.New("cannot push a digest reference format image")
}

tag := ""
if v, ok := namedRef.(reference.Tagged); ok {
tag = v.Tag()
}

q := url.Values{}
if tag != "" {
q.Set("tag", tag)
}

headers := map[string][]string{}
if encodedAuth != "" {
headers["X-Registry-Auth"] = []string{encodedAuth}
}
resp, err := client.post(ctx, "/images/"+namedRef.Name()+"/push", q, nil, headers)
if err != nil {
return nil, err
}
return resp.Body, nil
}
51 changes: 51 additions & 0 deletions client/image_push_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package client

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"strings"
"testing"
)

func TestImagePushServerError(t *testing.T) {
client := &APIClient{
HTTPCli: newMockClient(errorMockResponse(http.StatusInternalServerError, "Server error")),
}
_, err := client.ImagePush(context.Background(), "image", "auth")
if err == nil || !strings.Contains(err.Error(), "Server error") {
t.Fatalf("expected a Server Error, got %v", err)
}
}

func TestImagePush(t *testing.T) {
name := "test_image"
expectedURL := "/images/" + name + "/push"

httpClient := newMockClient(func(req *http.Request) (*http.Response, error) {
if !strings.HasPrefix(req.URL.Path, expectedURL) {
return nil, fmt.Errorf("expected URL '%s', got '%s'", expectedURL, req.URL)
}

if req.Method != "POST" {
return nil, fmt.Errorf("expected POST method, got %s", req.Method)
}

return &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader([]byte(""))),
}, nil
})

client := &APIClient{
HTTPCli: httpClient,
}

_, err := client.ImagePush(context.Background(), name, "auth")
if err != nil {
t.Fatal(err)
}

}
1 change: 1 addition & 0 deletions client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type ImageAPIClient interface {
ImageLoad(ctx context.Context, name string, r io.Reader) error
ImageSave(ctx context.Context, imageName string) (io.ReadCloser, error)
ImageHistory(ctx context.Context, name string) ([]types.HistoryResultItem, error)
ImagePush(ctx context.Context, ref, encodedAuth string) (io.ReadCloser, error)
}

// VolumeAPIClient defines methods of Volume client.
Expand Down
59 changes: 58 additions & 1 deletion ctrd/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/containerd/containerd/errdefs"
ctrdmetaimages "github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
Expand Down Expand Up @@ -175,14 +176,70 @@ func (c *Client) importImage(ctx context.Context, importer ctrdmetaimages.Import
return imgs, nil
}

// PushImage pushes image to registry
func (c *Client) PushImage(ctx context.Context, ref string, authConfig *types.AuthConfig, out io.Writer) error {
wrapperCli, err := c.Get(ctx)
if err != nil {
return fmt.Errorf("failed to get a containerd grpc client: %v", err)
}

img, err := wrapperCli.client.GetImage(ctx, ref)
if err != nil {
return convertCtrdErr(err)
}

pushTracker := docker.NewInMemoryTracker()

resolver, err := resolver(authConfig, docker.ResolverOptions{
Tracker: pushTracker,
})
if err != nil {
return err
}

ongoing := jsonstream.NewPushJobs(pushTracker)
handler := ctrdmetaimages.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.Add(remotes.MakeRefKey(ctx, desc))
return nil, nil
})

// fetch progress status, then send to client via out channel.
stream := jsonstream.New(out, nil)
pctx, cancelProgress := context.WithCancel(ctx)
wait := make(chan struct{})
go func() {
jsonstream.PushProcess(pctx, ongoing, stream)
close(wait)
}()

err = wrapperCli.client.Push(ctx, ref, img.Target(),
containerd.WithResolver(resolver),
containerd.WithImageHandler(handler))

cancelProgress()
<-wait

defer func() {
stream.Close()
stream.Wait()
}()
if err != nil {
return err
}

logrus.Infof("push image %s successfully", ref)

return nil
}

// FetchImage fetches image content from the remote repository.
func (c *Client) FetchImage(ctx context.Context, ref string, authConfig *types.AuthConfig, stream *jsonstream.JSONStream) (containerd.Image, error) {
wrapperCli, err := c.Get(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get a containerd grpc client: %v", err)
}

resolver, err := resolver(authConfig)
resolver, err := resolver(authConfig, docker.ResolverOptions{})
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions ctrd/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type ImageAPIClient interface {
SaveImage(ctx context.Context, exporter ctrdmetaimages.Exporter, ref string) (io.ReadCloser, error)
// Commit commits an image from a container.
Commit(ctx context.Context, config *CommitConfig) (digest.Digest, error)
// PushImage pushes a image to registry
PushImage(ctx context.Context, ref string, authConfig *types.AuthConfig, out io.Writer) error
}

// SnapshotAPIClient provides access to containerd snapshot features
Expand Down
15 changes: 7 additions & 8 deletions ctrd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ import (
"github.com/pkg/errors"
)

func resolver(authConfig *types.AuthConfig) (remotes.Resolver, error) {
func resolver(authConfig *types.AuthConfig, resolverOpt docker.ResolverOptions) (remotes.Resolver, error) {
var (
// TODO
username = ""
secret = ""
plainHTTP = false
refresh = ""
insecure = false
username = ""
secret = ""
refresh = ""
insecure = false
)

if authConfig != nil {
Expand All @@ -35,8 +34,8 @@ func resolver(authConfig *types.AuthConfig) (remotes.Resolver, error) {
_ = refresh

options := docker.ResolverOptions{
PlainHTTP: plainHTTP,
Tracker: docker.NewInMemoryTracker(),
PlainHTTP: resolverOpt.PlainHTTP,
Tracker: resolverOpt.Tracker,
}
options.Credentials = func(host string) (string, string, error) {
// Only one host
Expand Down
19 changes: 19 additions & 0 deletions daemon/mgr/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type ImageMgr interface {
// PullImage pulls images from specified registry.
PullImage(ctx context.Context, ref string, authConfig *types.AuthConfig, out io.Writer) error

// PushImage pushes image to specified registry.
PushImage(ctx context.Context, name, tag string, authConfig *types.AuthConfig, out io.Writer) error

// GetImage returns imageInfo by reference or id.
GetImage(ctx context.Context, idOrRef string) (*types.ImageInfo, error)

Expand Down Expand Up @@ -194,6 +197,22 @@ func (mgr *ImageManager) PullImage(ctx context.Context, ref string, authConfig *
return mgr.StoreImageReference(ctx, img)
}

// PushImage pushes image to specified registry.
func (mgr *ImageManager) PushImage(ctx context.Context, name, tag string, authConfig *types.AuthConfig, out io.Writer) error {
ref, err := reference.Parse(name)
if err != nil {
return err
}

if tag == "" {
ref = reference.WithDefaultTagIfMissing(ref)
} else {
ref = reference.WithTag(ref, tag)
}

return mgr.client.PushImage(ctx, ref.String(), authConfig, out)
}

// GetImage returns imageInfo by reference.
func (mgr *ImageManager) GetImage(ctx context.Context, idOrRef string) (*types.ImageInfo, error) {
id, _, _, err := mgr.CheckReference(ctx, idOrRef)
Expand Down
9 changes: 6 additions & 3 deletions pkg/jsonstream/image_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,24 @@ const (
PullStatusExists = "exists"
// PullStatusDone represents done status.
PullStatusDone = "done"

// PushStatusUploading represents uploading status.
PushStatusUploading = "uploading"
)

// PullReferenceStatus returns the status of pulling the image reference.
// ProcessStatus returns the status of download or upload image
//
// NOTE: if the stdout is not terminal, it should only show the reference and
// status without progress bar.
func PullReferenceStatus(short bool, msg JSONMessage) string {
func ProcessStatus(short bool, msg JSONMessage) string {
if short || msg.Detail == nil {
return fmt.Sprintf("%s:\t%s\n", msg.ID, msg.Status)
}

switch msg.Status {
case PullStatusResolving, PullStatusWaiting:
return fmt.Sprintf("%s:\t%s\t%40r\t\n", msg.ID, msg.Status, progress.Bar(0.0))
case PullStatusDownloading:
case PullStatusDownloading, PushStatusUploading:
bar := progress.Bar(0)
current, total := progress.Bytes(msg.Detail.Current), progress.Bytes(msg.Detail.Total)

Expand Down
Loading

0 comments on commit ff8b8fb

Please sign in to comment.