Skip to content

Commit

Permalink
pushWriter: refactor reset pipe logic into separate function
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Chadwell <me@jedevc.com>
  • Loading branch information
jedevc committed Mar 4, 2024
1 parent 2577207 commit 0465472
Showing 1 changed file with 25 additions and 40 deletions.
65 changes: 25 additions & 40 deletions remotes/docker/pusher.go
Expand Up @@ -384,6 +384,27 @@ func (pw *pushWriter) setResponse(resp *http.Response) {
}
}

func (pw *pushWriter) replacePipe(p *io.PipeWriter) error {
if pw.pipe == nil {
pw.pipe = p
return nil
}

pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p

// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return content.ErrReset
}

func (pw *pushWriter) Write(p []byte) (n int, err error) {
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
Expand All @@ -395,26 +416,14 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
case <-pw.done:
return 0, io.ErrClosedPipe
case p := <-pw.pipeC:
pw.pipe = p
pw.replacePipe(p)
}
} else {
select {
case <-pw.done:
return 0, io.ErrClosedPipe
case p := <-pw.pipeC:
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p

// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return 0, err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return 0, content.ErrReset
return 0, pw.replacePipe(p)
default:
}
}
Expand All @@ -427,19 +436,7 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
case <-pw.done:
case err = <-pw.errC:
case p := <-pw.pipeC:
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p

// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return 0, err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return 0, content.ErrReset
return 0, pw.replacePipe(p)
}
}
status.Offset += int64(n)
Expand Down Expand Up @@ -502,19 +499,7 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
// check whether the pipe has changed in the commit, because sometimes Write
// can complete successfully, but the pipe may have changed. In that case, the
// content needs to be reset.
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p

// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return content.ErrReset
return pw.replacePipe(p)
}

// 201 is specified return status, some registries return
Expand Down

0 comments on commit 0465472

Please sign in to comment.