Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watch: batch & de-duplicate file events #10865

Merged
merged 1 commit into from Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions internal/sync/docker_cp.go
Expand Up @@ -71,19 +71,19 @@
if fi.IsDir() {
for i := 1; i <= scale; i++ {
_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
Service: pathMapping.Service,
Service: service.Name,

Check warning on line 74 in internal/sync/docker_cp.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/docker_cp.go#L74

Added line #L74 was not covered by tests
Command: []string{"mkdir", "-p", pathMapping.ContainerPath},
Index: i,
})
if err != nil {
logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err)
logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, service.Name, err)

Check warning on line 79 in internal/sync/docker_cp.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/docker_cp.go#L79

Added line #L79 was not covered by tests
}
}
fmt.Fprintf(d.infoWriter, "%s created\n", pathMapping.ContainerPath)
} else {
err := d.client.Copy(ctx, d.projectName, api.CopyOptions{
Source: pathMapping.HostPath,
Destination: fmt.Sprintf("%s:%s", pathMapping.Service, pathMapping.ContainerPath),
Destination: fmt.Sprintf("%s:%s", service.Name, pathMapping.ContainerPath),

Check warning on line 86 in internal/sync/docker_cp.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/docker_cp.go#L86

Added line #L86 was not covered by tests
})
if err != nil {
return err
Expand All @@ -93,12 +93,12 @@
} else if errors.Is(statErr, fs.ErrNotExist) {
for i := 1; i <= scale; i++ {
_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
Service: pathMapping.Service,
Service: service.Name,

Check warning on line 96 in internal/sync/docker_cp.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/docker_cp.go#L96

Added line #L96 was not covered by tests
Command: []string{"rm", "-rf", pathMapping.ContainerPath},
Index: i,
})
if err != nil {
logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err)
logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, service.Name, err)

Check warning on line 101 in internal/sync/docker_cp.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/docker_cp.go#L101

Added line #L101 was not covered by tests
}
}
fmt.Fprintf(d.infoWriter, "%s deleted from service\n", pathMapping.ContainerPath)
Expand Down
2 changes: 0 additions & 2 deletions internal/sync/shared.go
Expand Up @@ -22,8 +22,6 @@ import (

// PathMapping contains the Compose service and modified host system path.
type PathMapping struct {
// Service that the file event is for.
Service string
// HostPath that was created/modified/deleted outside the container.
//
// This is the path as seen from the user's perspective, e.g.
Expand Down
5 changes: 1 addition & 4 deletions internal/sync/tar.go
Expand Up @@ -57,133 +57,130 @@

var _ Syncer = &Tar{}

func NewTar(projectName string, client LowLevelClient) *Tar {
return &Tar{
projectName: projectName,
client: client,
}

Check warning on line 64 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L60-L64

Added lines #L60 - L64 were not covered by tests
}

func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error {
containers, err := t.client.ContainersForService(ctx, t.projectName, service.Name)
if err != nil {
return err
}

Check warning on line 71 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L67-L71

Added lines #L67 - L71 were not covered by tests

var pathsToCopy []PathMapping
var pathsToDelete []string
for _, p := range paths {
if _, err := os.Stat(p.HostPath); err != nil && errors.Is(err, fs.ErrNotExist) {
pathsToDelete = append(pathsToDelete, p.ContainerPath)
} else {
pathsToCopy = append(pathsToCopy, p)
}

Check warning on line 80 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L73-L80

Added lines #L73 - L80 were not covered by tests
}

// TODO: this can't be read from multiple times
tarReader := tarArchive(pathsToCopy)

var deleteCmd []string
if len(pathsToDelete) != 0 {
deleteCmd = append([]string{"rm", "-rf"}, pathsToDelete...)
}
copyCmd := []string{"tar", "-v", "-C", "/", "-x", "-f", "-"}

var eg multierror.Group
for i := range containers {
containerID := containers[i].ID
eg.Go(func() error {
if len(deleteCmd) != 0 {
if err := t.client.Exec(ctx, containerID, deleteCmd, nil); err != nil {
return fmt.Errorf("deleting paths in %s: %w", containerID, err)
}

Check warning on line 99 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L84-L99

Added lines #L84 - L99 were not covered by tests
}
if err := t.client.Exec(ctx, containerID, copyCmd, tarReader); err != nil {
return fmt.Errorf("copying files to %s: %w", containerID, err)
}
return nil

Check warning on line 104 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L101-L104

Added lines #L101 - L104 were not covered by tests
})
}
return eg.Wait().ErrorOrNil()

Check warning on line 107 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L107

Added line #L107 was not covered by tests
}

type ArchiveBuilder struct {
tw *tar.Writer
paths []string // local paths archived

tw *tar.Writer
// A shared I/O buffer to help with file copying.
copyBuf *bytes.Buffer
}

func NewArchiveBuilder(writer io.Writer) *ArchiveBuilder {
tw := tar.NewWriter(writer)
return &ArchiveBuilder{
tw: tw,
copyBuf: &bytes.Buffer{},
}

Check warning on line 121 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L116-L121

Added lines #L116 - L121 were not covered by tests
}

func (a *ArchiveBuilder) Close() error {
return a.tw.Close()

Check warning on line 125 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L124-L125

Added lines #L124 - L125 were not covered by tests
}

// ArchivePathsIfExist creates a tar archive of all local files in `paths`. It quietly skips any paths that don't exist.
func (a *ArchiveBuilder) ArchivePathsIfExist(paths []PathMapping) error {
// In order to handle overlapping syncs, we
// 1) collect all the entries,
// 2) de-dupe them, with last-one-wins semantics
// 3) write all the entries
//
// It's not obvious that this is the correct behavior. A better approach
// (that's more in-line with how syncs work) might ignore files in earlier
// path mappings when we know they're going to be "synced" over.
// There's a bunch of subtle product decisions about how overlapping path
// mappings work that we're not sure about.
var entries []archiveEntry
for _, p := range paths {
newEntries, err := a.entriesForPath(p.HostPath, p.ContainerPath)
if err != nil {
return fmt.Errorf("inspecting %q: %w", p.HostPath, err)
}

Check warning on line 145 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L129-L145

Added lines #L129 - L145 were not covered by tests

entries = append(entries, newEntries...)

Check warning on line 147 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L147

Added line #L147 was not covered by tests
}

entries = dedupeEntries(entries)
for _, entry := range entries {
err := a.writeEntry(entry)
if err != nil {
return fmt.Errorf("archiving %q: %w", entry.path, err)
}

Check warning on line 155 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L150-L155

Added lines #L150 - L155 were not covered by tests
a.paths = append(a.paths, entry.path)
}
return nil

Check warning on line 157 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L157

Added line #L157 was not covered by tests
}

func (a *ArchiveBuilder) writeEntry(entry archiveEntry) error {
pathInTar := entry.path
header := entry.header

if header.Typeflag != tar.TypeReg {
// anything other than a regular file (e.g. dir, symlink) just needs the header
if err := a.tw.WriteHeader(header); err != nil {
return fmt.Errorf("writing %q header: %w", pathInTar, err)
}
return nil

Check warning on line 169 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L160-L169

Added lines #L160 - L169 were not covered by tests
}

file, err := os.Open(pathInTar)
if err != nil {
// In case the file has been deleted since we last looked at it.
if os.IsNotExist(err) {
return nil
}
return err

Check warning on line 178 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L172-L178

Added lines #L172 - L178 were not covered by tests
}

defer func() {
_ = file.Close()
}()

Check warning on line 183 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L181-L183

Added lines #L181 - L183 were not covered by tests

// The size header must match the number of contents bytes.
//
Expand All @@ -198,147 +195,147 @@
// will lead to a spurious error when the tar writer validates the sizes.
// That error will be disruptive but will be handled as best as we
// can downstream.
useBuf := header.Size < 5000000
if useBuf {
a.copyBuf.Reset()
_, err = io.Copy(a.copyBuf, file)
if err != nil && err != io.EOF {
return fmt.Errorf("copying %q: %w", pathInTar, err)
}
header.Size = int64(len(a.copyBuf.Bytes()))

Check warning on line 205 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L198-L205

Added lines #L198 - L205 were not covered by tests
}

// wait to write the header until _after_ the file is successfully opened
// to avoid generating an invalid tar entry that has a header but no contents
// in the case the file has been deleted
err = a.tw.WriteHeader(header)
if err != nil {
return fmt.Errorf("writing %q header: %w", pathInTar, err)
}

Check warning on line 214 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L211-L214

Added lines #L211 - L214 were not covered by tests

if useBuf {
_, err = io.Copy(a.tw, a.copyBuf)
} else {
_, err = io.Copy(a.tw, file)
}

Check warning on line 220 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L216-L220

Added lines #L216 - L220 were not covered by tests

if err != nil && err != io.EOF {
return fmt.Errorf("copying %q: %w", pathInTar, err)
}

Check warning on line 224 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L222-L224

Added lines #L222 - L224 were not covered by tests

// explicitly flush so that if the entry is invalid we will detect it now and
// provide a more meaningful error
if err := a.tw.Flush(); err != nil {
return fmt.Errorf("finalizing %q: %w", pathInTar, err)
}
return nil

Check warning on line 231 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L228-L231

Added lines #L228 - L231 were not covered by tests
}

// tarPath writes the given source path into tarWriter at the given dest (recursively for directories).
// e.g. tarring my_dir --> dest d: d/file_a, d/file_b
// If source path does not exist, quietly skips it and returns no err
func (a *ArchiveBuilder) entriesForPath(localPath, containerPath string) ([]archiveEntry, error) {
localInfo, err := os.Stat(localPath)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err

Check warning on line 243 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L237-L243

Added lines #L237 - L243 were not covered by tests
}

localPathIsDir := localInfo.IsDir()
if localPathIsDir {
// Make sure we can trim this off filenames to get valid relative filepaths
if !strings.HasSuffix(localPath, string(filepath.Separator)) {
localPath += string(filepath.Separator)
}

Check warning on line 251 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L246-L251

Added lines #L246 - L251 were not covered by tests
}

containerPath = strings.TrimPrefix(containerPath, "/")

result := make([]archiveEntry, 0)
err = filepath.Walk(localPath, func(curLocalPath string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("walking %q: %w", curLocalPath, err)
}

Check warning on line 260 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L254-L260

Added lines #L254 - L260 were not covered by tests

linkname := ""
if info.Mode()&os.ModeSymlink != 0 {
var err error
linkname, err = os.Readlink(curLocalPath)
if err != nil {
return err
}

Check warning on line 268 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L262-L268

Added lines #L262 - L268 were not covered by tests
}

var name string
//nolint:gocritic
if localPathIsDir {
// Name of file in tar should be relative to source directory...
tmp, err := filepath.Rel(localPath, curLocalPath)
if err != nil {
return fmt.Errorf("making %q relative to %q: %w", curLocalPath, localPath, err)
}

Check warning on line 278 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L271-L278

Added lines #L271 - L278 were not covered by tests
// ...and live inside `dest`
name = path.Join(containerPath, filepath.ToSlash(tmp))
} else if strings.HasSuffix(containerPath, "/") {
name = containerPath + filepath.Base(curLocalPath)
} else {
name = containerPath
}

Check warning on line 285 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L280-L285

Added lines #L280 - L285 were not covered by tests

header, err := archive.FileInfoHeader(name, info, linkname)
if err != nil {
// Not all types of files are allowed in a tarball. That's OK.
// Mimic the Docker behavior and just skip the file.
return nil
}

Check warning on line 292 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L287-L292

Added lines #L287 - L292 were not covered by tests

result = append(result, archiveEntry{
path: curLocalPath,
info: info,
header: header,
})

return nil

Check warning on line 300 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L294-L300

Added lines #L294 - L300 were not covered by tests
})
if err != nil {
return nil, err
}
return result, nil

Check warning on line 305 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L302-L305

Added lines #L302 - L305 were not covered by tests
}

func tarArchive(ops []PathMapping) io.ReadCloser {
pr, pw := io.Pipe()
go func() {
ab := NewArchiveBuilder(pw)
err := ab.ArchivePathsIfExist(ops)
if err != nil {
_ = pw.CloseWithError(fmt.Errorf("adding files to tar: %w", err))
} else {
// propagate errors from the TarWriter::Close() because it performs a final
// Flush() and any errors mean the tar is invalid
if err := ab.Close(); err != nil {
_ = pw.CloseWithError(fmt.Errorf("closing tar: %w", err))
} else {
_ = pw.Close()
}

Check warning on line 322 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L308-L322

Added lines #L308 - L322 were not covered by tests
}
}()
return pr

Check warning on line 325 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L325

Added line #L325 was not covered by tests
}

// Dedupe the entries with last-entry-wins semantics.
func dedupeEntries(entries []archiveEntry) []archiveEntry {
seenIndex := make(map[string]int, len(entries))
result := make([]archiveEntry, 0, len(entries))
for i, entry := range entries {
seenIndex[entry.header.Name] = i
}
for i, entry := range entries {
if seenIndex[entry.header.Name] == i {
result = append(result, entry)
}

Check warning on line 338 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L329-L338

Added lines #L329 - L338 were not covered by tests
}
return result

Check warning on line 340 in internal/sync/tar.go

View check run for this annotation

Codecov / codecov/patch

internal/sync/tar.go#L340

Added line #L340 was not covered by tests
}
4 changes: 4 additions & 0 deletions pkg/compose/compose.go
Expand Up @@ -26,6 +26,8 @@ import (
"strings"
"sync"

"github.com/jonboulle/clockwork"

"github.com/docker/docker/api/types/volume"

"github.com/compose-spec/compose-go/types"
Expand Down Expand Up @@ -58,13 +60,15 @@ func init() {
func NewComposeService(dockerCli command.Cli) api.Service {
return &composeService{
dockerCli: dockerCli,
clock: clockwork.NewRealClock(),
maxConcurrency: -1,
dryRun: false,
}
}

type composeService struct {
dockerCli command.Cli
clock clockwork.Clock
maxConcurrency int
dryRun bool
}
Expand Down