Skip to content

Commit 9316804

Browse files
committed
[API change] add client-side streaming get-batch; add test
* new API: api.GetBatchStream - requires req.StreamingGet == true - rejects zip - no client-side writing: caller gets the stream and is responsible to close it * tools: extend `tarch` sub-package with two draining flavors * add integration smoke-test Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent a19b22c commit 9316804

File tree

4 files changed

+195
-22
lines changed

4 files changed

+195
-22
lines changed

ais/test/moss_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"io"
1212
"os"
13+
"path"
1314
"strings"
1415
"sync"
1516
"testing"
@@ -903,3 +904,56 @@ func validateTarStreamingWithArchive(t *testing.T, m *ioContext, req *apc.MossRe
903904
tlog.Logfln("Streaming archive validation passed: %d entries, correct order (format: %s)",
904905
len(callback.entries), format)
905906
}
907+
908+
func TestGetBatchStream_DrainTar(t *testing.T) {
909+
proxyURL := tools.GetPrimaryURL()
910+
bp := tools.BaseAPIParams(proxyURL)
911+
912+
bck := cmn.Bck{Name: trand.String(10), Provider: apc.AIS}
913+
tools.CreateBucket(t, proxyURL, bck, nil, true /*cleanup*/)
914+
915+
// few objects
916+
names := []string{"s1", "s2", "sbig", "s3"}
917+
sizes := []int64{8*cos.KiB + 123, 12*cos.KiB + 456, 3*cos.MiB + 789, 4*cos.KiB + 0xabc}
918+
for i, n := range names {
919+
reader, err := readers.NewRand(sizes[i], cos.ChecksumNone)
920+
tassert.CheckFatal(t, err)
921+
922+
_, err = api.PutObject(&api.PutArgs{
923+
BaseParams: bp,
924+
Bck: bck,
925+
ObjName: n,
926+
Reader: reader,
927+
})
928+
tassert.CheckFatal(t, err)
929+
}
930+
931+
// Build Moss request (streaming tar).
932+
in := make([]apc.MossIn, 0, len(names))
933+
for _, n := range names {
934+
in = append(in, apc.MossIn{ObjName: n})
935+
}
936+
req := &apc.MossReq{In: in, StreamingGet: true}
937+
938+
rc, _, err := api.GetBatchStream(bp, bck, req)
939+
tassert.CheckFatal(t, err)
940+
defer rc.Close()
941+
942+
// TAR is the default output format
943+
ar, err := archive.NewReader(archive.ExtTar, rc)
944+
tassert.CheckFatal(t, err)
945+
946+
// prep. tarch.DrainVerify
947+
var (
948+
wantNames = make([]string, len(in))
949+
sum int64
950+
)
951+
for i, n := range names {
952+
wantNames[i] = path.Join(bck.Name, n) // NOTE: must be api/ml.go documented naming
953+
sum += sizes[i]
954+
}
955+
956+
drain := tarch.NewDrainVerify(t, wantNames, sizes)
957+
err = ar.ReadUntil(drain, "" /*match all*/, cos.EmptyMatchAll)
958+
tassert.CheckFatal(t, err)
959+
}

api/client.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,8 @@ func (reqParams *ReqParams) doReqStr(out *string) (int, error) {
155155
return resp.StatusCode, err
156156
}
157157

158-
// Makes request via do() and uses provided writer to write `resp.Body`
159-
// (which is also closes)
160-
//
161-
// Returns the entire wrapped response.
158+
// do() and use provided writer (to write `resp.Body`)
159+
// return the entire wrapped response
162160
func (reqParams *ReqParams) doWriter(w io.Writer) (wresp *wrappedResp, err error) {
163161
var resp *http.Response
164162
resp, err = reqParams.do()
@@ -171,6 +169,20 @@ func (reqParams *ReqParams) doWriter(w io.Writer) (wresp *wrappedResp, err error
171169
return
172170
}
173171

172+
// do() and return a checked response *with* the `resp.Body` as is
173+
// and *without* draining or closing the latter
174+
func (reqParams *ReqParams) doStream() (wresp *wrappedResp, body io.ReadCloser, err error) {
175+
resp, err := reqParams.do()
176+
if err != nil {
177+
return nil, nil, err
178+
}
179+
if err = reqParams.checkResp(resp); err != nil {
180+
resp.Body.Close()
181+
return nil, nil, err
182+
}
183+
return &wrappedResp{Response: resp}, resp.Body, nil
184+
}
185+
174186
// same as above except that it returns response body (as io.ReadCloser) for subsequent reading
175187
func (reqParams *ReqParams) doReader() (io.ReadCloser, int64, error) {
176188
resp, err := reqParams.do()

api/ml.go

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package api
66

77
import (
8+
"errors"
89
"fmt"
910
"io"
1011
"net/http"
@@ -48,36 +49,83 @@ import (
4849
// - but when read range is defined: MossOut.Size = (length of this range)
4950

5051
func GetBatch(bp BaseParams, bck cmn.Bck, req *apc.MossReq, w io.Writer) (resp apc.MossResp, err error) {
51-
bp.Method = http.MethodGet
52-
53-
q, path := _optionalBucket(&bck)
54-
reqParams := AllocRp()
55-
{
56-
reqParams.BaseParams = bp
57-
reqParams.Path = path
58-
reqParams.Body = cos.MustMarshal(req)
59-
reqParams.Header = http.Header{cos.HdrContentType: []string{cos.ContentJSON}}
60-
reqParams.Query = q
61-
}
52+
rp, q := _makeMossReq(bp, bck, req)
6253
if req.StreamingGet {
6354
var wresp *wrappedResp
64-
wresp, err = reqParams.doWriter(w)
55+
wresp, err = rp.doWriter(w)
6556
if err == nil {
66-
ct := wresp.Header.Get(cos.HdrContentType)
67-
if !strings.HasPrefix(ct, "application/") {
68-
err = fmt.Errorf("unexpected Content-Type %q", ct)
69-
}
57+
err = _checkMossResp(wresp.Header)
7058
}
7159
} else {
72-
_, err = reqParams.readMultipart(&resp, w)
60+
_, err = rp.readMultipart(&resp, w)
7361
}
74-
FreeRp(reqParams)
62+
FreeRp(rp)
7563
if q != nil {
7664
qfree(q)
7765
}
7866
return resp, err
7967
}
8068

69+
// GetBatchStream starts a streaming GetBatch and returns the response body _as is_
70+
// and response headers:
71+
// - the returned body is forward-only (non-seekable)
72+
// - supported streaming formats: .tar/.tgz/.tar.lz4; zip is excepted as non-streamable
73+
// - it is the caller's responsibility to close the body
74+
// - compare with GetBatch() above
75+
76+
const zipext = ".zip"
77+
78+
func GetBatchStream(bp BaseParams, bck cmn.Bck, req *apc.MossReq) (io.ReadCloser, http.Header, error) {
79+
if !req.StreamingGet {
80+
return nil, nil, errors.New("GetBatchStream: expecting req.StreamingGet to be set")
81+
}
82+
if req.OutputFormat != "" {
83+
if req.OutputFormat == zipext || strings.Contains(req.OutputFormat, zipext[1:]) {
84+
return nil, nil, fmt.Errorf("GetBatchStream: output format %q is not streamable", req.OutputFormat)
85+
}
86+
}
87+
rp, q := _makeMossReq(bp, bck, req)
88+
89+
wresp, body, err := rp.doStream()
90+
FreeRp(rp)
91+
if q != nil {
92+
qfree(q)
93+
}
94+
if err != nil {
95+
return nil, nil, err
96+
}
97+
if err := _checkMossResp(wresp.Header); err != nil {
98+
body.Close()
99+
return nil, nil, err
100+
}
101+
return body, wresp.Header, nil
102+
}
103+
104+
//
105+
// misc. helpers
106+
//
107+
108+
// not a very strict check for: application/x-tar | application/gzip (tgz) | application/x-gtar | ...
109+
func _checkMossResp(hdr http.Header) error {
110+
ct := hdr.Get(cos.HdrContentType)
111+
if !strings.HasPrefix(ct, "application/") {
112+
return fmt.Errorf("unexpected Content-Type %q", ct)
113+
}
114+
return nil
115+
}
116+
117+
func _makeMossReq(bp BaseParams, bck cmn.Bck, req *apc.MossReq) (*ReqParams, url.Values) {
118+
bp.Method = http.MethodGet
119+
q, path := _optionalBucket(&bck)
120+
rp := AllocRp()
121+
rp.BaseParams = bp
122+
rp.Path = path
123+
rp.Body = cos.MustMarshal(req)
124+
rp.Header = http.Header{cos.HdrContentType: []string{cos.ContentJSON}}
125+
rp.Query = q
126+
return rp, q
127+
}
128+
81129
func _optionalBucket(bck *cmn.Bck) (q url.Values, path string) {
82130
if bck.IsEmpty() {
83131
path = apc.URLPathML.Join(apc.Moss)

tools/tarch/archive.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ import (
1414
"os"
1515
"strconv"
1616
"sync"
17+
"testing"
1718
"time"
1819

1920
"github.com/NVIDIA/aistore/cmn/archive"
2021
"github.com/NVIDIA/aistore/cmn/cos"
2122
"github.com/NVIDIA/aistore/cmn/debug"
2223
"github.com/NVIDIA/aistore/cmn/mono"
2324
"github.com/NVIDIA/aistore/ext/dsort/shard"
25+
"github.com/NVIDIA/aistore/tools/tassert"
2426
"github.com/NVIDIA/aistore/tools/trand"
2527
)
2628

@@ -327,3 +329,60 @@ func newBuf32k() (buf []byte) {
327329
func freeBuf32k(buf []byte) {
328330
pool32k.Put(&buf)
329331
}
332+
333+
///////////
334+
// Drain //
335+
///////////
336+
337+
type (
338+
Drain struct {
339+
Total int64
340+
Num int
341+
}
342+
343+
DrainVerify struct {
344+
t *testing.T
345+
wantNames []string
346+
wantSizes []int64
347+
i int
348+
total int64
349+
}
350+
)
351+
352+
func (drain *Drain) Call(_ string, r cos.ReadCloseSizer, _ any) (bool, error) {
353+
n, err := io.Copy(io.Discard, r)
354+
drain.Total += n
355+
_ = r.Close()
356+
if err == nil {
357+
drain.Num++
358+
}
359+
return false, err
360+
}
361+
362+
func NewDrainVerify(t *testing.T, wantNames []string, wantSizes []int64) *DrainVerify {
363+
return &DrainVerify{
364+
t: t,
365+
wantNames: wantNames,
366+
wantSizes: wantSizes,
367+
}
368+
}
369+
370+
func (drain *DrainVerify) Call(name string, r cos.ReadCloseSizer, hdr any) (bool, error) {
371+
tarhdr, ok := hdr.(*tar.Header)
372+
if !ok {
373+
tassert.Fatalf(drain.t, false, "expected *tar.Header, got %T", hdr)
374+
}
375+
expSize := drain.wantSizes[drain.i]
376+
tassert.Errorf(drain.t, tarhdr.Size == expSize, "entry[%d] size mismatch: hdr=%d exp=%d", drain.i, tarhdr.Size, expSize)
377+
378+
expName := drain.wantNames[drain.i]
379+
tassert.Errorf(drain.t, name == expName, "entry[%d] name mismatch: got %q exp %q", drain.i, name, expName)
380+
381+
n, err := io.Copy(io.Discard, r)
382+
drain.total += n
383+
_ = r.Close()
384+
tassert.Errorf(drain.t, n == tarhdr.Size, "entry[%d] drained %d bytes != hdr.Size %d", drain.i, n, tarhdr.Size)
385+
386+
drain.i++
387+
return false, err
388+
}

0 commit comments

Comments
 (0)