Skip to content

Commit 0572fe9

Browse files
committed
backend: add GCP support for multipart uploads
* GCP's Go SDK doesn't natively support multipart uploads via XML. * implement direct HTTP client using `cmn.AllocHra()` to issue raw HTTP reqeusts to GCP's XML API endpoints. Signed-off-by: Tony Chen <a122774007@gmail.com>
1 parent 3f9da7b commit 0572fe9

File tree

7 files changed

+272
-19
lines changed

7 files changed

+272
-19
lines changed

.gitlab-ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,7 @@ test:short:gcp:
521521
<<: *test_short_skip_scheduled_def
522522
variables:
523523
BUCKET: "gs://ais-ci"
524+
timeout: 1h
524525
rules:
525526
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && $CI_MERGE_REQUEST_LABELS =~ /.*skip-ci.*/'
526527
when: manual

ais/backend/gcp.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
)
3535

3636
const (
37+
gcpXMLEndpoint = "https://storage.googleapis.com"
3738
gcpChecksumType = "x-goog-meta-ais-cksum-type"
3839
gcpChecksumVal = "x-goog-meta-ais-cksum-val"
3940

@@ -44,8 +45,9 @@ const (
4445

4546
type (
4647
gsbp struct {
47-
t core.TargetPut
48-
projectID string
48+
t core.TargetPut
49+
projectID string
50+
httpClient *http.Client // for raw requests
4951
base
5052
}
5153
)
@@ -112,7 +114,9 @@ func (gsbp *gsbp) createClient(ctx context.Context) (*storage.Client, error) {
112114
}
113115
return nil, cmn.NewErrFailedTo(nil, "gcp-backend: create", "http transport", err)
114116
}
115-
opts = append(opts, option.WithHTTPClient(tracing.NewTraceableClient(&http.Client{Transport: transport})))
117+
// Store authenticated HTTP client for raw requests (e.g., multipart uploads)
118+
gsbp.httpClient = tracing.NewTraceableClient(&http.Client{Transport: transport})
119+
opts = append(opts, option.WithHTTPClient(gsbp.httpClient))
116120
// create HTTP client
117121
client, err := storage.NewClient(ctx, opts...)
118122
if err != nil {

ais/backend/gcpmpt.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
//go:build gcp
2+
3+
// Package backend contains core/backend interface implementations for supported backend providers.
4+
/*
5+
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
6+
*/
7+
package backend
8+
9+
import (
10+
"encoding/xml"
11+
"errors"
12+
"fmt"
13+
"io"
14+
"net/http"
15+
"net/url"
16+
"strconv"
17+
18+
"github.com/NVIDIA/aistore/api/apc"
19+
"github.com/NVIDIA/aistore/cmn"
20+
"github.com/NVIDIA/aistore/cmn/cos"
21+
"github.com/NVIDIA/aistore/cmn/nlog"
22+
"github.com/NVIDIA/aistore/core"
23+
)
24+
25+
// NOTE: Google's XML API is a compatibility layer for S3 clients and implements a subset of S3's control plane — buckets, ACLs, multipart, versioning.
26+
// This API exists separately from the Google's Go client library (cloud.google.com/go/storage) that implements regular GET, PUT, HEAD, etc.
27+
28+
type (
29+
// XML response structures for GCP multipart upload
30+
initiateMptUploadResult struct {
31+
XMLName xml.Name `xml:"InitiateMultipartUploadResult"`
32+
Bucket string `xml:"Bucket"`
33+
Key string `xml:"Key"`
34+
UploadID string `xml:"UploadId"`
35+
}
36+
37+
completeMptUploadResult struct {
38+
XMLName xml.Name `xml:"CompleteMultipartUploadResult"`
39+
Location string `xml:"Location"`
40+
Bucket string `xml:"Bucket"`
41+
Key string `xml:"Key"`
42+
ETag string `xml:"ETag"`
43+
}
44+
45+
completedPart struct {
46+
PartNumber int32 `xml:"PartNumber"`
47+
ETag string `xml:"ETag"`
48+
}
49+
50+
completeMultipartUpload struct {
51+
XMLName xml.Name `xml:"CompleteMultipartUpload"`
52+
Parts []completedPart `xml:"Part"`
53+
}
54+
)
55+
56+
func (gsbp *gsbp) StartMpt(lom *core.LOM, _ *http.Request) (id string, ecode int, err error) {
57+
cloudBck := lom.Bck().RemoteBck()
58+
59+
reqArgs := cmn.AllocHra()
60+
{
61+
reqArgs.Method = http.MethodPost
62+
reqArgs.Base = gcpXMLEndpoint
63+
reqArgs.Path = cos.JoinPath(cloudBck.Name, lom.ObjName)
64+
reqArgs.Header = http.Header{
65+
cos.HdrContentLength: []string{"0"},
66+
}
67+
reqArgs.Query = url.Values{
68+
apc.QparamMptUploads: []string{""},
69+
}
70+
}
71+
72+
req, err := reqArgs.Req()
73+
if err != nil {
74+
cmn.FreeHra(reqArgs)
75+
return "", http.StatusInternalServerError, err
76+
}
77+
78+
resp, err := gsbp.httpClient.Do(req)
79+
cmn.FreeHra(reqArgs)
80+
cmn.HreqFree(req)
81+
82+
if err != nil {
83+
return "", http.StatusInternalServerError, err
84+
}
85+
defer resp.Body.Close()
86+
87+
if resp.StatusCode != http.StatusOK {
88+
body, _ := io.ReadAll(resp.Body)
89+
err = fmt.Errorf("gcp: failed to initiate multipart upload: %s (status: %d)", string(body), resp.StatusCode)
90+
return "", resp.StatusCode, err
91+
}
92+
93+
var result initiateMptUploadResult
94+
if err = xml.NewDecoder(resp.Body).Decode(&result); err != nil {
95+
return "", http.StatusInternalServerError, fmt.Errorf("failed to decode response: %w", err)
96+
}
97+
98+
if cmn.Rom.V(5, cos.ModBackend) {
99+
nlog.Infof("[start_mpt] %s, upload_id: %s", cloudBck.Cname(lom.ObjName), result.UploadID)
100+
}
101+
102+
return result.UploadID, 0, nil
103+
}
104+
105+
func (gsbp *gsbp) PutMptPart(lom *core.LOM, r io.ReadCloser, _ *http.Request, uploadID string, size int64, partNum int32) (string, int, error) {
106+
cloudBck := lom.Bck().RemoteBck()
107+
108+
reqArgs := cmn.AllocHra()
109+
{
110+
reqArgs.Method = http.MethodPut
111+
reqArgs.Base = gcpXMLEndpoint
112+
reqArgs.Path = cos.JoinPath(cloudBck.Name, lom.ObjName)
113+
reqArgs.BodyR = r
114+
reqArgs.Query = url.Values{
115+
apc.QparamMptPartNo: []string{strconv.FormatInt(int64(partNum), 10)},
116+
apc.QparamMptUploadID: []string{uploadID},
117+
}
118+
reqArgs.Header = http.Header{
119+
cos.HdrContentLength: []string{strconv.FormatInt(size, 10)},
120+
}
121+
}
122+
123+
req, err := reqArgs.Req()
124+
if err != nil {
125+
cmn.FreeHra(reqArgs)
126+
cos.Close(r)
127+
return "", http.StatusInternalServerError, err
128+
}
129+
req.ContentLength = size
130+
131+
resp, err := gsbp.httpClient.Do(req)
132+
cmn.FreeHra(reqArgs)
133+
cmn.HreqFree(req)
134+
cos.Close(r)
135+
136+
if err != nil {
137+
return "", http.StatusInternalServerError, err
138+
}
139+
defer resp.Body.Close()
140+
141+
if resp.StatusCode != http.StatusOK {
142+
body, _ := io.ReadAll(resp.Body)
143+
err = fmt.Errorf("gcp: failed to upload part: %s (status: %d)", string(body), resp.StatusCode)
144+
return "", resp.StatusCode, err
145+
}
146+
147+
etag := resp.Header.Get("ETag")
148+
if etag == "" {
149+
return "", http.StatusInternalServerError, errors.New("gcp: no ETag in response")
150+
}
151+
152+
if cmn.Rom.V(5, cos.ModBackend) {
153+
nlog.Infof("[put_mpt_part] %s, part: %d, etag: %s", cloudBck.Cname(lom.ObjName), partNum, etag)
154+
}
155+
156+
return etag, 0, nil
157+
}
158+
159+
func (gsbp *gsbp) CompleteMpt(lom *core.LOM, _ *http.Request, uploadID string, _ []byte, parts apc.MptCompletedParts) (version, etag string, _ int, _ error) {
160+
cloudBck := lom.Bck().RemoteBck()
161+
162+
// Build XML body with completed parts
163+
completeMpt := completeMultipartUpload{
164+
Parts: make([]completedPart, len(parts)),
165+
}
166+
for i, part := range parts {
167+
completeMpt.Parts[i] = completedPart{
168+
PartNumber: int32(part.PartNumber),
169+
ETag: part.ETag,
170+
}
171+
}
172+
173+
xmlBody, err := xml.Marshal(completeMpt)
174+
if err != nil {
175+
return "", "", http.StatusInternalServerError, fmt.Errorf("failed to marshal XML: %w", err)
176+
}
177+
178+
reqArgs := cmn.AllocHra()
179+
{
180+
reqArgs.Method = http.MethodPost
181+
reqArgs.Base = gcpXMLEndpoint
182+
reqArgs.Path = cos.JoinPath(cloudBck.Name, lom.ObjName)
183+
reqArgs.Body = xmlBody
184+
reqArgs.Header = http.Header{
185+
cos.HdrContentType: []string{cos.ContentXML},
186+
cos.HdrContentLength: []string{strconv.Itoa(len(xmlBody))},
187+
}
188+
reqArgs.Query = url.Values{
189+
apc.QparamMptUploadID: []string{uploadID},
190+
}
191+
}
192+
193+
req, err := reqArgs.Req()
194+
if err != nil {
195+
cmn.FreeHra(reqArgs)
196+
return "", "", http.StatusInternalServerError, err
197+
}
198+
199+
resp, err := gsbp.httpClient.Do(req)
200+
cmn.FreeHra(reqArgs)
201+
cmn.HreqFree(req)
202+
203+
if err != nil {
204+
return "", "", http.StatusInternalServerError, err
205+
}
206+
defer resp.Body.Close()
207+
208+
if resp.StatusCode != http.StatusOK {
209+
body, _ := io.ReadAll(resp.Body)
210+
err = fmt.Errorf("gcp: failed to complete multipart upload: %s (status: %d)", string(body), resp.StatusCode)
211+
return "", "", resp.StatusCode, err
212+
}
213+
214+
var result completeMptUploadResult
215+
if err = xml.NewDecoder(resp.Body).Decode(&result); err != nil {
216+
return "", "", http.StatusInternalServerError, fmt.Errorf("failed to decode response: %w", err)
217+
}
218+
219+
// Decode ETag
220+
if result.ETag != "" {
221+
if e, ok := cmn.BackendHelpers.Google.EncodeETag(result.ETag); ok {
222+
etag = e
223+
} else {
224+
etag = result.ETag
225+
}
226+
}
227+
228+
if cmn.Rom.V(5, cos.ModBackend) {
229+
nlog.Infof("[complete_mpt] %s, version: %s, etag: %s", cloudBck.Cname(lom.ObjName), version, etag)
230+
}
231+
232+
return version, etag, 0, nil
233+
}

ais/test/common_test.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ import (
4545
const rebalanceObjectDistributionTestCoef = 0.3
4646

4747
const (
48-
prefixDir = "filter"
49-
largeFileSize = 4 * cos.MiB
50-
awsMinPartSize = 5 * cos.MiB
48+
prefixDir = "filter"
49+
largeFileSize = 4 * cos.MiB
50+
cloudMinPartSize = 5 * cos.MiB // AWS and GCP minimum part size for multipart uploads
5151

5252
workerCnt = 10
5353
)
@@ -293,29 +293,38 @@ func (m *ioContext) puts(ignoreErrs ...bool) {
293293
tassert.CheckFatal(m.t, err)
294294
}
295295

296-
// adjustFileSizeRange adjusts the file size range to avoid aws[EntityTooSmall] errors
297-
func (m *ioContext) adjustFileSizeRange(minSize uint64) {
296+
// adjustFileSizeRange adjusts the file size range to avoid cloud backend [EntityTooSmall] errors
297+
// for AWS and GCP backends which require minimum 5MiB per part in multipart uploads
298+
func (m *ioContext) adjustFileSizeRange() {
298299
m.t.Helper()
299300

300301
if m.chunksConf == nil || !m.chunksConf.multipart {
301302
return
302303
}
303304

304-
minTotalSize := minSize * uint64(m.chunksConf.numChunks)
305+
// Check if backend is AWS or GCP (both have 5MiB minimum part size)
306+
provider := m.bck.Provider
307+
if m.bck.Backend() != nil {
308+
provider = m.bck.Backend().Provider
309+
}
310+
if provider != apc.AWS && provider != apc.GCP {
311+
return
312+
}
313+
314+
minTotalSize := cloudMinPartSize * uint64(m.chunksConf.numChunks)
305315

306316
if m.fileSizeRange[0] >= minTotalSize && m.fileSizeRange[1] >= minTotalSize {
307317
return
308318
}
309319

310320
m.fileSizeRange = [2]uint64{minTotalSize, minTotalSize * 2}
311-
tlog.Logfln("AWS bucket detected, increase file size range to %s - %s to avoid aws[EntityTooSmall] errors", cos.ToSizeIEC(int64(m.fileSizeRange[0]), 0), cos.ToSizeIEC(int64(m.fileSizeRange[1]), 0))
321+
tlog.Logfln("%s backend detected, increase file size range to %s - %s to avoid [EntityTooSmall] errors",
322+
provider, cos.ToSizeIEC(int64(m.fileSizeRange[0]), 0), cos.ToSizeIEC(int64(m.fileSizeRange[1]), 0))
312323
}
313324

314325
// update updates the object with a new random reader and returns the reader and the size; reader is used to validate the object after the update
315326
func (m *ioContext) update(objName, cksumType string) (readers.Reader, uint64) {
316-
if m.bck.Provider == apc.AWS || (m.bck.Backend() != nil && m.bck.Backend().Provider == apc.AWS) {
317-
m.adjustFileSizeRange(awsMinPartSize)
318-
}
327+
m.adjustFileSizeRange()
319328
var (
320329
size = tools.GetRandSize(m.fileSizeRange, m.fileSize, m.fixedSize)
321330
errCh = make(chan error, 1)
@@ -366,10 +375,7 @@ func (m *ioContext) remotePuts(evict bool, overrides ...bool) {
366375
m.objNames = m.objNames[:0]
367376
}
368377

369-
if m.bck.Provider == apc.AWS || (m.bck.Backend() != nil && m.bck.Backend().Provider == apc.AWS) {
370-
// increase the object size to avoid aws[EntityTooSmall] errors, since each part in a multipart upload to AWS must be at least 5MB.
371-
m.adjustFileSizeRange(awsMinPartSize)
372-
}
378+
m.adjustFileSizeRange()
373379

374380
m._remoteFill(m.num, evict, override)
375381
}

ais/tgtmpt.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,8 +465,8 @@ func (ups *ups) complete(r *http.Request, lom *core.LOM, uploadID string, body [
465465
// call remote
466466
remote := lom.Bck().IsRemote()
467467
if remote {
468-
// NOTE: only OCI and AWS backends require ETag in the part list
469-
if lom.Bck().IsRemoteS3() || lom.Bck().IsRemoteOCI() {
468+
// NOTE: only OCI, AWS and GCP backends require ETag in the part list
469+
if lom.Bck().IsRemoteS3() || lom.Bck().IsRemoteOCI() || lom.Bck().IsRemoteGCP() {
470470
for i := range parts {
471471
if parts[i].ETag == "" {
472472
parts[i].ETag = manifest.GetChunk(parts[i].PartNumber, true).ETag

api/apc/query.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ const (
152152
// Request to restore an object
153153
QparamECObject = "object"
154154

155+
QparamMptUploads = "uploads" // Start multipart upload
155156
QparamMptUploadID = "uploadId" // Complete, abort, or list parts of specific multipart upload
156157
QparamMptPartNo = "partNumber" // Part number for multipart upload
157158
)

core/meta/bck.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,14 @@ func (b *Bck) IsRemoteOCI() bool {
8181
return backend != nil && backend.Provider == apc.OCI
8282
}
8383

84+
func (b *Bck) IsRemoteGCP() bool {
85+
if b.Provider == apc.GCP {
86+
return true
87+
}
88+
backend := b.Backend()
89+
return backend != nil && backend.Provider == apc.GCP
90+
}
91+
8492
// TODO: mem-pool
8593
func (b *Bck) NewQuery() (q url.Values) {
8694
q = make(url.Values, 4)

0 commit comments

Comments
 (0)