Skip to content

Commit

Permalink
Send offset in FS.Put requests
Browse files Browse the repository at this point in the history
CL: mos: Send offset in FS.Put requests

PUBLISHED_FROM=e59249ccc798670c125f3fc55ee6f7c9ca714251
  • Loading branch information
Deomid Ryabkov authored and cesantabot committed Dec 6, 2018
1 parent 89b449a commit ab8a56b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 12 deletions.
19 changes: 13 additions & 6 deletions mos/dev/dev_conn.go
Expand Up @@ -205,16 +205,23 @@ func isJSON(s string) bool {
return json.Unmarshal([]byte(s), &js) == nil
}

func CallDeviceService(
ctx context.Context, devConn *DevConn, method string, args string,
) (string, error) {
if args != "" && !isJSON(args) {
return "", errors.Errorf("Args [%s] is not a valid JSON string", args)
func CallDeviceService(ctx context.Context, devConn *DevConn, method string, args interface{}) (string, error) {
argsJSON, ok := args.(string)
if !ok {
b, err := json.Marshal(args)
if err != nil {
return "", errors.Annotatef(err, "failed to serialize args")
}
argsJSON = string(b)
} else {
if !isJSON(argsJSON) {
return "", errors.Errorf("Args [%s] is not a valid JSON string", args)
}
}

cmd := &frame.Command{Cmd: method}
if args != "" {
cmd.Args = ourjson.RawJSON([]byte(args))
cmd.Args = ourjson.RawJSON([]byte(argsJSON))
}

resp, err := devConn.RPC.Call(ctx, devConn.Dest, cmd, rpccreds.GetRPCCreds)
Expand Down
20 changes: 14 additions & 6 deletions mos/fs/fs.go
Expand Up @@ -162,6 +162,7 @@ func PutData(ctx context.Context, devConn *dev.DevConn, r io.Reader, devFilename
data := make([]byte, *ChunkSizeFlag)
appendFlag := false

offset := 0
attempts := fsOpAttempts
for {
// Read the next chunk from the file.
Expand All @@ -171,12 +172,18 @@ func PutData(ctx context.Context, devConn *dev.DevConn, r io.Reader, devFilename
ctx2, cancel := context.WithTimeout(ctx, devConn.GetTimeout())
defer cancel()
glog.V(1).Infof("Sending %s %d (attempts %d)", devFilename, n, attempts)
err := devConn.CFilesystem.Put(ctx2, &fwfs.PutArgs{
Filename: &devFilename,
Data: lptr.String(base64.StdEncoding.EncodeToString(data[:n])),
Append: lptr.Bool(appendFlag),
})
if err != nil {
putArgs := &struct {
Filename string `json:"filename"`
Offset int `json:"offset"`
Append bool `json:"append"`
Data string `json:"data"`
}{
Filename: devFilename,
Offset: offset,
Append: appendFlag,
Data: base64.StdEncoding.EncodeToString(data[:n]),
}
if _, err := dev.CallDeviceService(ctx2, devConn, "FS.Put", putArgs); err != nil {
attempts -= 1
if attempts > 0 {
glog.Warningf("Error: %s", err)
Expand All @@ -200,6 +207,7 @@ func PutData(ctx context.Context, devConn *dev.DevConn, r io.Reader, devFilename

// All subsequent writes to this file will append the chunk.
appendFlag = true
offset += n
}

return nil
Expand Down

0 comments on commit ab8a56b

Please sign in to comment.