Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move client implementations for content store and snapshotter #2341

Merged
merged 4 commits into from May 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions client.go
Expand Up @@ -38,6 +38,7 @@ import (
versionservice "github.com/containerd/containerd/api/services/version/v1"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
contentproxy "github.com/containerd/containerd/content/proxy"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/dialer"
"github.com/containerd/containerd/errdefs"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/schema1"
"github.com/containerd/containerd/snapshots"
snproxy "github.com/containerd/containerd/snapshots/proxy"
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -459,15 +461,15 @@ func (c *Client) ContentStore() content.Store {
if c.contentStore != nil {
return c.contentStore
}
return NewContentStoreFromClient(contentapi.NewContentClient(c.conn))
return contentproxy.NewContentStore(contentapi.NewContentClient(c.conn))
}

// SnapshotService returns the underlying snapshotter for the provided snapshotter name
func (c *Client) SnapshotService(snapshotterName string) snapshots.Snapshotter {
if c.snapshotters != nil {
return c.snapshotters[snapshotterName]
}
return NewSnapshotterFromClient(snapshotsapi.NewSnapshotsClient(c.conn), snapshotterName)
return snproxy.NewSnapshotter(snapshotsapi.NewSnapshotsClient(c.conn), snapshotterName)
}

// TaskService returns the underlying TasksClient
Expand Down
2 changes: 1 addition & 1 deletion content_reader.go → content/proxy/content_reader.go
Expand Up @@ -14,7 +14,7 @@
limitations under the License.
*/

package containerd
package proxy

import (
"context"
Expand Down
53 changes: 27 additions & 26 deletions content_store.go → content/proxy/content_store.go
Expand Up @@ -14,7 +14,7 @@
limitations under the License.
*/

package containerd
package proxy

import (
"context"
Expand All @@ -27,19 +27,20 @@ import (
digest "github.com/opencontainers/go-digest"
)

type remoteContent struct {
type proxyContentStore struct {
client contentapi.ContentClient
}

// NewContentStoreFromClient returns a new content store
func NewContentStoreFromClient(client contentapi.ContentClient) content.Store {
return &remoteContent{
// NewContentStore returns a new content store which communicates over a GRPC
// connection using the containerd content GRPC API.
func NewContentStore(client contentapi.ContentClient) content.Store {
return &proxyContentStore{
client: client,
}
}

func (rs *remoteContent) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
resp, err := rs.client.Info(ctx, &contentapi.InfoRequest{
func (pcs *proxyContentStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
resp, err := pcs.client.Info(ctx, &contentapi.InfoRequest{
Digest: dgst,
})
if err != nil {
Expand All @@ -49,8 +50,8 @@ func (rs *remoteContent) Info(ctx context.Context, dgst digest.Digest) (content.
return infoFromGRPC(resp.Info), nil
}

func (rs *remoteContent) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
session, err := rs.client.List(ctx, &contentapi.ListContentRequest{
func (pcs *proxyContentStore) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
session, err := pcs.client.List(ctx, &contentapi.ListContentRequest{
Filters: filters,
})
if err != nil {
Expand All @@ -77,8 +78,8 @@ func (rs *remoteContent) Walk(ctx context.Context, fn content.WalkFunc, filters
return nil
}

func (rs *remoteContent) Delete(ctx context.Context, dgst digest.Digest) error {
if _, err := rs.client.Delete(ctx, &contentapi.DeleteContentRequest{
func (pcs *proxyContentStore) Delete(ctx context.Context, dgst digest.Digest) error {
if _, err := pcs.client.Delete(ctx, &contentapi.DeleteContentRequest{
Digest: dgst,
}); err != nil {
return errdefs.FromGRPC(err)
Expand All @@ -87,8 +88,8 @@ func (rs *remoteContent) Delete(ctx context.Context, dgst digest.Digest) error {
return nil
}

func (rs *remoteContent) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
i, err := rs.Info(ctx, dgst)
func (pcs *proxyContentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
i, err := pcs.Info(ctx, dgst)
if err != nil {
return nil, err
}
Expand All @@ -97,12 +98,12 @@ func (rs *remoteContent) ReaderAt(ctx context.Context, dgst digest.Digest) (cont
ctx: ctx,
digest: dgst,
size: i.Size,
client: rs.client,
client: pcs.client,
}, nil
}

func (rs *remoteContent) Status(ctx context.Context, ref string) (content.Status, error) {
resp, err := rs.client.Status(ctx, &contentapi.StatusRequest{
func (pcs *proxyContentStore) Status(ctx context.Context, ref string) (content.Status, error) {
resp, err := pcs.client.Status(ctx, &contentapi.StatusRequest{
Ref: ref,
})
if err != nil {
Expand All @@ -120,8 +121,8 @@ func (rs *remoteContent) Status(ctx context.Context, ref string) (content.Status
}, nil
}

func (rs *remoteContent) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
resp, err := rs.client.Update(ctx, &contentapi.UpdateRequest{
func (pcs *proxyContentStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
resp, err := pcs.client.Update(ctx, &contentapi.UpdateRequest{
Info: infoToGRPC(info),
UpdateMask: &protobuftypes.FieldMask{
Paths: fieldpaths,
Expand All @@ -133,8 +134,8 @@ func (rs *remoteContent) Update(ctx context.Context, info content.Info, fieldpat
return infoFromGRPC(resp.Info), nil
}

func (rs *remoteContent) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) {
resp, err := rs.client.ListStatuses(ctx, &contentapi.ListStatusesRequest{
func (pcs *proxyContentStore) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) {
resp, err := pcs.client.ListStatuses(ctx, &contentapi.ListStatusesRequest{
Filters: filters,
})
if err != nil {
Expand All @@ -156,8 +157,8 @@ func (rs *remoteContent) ListStatuses(ctx context.Context, filters ...string) ([
return statuses, nil
}

func (rs *remoteContent) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
wrclient, offset, err := rs.negotiate(ctx, ref, size, expected)
func (pcs *proxyContentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
wrclient, offset, err := pcs.negotiate(ctx, ref, size, expected)
if err != nil {
return nil, errdefs.FromGRPC(err)
}
Expand All @@ -170,8 +171,8 @@ func (rs *remoteContent) Writer(ctx context.Context, ref string, size int64, exp
}

// Abort implements asynchronous abort. It starts a new write session on the ref l
func (rs *remoteContent) Abort(ctx context.Context, ref string) error {
if _, err := rs.client.Abort(ctx, &contentapi.AbortRequest{
func (pcs *proxyContentStore) Abort(ctx context.Context, ref string) error {
if _, err := pcs.client.Abort(ctx, &contentapi.AbortRequest{
Ref: ref,
}); err != nil {
return errdefs.FromGRPC(err)
Expand All @@ -180,8 +181,8 @@ func (rs *remoteContent) Abort(ctx context.Context, ref string) error {
return nil
}

func (rs *remoteContent) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.Content_WriteClient, int64, error) {
wrclient, err := rs.client.Write(ctx)
func (pcs *proxyContentStore) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.Content_WriteClient, int64, error) {
wrclient, err := pcs.client.Write(ctx)
if err != nil {
return nil, 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion content_writer.go → content/proxy/content_writer.go
Expand Up @@ -14,7 +14,7 @@
limitations under the License.
*/

package containerd
package proxy

import (
"context"
Expand Down
68 changes: 34 additions & 34 deletions snapshot.go → snapshots/proxy/proxy.go
Expand Up @@ -14,7 +14,7 @@
limitations under the License.
*/

package containerd
package proxy

import (
"context"
Expand All @@ -28,24 +28,24 @@ import (
protobuftypes "github.com/gogo/protobuf/types"
)

// NewSnapshotterFromClient returns a new Snapshotter which communicates
// over a GRPC connection.
func NewSnapshotterFromClient(client snapshotsapi.SnapshotsClient, snapshotterName string) snapshots.Snapshotter {
return &remoteSnapshotter{
// NewSnapshotter returns a new Snapshotter which communicates over a GRPC
// connection using the containerd snapshot GRPC API.
func NewSnapshotter(client snapshotsapi.SnapshotsClient, snapshotterName string) snapshots.Snapshotter {
return &proxySnapshotter{
client: client,
snapshotterName: snapshotterName,
}
}

type remoteSnapshotter struct {
type proxySnapshotter struct {
client snapshotsapi.SnapshotsClient
snapshotterName string
}

func (r *remoteSnapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) {
resp, err := r.client.Stat(ctx,
func (p *proxySnapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) {
resp, err := p.client.Stat(ctx,
&snapshotsapi.StatSnapshotRequest{
Snapshotter: r.snapshotterName,
Snapshotter: p.snapshotterName,
Key: key,
})
if err != nil {
Expand All @@ -54,10 +54,10 @@ func (r *remoteSnapshotter) Stat(ctx context.Context, key string) (snapshots.Inf
return toInfo(resp.Info), nil
}

func (r *remoteSnapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) {
resp, err := r.client.Update(ctx,
func (p *proxySnapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) {
resp, err := p.client.Update(ctx,
&snapshotsapi.UpdateSnapshotRequest{
Snapshotter: r.snapshotterName,
Snapshotter: p.snapshotterName,
Info: fromInfo(info),
UpdateMask: &protobuftypes.FieldMask{
Paths: fieldpaths,
Expand All @@ -69,9 +69,9 @@ func (r *remoteSnapshotter) Update(ctx context.Context, info snapshots.Info, fie
return toInfo(resp.Info), nil
}

func (r *remoteSnapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) {
resp, err := r.client.Usage(ctx, &snapshotsapi.UsageRequest{
Snapshotter: r.snapshotterName,
func (p *proxySnapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) {
resp, err := p.client.Usage(ctx, &snapshotsapi.UsageRequest{
Snapshotter: p.snapshotterName,
Key: key,
})
if err != nil {
Expand All @@ -80,9 +80,9 @@ func (r *remoteSnapshotter) Usage(ctx context.Context, key string) (snapshots.Us
return toUsage(resp), nil
}

func (r *remoteSnapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, error) {
resp, err := r.client.Mounts(ctx, &snapshotsapi.MountsRequest{
Snapshotter: r.snapshotterName,
func (p *proxySnapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, error) {
resp, err := p.client.Mounts(ctx, &snapshotsapi.MountsRequest{
Snapshotter: p.snapshotterName,
Key: key,
})
if err != nil {
Expand All @@ -91,15 +91,15 @@ func (r *remoteSnapshotter) Mounts(ctx context.Context, key string) ([]mount.Mou
return toMounts(resp.Mounts), nil
}

func (r *remoteSnapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
func (p *proxySnapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
var local snapshots.Info
for _, opt := range opts {
if err := opt(&local); err != nil {
return nil, err
}
}
resp, err := r.client.Prepare(ctx, &snapshotsapi.PrepareSnapshotRequest{
Snapshotter: r.snapshotterName,
resp, err := p.client.Prepare(ctx, &snapshotsapi.PrepareSnapshotRequest{
Snapshotter: p.snapshotterName,
Key: key,
Parent: parent,
Labels: local.Labels,
Expand All @@ -110,15 +110,15 @@ func (r *remoteSnapshotter) Prepare(ctx context.Context, key, parent string, opt
return toMounts(resp.Mounts), nil
}

func (r *remoteSnapshotter) View(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
func (p *proxySnapshotter) View(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
var local snapshots.Info
for _, opt := range opts {
if err := opt(&local); err != nil {
return nil, err
}
}
resp, err := r.client.View(ctx, &snapshotsapi.ViewSnapshotRequest{
Snapshotter: r.snapshotterName,
resp, err := p.client.View(ctx, &snapshotsapi.ViewSnapshotRequest{
Snapshotter: p.snapshotterName,
Key: key,
Parent: parent,
Labels: local.Labels,
Expand All @@ -129,33 +129,33 @@ func (r *remoteSnapshotter) View(ctx context.Context, key, parent string, opts .
return toMounts(resp.Mounts), nil
}

func (r *remoteSnapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
func (p *proxySnapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
var local snapshots.Info
for _, opt := range opts {
if err := opt(&local); err != nil {
return err
}
}
_, err := r.client.Commit(ctx, &snapshotsapi.CommitSnapshotRequest{
Snapshotter: r.snapshotterName,
_, err := p.client.Commit(ctx, &snapshotsapi.CommitSnapshotRequest{
Snapshotter: p.snapshotterName,
Name: name,
Key: key,
Labels: local.Labels,
})
return errdefs.FromGRPC(err)
}

func (r *remoteSnapshotter) Remove(ctx context.Context, key string) error {
_, err := r.client.Remove(ctx, &snapshotsapi.RemoveSnapshotRequest{
Snapshotter: r.snapshotterName,
func (p *proxySnapshotter) Remove(ctx context.Context, key string) error {
_, err := p.client.Remove(ctx, &snapshotsapi.RemoveSnapshotRequest{
Snapshotter: p.snapshotterName,
Key: key,
})
return errdefs.FromGRPC(err)
}

func (r *remoteSnapshotter) Walk(ctx context.Context, fn func(context.Context, snapshots.Info) error) error {
sc, err := r.client.List(ctx, &snapshotsapi.ListSnapshotsRequest{
Snapshotter: r.snapshotterName,
func (p *proxySnapshotter) Walk(ctx context.Context, fn func(context.Context, snapshots.Info) error) error {
sc, err := p.client.List(ctx, &snapshotsapi.ListSnapshotsRequest{
Snapshotter: p.snapshotterName,
})
if err != nil {
return errdefs.FromGRPC(err)
Expand All @@ -179,7 +179,7 @@ func (r *remoteSnapshotter) Walk(ctx context.Context, fn func(context.Context, s
}
}

func (r *remoteSnapshotter) Close() error {
func (p *proxySnapshotter) Close() error {
return nil
}

Expand Down