Skip to content

Commit

Permalink
Use named pipes for windows processors
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
  • Loading branch information
crosbymichael committed Aug 7, 2019
1 parent 134d3c8 commit e1489f9
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 91 deletions.
91 changes: 0 additions & 91 deletions diff/stream.go
Expand Up @@ -17,16 +17,12 @@
package diff

import (
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"

"github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/images"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
Expand Down Expand Up @@ -189,90 +185,3 @@ func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string
}

const mediaTypeEnvVar = "STREAM_PROCESSOR_MEDIATYPE"

// NewBinaryProcessor returns a binary processor for use with processing content streams
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
cmd := exec.CommandContext(ctx, name, args...)

var payloadC io.Closer
if payload != nil {
data, err := proto.Marshal(payload)
if err != nil {
return nil, err
}
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
go func() {
io.Copy(w, bytes.NewReader(data))
w.Close()
}()

cmd.ExtraFiles = append(cmd.ExtraFiles, r)
payloadC = r
}
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt))
var (
stdin io.Reader
closer func() error
err error
)
if f, ok := stream.(RawProcessor); ok {
stdin = f.File()
closer = f.File().Close
} else {
stdin = stream
}
cmd.Stdin = stdin
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
cmd.Stdout = w

if err := cmd.Start(); err != nil {
return nil, err
}
go cmd.Wait()

// close after start and dup
w.Close()
if closer != nil {
closer()
}
if payloadC != nil {
payloadC.Close()
}
return &binaryProcessor{
cmd: cmd,
r: r,
mt: rmt,
}, nil
}

type binaryProcessor struct {
cmd *exec.Cmd
r *os.File
mt string
}

func (c *binaryProcessor) File() *os.File {
return c.r
}

func (c *binaryProcessor) MediaType() string {
return c.mt
}

func (c *binaryProcessor) Read(p []byte) (int, error) {
return c.r.Read(p)
}

func (c *binaryProcessor) Close() error {
err := c.r.Close()
if kerr := c.cmd.Process.Kill(); err == nil {
err = kerr
}
return err
}
118 changes: 118 additions & 0 deletions diff/stream_unix.go
@@ -0,0 +1,118 @@
// +build !windows

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package diff

import (
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"

"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
)

// NewBinaryProcessor returns a binary processor for use with processing content streams
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
cmd := exec.CommandContext(ctx, name, args...)

var payloadC io.Closer
if payload != nil {
data, err := proto.Marshal(payload)
if err != nil {
return nil, err
}
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
go func() {
io.Copy(w, bytes.NewReader(data))
w.Close()
}()

cmd.ExtraFiles = append(cmd.ExtraFiles, r)
payloadC = r
}
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt))
var (
stdin io.Reader
closer func() error
err error
)
if f, ok := stream.(RawProcessor); ok {
stdin = f.File()
closer = f.File().Close
} else {
stdin = stream
}
cmd.Stdin = stdin
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
cmd.Stdout = w

if err := cmd.Start(); err != nil {
return nil, err
}
go cmd.Wait()

// close after start and dup
w.Close()
if closer != nil {
closer()
}
if payloadC != nil {
payloadC.Close()
}
return &binaryProcessor{
cmd: cmd,
r: r,
mt: rmt,
}, nil
}

type binaryProcessor struct {
cmd *exec.Cmd
r *os.File
mt string
}

func (c *binaryProcessor) File() *os.File {
return c.r
}

func (c *binaryProcessor) MediaType() string {
return c.mt
}

func (c *binaryProcessor) Read(p []byte) (int, error) {
return c.r.Read(p)
}

func (c *binaryProcessor) Close() error {
err := c.r.Close()
if kerr := c.cmd.Process.Kill(); err == nil {
err = kerr
}
return err
}
138 changes: 138 additions & 0 deletions diff/stream_windows.go
@@ -0,0 +1,138 @@
// +build windows

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package diff

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"

winio "github.com/Microsoft/go-winio"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/sirupsen/logrus"
)

const processorPipe = "STREAM_PROCESSOR_PIPE"

// NewBinaryProcessor returns a binary processor for use with processing content streams
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
cmd := exec.CommandContext(ctx, name, args...)

if payload != nil {
data, err := proto.Marshal(payload)
if err != nil {
return nil, err
}
up, err := getUiqPath()
if err != nil {
return nil, err
}
path := fmt.Sprintf("\\\\.\\pipe\\containerd-processor-%s-pipe", up)
l, err := winio.ListenPipe(path, nil)
if err != nil {
return nil, err
}
go func() {
defer l.Close()
conn, err := l.Accept()
if err != nil {
logrus.WithError(err).Error("accept npipe connection")
return
}
io.Copy(conn, bytes.NewReader(data))
conn.Close()
}()
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", processorPipe, path))
}
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt))
var (
stdin io.Reader
closer func() error
err error
)
if f, ok := stream.(RawProcessor); ok {
stdin = f.File()
closer = f.File().Close
} else {
stdin = stream
}
cmd.Stdin = stdin
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
cmd.Stdout = w

if err := cmd.Start(); err != nil {
return nil, err
}
go cmd.Wait()

// close after start and dup
w.Close()
if closer != nil {
closer()
}
return &binaryProcessor{
cmd: cmd,
r: r,
mt: rmt,
}, nil
}

type binaryProcessor struct {
cmd *exec.Cmd
r *os.File
mt string
}

func (c *binaryProcessor) File() *os.File {
return c.r
}

func (c *binaryProcessor) MediaType() string {
return c.mt
}

func (c *binaryProcessor) Read(p []byte) (int, error) {
return c.r.Read(p)
}

func (c *binaryProcessor) Close() error {
err := c.r.Close()
if kerr := c.cmd.Process.Kill(); err == nil {
err = kerr
}
return err
}

func getUiqPath() (string, error) {
dir, err := ioutil.TempDir("", "")
if err != nil {
return "", err
}
os.Remove(dir)
return filepath.Base(dir), nil
}

0 comments on commit e1489f9

Please sign in to comment.