Skip to content

Commit

Permalink
Merge pull request #220 from danielgtaylor/bulk-improvements
Browse files Browse the repository at this point in the history
feat: bulk command resiliency improvements
  • Loading branch information
danielgtaylor committed Oct 3, 2023
2 parents c6debe7 + aeaab4a commit 2c85909
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 25 deletions.
12 changes: 9 additions & 3 deletions bulk/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,19 @@ func diff(originalPath, modifiedPath string, original, modified []byte) {
var err error

if len(original) > 0 {
json.Unmarshal(original, &parsedOrig)
if err := json.Unmarshal(original, &parsedOrig); err != nil {
cli.LogWarning("Unable to parse %s: %s", originalPath, err)
return
}
original, err = cli.MarshalShort("json", true, parsedOrig)
panicOnErr(err)
}

if len(modified) > 0 {
json.Unmarshal(modified, &parsedMod)
if err := json.Unmarshal(modified, &parsedMod); err != nil {
cli.LogWarning("Unable to parse %s: %s", modifiedPath, err)
return
}
modified, err = cli.MarshalShort("json", true, parsedMod)
panicOnErr(err)
}
Expand Down Expand Up @@ -429,7 +435,7 @@ func Init(cmd *cobra.Command) {
meta := mustLoadMeta()
match, _ := cmd.Flags().GetString("match")
for _, name := range collectFiles(meta, args, match, true) {
if f, ok := meta.Files[name]; ok {
if f, ok := meta.Files[name]; ok && f.VersionLocal != "" {
panicOnErr(f.Reset())
}
}
Expand Down
140 changes: 140 additions & 0 deletions bulk/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,146 @@ func TestWorkflow(t *testing.T) {
mustHaveCalledAllHTTPMocks(t)
}

// TestPullFailure simulates a partial pull failure. The command should still
// complete successfully, but the failed file should be left alone. The state
// should be in a good place to retry the pull.
func TestPullFailure(t *testing.T) {
defer gock.Off()

expectRemote([]remoteFile{
{User: "a", ID: "a1", Version: "a11", fetch: true},
{User: "a", ID: "a2", Version: "a21", fetch: false},
{User: "b", ID: "b1", Version: "b11", fetch: true},
})

// Simulate a pull failure on the server.
gock.New("https://example.com").
Get("/users/a/items/a2").
Reply(http.StatusInternalServerError)

afs = afero.NewMemMapFs()

cli.Init("test", "1.0.0")
cli.Defaults()
Init(cli.Root)

// Init
// ====
run("bulk", "init", "example.com/all-items", "--url-template=/users/{user}/items/{id}")

mustExist(t, ".rshbulk")
mustExist(t, ".rshbulk/meta")
mustEqualJSON(t, "a/items/a1.json", `{"id": "a1"}`)
_, err := afs.Stat("a/items/a2.json")
require.Error(t, err)
mustEqualJSON(t, "b/items/b1.json", `{"id": "b1"}`)
mustHaveCalledAllHTTPMocks(t)

// Status should show the one failed file as needing to be pulled.
// ----------------------------------------------------------------
gock.Flush()

expectRemote([]remoteFile{
{User: "a", ID: "a1", Version: "a11", fetch: false},
{User: "a", ID: "a2", Version: "a21", fetch: false},
{User: "b", ID: "b1", Version: "b11", fetch: false},
})

out, err := run("bulk", "status")
require.NoError(t, err)
require.Contains(t, out, "Remote changes")
require.NotContains(t, out, "a/items/a1.json")
require.Contains(t, out, "added: a/items/a2.json")
require.NotContains(t, out, "b/items/b1.json")
mustHaveCalledAllHTTPMocks(t)
}

func TestPushFailure(t *testing.T) {
defer gock.Off()

expectRemote([]remoteFile{
{User: "a", ID: "a1", Version: "a11", fetch: true},
{User: "a", ID: "a2", Version: "a21", fetch: true},
{User: "b", ID: "b1", Version: "b11", fetch: true},
})

afs = afero.NewMemMapFs()

cli.Init("test", "1.0.0")
cli.Defaults()
Init(cli.Root)

// Init
// ====
run("bulk", "init", "example.com/all-items", "--url-template=/users/{user}/items/{id}")

mustExist(t, ".rshbulk")
mustExist(t, ".rshbulk/meta")
mustEqualJSON(t, "a/items/a1.json", `{"id": "a1"}`)
mustEqualJSON(t, "a/items/a2.json", `{"id": "a2"}`)
mustEqualJSON(t, "b/items/b1.json", `{"id": "b1"}`)
mustHaveCalledAllHTTPMocks(t)

// Modify files
// ------------

afero.WriteFile(afs, "a/items/a1.json", []byte(`{"id": "a1", "labels": ["one"]}`), 0600)
afero.WriteFile(afs, "a/items/a2.json", []byte(`{"id": "a2", "labels": ["two"]}`), 0600)
afero.WriteFile(afs, "b/items/b1.json", []byte(`{"id": "b1", "labels": ["three"]}`), 0600)

// Push with partial failure
// -------------------------
gock.Flush()

expectRemote([]remoteFile{
{User: "a", ID: "a1", Version: "a11"},
{User: "a", ID: "a2", Version: "a21"},
{User: "b", ID: "b1", Version: "b11"},
})

gock.New("https://example.com").
Put("/users/a/items/a1").
Reply(http.StatusOK)

gock.New("https://example.com").
Put("/users/a/items/a2").
Reply(http.StatusBadRequest) // <--- simulate invalid input

gock.New("https://example.com").
Put("/users/b/items/b1").
Reply(http.StatusOK)

// Remote has changed after push!
expectRemote([]remoteFile{
{User: "a", ID: "a1", Version: "a12", fetch: true},
{User: "a", ID: "a2", Version: "a22", fetch: false},
{User: "b", ID: "b1", Version: "b12", fetch: true},
})

out, err := run("bulk", "push")
require.NoError(t, err)
require.Contains(t, out, "Push complete")
mustHaveCalledAllHTTPMocks(t)

// Status should show the one failed file as needing to still be pushed.
// ---------------------------------------------------------------------
gock.Flush()

expectRemote([]remoteFile{
{User: "a", ID: "a1", Version: "a12"},
{User: "a", ID: "a2", Version: "a22"},
{User: "b", ID: "b1", Version: "b12"},
})

out, err = run("bulk", "status")
require.NoError(t, err)
require.Contains(t, out, "Local changes")
require.NotContains(t, out, "a/items/a1.json")
require.Contains(t, out, "modified: a/items/a2.json")
require.NotContains(t, out, "b/items/b1.json")
mustHaveCalledAllHTTPMocks(t)
}

func TestFalsey(t *testing.T) {
for _, item := range []any{false, 0, 0.0, "", []byte{}, []any{}, map[string]any{}, map[any]any{}} {
t.Run(fmt.Sprintf("%T-%+v", item, item), func(t *testing.T) {
Expand Down
22 changes: 16 additions & 6 deletions bulk/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ import (
"github.com/zeebo/xxh3"
)

// reformat returns the standardized/formatted JSON representation given JSON
// byte data as input.
func reformat(data []byte) ([]byte, error) {
// Round-trip to get consistent formatting. This is inefficient but a much
// nicer experience for people with auto-formatters set up in their editor
// or who may try to undo changes and get the formatting slightly off.
var tmp any
json.Unmarshal(data, &tmp)
return cli.MarshalShort("json", true, tmp)
}

// hash returns a new fast 128-bit hash of the given bytes.
func hash(b []byte) []byte {
tmp := xxh3.Hash128(b).Bytes()
Expand Down Expand Up @@ -63,12 +74,11 @@ func (f *File) IsChangedLocal(ignoreDeleted bool) bool {
return !ignoreDeleted
}

// Round-trip to get consistent formatting. This is inefficient but a much
// nicer experience for people with auto-formatters set up in their editor
// or who may try to undo changes and get the formatting slightly off.
var tmp any
json.Unmarshal(b, &tmp)
b, _ = cli.MarshalShort("json", true, tmp)
b, err = reformat(b)
if err != nil {
cli.LogWarning("Warning unable to format %s: %s\n", f.Path, err)
return false
}

return !bytes.Equal(hash(b), f.Hash)
}
Expand Down
76 changes: 60 additions & 16 deletions bulk/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ func getFirstKey(item any, keys ...string) string {
return ""
}

// fileMsg prints an error message and optional response to the terminal
// making sure to clear the progress bar first and then increment it by one
// after printing the message.
func fileMsg(bar *progressbar.ProgressBar, resp *cli.Response, format string, args ...any) {
bar.Clear()
fmt.Fprintf(cli.Stdout, format, args...)
if resp != nil {
cli.Formatter.Format(*resp)
}
bar.Add(1)
}

// listEntry represents a response from a list resources call.
type listEntry struct {
URL string `json:"url"`
Expand Down Expand Up @@ -291,23 +303,31 @@ func (m *Meta) Pull() error {
if f.VersionRemote == "" {
// This was removed on the remote!
delete(m.Files, f.Path)
m.Save()
if !f.IsChangedLocal(true) {
afs.Remove(f.Path)
if err := afs.Remove(f.Path); err != nil {
fileMsg(bar, nil, "Error removing file %s: %s\n", f.Path, err)
continue
}
}
bar.Add(1)
continue
}

b, err := f.Fetch()
if err != nil {
return err
fileMsg(bar, nil, "Error fetching %s from %s: %s\n", f.Path, f.URL, err)
continue
}

// Best effort to save the metadata between files in case the app crashes
// or is killed. This leaves us in a better state for the next run. We
// are trading speed and some disk churn for safety.
m.Save()

// Don't overwrite local edits!
if f.IsChangedLocal(true) {
bar.Clear()
fmt.Fprintln(cli.Stdout, "Skipping due to local edits:", f.Path)
bar.Add(1)
fileMsg(bar, nil, "Skipping due to local edits: %s\n", f.Path)
continue
}

Expand Down Expand Up @@ -380,6 +400,11 @@ func (m *Meta) GetChanged(files []string) ([]changedFile, []changedFile, error)
}
}

// Sort by path for consistent output.
sort.Slice(remote, func(i, j int) bool {
return remote[i].File.Path < remote[j].File.Path
})

// Because deleted files would be appended, we need to sort!
sort.Slice(local, func(i, j int) bool {
return local[i].File.Path < local[j].File.Path
Expand All @@ -402,6 +427,10 @@ func (m *Meta) Push() error {
progressbar.OptionSetDescription("Pushing resources..."),
)

// Keep track of which files were successfully pushed so we can update the
// metadata for them.
success := []changedFile{}

for _, changed := range local {
f := changed.File
if changed.Status == statusModified || changed.Status == statusAdded {
Expand All @@ -416,26 +445,39 @@ func (m *Meta) Push() error {

resp, err := cli.GetParsedResponse(req)
if err != nil {
return err
fileMsg(bar, nil, "Error uploading %s to %s: %s\n", f.Path, f.URL, err)
continue
}
if resp.Status >= 400 {
fmt.Fprintf(cli.Stdout, "Error uploading %s to %s\n", f.Path, f.URL)
cli.Formatter.Format(resp)
return err
fileMsg(bar, &resp, "Error uploading %s to %s\n", f.Path, f.URL)
continue
}

if changed.Status == statusAdded {
// Add the file to the metadata
m.Files[changed.File.Path] = changed.File
}

// In case of fetch or write errors, first mark this file as unmodified
// now that the push was successful and the updated data is on the server,
// making it not show as locally modified for subsequent commands. If the
// write is successful, this hash is overwritten with the updated
// contents, including any fields computed on the server at write time.
// This is best effort, so if it fails we just ignore it.
if formatted, err := reformat(body); err == nil {
changed.File.Hash = hash(formatted)
m.Save()
}

// Fetch and write the updated metadata/file to disk.
b, err := f.Fetch()
if err != nil {
return err
fileMsg(bar, nil, "Error fetching %s from %s: %s\n", f.Path, f.URL, err)
continue
}
if err := f.Write(b); err != nil {
return err
fileMsg(bar, nil, "Error writing file %s: %s\n", f.Path, err)
continue
}
} else {
req, _ := http.NewRequest(http.MethodDelete, f.URL, nil)
Expand All @@ -448,15 +490,17 @@ func (m *Meta) Push() error {

resp, err := cli.GetParsedResponse(req)
if err != nil {
return err
fileMsg(bar, nil, "Error deleting %s from %s: %s\n", f.Path, f.URL, err)
continue
}
if resp.Status >= 400 {
fmt.Fprintf(cli.Stdout, "Error uploading %s to %s\n", f.Path, f.URL)
cli.Formatter.Format(resp)
return err
fileMsg(bar, &resp, "Error deleting %s from %s\n", f.Path, f.URL)
continue
}
delete(m.Files, f.Path)
m.Save()
}
success = append(success, changed)
bar.Add(1)
}

Expand All @@ -466,7 +510,7 @@ func (m *Meta) Push() error {
return err
}

for _, changed := range local {
for _, changed := range success {
// Mark all the changed files as matching the new remote version. The
// file contents were already updated above. This code can't be run until
// after we pull the index again to get the updated remote versions.
Expand Down

0 comments on commit 2c85909

Please sign in to comment.