diff --git a/bulk/commands.go b/bulk/commands.go index 66483a6..1ccbc83 100644 --- a/bulk/commands.go +++ b/bulk/commands.go @@ -229,13 +229,19 @@ func diff(originalPath, modifiedPath string, original, modified []byte) { var err error if len(original) > 0 { - json.Unmarshal(original, &parsedOrig) + if err := json.Unmarshal(original, &parsedOrig); err != nil { + cli.LogWarning("Unable to parse %s: %s", originalPath, err) + return + } original, err = cli.MarshalShort("json", true, parsedOrig) panicOnErr(err) } if len(modified) > 0 { - json.Unmarshal(modified, &parsedMod) + if err := json.Unmarshal(modified, &parsedMod); err != nil { + cli.LogWarning("Unable to parse %s: %s", modifiedPath, err) + return + } modified, err = cli.MarshalShort("json", true, parsedMod) panicOnErr(err) } @@ -429,7 +435,7 @@ func Init(cmd *cobra.Command) { meta := mustLoadMeta() match, _ := cmd.Flags().GetString("match") for _, name := range collectFiles(meta, args, match, true) { - if f, ok := meta.Files[name]; ok { + if f, ok := meta.Files[name]; ok && f.VersionLocal != "" { panicOnErr(f.Reset()) } } diff --git a/bulk/commands_test.go b/bulk/commands_test.go index ddc33bf..2227d46 100644 --- a/bulk/commands_test.go +++ b/bulk/commands_test.go @@ -355,6 +355,146 @@ func TestWorkflow(t *testing.T) { mustHaveCalledAllHTTPMocks(t) } +// TestPullFailure simulates a partial pull failure. The command should still +// complete successfully, but the failed file should be left alone. The state +// should be in a good place to retry the pull. +func TestPullFailure(t *testing.T) { + defer gock.Off() + + expectRemote([]remoteFile{ + {User: "a", ID: "a1", Version: "a11", fetch: true}, + {User: "a", ID: "a2", Version: "a21", fetch: false}, + {User: "b", ID: "b1", Version: "b11", fetch: true}, + }) + + // Simulate a pull failure on the server. + gock.New("https://example.com"). + Get("/users/a/items/a2"). + Reply(http.StatusInternalServerError) + + afs = afero.NewMemMapFs() + + cli.Init("test", "1.0.0") + cli.Defaults() + Init(cli.Root) + + // Init + // ==== + run("bulk", "init", "example.com/all-items", "--url-template=/users/{user}/items/{id}") + + mustExist(t, ".rshbulk") + mustExist(t, ".rshbulk/meta") + mustEqualJSON(t, "a/items/a1.json", `{"id": "a1"}`) + _, err := afs.Stat("a/items/a2.json") + require.Error(t, err) + mustEqualJSON(t, "b/items/b1.json", `{"id": "b1"}`) + mustHaveCalledAllHTTPMocks(t) + + // Status should show the one failed file as needing to be pulled. + // ---------------------------------------------------------------- + gock.Flush() + + expectRemote([]remoteFile{ + {User: "a", ID: "a1", Version: "a11", fetch: false}, + {User: "a", ID: "a2", Version: "a21", fetch: false}, + {User: "b", ID: "b1", Version: "b11", fetch: false}, + }) + + out, err := run("bulk", "status") + require.NoError(t, err) + require.Contains(t, out, "Remote changes") + require.NotContains(t, out, "a/items/a1.json") + require.Contains(t, out, "added: a/items/a2.json") + require.NotContains(t, out, "b/items/b1.json") + mustHaveCalledAllHTTPMocks(t) +} + +func TestPushFailure(t *testing.T) { + defer gock.Off() + + expectRemote([]remoteFile{ + {User: "a", ID: "a1", Version: "a11", fetch: true}, + {User: "a", ID: "a2", Version: "a21", fetch: true}, + {User: "b", ID: "b1", Version: "b11", fetch: true}, + }) + + afs = afero.NewMemMapFs() + + cli.Init("test", "1.0.0") + cli.Defaults() + Init(cli.Root) + + // Init + // ==== + run("bulk", "init", "example.com/all-items", "--url-template=/users/{user}/items/{id}") + + mustExist(t, ".rshbulk") + mustExist(t, ".rshbulk/meta") + mustEqualJSON(t, "a/items/a1.json", `{"id": "a1"}`) + mustEqualJSON(t, "a/items/a2.json", `{"id": "a2"}`) + mustEqualJSON(t, "b/items/b1.json", `{"id": "b1"}`) + mustHaveCalledAllHTTPMocks(t) + + // Modify files + // ------------ + + afero.WriteFile(afs, "a/items/a1.json", []byte(`{"id": "a1", "labels": ["one"]}`), 0600) + afero.WriteFile(afs, "a/items/a2.json", []byte(`{"id": "a2", "labels": ["two"]}`), 0600) + afero.WriteFile(afs, "b/items/b1.json", []byte(`{"id": "b1", "labels": ["three"]}`), 0600) + + // Push with partial failure + // ------------------------- + gock.Flush() + + expectRemote([]remoteFile{ + {User: "a", ID: "a1", Version: "a11"}, + {User: "a", ID: "a2", Version: "a21"}, + {User: "b", ID: "b1", Version: "b11"}, + }) + + gock.New("https://example.com"). + Put("/users/a/items/a1"). + Reply(http.StatusOK) + + gock.New("https://example.com"). + Put("/users/a/items/a2"). + Reply(http.StatusBadRequest) // <--- simulate invalid input + + gock.New("https://example.com"). + Put("/users/b/items/b1"). + Reply(http.StatusOK) + + // Remote has changed after push! + expectRemote([]remoteFile{ + {User: "a", ID: "a1", Version: "a12", fetch: true}, + {User: "a", ID: "a2", Version: "a22", fetch: false}, + {User: "b", ID: "b1", Version: "b12", fetch: true}, + }) + + out, err := run("bulk", "push") + require.NoError(t, err) + require.Contains(t, out, "Push complete") + mustHaveCalledAllHTTPMocks(t) + + // Status should show the one failed file as needing to still be pushed. + // --------------------------------------------------------------------- + gock.Flush() + + expectRemote([]remoteFile{ + {User: "a", ID: "a1", Version: "a12"}, + {User: "a", ID: "a2", Version: "a22"}, + {User: "b", ID: "b1", Version: "b12"}, + }) + + out, err = run("bulk", "status") + require.NoError(t, err) + require.Contains(t, out, "Local changes") + require.NotContains(t, out, "a/items/a1.json") + require.Contains(t, out, "modified: a/items/a2.json") + require.NotContains(t, out, "b/items/b1.json") + mustHaveCalledAllHTTPMocks(t) +} + func TestFalsey(t *testing.T) { for _, item := range []any{false, 0, 0.0, "", []byte{}, []any{}, map[string]any{}, map[any]any{}} { t.Run(fmt.Sprintf("%T-%+v", item, item), func(t *testing.T) { diff --git a/bulk/file.go b/bulk/file.go index 27ecb44..611ae3f 100644 --- a/bulk/file.go +++ b/bulk/file.go @@ -15,6 +15,17 @@ import ( "github.com/zeebo/xxh3" ) +// reformat returns the standardized/formatted JSON representation given JSON +// byte data as input. +func reformat(data []byte) ([]byte, error) { + // Round-trip to get consistent formatting. This is inefficient but a much + // nicer experience for people with auto-formatters set up in their editor + // or who may try to undo changes and get the formatting slightly off. + var tmp any + json.Unmarshal(data, &tmp) + return cli.MarshalShort("json", true, tmp) +} + // hash returns a new fast 128-bit hash of the given bytes. func hash(b []byte) []byte { tmp := xxh3.Hash128(b).Bytes() @@ -63,12 +74,11 @@ func (f *File) IsChangedLocal(ignoreDeleted bool) bool { return !ignoreDeleted } - // Round-trip to get consistent formatting. This is inefficient but a much - // nicer experience for people with auto-formatters set up in their editor - // or who may try to undo changes and get the formatting slightly off. - var tmp any - json.Unmarshal(b, &tmp) - b, _ = cli.MarshalShort("json", true, tmp) + b, err = reformat(b) + if err != nil { + cli.LogWarning("Warning unable to format %s: %s\n", f.Path, err) + return false + } return !bytes.Equal(hash(b), f.Hash) } diff --git a/bulk/metadata.go b/bulk/metadata.go index 4454436..71065a3 100644 --- a/bulk/metadata.go +++ b/bulk/metadata.go @@ -69,6 +69,18 @@ func getFirstKey(item any, keys ...string) string { return "" } +// fileMsg prints an error message and optional response to the terminal +// making sure to clear the progress bar first and then increment it by one +// after printing the message. +func fileMsg(bar *progressbar.ProgressBar, resp *cli.Response, format string, args ...any) { + bar.Clear() + fmt.Fprintf(cli.Stdout, format, args...) + if resp != nil { + cli.Formatter.Format(*resp) + } + bar.Add(1) +} + // listEntry represents a response from a list resources call. type listEntry struct { URL string `json:"url"` @@ -291,8 +303,12 @@ func (m *Meta) Pull() error { if f.VersionRemote == "" { // This was removed on the remote! delete(m.Files, f.Path) + m.Save() if !f.IsChangedLocal(true) { - afs.Remove(f.Path) + if err := afs.Remove(f.Path); err != nil { + fileMsg(bar, nil, "Error removing file %s: %s\n", f.Path, err) + continue + } } bar.Add(1) continue @@ -300,14 +316,18 @@ func (m *Meta) Pull() error { b, err := f.Fetch() if err != nil { - return err + fileMsg(bar, nil, "Error fetching %s from %s: %s\n", f.Path, f.URL, err) + continue } + // Best effort to save the metadata between files in case the app crashes + // or is killed. This leaves us in a better state for the next run. We + // are trading speed and some disk churn for safety. + m.Save() + // Don't overwrite local edits! if f.IsChangedLocal(true) { - bar.Clear() - fmt.Fprintln(cli.Stdout, "Skipping due to local edits:", f.Path) - bar.Add(1) + fileMsg(bar, nil, "Skipping due to local edits: %s\n", f.Path) continue } @@ -380,6 +400,11 @@ func (m *Meta) GetChanged(files []string) ([]changedFile, []changedFile, error) } } + // Sort by path for consistent output. + sort.Slice(remote, func(i, j int) bool { + return remote[i].File.Path < remote[j].File.Path + }) + // Because deleted files would be appended, we need to sort! sort.Slice(local, func(i, j int) bool { return local[i].File.Path < local[j].File.Path @@ -402,6 +427,10 @@ func (m *Meta) Push() error { progressbar.OptionSetDescription("Pushing resources..."), ) + // Keep track of which files were successfully pushed so we can update the + // metadata for them. + success := []changedFile{} + for _, changed := range local { f := changed.File if changed.Status == statusModified || changed.Status == statusAdded { @@ -416,12 +445,12 @@ func (m *Meta) Push() error { resp, err := cli.GetParsedResponse(req) if err != nil { - return err + fileMsg(bar, nil, "Error uploading %s to %s: %s\n", f.Path, f.URL, err) + continue } if resp.Status >= 400 { - fmt.Fprintf(cli.Stdout, "Error uploading %s to %s\n", f.Path, f.URL) - cli.Formatter.Format(resp) - return err + fileMsg(bar, &resp, "Error uploading %s to %s\n", f.Path, f.URL) + continue } if changed.Status == statusAdded { @@ -429,13 +458,26 @@ func (m *Meta) Push() error { m.Files[changed.File.Path] = changed.File } + // In case of fetch or write errors, first mark this file as unmodified + // now that the push was successful and the updated data is on the server, + // making it not show as locally modified for subsequent commands. If the + // write is successful, this hash is overwritten with the updated + // contents, including any fields computed on the server at write time. + // This is best effort, so if it fails we just ignore it. + if formatted, err := reformat(body); err == nil { + changed.File.Hash = hash(formatted) + m.Save() + } + // Fetch and write the updated metadata/file to disk. b, err := f.Fetch() if err != nil { - return err + fileMsg(bar, nil, "Error fetching %s from %s: %s\n", f.Path, f.URL, err) + continue } if err := f.Write(b); err != nil { - return err + fileMsg(bar, nil, "Error writing file %s: %s\n", f.Path, err) + continue } } else { req, _ := http.NewRequest(http.MethodDelete, f.URL, nil) @@ -448,15 +490,17 @@ func (m *Meta) Push() error { resp, err := cli.GetParsedResponse(req) if err != nil { - return err + fileMsg(bar, nil, "Error deleting %s from %s: %s\n", f.Path, f.URL, err) + continue } if resp.Status >= 400 { - fmt.Fprintf(cli.Stdout, "Error uploading %s to %s\n", f.Path, f.URL) - cli.Formatter.Format(resp) - return err + fileMsg(bar, &resp, "Error deleting %s from %s\n", f.Path, f.URL) + continue } delete(m.Files, f.Path) + m.Save() } + success = append(success, changed) bar.Add(1) } @@ -466,7 +510,7 @@ func (m *Meta) Push() error { return err } - for _, changed := range local { + for _, changed := range success { // Mark all the changed files as matching the new remote version. The // file contents were already updated above. This code can't be run until // after we pull the index again to get the updated remote versions.