Skip to content

Transfer service #7320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Dec 1, 2022
Merged

Transfer service #7320

merged 34 commits into from
Dec 1, 2022

Conversation

dmcgowan
Copy link
Member

@dmcgowan dmcgowan commented Aug 25, 2022

Adds transfer service and associated streaming service

See #7592

Initial version is ready for review, for requested changes which do not change the API, follow ups will be tracked in the transfer service issue

@k8s-ci-robot
Copy link

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@dmcgowan dmcgowan force-pushed the transfer-service branch 3 times, most recently from 4abab3b to fca4bd4 Compare August 25, 2022 05:59
@dmcgowan dmcgowan added this to the 1.7 milestone Aug 25, 2022
Copy link
Member

@akhilerm akhilerm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a few questions/comments on the new services.

rpc Stream(stream google.protobuf.Any) returns (stream google.protobuf.Any);
}

message StreamInit {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of StreamInit message? Could find only one usage in the below code. A comment here explaining it would be good.

newbie question: If we use a message for stream init, shouldnt we need a stream terminate also? Or is it somehow handled internally?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is a message sent on init to notify id


// Content filters

repeated string platforms = 3; // Does this need a separate type? MatchComparer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cant we have the platforms as a separate message type of OS/Arch/Variant.? Since that is how we denote the platforms everywhere else in containerd, I think that would be more suitable here.


// Unpack Configuration

repeated string unpack_platforms = 6;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the difference between platforms and unpack_platforms?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should probably just change unpack_platforms into a separate type. What it is really indicating is different unpack configurations which may be done in parallel. Normally this would be keyed on platform and snapshotter would be inferred from the platform. The client may want control over that to target the platform and the snapshotter.

@@ -0,0 +1,29 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file name needs to be changed as we have both import and export messages being defined here.


option go_package = "github.com/containerd/containerd/api/types/transfer";

message Data {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these the messages that will be used by api/streaming service?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to document this a little more, the streaming API is only responsible for managing and identifying the stream. The protocol of the stream, including the messages sent across the stream are determined by APIs which use them. Inside the API messages, there is just a stream ID string communicated, but the context of where it is used determines the protocol. At the very least the protocol should be defined or mentioned next to where the stream ID message is in the proto files.

docs/transfer.md Outdated
@@ -0,0 +1,32 @@
# Transfer Service

The transfer service is a simple flexible service which can be used to transfer artifact objects between a source and destination. The service determines whether the transfer between the source and destination is possible rather than the API.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The service determines whether the transfer between the source and destination is possible rather than the API.

Can we have a little more detailed explanation of what is really meant here?

docs/transfer.md Outdated
| Image Store | Object stream (Archive) | "export" |
| Object stream (Layer) | Mount/Snapshot | "unpack" |
| Mount/Snapshot | Object stream (Layer) | "diff" |
| Image Store | Image Store | "tag" |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does retagging an image involve using the transfer service?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could, today tagging is two API calls, Get and Create. The point is highlighting the flexibility. We will never have a Tag API call, but you could do the equivalent using the image API or also the transfer API. The transfer API could be more useful if the image stores have separate content backends or in the case where one of the image stores was actually inside a shim or some other sandboxed environment.

docs/transfer.md Outdated
| Object stream (Layer) | Mount/Snapshot | "unpack" |
| Mount/Snapshot | Object stream (Layer) | "diff" |
| Image Store | Image Store | "tag" |
| Registry | Registry | mirror registry image |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When mirroring registry images, will it be a 2 step process that is abstracted

registry1 -> image store
image store -> registry2

or a direct registry -> registry transfer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that the transfer interface supports it, how it is implemented is up to the transfer plugin which implements it. The caller would just give two registry arguments though

@@ -66,6 +73,10 @@ command. As part of this process, we do the following:
Name: "max-concurrent-downloads",
Usage: "Set the max concurrent downloads for each pull",
},
cli.BoolFlag{
Name: "local",
Usage: "Print the resulting image's chain ID",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a typo??

docs/transfer.md Outdated

The transfer service is a simple flexible service which can be used to transfer artifact objects between a source and destination. The flexible API allows each implementation of the transfer interface to determines whether the transfer between the source and destination is possible. This allows new functionality to be added directly by implementations without versioning the API or requiring other implementations to handle an interface change.

The transfer service if built upon the core ideas put forth by the libchan project, that an API with binary streams and data channels as first class objects is more flexible and opens a wider variety of use cases without requiring constant protocol and API updates. To accomplish this, the transfer service makes use of the streaming service to allow binary and object streams to be accessible by transfer objects even when using grpc and ttrpc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The transfer service if built upon the core ideas put forth by the libchan project, that an API with binary streams and data channels as first class objects is more flexible and opens a wider variety of use cases without requiring constant protocol and API updates. To accomplish this, the transfer service makes use of the streaming service to allow binary and object streams to be accessible by transfer objects even when using grpc and ttrpc.
The transfer service is built upon the core ideas put forth by the libchan project, that an API with binary streams and data channels as first class objects is more flexible and opens a wider variety of use cases without requiring constant protocol and API updates. To accomplish this, the transfer service makes use of the streaming service to allow binary and object streams to be accessible by transfer objects even when using grpc and ttrpc.

(very minor typo 👀)

Copy link
Member

@cpuguy83 cpuguy83 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks great, only comments are on fit/finish sort of things.
I'd be for bringing this in sooner rather than later so we can iterate on it more easily and start integrating with cri.

transfer.go Outdated
)

func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error {
return proxy.NewTransferer(transferapi.NewTransferClient(c.conn), c.streamCreator()).Transfer(ctx, src, dest, opts...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the transfer service something that can be customized in the client like other services? e.g. containerd.New("", containerd.WithServices(containerd.WithTransferService(...))), and a way to get at just the transfer service (client.TransferService())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we likely need the same for the stream service?

@anakrish
Copy link

anakrish commented Oct 4, 2022

@dmcgowan In our use-case, we need to customize image management for certain pods, whereas use containerd's default implementation for all other pods. To this end, currently we've written our own snapshotter leveraging the "Remote Snapshotter" contract/interface. We still need #6899 so that we can specify a different snapshotter per runtime, to complete our implementation.

In the future, do you think our requirements could be better implemented as a Transfer Service? Can we specify a separate service per pod/runtime?

@dmcgowan dmcgowan marked this pull request as ready for review October 7, 2022 20:47
@wllenyj
Copy link
Contributor

wllenyj commented Oct 13, 2022

In the future, do you think our requirements could be better implemented as a Transfer Service? Can we specify a separate service per pod/runtime?

Confidential computing has the same needs. In the future, can we use Transfer Service to support image pulling inside sandbox? And is it possible to apply to #5742? Or is this just the beginning of the next step?
This pr refactored the Images function using Transfer Service.
Sorry, my question is too much.

@estesp
Copy link
Member

estesp commented Oct 19, 2022

Needs rebase due to merge conflict

@dmcgowan dmcgowan mentioned this pull request Oct 27, 2022
10 tasks
images images.Store

// semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
limiter *semaphore.Weighted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making this overridable on per Transfer basic (with transfer.Opt)?

This would allow API client to choose a desired limit scope, for example:

  • Single operation - for example in one pull operation I user may not want to have more than 3 layers being downloaded concurrently. To achieve this, API client would pass a fresh semaphore.Weighted for each transfer. This is would be the equivalent of how current containerd's WithMaxConcurrentDownloads and WithMaxConcurrentUploadedLayers behave.

  • Global - user may want the concurrent download/uploads limits to be shared among all operations (within Transfer service) and possibly with other components in my code. API consumer would pass a semaphore.Weighted shared with other components. For instance this is what Moby engine would do if used this service in containerd integration to support max-concurrent-downloads and max-concurrent-uploads daemon configuration options.

This also allows more specific limits to be applied, for example the API user might dynamically decide the limits, or whether he wants them at all, depending on other conditions like for example if the repository is remote or local.

@AkihiroSuda
Copy link
Member

Is this experimental?
If so, that status should be documented somewhere (RELEASES.md?)

tops.Progress(transfer.Progress{
Event: "saved",
Name: img.Name,
//Digest: img.Target.Digest.String(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Event: j.transferState,
Name: job.name,
Parents: job.parents,
//Digest: job.desc.Digest.String(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

// Higher level implementation just takes strings and options
// Lower level implementation takes pusher/fetcher?

*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably removable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, can remove these now and refer to the docs

@@ -78,8 +78,12 @@ const (
EventPlugin Type = "io.containerd.event.v1"
// LeasePlugin implements lease manager
LeasePlugin Type = "io.containerd.lease.v1"
// Streaming implements a stream manager
StreamingPlugin Type = "io.containerd.streaming.v1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause confusion with stream processors?
I don't come up with a better name though 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you are deep enough into containerd's architecture to be using stream processors it won't be too hard to understand this difference and find where it is documented. Confusion is hard to avoid but we can always attempt to address it in documentation if it becomes a frequently asked question. Also interesting here is you could potentially use the streaming API to do client side stream processors, so many there is some relationship in the future.


func (ss *serviceStream) Recv() (a typeurl.Any, err error) {
a, err = ss.s.Recv()
if err != io.EOF {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errors.Is can be used here?

Copy link
Member

@fuweid fuweid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM basically and leave some comments.

@estesp
Copy link
Member

estesp commented Nov 16, 2022

I'm good with getting this in and making improvements as needed in follow-ups; looks like it needs one last rebase, though.

Signed-off-by: Derek McGowan <derek@mcg.dev>
Disable using transfer service by default for now

Signed-off-by: Derek McGowan <derek@mcg.dev>
Signed-off-by: Derek McGowan <derek@mcg.dev>
Signed-off-by: Derek McGowan <derek@mcg.dev>
Signed-off-by: Derek McGowan <derek@mcg.dev>
Signed-off-by: Derek McGowan <derek@mcg.dev>
Signed-off-by: Derek McGowan <derek@mcg.dev>
Signed-off-by: Derek McGowan <derek@mcg.dev>
Copy link
Member

@mikebrow mikebrow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
agree with Phil this is good to merge and iterate on..

Nice!

Signed-off-by: Derek McGowan <derek@mcg.dev>
Signed-off-by: Derek McGowan <derek@mcg.dev>
@mikebrow
Copy link
Member

mikebrow commented Dec 1, 2022

/ok-to-test

Copy link
Member

@estesp estesp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Archived in project
Archived in project
Development

Successfully merging this pull request may close these issues.