Skip to content

Commit 04d7358

Browse files
committed
backend: add Azure support for multipart uploads
Signed-off-by: Tony Chen <a122774007@gmail.com>
1 parent 1963e91 commit 04d7358

File tree

5 files changed

+170
-0
lines changed

5 files changed

+170
-0
lines changed

.gitlab-ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,7 @@ test:short:azure:
536536
<<: *test_short_skip_scheduled_def
537537
variables:
538538
BUCKET: "az://ais-ci"
539+
timeout: 1h
539540
rules:
540541
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && $CI_MERGE_REQUEST_LABELS =~ /.*skip-ci.*/'
541542
when: manual

ais/backend/azurempt.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
//go:build azure
2+
3+
// Package backend contains core/backend interface implementations for supported backend providers.
4+
/*
5+
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
6+
*/
7+
package backend
8+
9+
import (
10+
"context"
11+
"encoding/base64"
12+
"fmt"
13+
"io"
14+
"net/http"
15+
16+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
17+
18+
"github.com/NVIDIA/aistore/api/apc"
19+
"github.com/NVIDIA/aistore/cmn"
20+
"github.com/NVIDIA/aistore/cmn/cos"
21+
"github.com/NVIDIA/aistore/cmn/debug"
22+
"github.com/NVIDIA/aistore/cmn/nlog"
23+
"github.com/NVIDIA/aistore/core"
24+
)
25+
26+
// azureBlockIDFmt defines the format used to generate a unique block ID for each part of a multipart upload to Azure Blob Storage.
27+
// It combines the uploadID (string) and the part number (zero-padded to 6 digits), e.g., "upload-000001".
28+
const azureBlockIDFmt = "%s-%06d"
29+
30+
// Azure uses Block Blobs for multipart uploads:
31+
// - StartMpt: No explicit "start" - just begin staging blocks
32+
// - PutMptPart: StageBlock with a block ID
33+
// - CompleteMpt: CommitBlockList with all block IDs
34+
// - AbortMpt: No explicit abort - uncommitted blocks are garbage collected after 7 days
35+
36+
// azureBlockID generates a base64-encoded block ID from uploadID and part number
37+
// Azure requires block IDs to be base64 encoded strings
38+
func azureBlockID(uploadID string, partNum int) string {
39+
return base64.StdEncoding.EncodeToString(fmt.Appendf(nil, azureBlockIDFmt, uploadID, partNum))
40+
}
41+
42+
func (*azbp) StartMpt(lom *core.LOM, _ *http.Request) (id string, ecode int, err error) {
43+
// Azure doesn't have an explicit "start multipart upload" operation.
44+
// Multipart upload is initiated by staging blocks, and we use a generated
45+
// upload ID to track the block IDs for this upload session.
46+
uploadID := cos.GenUUID() // Generate a unique ID for this multipart upload session
47+
48+
if cmn.Rom.V(5, cos.ModBackend) {
49+
cloudBck := lom.Bck().RemoteBck()
50+
nlog.Infof("[start_mpt] %s, upload_id: %s", cloudBck.Cname(lom.ObjName), uploadID)
51+
}
52+
53+
return uploadID, 0, nil
54+
}
55+
56+
func (azbp *azbp) PutMptPart(lom *core.LOM, r io.ReadCloser, _ *http.Request, uploadID string, _ int64, partNum int32) (string, int, error) {
57+
var (
58+
cloudBck = lom.Bck().RemoteBck()
59+
blURL = azbp.u + "/" + cloudBck.Name + "/" + lom.ObjName
60+
)
61+
62+
client, err := blockblob.NewClientWithSharedKeyCredential(blURL, azbp.creds, nil)
63+
if err != nil {
64+
cos.Close(r)
65+
ecode, err := azureErrorToAISError(err, cloudBck, lom.ObjName)
66+
return "", ecode, err
67+
}
68+
69+
blockID := azureBlockID(uploadID, int(partNum))
70+
71+
// StageBlock requires io.ReadSeekCloser
72+
rsc, ok := r.(io.ReadSeekCloser)
73+
debug.Assertf(ok, "Azure backend requires io.ReadSeekCloser, but got %T", r)
74+
75+
_, err = client.StageBlock(context.Background(), blockID, rsc, nil)
76+
77+
if err != nil {
78+
ecode, err := azureErrorToAISError(err, cloudBck, lom.ObjName)
79+
return "", ecode, err
80+
}
81+
82+
if cmn.Rom.V(5, cos.ModBackend) {
83+
nlog.Infof("[put_mpt_part] %s, part: %d, block_id: %s", cloudBck.Cname(lom.ObjName), partNum, blockID)
84+
}
85+
86+
// Return empty string - block ID can be reconstructed from part number in CompleteMpt
87+
return "", 0, nil
88+
}
89+
90+
func (azbp *azbp) CompleteMpt(lom *core.LOM, _ *http.Request, uploadID string, _ []byte, parts apc.MptCompletedParts) (version, etag string, _ int, _ error) {
91+
var (
92+
cloudBck = lom.Bck().RemoteBck()
93+
blURL = azbp.u + "/" + cloudBck.Name + "/" + lom.ObjName
94+
)
95+
96+
client, err := blockblob.NewClientWithSharedKeyCredential(blURL, azbp.creds, nil)
97+
if err != nil {
98+
ecode, err := azureErrorToAISError(err, cloudBck, lom.ObjName)
99+
return "", "", ecode, err
100+
}
101+
102+
// Build the list of block IDs by reconstructing them from part numbers
103+
blockIDs := make([]string, len(parts))
104+
for i, part := range parts {
105+
debug.Assertf(part.PartNumber == i+1, "parts must be sorted without gaps: expected part %d, got %d", i+1, part.PartNumber)
106+
blockIDs[i] = azureBlockID(uploadID, part.PartNumber)
107+
}
108+
109+
// Commit the block list to create the final blob
110+
resp, err := client.CommitBlockList(context.Background(), blockIDs, nil)
111+
if err != nil {
112+
ecode, err := azureErrorToAISError(err, cloudBck, lom.ObjName)
113+
return "", "", ecode, err
114+
}
115+
116+
// Get version and ETag from response
117+
if resp.ETag != nil {
118+
if e, ok := cmn.BackendHelpers.Azure.EncodeETag(string(*resp.ETag)); ok {
119+
etag = e
120+
version = e // Azure uses ETag as version
121+
}
122+
}
123+
124+
if cmn.Rom.V(5, cos.ModBackend) {
125+
nlog.Infof("[complete_mpt] %s, version: %s, etag: %s, blocks: %d",
126+
cloudBck.Cname(lom.ObjName), version, etag, len(blockIDs))
127+
}
128+
129+
return version, etag, 0, nil
130+
}
131+
132+
func (*azbp) AbortMpt(lom *core.LOM, _ *http.Request, uploadID string) (ecode int, err error) {
133+
// Azure doesn't have an explicit "abort multipart upload" operation.
134+
// Uncommitted blocks are automatically garbage collected after 7 days.
135+
// We could optionally try to list and delete uncommitted blocks here,
136+
// but it's not strictly necessary and adds complexity.
137+
//
138+
// For now, just return success and let Azure's GC handle cleanup.
139+
140+
if cmn.Rom.V(5, cos.ModBackend) {
141+
cloudBck := lom.Bck().RemoteBck()
142+
nlog.Infof("[abort_mpt] %s, upload_id: %s (uncommitted blocks will be GC'd by Azure)",
143+
cloudBck.Cname(lom.ObjName), uploadID)
144+
}
145+
146+
return 0, nil
147+
}

ais/tgtmpt.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,19 @@ func (ups *ups) _put(args *partArgs) (etag string, ecode int, err error) {
353353
buf, slab := t.gmm.AllocSize(r.ContentLength)
354354
expectedSize, err = io.CopyBuffer(mw, r.Body, buf)
355355
slab.Free(buf)
356+
case lom.Bck().IsRemoteAzure():
357+
// NOTE: Azure backend requires io.ReadSeekCloser for the `PutMptPart` method
358+
backend = t.Backend(lom.Bck())
359+
sgl := t.gmm.NewSGL(r.ContentLength)
360+
mw.Append(sgl)
361+
reader := memsys.NewReader(sgl)
362+
expectedSize, err = io.Copy(mw, r.Body)
363+
if err == nil {
364+
remoteStart := mono.NanoTime()
365+
etag, ecode, err = backend.PutMptPart(lom, reader, r, uploadID, expectedSize, int32(args.partNum))
366+
remotePutLatency = mono.SinceNano(remoteStart)
367+
}
368+
sgl.Free()
356369
case t.gmm.Pressure() < memsys.PressureHigh:
357370
// write 1) locally + sgl + checksums; 2) write sgl => backend
358371
backend = t.Backend(lom.Bck())

core/meta/bck.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ func (b *Bck) IsRemoteGCP() bool {
8989
return backend != nil && backend.Provider == apc.GCP
9090
}
9191

92+
func (b *Bck) IsRemoteAzure() bool {
93+
if b.Provider == apc.Azure {
94+
return true
95+
}
96+
backend := b.Backend()
97+
return backend != nil && backend.Provider == apc.Azure
98+
}
99+
92100
// TODO: mem-pool
93101
func (b *Bck) NewQuery() (q url.Values) {
94102
q = make(url.Values, 4)

memsys/iosgl.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ var (
3030

3131
_ cos.ReadOpenCloser = (*SGL)(nil)
3232
_ cos.ReadOpenCloser = (*Reader)(nil)
33+
_ io.ReadSeekCloser = (*Reader)(nil)
3334
)
3435

3536
type (

0 commit comments

Comments
 (0)