Skip to content

Commit

Permalink
Add pull command and client implementation
Browse files Browse the repository at this point in the history
 * Implement client.Pull().
 * Implement `pebble pull` command.
 * Patch `pebble push` command to correctly display byte units after transfer.
 * Patch server-side files API to include Content-Length on multipart payload.
   This is required for the pull command to determine transfer progress.
  • Loading branch information
anpep committed Sep 6, 2022
1 parent b49f048 commit 2c4f24a
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 3 deletions.
112 changes: 112 additions & 0 deletions client/files.go
Expand Up @@ -16,12 +16,18 @@ package client

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"mime"
"mime/multipart"
"net/http"
"net/textproto"
"net/url"
"strconv"
"strings"
"time"
)

// PushOptions contains the options for a call to Push.
Expand Down Expand Up @@ -64,6 +70,21 @@ type PushOptions struct {
Group string
}

// PullOptions contains the options for a call to Pull.
type PullOptions struct {
// Path indicates the absolute path of the file in the remote system
// (required).
Path string
}

// PullResult contains information about the result of a call to Pull.
type PullResult struct {
// Reader is an io.ReadCloser for the file retrieved from the remote system.
Reader io.ReadCloser
// Size is the length in bytes of the file retrieved from the remote system.
Size int64
}

type writeFilesPayload struct {
Action string `json:"action"`
Files []writeFilesItem `json:"files"`
Expand Down Expand Up @@ -161,3 +182,94 @@ func (client *Client) Push(opts *PushOptions) error {

return nil
}

// Pull retrieves a file from the remote system.
func (client *Client) Pull(opts *PullOptions) (*PullResult, error) {
query := url.Values{
"action": {"read"},
"path": {opts.Path},
}
headers := map[string]string{
"Accept": "multipart/form-data",
}

retry := time.NewTicker(doRetry)
defer retry.Stop()
timeout := time.After(doTimeout)
var rsp *http.Response
var err error
for {
rsp, err = client.raw(context.Background(), "GET", "/v1/files", query, headers, nil)
if err == nil {
break
}
select {
case <-retry.C:
continue
case <-timeout:
}
break
}
if err != nil {
return nil, err
}

// Obtain Content-Type to check for a multipart payload and parse its value
// in order to obtain the multipart boundary
contentType := rsp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
return nil, err
}

if mediaType != "multipart/form-data" {
// Probably JSON-encoded error response
var res response
var fr []fileResult

if err := decodeInto(rsp.Body, &res); err != nil {
return nil, err
}
if err := res.err(client); err != nil {
return nil, err
}
if res.Type != "sync" {
return nil, fmt.Errorf("expected sync response, got %q", res.Type)
}
if err := decodeWithNumber(bytes.NewReader(res.Result), &fr); err != nil {
return nil, fmt.Errorf("cannot unmarshal: %w", err)
}

if len(fr) != 1 {
return nil, fmt.Errorf("expected exactly one result from API, got %d", len(fr))
}

if fr[0].Error != nil {
return nil, &Error{
Kind: fr[0].Error.Kind,
Value: fr[0].Error.Value,
Message: fr[0].Error.Message,
}
}

// Not an error response after all
return nil, fmt.Errorf("expected a multipart response but didn't get one")
}

// Obtain the file from the multipart payload
mr := multipart.NewReader(rsp.Body, params["boundary"])
part, err := mr.NextPart()
if err != nil {
return nil, err
}
// Obtain the file size from the Content-Length header
size, err := strconv.ParseInt(part.Header.Get("Content-Length"), 10, 64)
if err != nil {
return nil, err
}

return &PullResult{
Reader: part,
Size: size,
}, nil
}
2 changes: 1 addition & 1 deletion cmd/pebble/cmd_help.go
Expand Up @@ -180,7 +180,7 @@ var helpCategories = []helpCategory{{
}, {
Label: "Files",
Description: "work with files and execute commands",
Commands: []string{"push", "exec"},
Commands: []string{"push", "pull", "exec"},
}, {
Label: "Changes",
Description: "manage changes and their tasks",
Expand Down
125 changes: 125 additions & 0 deletions cmd/pebble/cmd_pull.go
@@ -0,0 +1,125 @@
// Copyright (c) 2022 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package main

import (
"fmt"
"io"
"os"
"path/filepath"
"time"

"github.com/jessevdk/go-flags"

"github.com/canonical/pebble/client"
"github.com/canonical/pebble/internal/progress"
"github.com/canonical/pebble/internal/strutil/quantity"
)

type cmdPull struct {
clientMixin

Positional struct {
RemotePath string `positional-arg-name:"<remote-path>" required:"1"`
LocalPath string `positional-arg-name:"<local-path>" required:"1"`
} `positional-args:"yes"`
}

var shortPullHelp = "Retrieve a file from the remote system"
var longPullHelp = `
The pull command retrieves a file from the remote system.
`

type pullProgress struct {
file *os.File
size int64
current int64
pb progress.Meter
started time.Time
msg string
}

func (p *pullProgress) Write(b []byte) (n int, err error) {
n, err = p.file.Write(b)
if err != nil {
return
}

p.current += int64(n)
p.pb.Set(float64(p.current))

if p.current == p.size {
p.pb.Finished()

size := quantity.FormatAmount(uint64(p.size), 0)
duration := quantity.FormatDuration(time.Since(p.started).Seconds())
p.pb.Notify(fmt.Sprintf("Transferred %sB in %s", size, duration))
}

return
}

func newPullProgress(f *os.File, remotePath string) (*pullProgress, error) {
p := &pullProgress{
file: f,
pb: progress.MakeProgressBar(),
started: time.Now(),
}

p.msg = fmt.Sprintf("Transferring %s -> %s", remotePath, filepath.Base(f.Name()))
p.pb.Spin(p.msg)

return p, nil
}

func (p *pullProgress) setSize(size int64) {
p.size = size
p.pb.Start(p.msg, float64(p.size))
}

func (cmd *cmdPull) Execute(args []string) error {
if len(args) > 0 {
return ErrExtraArgs
}

f, err := os.Create(cmd.Positional.LocalPath)
if err != nil {
return err
}
defer f.Close()

p, err := newPullProgress(f, cmd.Positional.RemotePath)
if err != nil {
return err
}

res, err := cmd.client.Pull(&client.PullOptions{
Path: cmd.Positional.RemotePath,
})
if err != nil {
return err
}
p.setSize(res.Size)

if _, err := io.Copy(p, res.Reader); err != nil {
return err
}

return nil
}

func init() {
addCommand("pull", shortPullHelp, longPullHelp, func() flags.Commander { return &cmdPull{} }, nil, nil)
}
2 changes: 1 addition & 1 deletion cmd/pebble/cmd_push.go
Expand Up @@ -79,7 +79,7 @@ func (p *pushProgress) Read(b []byte) (n int, err error) {

size := quantity.FormatAmount(uint64(p.size), 0)
duration := quantity.FormatDuration(time.Since(p.started).Seconds())
p.pb.Notify(fmt.Sprintf("Transferred %s in %s", size, duration))
p.pb.Notify(fmt.Sprintf("Transferred %sB in %s", size, duration))
}

return
Expand Down
9 changes: 8 additions & 1 deletion internal/daemon/api_files.go
Expand Up @@ -149,14 +149,21 @@ func readFile(path string, mw *multipart.Writer) error {
}
defer f.Close()

fw, err := mw.CreateFormFile("files", path)
h := make(textproto.MIMEHeader)
h.Set("Content-Disposition",
fmt.Sprintf(`form-data; name="files"; filename=%q`, path))
h.Set("Content-Type", "application/octet-stream")
h.Set("Content-Length", fmt.Sprint(info.Size()))

fw, err := mw.CreatePart(h)
if err != nil {
return err
}
_, err = io.Copy(fw, f)
if err != nil {
return err
}

return nil
}

Expand Down

0 comments on commit 2c4f24a

Please sign in to comment.