Skip to content

Commit

Permalink
stores: add object and multipart metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Dec 20, 2023
1 parent 505867b commit 908e727
Show file tree
Hide file tree
Showing 21 changed files with 756 additions and 381 deletions.
2 changes: 2 additions & 0 deletions api/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type (
CreateMultipartOptions struct {
Key object.EncryptionKey
MimeType string
Metadata ObjectUserMetadata
}
)

Expand Down Expand Up @@ -84,6 +85,7 @@ type (
Path string `json:"path"`
Key object.EncryptionKey `json:"key"`
MimeType string `json:"mimeType"`
Metadata ObjectUserMetadata `json:"metadata"`
}

MultipartCreateResponse struct {
Expand Down
102 changes: 79 additions & 23 deletions api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"errors"
"fmt"
"io"
"mime"
"net/http"
"net/url"
Expand All @@ -13,6 +14,8 @@ import (
)

const (
ObjectMetaPrefix = "X-Amz-Meta-"

ObjectsRenameModeSingle = "single"
ObjectsRenameModeMulti = "multi"

Expand Down Expand Up @@ -44,6 +47,7 @@ var (
type (
// Object wraps an object.Object with its metadata.
Object struct {
Metadata ObjectUserMetadata `json:"metadata,omitempty"`
ObjectMetadata
object.Object
}
Expand All @@ -52,20 +56,15 @@ type (
ObjectMetadata struct {
ETag string `json:"eTag,omitempty"`
Health float64 `json:"health"`
MimeType string `json:"mimeType,omitempty"`
ModTime TimeRFC3339 `json:"modTime"`
Name string `json:"name"`
Size int64 `json:"size"`
MimeType string `json:"mimeType,omitempty"`
}

// ObjectAddRequest is the request type for the /bus/object/*key endpoint.
ObjectAddRequest struct {
Bucket string `json:"bucket"`
ContractSet string `json:"contractSet"`
Object object.Object `json:"object"`
MimeType string `json:"mimeType"`
ETag string `json:"eTag"`
}
// ObjectUserMetadata contains user-defined metadata about an object,
// usually provided through `X-Amz-Meta-` meta headers.
ObjectUserMetadata map[string]string

// ObjectsResponse is the response type for the /bus/objects endpoint.
ObjectsResponse struct {
Expand All @@ -74,15 +73,14 @@ type (
Object *Object `json:"object,omitempty"`
}

// ObjectsCopyRequest is the request type for the /bus/objects/copy endpoint.
ObjectsCopyRequest struct {
SourceBucket string `json:"sourceBucket"`
SourcePath string `json:"sourcePath"`

DestinationBucket string `json:"destinationBucket"`
DestinationPath string `json:"destinationPath"`

MimeType string `json:"mimeType"`
// GetObjectResponse is the response type for the /worker/object endpoint.
GetObjectResponse struct {
Content io.ReadCloser `json:"content"`
ContentType string `json:"contentType"`
LastModified string `json:"lastModified"`
Range *DownloadRange `json:"range,omitempty"`
Size int64 `json:"size"`
Metadata ObjectUserMetadata `json:"metadata"`
}

// ObjectsDeleteRequest is the request type for the /bus/objects/list endpoint.
Expand Down Expand Up @@ -123,6 +121,16 @@ type (
}
)

func ObjectUserMetadataFrom(metadata map[string]string) ObjectUserMetadata {
oum := make(map[string]string)
for k, v := range metadata {
if strings.HasPrefix(strings.ToLower(k), strings.ToLower(ObjectMetaPrefix)) {
oum[k[len(ObjectMetaPrefix):]] = v
}
}
return oum
}

// LastModified returns the object's ModTime formatted for use in the
// 'Last-Modified' header
func (o ObjectMetadata) LastModified() string {
Expand All @@ -144,14 +152,49 @@ func (o ObjectMetadata) ContentType() string {
return ""
}

func (o ObjectMetadata) Equals(other ObjectMetadata) bool {
return o.ETag == other.ETag &&
o.Health == other.Health &&
o.MimeType == other.MimeType &&
o.ModTime == other.ModTime &&
o.Name == other.Name &&
o.Size == other.Size
}

type (
// AddObjectOptions is the options type for the bus client.
AddObjectOptions struct {
MimeType string
ETag string
MimeType string
Metadata ObjectUserMetadata
}

// AddObjectRequest is the request type for the /bus/object/*key endpoint.
AddObjectRequest struct {
Bucket string `json:"bucket"`
ContractSet string `json:"contractSet"`
Object object.Object `json:"object"`
ETag string `json:"eTag"`
MimeType string `json:"mimeType"`
Metadata ObjectUserMetadata `json:"metadata"`
}

// CopyObjectOptions is the options type for the bus client.
CopyObjectOptions struct {
MimeType string
Metadata ObjectUserMetadata
}

// CopyObjectsRequest is the request type for the /bus/objects/copy endpoint.
CopyObjectsRequest struct {
SourceBucket string `json:"sourceBucket"`
SourcePath string `json:"sourcePath"`

DestinationBucket string `json:"destinationBucket"`
DestinationPath string `json:"destinationPath"`

MimeType string `json:"mimeType"`
Metadata ObjectUserMetadata `json:"metadata"`
}

DeleteObjectOptions struct {
Expand Down Expand Up @@ -191,24 +234,29 @@ type (
Limit int
}

// UploadObjectOptions is the options type for the worker client.
UploadObjectOptions struct {
Offset int
MinShards int
TotalShards int
ContractSet string
MimeType string
DisablePreshardingEncryption bool
ContentLength int64

// Metadata contains all object metadata and will contain things like
// the Content-Type as well as all user-defined metadata.
Metadata map[string]string
}

// TODO PJ: add meta (?)
UploadMultipartUploadPartOptions struct {
DisablePreshardingEncryption bool
EncryptionOffset int
ContentLength int64
}
)

func (opts UploadObjectOptions) Apply(values url.Values) {
func (opts UploadObjectOptions) ApplyValues(values url.Values) {
if opts.Offset != 0 {
values.Set("offset", fmt.Sprint(opts.Offset))
}
Expand All @@ -221,14 +269,22 @@ func (opts UploadObjectOptions) Apply(values url.Values) {
if opts.ContractSet != "" {
values.Set("contractset", opts.ContractSet)
}
if opts.MimeType != "" {
values.Set("mimetype", opts.MimeType)
if ct, ok := opts.Metadata["Content-Type"]; ok {
values.Set("mimetype", ct)
}
if opts.DisablePreshardingEncryption {
values.Set("disablepreshardingencryption", "true")
}
}

func (opts UploadObjectOptions) ApplyHeaders(h http.Header) {
for k, v := range opts.Metadata {
if strings.HasPrefix(strings.ToLower(k), strings.ToLower(ObjectMetaPrefix)) {
h.Set(k, v)
}
}
}

func (opts UploadMultipartUploadPartOptions) Apply(values url.Values) {
if opts.DisablePreshardingEncryption {
values.Set("disablepreshardingencryption", "true")
Expand Down
2 changes: 1 addition & 1 deletion api/param.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (t *TimeRFC3339) UnmarshalText(b []byte) error {

// MarshalJSON implements json.Marshaler.
func (t TimeRFC3339) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, (time.Time)(t).UTC().Format(time.RFC3339))), nil
return []byte(fmt.Sprintf(`"%s"`, (time.Time)(t).UTC().Format(time.RFC3339Nano))), nil
}

// String implements fmt.Stringer.
Expand Down
9 changes: 0 additions & 9 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package api
import (
"errors"
"fmt"
"io"
"strconv"
"strings"

Expand Down Expand Up @@ -217,14 +216,6 @@ type (
UploadMultipartUploadPartResponse struct {
ETag string `json:"etag"`
}

GetObjectResponse struct {
Content io.ReadCloser `json:"content"`
ContentType string `json:"contentType"`
ModTime TimeRFC3339 `json:"modTime"`
Range *DownloadRange `json:"range,omitempty"`
Size int64 `json:"size"`
}
)

type DownloadRange struct {
Expand Down
16 changes: 8 additions & 8 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type (
ListBuckets(_ context.Context) ([]api.Bucket, error)
UpdateBucketPolicy(ctx context.Context, bucketName string, policy api.BucketPolicy) error

CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath, mimeType string) (api.ObjectMetadata, error)
CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error)
ListObjects(ctx context.Context, bucketName, prefix, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error)
Object(ctx context.Context, bucketName, path string) (api.Object, error)
ObjectEntries(ctx context.Context, bucketName, path, prefix, sortBy, sortDir, marker string, offset, limit int) ([]api.ObjectMetadata, bool, error)
Expand All @@ -146,12 +146,12 @@ type (
RenameObject(ctx context.Context, bucketName, from, to string, force bool) error
RenameObjects(ctx context.Context, bucketName, from, to string, force bool) error
SearchObjects(ctx context.Context, bucketName, substring string, offset, limit int) ([]api.ObjectMetadata, error)
UpdateObject(ctx context.Context, bucketName, path, contractSet, ETag, mimeType string, o object.Object) error
UpdateObject(ctx context.Context, bucketName, path, contractSet, ETag, mimeType string, metadata api.ObjectUserMetadata, o object.Object) error

AbortMultipartUpload(ctx context.Context, bucketName, path string, uploadID string) (err error)
AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error)
CompleteMultipartUpload(ctx context.Context, bucketName, path, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error)
CreateMultipartUpload(ctx context.Context, bucketName, path string, ec object.EncryptionKey, mimeType string) (api.MultipartCreateResponse, error)
CreateMultipartUpload(ctx context.Context, bucketName, path string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (api.MultipartCreateResponse, error)
MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, _ error)
MultipartUploads(ctx context.Context, bucketName, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error)
MultipartUploadParts(ctx context.Context, bucketName, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error)
Expand Down Expand Up @@ -1251,22 +1251,22 @@ func (b *bus) objectEntriesHandlerGET(jc jape.Context, path string) {
}

func (b *bus) objectsHandlerPUT(jc jape.Context) {
var aor api.ObjectAddRequest
var aor api.AddObjectRequest
if jc.Decode(&aor) != nil {
return
} else if aor.Bucket == "" {
aor.Bucket = api.DefaultBucketName
}
jc.Check("couldn't store object", b.ms.UpdateObject(jc.Request.Context(), aor.Bucket, jc.PathParam("path"), aor.ContractSet, aor.ETag, aor.MimeType, aor.Object))
jc.Check("couldn't store object", b.ms.UpdateObject(jc.Request.Context(), aor.Bucket, jc.PathParam("path"), aor.ContractSet, aor.ETag, aor.MimeType, aor.Metadata, aor.Object))
}

func (b *bus) objectsCopyHandlerPOST(jc jape.Context) {
var orr api.ObjectsCopyRequest
var orr api.CopyObjectsRequest
if jc.Decode(&orr) != nil {
return
}

om, err := b.ms.CopyObject(jc.Request.Context(), orr.SourceBucket, orr.DestinationBucket, orr.SourcePath, orr.DestinationPath, orr.MimeType)
om, err := b.ms.CopyObject(jc.Request.Context(), orr.SourceBucket, orr.DestinationBucket, orr.SourcePath, orr.DestinationPath, orr.MimeType, orr.Metadata)
if jc.Check("couldn't copy object", err) != nil {
return
}
Expand Down Expand Up @@ -2179,7 +2179,7 @@ func (b *bus) multipartHandlerCreatePOST(jc jape.Context) {
key = object.NoOpKey
}

resp, err := b.ms.CreateMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, key, req.MimeType)
resp, err := b.ms.CreateMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, key, req.MimeType, req.Metadata)
if jc.Check("failed to create multipart upload", err) != nil {
return
}
Expand Down
1 change: 1 addition & 0 deletions bus/client/multipart-upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (c *Client) CreateMultipartUpload(ctx context.Context, bucket, path string,
Path: path,
Key: opts.Key,
MimeType: opts.MimeType,
Metadata: opts.Metadata,
}, &resp)
return
}
Expand Down
11 changes: 6 additions & 5 deletions bus/client/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,27 @@ import (
// AddObject stores the provided object under the given path.
func (c *Client) AddObject(ctx context.Context, bucket, path, contractSet string, o object.Object, opts api.AddObjectOptions) (err error) {
path = api.ObjectPathEscape(path)
err = c.c.WithContext(ctx).PUT(fmt.Sprintf("/objects/%s", path), api.ObjectAddRequest{
err = c.c.WithContext(ctx).PUT(fmt.Sprintf("/objects/%s", path), api.AddObjectRequest{
Bucket: bucket,
ContractSet: contractSet,
Object: o,
MimeType: opts.MimeType,
ETag: opts.ETag,
MimeType: opts.MimeType,
Metadata: opts.Metadata,
})
return
}

// CopyObject copies the object from the source bucket and path to the
// destination bucket and path.
func (c *Client) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath string, opts api.CopyObjectOptions) (om api.ObjectMetadata, err error) {
err = c.c.WithContext(ctx).POST("/objects/copy", api.ObjectsCopyRequest{
err = c.c.WithContext(ctx).POST("/objects/copy", api.CopyObjectsRequest{
SourceBucket: srcBucket,
DestinationBucket: dstBucket,
SourcePath: srcPath,
DestinationPath: dstPath,

MimeType: opts.MimeType,
MimeType: opts.MimeType,
Metadata: opts.Metadata,
}, &om)
return
}
Expand Down
4 changes: 2 additions & 2 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func TestObjectEntries(t *testing.T) {
}
assertMetadata(res.Entries)

if len(res.Entries) != 1 || res.Entries[0] != test.want[offset] {
if len(res.Entries) != 1 || !res.Entries[0].Equals(test.want[offset]) {
t.Errorf("\nlist: %v\nprefix: %v\ngot: %v\nwant: %v", test.path, test.prefix, res.Entries, test.want[offset])
}
moreRemaining := len(test.want)-offset-1 > 0
Expand All @@ -361,7 +361,7 @@ func TestObjectEntries(t *testing.T) {
}
assertMetadata(res.Entries)

if len(res.Entries) != 1 || res.Entries[0] != test.want[offset+1] {
if len(res.Entries) != 1 || !res.Entries[0].Equals(test.want[offset+1]) {
t.Errorf("\nlist: %v\nprefix: %v\nmarker: %v\ngot: %v\nwant: %v", test.path, test.prefix, test.want[offset].Name, res.Entries, test.want[offset+1])
}

Expand Down
Loading

0 comments on commit 908e727

Please sign in to comment.