diff --git a/api/multipart.go b/api/multipart.go index 2a60e1f75..955b78849 100644 --- a/api/multipart.go +++ b/api/multipart.go @@ -48,6 +48,7 @@ type ( CreateMultipartOptions struct { Key object.EncryptionKey MimeType string + Metadata ObjectUserMetadata } ) @@ -84,6 +85,7 @@ type ( Path string `json:"path"` Key object.EncryptionKey `json:"key"` MimeType string `json:"mimeType"` + Metadata ObjectUserMetadata `json:"metadata"` } MultipartCreateResponse struct { diff --git a/api/object.go b/api/object.go index 4548eae8a..4d68901a5 100644 --- a/api/object.go +++ b/api/object.go @@ -3,6 +3,7 @@ package api import ( "errors" "fmt" + "io" "mime" "net/http" "net/url" @@ -13,6 +14,8 @@ import ( ) const ( + ObjectMetaPrefix = "X-Amz-Meta-" + ObjectsRenameModeSingle = "single" ObjectsRenameModeMulti = "multi" @@ -44,6 +47,7 @@ var ( type ( // Object wraps an object.Object with its metadata. Object struct { + Metadata ObjectUserMetadata `json:"metadata,omitempty"` ObjectMetadata object.Object } @@ -52,20 +56,15 @@ type ( ObjectMetadata struct { ETag string `json:"eTag,omitempty"` Health float64 `json:"health"` - MimeType string `json:"mimeType,omitempty"` ModTime TimeRFC3339 `json:"modTime"` Name string `json:"name"` Size int64 `json:"size"` + MimeType string `json:"mimeType,omitempty"` } - // ObjectAddRequest is the request type for the /bus/object/*key endpoint. - ObjectAddRequest struct { - Bucket string `json:"bucket"` - ContractSet string `json:"contractSet"` - Object object.Object `json:"object"` - MimeType string `json:"mimeType"` - ETag string `json:"eTag"` - } + // ObjectUserMetadata contains user-defined metadata about an object, + // usually provided through `X-Amz-Meta-` meta headers. + ObjectUserMetadata map[string]string // ObjectsResponse is the response type for the /bus/objects endpoint. ObjectsResponse struct { @@ -74,15 +73,14 @@ type ( Object *Object `json:"object,omitempty"` } - // ObjectsCopyRequest is the request type for the /bus/objects/copy endpoint. - ObjectsCopyRequest struct { - SourceBucket string `json:"sourceBucket"` - SourcePath string `json:"sourcePath"` - - DestinationBucket string `json:"destinationBucket"` - DestinationPath string `json:"destinationPath"` - - MimeType string `json:"mimeType"` + // GetObjectResponse is the response type for the /worker/object endpoint. + GetObjectResponse struct { + Content io.ReadCloser `json:"content"` + ContentType string `json:"contentType"` + LastModified string `json:"lastModified"` + Range *DownloadRange `json:"range,omitempty"` + Size int64 `json:"size"` + Metadata ObjectUserMetadata `json:"metadata"` } // ObjectsDeleteRequest is the request type for the /bus/objects/list endpoint. @@ -123,6 +121,16 @@ type ( } ) +func ObjectUserMetadataFrom(metadata map[string]string) ObjectUserMetadata { + oum := make(map[string]string) + for k, v := range metadata { + if strings.HasPrefix(strings.ToLower(k), strings.ToLower(ObjectMetaPrefix)) { + oum[k[len(ObjectMetaPrefix):]] = v + } + } + return oum +} + // LastModified returns the object's ModTime formatted for use in the // 'Last-Modified' header func (o ObjectMetadata) LastModified() string { @@ -144,14 +152,49 @@ func (o ObjectMetadata) ContentType() string { return "" } +func (o ObjectMetadata) Equals(other ObjectMetadata) bool { + return o.ETag == other.ETag && + o.Health == other.Health && + o.MimeType == other.MimeType && + o.ModTime == other.ModTime && + o.Name == other.Name && + o.Size == other.Size +} + type ( + // AddObjectOptions is the options type for the bus client. AddObjectOptions struct { - MimeType string ETag string + MimeType string + Metadata ObjectUserMetadata + } + + // AddObjectRequest is the request type for the /bus/object/*key endpoint. + AddObjectRequest struct { + Bucket string `json:"bucket"` + ContractSet string `json:"contractSet"` + Object object.Object `json:"object"` + ETag string `json:"eTag"` + MimeType string `json:"mimeType"` + Metadata ObjectUserMetadata `json:"metadata"` } + // CopyObjectOptions is the options type for the bus client. CopyObjectOptions struct { MimeType string + Metadata ObjectUserMetadata + } + + // CopyObjectsRequest is the request type for the /bus/objects/copy endpoint. + CopyObjectsRequest struct { + SourceBucket string `json:"sourceBucket"` + SourcePath string `json:"sourcePath"` + + DestinationBucket string `json:"destinationBucket"` + DestinationPath string `json:"destinationPath"` + + MimeType string `json:"mimeType"` + Metadata ObjectUserMetadata `json:"metadata"` } DeleteObjectOptions struct { @@ -191,16 +234,21 @@ type ( Limit int } + // UploadObjectOptions is the options type for the worker client. UploadObjectOptions struct { Offset int MinShards int TotalShards int ContractSet string - MimeType string DisablePreshardingEncryption bool ContentLength int64 + + // Metadata contains all object metadata and will contain things like + // the Content-Type as well as all user-defined metadata. + Metadata map[string]string } + // TODO PJ: add meta (?) UploadMultipartUploadPartOptions struct { DisablePreshardingEncryption bool EncryptionOffset int @@ -208,7 +256,7 @@ type ( } ) -func (opts UploadObjectOptions) Apply(values url.Values) { +func (opts UploadObjectOptions) ApplyValues(values url.Values) { if opts.Offset != 0 { values.Set("offset", fmt.Sprint(opts.Offset)) } @@ -221,14 +269,22 @@ func (opts UploadObjectOptions) Apply(values url.Values) { if opts.ContractSet != "" { values.Set("contractset", opts.ContractSet) } - if opts.MimeType != "" { - values.Set("mimetype", opts.MimeType) + if ct, ok := opts.Metadata["Content-Type"]; ok { + values.Set("mimetype", ct) } if opts.DisablePreshardingEncryption { values.Set("disablepreshardingencryption", "true") } } +func (opts UploadObjectOptions) ApplyHeaders(h http.Header) { + for k, v := range opts.Metadata { + if strings.HasPrefix(strings.ToLower(k), strings.ToLower(ObjectMetaPrefix)) { + h.Set(k, v) + } + } +} + func (opts UploadMultipartUploadPartOptions) Apply(values url.Values) { if opts.DisablePreshardingEncryption { values.Set("disablepreshardingencryption", "true") diff --git a/api/param.go b/api/param.go index 87048df6d..7e9ef6e75 100644 --- a/api/param.go +++ b/api/param.go @@ -105,7 +105,7 @@ func (t *TimeRFC3339) UnmarshalText(b []byte) error { // MarshalJSON implements json.Marshaler. func (t TimeRFC3339) MarshalJSON() ([]byte, error) { - return []byte(fmt.Sprintf(`"%s"`, (time.Time)(t).UTC().Format(time.RFC3339))), nil + return []byte(fmt.Sprintf(`"%s"`, (time.Time)(t).UTC().Format(time.RFC3339Nano))), nil } // String implements fmt.Stringer. diff --git a/api/worker.go b/api/worker.go index 061164592..d280698b7 100644 --- a/api/worker.go +++ b/api/worker.go @@ -3,7 +3,6 @@ package api import ( "errors" "fmt" - "io" "strconv" "strings" @@ -217,14 +216,6 @@ type ( UploadMultipartUploadPartResponse struct { ETag string `json:"etag"` } - - GetObjectResponse struct { - Content io.ReadCloser `json:"content"` - ContentType string `json:"contentType"` - ModTime TimeRFC3339 `json:"modTime"` - Range *DownloadRange `json:"range,omitempty"` - Size int64 `json:"size"` - } ) type DownloadRange struct { diff --git a/bus/bus.go b/bus/bus.go index eb5467398..aa7afe29a 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -135,7 +135,7 @@ type ( ListBuckets(_ context.Context) ([]api.Bucket, error) UpdateBucketPolicy(ctx context.Context, bucketName string, policy api.BucketPolicy) error - CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath, mimeType string) (api.ObjectMetadata, error) + CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) ListObjects(ctx context.Context, bucketName, prefix, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) Object(ctx context.Context, bucketName, path string) (api.Object, error) ObjectEntries(ctx context.Context, bucketName, path, prefix, sortBy, sortDir, marker string, offset, limit int) ([]api.ObjectMetadata, bool, error) @@ -146,12 +146,12 @@ type ( RenameObject(ctx context.Context, bucketName, from, to string, force bool) error RenameObjects(ctx context.Context, bucketName, from, to string, force bool) error SearchObjects(ctx context.Context, bucketName, substring string, offset, limit int) ([]api.ObjectMetadata, error) - UpdateObject(ctx context.Context, bucketName, path, contractSet, ETag, mimeType string, o object.Object) error + UpdateObject(ctx context.Context, bucketName, path, contractSet, ETag, mimeType string, metadata api.ObjectUserMetadata, o object.Object) error AbortMultipartUpload(ctx context.Context, bucketName, path string, uploadID string) (err error) AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) CompleteMultipartUpload(ctx context.Context, bucketName, path, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error) - CreateMultipartUpload(ctx context.Context, bucketName, path string, ec object.EncryptionKey, mimeType string) (api.MultipartCreateResponse, error) + CreateMultipartUpload(ctx context.Context, bucketName, path string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (api.MultipartCreateResponse, error) MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, _ error) MultipartUploads(ctx context.Context, bucketName, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error) MultipartUploadParts(ctx context.Context, bucketName, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error) @@ -1251,22 +1251,22 @@ func (b *bus) objectEntriesHandlerGET(jc jape.Context, path string) { } func (b *bus) objectsHandlerPUT(jc jape.Context) { - var aor api.ObjectAddRequest + var aor api.AddObjectRequest if jc.Decode(&aor) != nil { return } else if aor.Bucket == "" { aor.Bucket = api.DefaultBucketName } - jc.Check("couldn't store object", b.ms.UpdateObject(jc.Request.Context(), aor.Bucket, jc.PathParam("path"), aor.ContractSet, aor.ETag, aor.MimeType, aor.Object)) + jc.Check("couldn't store object", b.ms.UpdateObject(jc.Request.Context(), aor.Bucket, jc.PathParam("path"), aor.ContractSet, aor.ETag, aor.MimeType, aor.Metadata, aor.Object)) } func (b *bus) objectsCopyHandlerPOST(jc jape.Context) { - var orr api.ObjectsCopyRequest + var orr api.CopyObjectsRequest if jc.Decode(&orr) != nil { return } - om, err := b.ms.CopyObject(jc.Request.Context(), orr.SourceBucket, orr.DestinationBucket, orr.SourcePath, orr.DestinationPath, orr.MimeType) + om, err := b.ms.CopyObject(jc.Request.Context(), orr.SourceBucket, orr.DestinationBucket, orr.SourcePath, orr.DestinationPath, orr.MimeType, orr.Metadata) if jc.Check("couldn't copy object", err) != nil { return } @@ -2179,7 +2179,7 @@ func (b *bus) multipartHandlerCreatePOST(jc jape.Context) { key = object.NoOpKey } - resp, err := b.ms.CreateMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, key, req.MimeType) + resp, err := b.ms.CreateMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, key, req.MimeType, req.Metadata) if jc.Check("failed to create multipart upload", err) != nil { return } diff --git a/bus/client/multipart-upload.go b/bus/client/multipart-upload.go index e12c1c43d..ffa4d8dc8 100644 --- a/bus/client/multipart-upload.go +++ b/bus/client/multipart-upload.go @@ -50,6 +50,7 @@ func (c *Client) CreateMultipartUpload(ctx context.Context, bucket, path string, Path: path, Key: opts.Key, MimeType: opts.MimeType, + Metadata: opts.Metadata, }, &resp) return } diff --git a/bus/client/objects.go b/bus/client/objects.go index 8f9852689..38a7b14cd 100644 --- a/bus/client/objects.go +++ b/bus/client/objects.go @@ -12,12 +12,13 @@ import ( // AddObject stores the provided object under the given path. func (c *Client) AddObject(ctx context.Context, bucket, path, contractSet string, o object.Object, opts api.AddObjectOptions) (err error) { path = api.ObjectPathEscape(path) - err = c.c.WithContext(ctx).PUT(fmt.Sprintf("/objects/%s", path), api.ObjectAddRequest{ + err = c.c.WithContext(ctx).PUT(fmt.Sprintf("/objects/%s", path), api.AddObjectRequest{ Bucket: bucket, ContractSet: contractSet, Object: o, - MimeType: opts.MimeType, ETag: opts.ETag, + MimeType: opts.MimeType, + Metadata: opts.Metadata, }) return } @@ -25,13 +26,13 @@ func (c *Client) AddObject(ctx context.Context, bucket, path, contractSet string // CopyObject copies the object from the source bucket and path to the // destination bucket and path. func (c *Client) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath string, opts api.CopyObjectOptions) (om api.ObjectMetadata, err error) { - err = c.c.WithContext(ctx).POST("/objects/copy", api.ObjectsCopyRequest{ + err = c.c.WithContext(ctx).POST("/objects/copy", api.CopyObjectsRequest{ SourceBucket: srcBucket, DestinationBucket: dstBucket, SourcePath: srcPath, DestinationPath: dstPath, - - MimeType: opts.MimeType, + MimeType: opts.MimeType, + Metadata: opts.Metadata, }, &om) return } diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index ac599ebda..7c8244973 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -342,7 +342,7 @@ func TestObjectEntries(t *testing.T) { } assertMetadata(res.Entries) - if len(res.Entries) != 1 || res.Entries[0] != test.want[offset] { + if len(res.Entries) != 1 || !res.Entries[0].Equals(test.want[offset]) { t.Errorf("\nlist: %v\nprefix: %v\ngot: %v\nwant: %v", test.path, test.prefix, res.Entries, test.want[offset]) } moreRemaining := len(test.want)-offset-1 > 0 @@ -361,7 +361,7 @@ func TestObjectEntries(t *testing.T) { } assertMetadata(res.Entries) - if len(res.Entries) != 1 || res.Entries[0] != test.want[offset+1] { + if len(res.Entries) != 1 || !res.Entries[0].Equals(test.want[offset+1]) { t.Errorf("\nlist: %v\nprefix: %v\nmarker: %v\ngot: %v\nwant: %v", test.path, test.prefix, test.want[offset].Name, res.Entries, test.want[offset+1]) } diff --git a/internal/testing/s3_test.go b/internal/testing/s3_test.go index f5cb2fba8..204941aa4 100644 --- a/internal/testing/s3_test.go +++ b/internal/testing/s3_test.go @@ -15,6 +15,7 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/gofakes3" "go.sia.tech/renterd/api" + "go.uber.org/zap" "lukechampine.com/frand" ) @@ -168,6 +169,55 @@ func TestS3Basic(t *testing.T) { } } +func TestS3ObjectMeta(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + // create cluster + opts := testClusterOptions{ + hosts: testRedundancySettings.TotalShards, + logger: zap.NewNop(), + } + cluster := newTestCluster(t, opts) + defer cluster.Shutdown() + + // convenience variables + s3 := cluster.S3 + tt := cluster.tt + + // add object to the bucket + _, err := s3.PutObject(context.Background(), api.DefaultBucketName, t.Name(), bytes.NewReader([]byte(t.Name())), int64(len([]byte(t.Name()))), minio.PutObjectOptions{UserMetadata: map[string]string{ + "X-Amz-Meta-Foo": "bar", + "X-Amz-Meta-Baz": "qux", + }}) + tt.OK(err) + + // create helper to assert metadata is present + assertMetadata := func(metadata minio.StringMap) { + t.Helper() + if val1, ok1 := metadata["Foo"]; !ok1 || val1 != "bar" { + t.Fatal("expected metadata", metadata, ok1, val1) + } + if val2, ok2 := metadata["Baz"]; !ok2 || val2 != "qux" { + t.Fatal("expected metadata", metadata) + } + } + + // perform GET request + obj, err := s3.GetObject(context.Background(), api.DefaultBucketName, t.Name(), minio.GetObjectOptions{}) + tt.OK(err) + + // assert metadata is set + get, err := obj.Stat() + tt.OK(err) + assertMetadata(get.UserMetadata) + + // perform HEAD request + head, err := s3.StatObject(context.Background(), api.DefaultBucketName, t.Name(), minio.StatObjectOptions{}) + tt.OK(err) + assertMetadata(head.UserMetadata) +} + func TestS3Authentication(t *testing.T) { if testing.Short() { t.SkipNow() diff --git a/s3/backend.go b/s3/backend.go index 83f59c637..1563df5ac 100644 --- a/s3/backend.go +++ b/s3/backend.go @@ -3,18 +3,14 @@ package s3 import ( "bytes" "context" - "encoding/hex" "fmt" "io" - "mime" - "net/http" "strings" "go.sia.tech/gofakes3" "go.sia.tech/renterd/api" "go.sia.tech/renterd/object" "go.uber.org/zap" - "lukechampine.com/frand" ) // maxKeysDefault is the default maxKeys value used in the AWS SDK @@ -68,9 +64,6 @@ func (s *s3) ListBuckets(ctx context.Context) ([]gofakes3.BucketInfo, error) { // work fine if you ignore the pagination request, but this may not suit // your application. Not all backends bundled with gofakes3 correctly // support this pagination yet, but that will change. -// -// TODO: This implementation is not ideal because it fetches all objects. We -// will eventually want to support this type of pagination in the bus. func (s *s3) ListBucket(ctx context.Context, bucketName string, prefix *gofakes3.Prefix, page gofakes3.ListBucketPage) (*gofakes3.ObjectList, error) { if prefix == nil { prefix = &gofakes3.Prefix{} @@ -150,11 +143,10 @@ func (s *s3) ListBucket(ctx context.Context, bucketName string, prefix *gofakes3 response.AddPrefix(key) continue } - item := &gofakes3.Content{ Key: key, LastModified: gofakes3.NewContentTime(object.ModTime.Std()), - ETag: hex.EncodeToString(frand.Bytes(32)), // TODO: don't have that + ETag: object.ETag, Size: object.Size, StorageClass: gofakes3.StorageStandard, } @@ -241,35 +233,40 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range } opts.Range = api.DownloadRange{Offset: rangeRequest.Start, Length: length} } - res, err := s.w.GetObject(ctx, bucketName, objectName, opts) - if err != nil && strings.Contains(err.Error(), api.ErrBucketNotFound.Error()) { + + if res, err := s.w.GetObject(ctx, bucketName, objectName, opts); err != nil && strings.Contains(err.Error(), api.ErrBucketNotFound.Error()) { return nil, gofakes3.BucketNotFound(bucketName) } else if err != nil && strings.Contains(err.Error(), api.ErrObjectNotFound.Error()) { return nil, gofakes3.KeyNotFound(objectName) } else if err != nil { return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) - } - var objectRange *gofakes3.ObjectRange - if res.Range != nil { - objectRange = &gofakes3.ObjectRange{ - Start: res.Range.Offset, - Length: res.Range.Length, + } else { + // build range + var objectRange *gofakes3.ObjectRange + if res.Range != nil { + objectRange = &gofakes3.ObjectRange{ + Start: res.Range.Offset, + Length: res.Range.Length, + } } - } - // TODO: When we support metadata we need to add it here. - metadata := map[string]string{ - "Content-Type": res.ContentType, - "Last-Modified": res.ModTime.Std().Format(http.TimeFormat), - } + // ensure metadata is not nil + if res.Metadata == nil { + res.Metadata = make(map[string]string) + } - return &gofakes3.Object{ - Name: gofakes3.URLEncode(objectName), - Metadata: metadata, - Size: res.Size, - Contents: res.Content, - Range: objectRange, - }, nil + // decorate metadata (TODO PJ: not sure if necessary) + res.Metadata["Content-Type"] = res.ContentType + res.Metadata["Last-Modified"] = res.LastModified + + return &gofakes3.Object{ + Name: gofakes3.URLEncode(objectName), + Metadata: res.Metadata, + Size: res.Size, + Contents: res.Content, + Range: objectRange, + }, nil + } } // HeadObject fetches the Object from the backend, but reading the Contents @@ -282,23 +279,28 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range // HeadObject should return a NotFound() error if the object does not // exist. func (s *s3) HeadObject(ctx context.Context, bucketName, objectName string) (*gofakes3.Object, error) { - res, err := s.b.Object(ctx, bucketName, objectName, api.GetObjectOptions{IgnoreDelim: true}) - if err != nil && strings.Contains(err.Error(), api.ErrObjectNotFound.Error()) { + if res, err := s.b.Object(ctx, bucketName, objectName, api.GetObjectOptions{IgnoreDelim: true}); err != nil && strings.Contains(err.Error(), api.ErrObjectNotFound.Error()) { return nil, gofakes3.KeyNotFound(objectName) } else if err != nil { return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) + } else { + // set user metadata + metadata := make(map[string]string) + for k, v := range res.Object.Metadata { + metadata[fmt.Sprintf("%s%s", api.ObjectMetaPrefix, k)] = v + } + + // decorate metadata + metadata["Content-Type"] = res.Object.MimeType + metadata["Last-Modified"] = res.Object.LastModified() + + return &gofakes3.Object{ + Name: gofakes3.URLEncode(objectName), + Metadata: metadata, + Size: res.Object.Size, + Contents: io.NopCloser(bytes.NewReader(nil)), + }, nil } - // TODO: When we support metadata we need to add it here. - metadata := map[string]string{ - "Content-Type": mime.TypeByExtension(objectName), - "Last-Modified": res.Object.LastModified(), - } - return &gofakes3.Object{ - Name: gofakes3.URLEncode(objectName), - Metadata: metadata, - Size: res.Object.Size, - Contents: io.NopCloser(bytes.NewReader(nil)), - }, nil } // DeleteObject deletes an object from the bucket. @@ -334,25 +336,21 @@ func (s *s3) DeleteObject(ctx context.Context, bucketName, objectName string) (g // // The size can be used if the backend needs to read the whole reader; use // gofakes3.ReadAll() for this job rather than ioutil.ReadAll(). -// TODO: Metadata is currently ignored. The backend requires an update to -// support it. func (s *s3) PutObject(ctx context.Context, bucketName, key string, meta map[string]string, input io.Reader, size int64) (gofakes3.PutObjectResult, error) { - opts := api.UploadObjectOptions{ + fmt.Printf("DEBUG PJ: s3 backend: PutObject: meta %+v \n", meta) + if ur, err := s.w.UploadObject(ctx, input, bucketName, key, api.UploadObjectOptions{ ContentLength: size, - } - if ct, ok := meta["Content-Type"]; ok { - opts.MimeType = ct - } - ur, err := s.w.UploadObject(ctx, input, bucketName, key, opts) - if err != nil && strings.Contains(err.Error(), api.ErrBucketNotFound.Error()) { + Metadata: meta, + }); err != nil && strings.Contains(err.Error(), api.ErrBucketNotFound.Error()) { return gofakes3.PutObjectResult{}, gofakes3.BucketNotFound(bucketName) } else if err != nil { return gofakes3.PutObjectResult{}, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) + } else { + return gofakes3.PutObjectResult{ + ETag: ur.ETag, + VersionID: "", // not supported + }, nil } - return gofakes3.PutObjectResult{ - ETag: ur.ETag, - VersionID: "", // not supported - }, nil } func (s *s3) DeleteMulti(ctx context.Context, bucketName string, objects ...string) (gofakes3.MultiDeleteResult, error) { @@ -376,41 +374,40 @@ func (s *s3) DeleteMulti(ctx context.Context, bucketName string, objects ...stri } func (s *s3) CopyObject(ctx context.Context, srcBucket, srcKey, dstBucket, dstKey string, meta map[string]string) (gofakes3.CopyObjectResult, error) { - var opts api.CopyObjectOptions - if ct, ok := meta["Content-Type"]; ok { - opts.MimeType = ct - } - obj, err := s.b.CopyObject(ctx, srcBucket, dstBucket, "/"+srcKey, "/"+dstKey, opts) - if err != nil { + if obj, err := s.b.CopyObject(ctx, srcBucket, dstBucket, "/"+srcKey, "/"+dstKey, api.CopyObjectOptions{ + MimeType: meta["Content-Type"], + Metadata: api.ObjectUserMetadataFrom(meta), + }); err != nil { return gofakes3.CopyObjectResult{}, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) + } else { + return gofakes3.CopyObjectResult{ + ETag: api.FormatETag(obj.ETag), + LastModified: gofakes3.NewContentTime(obj.ModTime.Std()), + }, nil } - return gofakes3.CopyObjectResult{ - ETag: api.FormatETag(obj.ETag), - LastModified: gofakes3.NewContentTime(obj.ModTime.Std()), - }, nil } func (s *s3) CreateMultipartUpload(ctx context.Context, bucket, key string, meta map[string]string) (gofakes3.UploadID, error) { - opts := api.CreateMultipartOptions{Key: object.NoOpKey} - if ct, ok := meta["Content-Type"]; ok { - opts.MimeType = ct - } - resp, err := s.b.CreateMultipartUpload(ctx, bucket, "/"+key, opts) - if err != nil { + if resp, err := s.b.CreateMultipartUpload(ctx, bucket, "/"+key, api.CreateMultipartOptions{ + Key: object.NoOpKey, + MimeType: meta["Content-Type"], + Metadata: api.ObjectUserMetadataFrom(meta), + }); err != nil { return "", gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) + } else { + return gofakes3.UploadID(resp.UploadID), nil } - return gofakes3.UploadID(resp.UploadID), nil } func (s *s3) UploadPart(ctx context.Context, bucket, object string, id gofakes3.UploadID, partNumber int, contentLength int64, input io.Reader) (*gofakes3.UploadPartResult, error) { - res, err := s.w.UploadMultipartUploadPart(ctx, input, bucket, object, string(id), partNumber, api.UploadMultipartUploadPartOptions{ + if res, err := s.w.UploadMultipartUploadPart(ctx, input, bucket, object, string(id), partNumber, api.UploadMultipartUploadPartOptions{ DisablePreshardingEncryption: true, ContentLength: contentLength, - }) - if err != nil { + }); err != nil { return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) + } else { + return &gofakes3.UploadPartResult{ETag: res.ETag}, nil } - return &gofakes3.UploadPartResult{ETag: res.ETag}, nil } func (s *s3) ListMultipartUploads(ctx context.Context, bucket string, marker *gofakes3.UploadListMarker, prefix gofakes3.Prefix, limit int64) (*gofakes3.ListMultipartUploadsResult, error) { diff --git a/stores/metadata.go b/stores/metadata.go index c86b9f81d..6e59eb0cf 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -108,15 +108,24 @@ type ( DBBucket dbBucket ObjectID string `gorm:"index;uniqueIndex:idx_object_bucket"` - Key secretKey - Slabs []dbSlice `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete slices too - Health float64 `gorm:"index;default:1.0; NOT NULL"` - Size int64 + Key secretKey + Slabs []dbSlice `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete slices too + Metadata []dbObjectUserMetadata `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete metadata too + Health float64 `gorm:"index;default:1.0; NOT NULL"` + Size int64 MimeType string `json:"index"` Etag string `gorm:"index"` } + dbObjectUserMetadata struct { + Model + + DBObjectID uint `gorm:"index:uniqueIndex:idx_object_meta_key"` + Key string `gorm:"index:uniqueIndex:idx_object_meta_key"` + Value string + } + dbBucket struct { Model @@ -286,6 +295,9 @@ func (dbContractSet) TableName() string { return "contract_sets" } // TableName implements the gorm.Tabler interface. func (dbObject) TableName() string { return "objects" } +// TableName implements the gorm.Tabler interface. +func (dbObjectUserMetadata) TableName() string { return "object_user_metadata" } + // TableName implements the gorm.Tabler interface. func (dbSector) TableName() string { return "sectors" } @@ -398,81 +410,6 @@ func (raw rawObjectMetadata) convert() api.ObjectMetadata { } } -func (raw rawObject) convert() (api.Object, error) { - if len(raw) == 0 { - return api.Object{}, errors.New("no slabs found") - } - - // parse object key - var key object.EncryptionKey - if err := key.UnmarshalBinary(raw[0].ObjectKey); err != nil { - return api.Object{}, err - } - - // filter out slabs without slab ID and buffered slabs - this is expected - // for an empty object or objects that end with a partial slab. - var filtered rawObject - minHealth := math.MaxFloat64 - for _, sector := range raw { - if sector.SlabID != 0 { - filtered = append(filtered, sector) - if sector.SlabHealth < minHealth { - minHealth = sector.SlabHealth - } - } - } - - // hydrate all slabs - slabs := make([]object.SlabSlice, 0, len(filtered)) - if len(filtered) > 0 { - var start int - // create a helper function to add a slab and update the state - addSlab := func(end int) error { - if slab, err := filtered[start:end].toSlabSlice(); err != nil { - return err - } else { - slabs = append(slabs, slab) - start = end - } - return nil - } - - curr := filtered[0] - for j, sector := range filtered { - if sector.ObjectIndex == 0 { - return api.Object{}, api.ErrObjectCorrupted - } else if sector.SectorIndex == 0 && !sector.SlabBuffered { - return api.Object{}, api.ErrObjectCorrupted - } - if sector.ObjectIndex != curr.ObjectIndex { - if err := addSlab(j); err != nil { - return api.Object{}, err - } - curr = sector - } - } - if err := addSlab(len(filtered)); err != nil { - return api.Object{}, err - } - } - - // return object - return api.Object{ - ObjectMetadata: api.ObjectMetadata{ - ETag: raw[0].ObjectETag, - Health: raw[0].ObjectHealth, - MimeType: raw[0].ObjectMimeType, - ModTime: api.TimeRFC3339(raw[0].ObjectModTime.UTC()), - Name: raw[0].ObjectName, - Size: raw[0].ObjectSize, - }, - Object: object.Object{ - Key: key, - Slabs: slabs, - }, - }, nil -} - func (raw rawObject) toSlabSlice() (slice object.SlabSlice, _ error) { if len(raw) == 0 { return object.SlabSlice{}, errors.New("no sectors found") @@ -1116,7 +1053,7 @@ func (s *SQLStore) SearchObjects(ctx context.Context, bucket, substring string, var objects []api.ObjectMetadata err := s.db. - Select("o.object_id as name, o.size as size, o.health as health"). + Select("o.object_id as Name, o.size as Size, o.health as Health, o.mime_type as MimeType, o.etag as ETag, o.created_at as ModTime"). Model(&dbObject{}). Table("objects o"). Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id"). @@ -1196,7 +1133,8 @@ func (s *SQLStore) ObjectEntries(ctx context.Context, bucket, path, prefix, sort objectsQuery := fmt.Sprintf(` SELECT ETag, ModTime, oname as Name, Size, Health, MimeType FROM ( - SELECT ANY_VALUE(etag) AS ETag, + SELECT + ANY_VALUE(etag) AS ETag, MAX(objects.created_at) AS ModTime, %s AS oname, SUM(size) AS Size, @@ -1305,17 +1243,12 @@ FROM ( return } -func (s *SQLStore) Object(ctx context.Context, bucket, path string) (api.Object, error) { - var obj api.Object - err := s.db.Transaction(func(tx *gorm.DB) error { - o, err := s.object(ctx, tx, bucket, path) - if err != nil { - return err - } - obj, err = o.convert() +func (s *SQLStore) Object(ctx context.Context, bucket, path string) (obj api.Object, err error) { + err = s.db.Transaction(func(tx *gorm.DB) error { + obj, err = s.object(ctx, tx, bucket, path) return err }) - return obj, err + return } func (s *SQLStore) RecordContractSpending(ctx context.Context, records []api.ContractSpendingRecord) error { @@ -1519,7 +1452,7 @@ func (s *SQLStore) AddPartialSlab(ctx context.Context, data []byte, minShards, t return s.slabBufferMgr.AddPartialSlab(ctx, data, minShards, totalShards, contractSetID) } -func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath, mimeType string) (om api.ObjectMetadata, err error) { +func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath, mimeType string, metadata api.ObjectUserMetadata) (om api.ObjectMetadata, err error) { err = s.retryTransaction(func(tx *gorm.DB) error { var srcObj dbObject err = tx.Where("objects.object_id = ? AND DBBucket.name = ?", srcPath, srcBucket). @@ -1530,22 +1463,20 @@ func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath return fmt.Errorf("failed to fetch src object: %w", err) } - srcObjHealth, err := s.objectHealth(ctx, tx, srcObj.ID) - if err != nil { - return fmt.Errorf("failed to fetch src object health: %w", err) - } - if srcBucket == dstBucket && srcPath == dstPath { // No copying is happening. We just update the metadata on the src // object. srcObj.MimeType = mimeType om = api.ObjectMetadata{ - Health: srcObjHealth, + Health: srcObj.Health, MimeType: srcObj.MimeType, ModTime: api.TimeRFC3339(srcObj.CreatedAt.UTC()), Name: srcObj.ObjectID, Size: srcObj.Size, } + if err := s.createObjectMeta(tx, srcObj.ID, metadata); err != nil { + return fmt.Errorf("failed to create object metadata: %w", err) + } return tx.Save(&srcObj).Error } _, err = deleteObject(tx, dstBucket, dstPath) @@ -1589,7 +1520,7 @@ func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath om = api.ObjectMetadata{ MimeType: dstObj.MimeType, ETag: dstObj.Etag, - Health: srcObjHealth, + Health: srcObj.Health, ModTime: api.TimeRFC3339(dstObj.CreatedAt.UTC()), Name: dstObj.ObjectID, Size: dstObj.Size, @@ -1668,7 +1599,7 @@ func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, roo }) } -func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, eTag, mimeType string, o object.Object) error { +func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, eTag, mimeType string, metadata api.ObjectUserMetadata, o object.Object) error { // Sanity check input. for _, s := range o.Slabs { for i, shard := range s.Shards { @@ -1693,9 +1624,13 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, // Try to delete. We want to get rid of the object and its slices if it // exists. // - // NOTE: please note that the object's created_at is currently used as - // its ModTime, if we ever stop recreating the object but update it - // instead we need to take this into account + // NOTE: the object's created_at is currently used as its ModTime, if we + // ever stop recreating the object but update it instead we need to take + // this into account + // + // NOTE: the metadata is not deleted because this delete will cascade, + // if we stop recreating the object we have to make sure to delete the + // object's metadata before trying to recreate it _, err := deleteObject(tx, bucket, path) if err != nil { return fmt.Errorf("failed to delete object: %w", err) @@ -1737,6 +1672,12 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, if err := s.createSlices(tx, &obj.ID, nil, cs.ID, contracts, o.Slabs); err != nil { return fmt.Errorf("failed to create slices: %w", err) } + + // Create all object meta. + if err := s.createObjectMeta(tx, obj.ID, metadata); err != nil { + return fmt.Errorf("failed to object meta: %w", err) + } + return nil }) } @@ -2020,6 +1961,32 @@ func (s *SQLStore) UnhealthySlabs(ctx context.Context, healthCutoff float64, set return slabs, nil } +func (s *SQLStore) createObjectMeta(tx *gorm.DB, objID uint, metadata api.ObjectUserMetadata) error { + fmt.Printf("DEBUG PJ: db: creating meta %+v \n", metadata) + + entities := make([]*dbObjectUserMetadata, 0, len(metadata)) + for k, v := range metadata { + entities = append(entities, &dbObjectUserMetadata{ + DBObjectID: objID, + Key: k, + Value: v, + }) + } + return tx.Clauses(clause.OnConflict{UpdateAll: true}).CreateInBatches(&entities, 1000).Error +} + +func (s *SQLStore) createMultipartMeta(tx *gorm.DB, multipartUploadID uint, metadata api.ObjectUserMetadata) error { + entities := make([]*dbMultipartMetadata, 0, len(metadata)) + for k, v := range metadata { + entities = append(entities, &dbMultipartMetadata{ + DBMultipartUploadID: multipartUploadID, + Key: k, + Value: v, + }) + } + return tx.CreateInBatches(&entities, 1000).Error +} + func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractSetID uint, contracts map[types.FileContractID]dbContract, slices []object.SlabSlice) error { if (objID == nil && multiPartID == nil) || (objID != nil && multiPartID != nil) { return fmt.Errorf("either objID or multiPartID must be set") @@ -2093,14 +2060,123 @@ func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractS return nil } -// object retrieves a raw object from the store. -func (s *SQLStore) object(ctx context.Context, txn *gorm.DB, bucket string, path string) (rawObject, error) { +// object retrieves an object from the store. +func (s *SQLStore) object(ctx context.Context, tx *gorm.DB, bucket, path string) (api.Object, error) { + // fetch raw object data + raw, err := s.objectRaw(ctx, tx, bucket, path) + if errors.Is(err, gorm.ErrRecordNotFound) || len(raw) == 0 { + return api.Object{}, api.ErrObjectNotFound + } + + // hydrate raw object data + return s.objectHydrate(ctx, tx, bucket, path, raw) +} + +// objectHydrate hydrates a raw object and returns an api.Object. +func (s *SQLStore) objectHydrate(ctx context.Context, tx *gorm.DB, bucket, path string, obj rawObject) (api.Object, error) { + // parse object key + var key object.EncryptionKey + if err := key.UnmarshalBinary(obj[0].ObjectKey); err != nil { + return api.Object{}, err + } + + // filter out slabs without slab ID and buffered slabs - this is expected + // for an empty object or objects that end with a partial slab. + var filtered rawObject + minHealth := math.MaxFloat64 + for _, sector := range obj { + if sector.SlabID != 0 { + filtered = append(filtered, sector) + if sector.SlabHealth < minHealth { + minHealth = sector.SlabHealth + } + } + } + + // hydrate all slabs + slabs := make([]object.SlabSlice, 0, len(filtered)) + if len(filtered) > 0 { + var start int + // create a helper function to add a slab and update the state + addSlab := func(end int) error { + if slab, err := filtered[start:end].toSlabSlice(); err != nil { + return err + } else { + slabs = append(slabs, slab) + start = end + } + return nil + } + + curr := filtered[0] + for j, sector := range filtered { + if sector.ObjectIndex == 0 { + return api.Object{}, api.ErrObjectCorrupted + } else if sector.SectorIndex == 0 && !sector.SlabBuffered { + return api.Object{}, api.ErrObjectCorrupted + } + if sector.ObjectIndex != curr.ObjectIndex { + if err := addSlab(j); err != nil { + return api.Object{}, err + } + curr = sector + } + } + if err := addSlab(len(filtered)); err != nil { + return api.Object{}, err + } + } + + // fetch object metadata + metadata, err := s.objectMetadata(ctx, tx, bucket, path) + if err != nil { + return api.Object{}, err + } + + // return object + return api.Object{ + Metadata: metadata, + ObjectMetadata: api.ObjectMetadata{ + ETag: obj[0].ObjectETag, + Health: obj[0].ObjectHealth, + MimeType: obj[0].ObjectMimeType, + ModTime: api.TimeRFC3339(obj[0].ObjectModTime.UTC()), + Name: obj[0].ObjectName, + Size: obj[0].ObjectSize, + }, + Object: object.Object{ + Key: key, + Slabs: slabs, + }, + }, nil +} + +func (s *SQLStore) objectMetadata(ctx context.Context, tx *gorm.DB, bucket, path string) (api.ObjectUserMetadata, error) { + var rows []dbObjectUserMetadata + err := tx. + Model(&dbObjectUserMetadata{}). + Table("object_user_metadata om"). + Joins("INNER JOIN objects o ON om.db_object_id = o.id"). + Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id"). + Where("o.object_id = ? AND b.name = ?", path, bucket). + Find(&rows). + Error + if err != nil { + return nil, err + } + metadata := make(api.ObjectUserMetadata) + for _, row := range rows { + metadata[row.Key] = row.Value + } + return metadata, nil +} + +func (s *SQLStore) objectRaw(ctx context.Context, txn *gorm.DB, bucket string, path string) (rows rawObject, err error) { // NOTE: we LEFT JOIN here because empty objects are valid and need to be // included in the result set, when we convert the rawObject before // returning it we'll check for SlabID and/or SectorID being 0 and act // accordingly - var rows rawObject - tx := s.db. + err = s.db. Select("o.id as ObjectID, o.health as ObjectHealth, sli.object_index as ObjectIndex, o.key as ObjectKey, o.object_id as ObjectName, o.size as ObjectSize, o.mime_type as ObjectMimeType, o.created_at as ObjectModTime, o.etag as ObjectETag, sli.object_index, sli.offset as SliceOffset, sli.length as SliceLength, sla.id as SlabID, sla.health as SlabHealth, sla.key as SlabKey, sla.min_shards as SlabMinShards, bs.id IS NOT NULL AS SlabBuffered, sec.slab_index as SectorIndex, sec.root as SectorRoot, sec.latest_host as LatestHost, c.fcid as FCID, h.public_key as HostKey"). Model(&dbObject{}). Table("objects o"). @@ -2115,12 +2191,9 @@ func (s *SQLStore) object(ctx context.Context, txn *gorm.DB, bucket string, path Where("o.object_id = ? AND b.name = ?", path, bucket). Order("sli.object_index ASC"). Order("sec.slab_index ASC"). - Scan(&rows) - if errors.Is(tx.Error, gorm.ErrRecordNotFound) || len(rows) == 0 { - return nil, api.ErrObjectNotFound - } - - return rows, nil + Scan(&rows). + Error + return } func (s *SQLStore) objectHealth(ctx context.Context, tx *gorm.DB, objectID uint) (health float64, err error) { @@ -2540,13 +2613,13 @@ func (s *SQLStore) ListObjects(ctx context.Context, bucket, prefix, sortBy, sort if err != nil { return api.ObjectsListResponse{}, err } - var rows []rawObjectMetadata if err := s.db. Select("o.object_id as Name, o.size as Size, o.health as Health, o.mime_type as mimeType, o.created_at as ModTime"). Model(&dbObject{}). Table("objects o"). Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id"). + Group("o.object_id"). Where("b.name = ? AND ? AND ?", bucket, prefixExpr, markerExpr). Order(orderBy). Order(markerOrderBy). diff --git a/stores/metadata_test.go b/stores/metadata_test.go index ecf6849ee..d1ac1ba59 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -83,7 +83,7 @@ func TestObjectBasic(t *testing.T) { } // add the object - if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, t.Name(), testContractSet, testETag, testMimeType, want); err != nil { + if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, t.Name(), testContractSet, testETag, testMimeType, testMetadata, want); err != nil { t.Fatal(err) } @@ -119,7 +119,7 @@ func TestObjectBasic(t *testing.T) { } // add the object - if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, t.Name(), testContractSet, testETag, testMimeType, want2); err != nil { + if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, t.Name(), testContractSet, testETag, testMimeType, testMetadata, want2); err != nil { t.Fatal(err) } @@ -133,6 +133,73 @@ func TestObjectBasic(t *testing.T) { } } +func TestObjectMeta(t *testing.T) { + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() + + // create 2 hosts + hks, err := ss.addTestHosts(2) + if err != nil { + t.Fatal(err) + } + hk1, hk2 := hks[0], hks[1] + + // create 2 contracts + fcids, _, err := ss.addTestContracts(hks) + if err != nil { + t.Fatal(err) + } + fcid1, fcid2 := fcids[0], fcids[1] + + // create an object + want := object.Object{ + Key: object.GenerateEncryptionKey(), + Slabs: []object.SlabSlice{ + { + Slab: object.Slab{ + Health: 1.0, + Key: object.GenerateEncryptionKey(), + MinShards: 1, + Shards: newTestShards(hk1, fcid1, types.Hash256{1}), + }, + Offset: 10, + Length: 100, + }, + { + Slab: object.Slab{ + Health: 1.0, + Key: object.GenerateEncryptionKey(), + MinShards: 2, + Shards: newTestShards(hk2, fcid2, types.Hash256{2}), + }, + Offset: 20, + Length: 200, + }, + }, + } + + // add the object + if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, t.Name(), testContractSet, testETag, testMimeType, testMetadata, want); err != nil { + t.Fatal(err) + } + + // fetch the object + got, err := ss.Object(context.Background(), api.DefaultBucketName, t.Name()) + if err != nil { + t.Fatal(err) + } + + // assert it matches + if !reflect.DeepEqual(got.Object, want) { + t.Log(got.Object) + t.Log(want) + t.Fatal("object mismatch", cmp.Diff(got.Object, want, cmp.AllowUnexported(object.EncryptionKey{}))) + } + if !reflect.DeepEqual(got.Metadata, testMetadata) { + t.Fatal("meta mismatch", cmp.Diff(got.Metadata, testMetadata)) + } +} + // TestSQLContractStore tests SQLContractStore functionality. func TestSQLContractStore(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) @@ -409,7 +476,7 @@ func TestContractRoots(t *testing.T) { } // add the object. - if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, t.Name(), testContractSet, testETag, testMimeType, obj); err != nil { + if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, t.Name(), testContractSet, testETag, testMimeType, testMetadata, obj); err != nil { t.Fatal(err) } @@ -511,7 +578,7 @@ func TestRenewedContract(t *testing.T) { } // add the object. - if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, obj); err != nil { + if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, testMetadata, obj); err != nil { t.Fatal(err) } @@ -798,40 +865,6 @@ func TestArchiveContracts(t *testing.T) { } } -func (s *SQLStore) addTestContracts(keys []types.PublicKey) (fcids []types.FileContractID, contracts []api.ContractMetadata, err error) { - cnt, err := s.contractsCount() - if err != nil { - return nil, nil, err - } - for i, key := range keys { - fcids = append(fcids, types.FileContractID{byte(int(cnt) + i + 1)}) - contract, err := s.addTestContract(fcids[len(fcids)-1], key) - if err != nil { - return nil, nil, err - } - contracts = append(contracts, contract) - } - return -} - -func (s *SQLStore) addTestContract(fcid types.FileContractID, hk types.PublicKey) (api.ContractMetadata, error) { - rev := testContractRevision(fcid, hk) - return s.AddContract(context.Background(), rev, types.ZeroCurrency, types.ZeroCurrency, 0, api.ContractStatePending) -} - -func (s *SQLStore) addTestRenewedContract(fcid, renewedFrom types.FileContractID, hk types.PublicKey, startHeight uint64) (api.ContractMetadata, error) { - rev := testContractRevision(fcid, hk) - return s.AddRenewedContract(context.Background(), rev, types.ZeroCurrency, types.ZeroCurrency, startHeight, renewedFrom, api.ContractStatePending) -} - -func (s *SQLStore) contractsCount() (cnt int64, err error) { - err = s.db. - Model(&dbContract{}). - Count(&cnt). - Error - return -} - func testContractRevision(fcid types.FileContractID, hk types.PublicKey) rhpv2.ContractRevision { uc := generateMultisigUC(1, 2, "salt") uc.PublicKeys[1].Key = hk[:] @@ -933,7 +966,7 @@ func TestSQLMetadataStore(t *testing.T) { // Store it. ctx := context.Background() objID := "key1" - if err := ss.UpdateObject(ctx, api.DefaultBucketName, objID, testContractSet, testETag, testMimeType, obj1); err != nil { + if err := ss.UpdateObject(ctx, api.DefaultBucketName, objID, testContractSet, testETag, testMimeType, testMetadata, obj1); err != nil { t.Fatal(err) } @@ -995,7 +1028,7 @@ func TestSQLMetadataStore(t *testing.T) { } // Try to store it again. Should work. - if err := ss.UpdateObject(ctx, api.DefaultBucketName, objID, testContractSet, testETag, testMimeType, obj1); err != nil { + if err := ss.UpdateObject(ctx, api.DefaultBucketName, objID, testContractSet, testETag, testMimeType, testMetadata, obj1); err != nil { t.Fatal(err) } @@ -1138,7 +1171,7 @@ func TestSQLMetadataStore(t *testing.T) { // Remove the first slab of the object. obj1.Slabs = obj1.Slabs[1:] - if err := ss.UpdateObject(ctx, api.DefaultBucketName, objID, testContractSet, testETag, testMimeType, obj1); err != nil { + if err := ss.UpdateObject(ctx, api.DefaultBucketName, objID, testContractSet, testETag, testMimeType, testMetadata, obj1); err != nil { t.Fatal(err) } fullObj, err = ss.Object(ctx, api.DefaultBucketName, objID) @@ -1247,7 +1280,7 @@ func TestObjectHealth(t *testing.T) { }, } - if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, "/foo", testContractSet, testETag, testMimeType, add); err != nil { + if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, "/foo", testContractSet, testETag, testMimeType, testMetadata, add); err != nil { t.Fatal(err) } @@ -1273,7 +1306,7 @@ func TestObjectHealth(t *testing.T) { } expectedHealth := float64(2) / float64(3) - // assert health + // assert object method obj, err = ss.Object(context.Background(), api.DefaultBucketName, "/foo") if err != nil { t.Fatal(err) @@ -1281,20 +1314,6 @@ func TestObjectHealth(t *testing.T) { t.Fatal("wrong health", obj.Health) } - // assert (raw) object and object health methods - raw, err := ss.object(context.Background(), ss.db, api.DefaultBucketName, "/foo") - if err != nil { - t.Fatal(err) - } else if len(raw) == 0 { - t.Fatal("object not found") - } - health, err := ss.objectHealth(context.Background(), ss.db, raw[0].ObjectID) - if err != nil { - t.Fatal(err) - } else if health != expectedHealth { - t.Fatal("wrong health", health) - } - // assert health is returned correctly by ObjectEntries entries, _, err := ss.ObjectEntries(context.Background(), api.DefaultBucketName, "/", "", "", "", "", 0, -1) if err != nil { @@ -1341,7 +1360,7 @@ func TestObjectHealth(t *testing.T) { Key: object.GenerateEncryptionKey(), Slabs: nil, } - if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, "/bar", testContractSet, testETag, testMimeType, add); err != nil { + if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, "/bar", testContractSet, testETag, testMimeType, testMetadata, add); err != nil { t.Fatal(err) } @@ -1380,7 +1399,7 @@ func TestObjectEntries(t *testing.T) { obj := newTestObject(frand.Intn(9) + 1) obj.Slabs = obj.Slabs[:1] obj.Slabs[0].Length = uint32(o.size) - err := ss.UpdateObject(ctx, api.DefaultBucketName, o.path, testContractSet, testETag, testMimeType, obj) + err := ss.UpdateObject(ctx, api.DefaultBucketName, o.path, testContractSet, testETag, testMimeType, testMetadata, obj) if err != nil { t.Fatal(err) } @@ -1466,7 +1485,7 @@ func TestObjectEntries(t *testing.T) { } assertMetadata(got) - if len(got) != 1 || got[0] != test.want[offset] { + if len(got) != 1 || !got[0].Equals(test.want[offset]) { t.Fatalf("\noffset: %v\nlist: %v\nprefix: %v\ngot: %v\nwant: %v", offset, test.path, test.prefix, got, test.want[offset]) } @@ -1486,7 +1505,7 @@ func TestObjectEntries(t *testing.T) { } assertMetadata(got) - if len(got) != 1 || got[0] != test.want[offset+1] { + if len(got) != 1 || !got[0].Equals(test.want[offset+1]) { t.Fatalf("\noffset: %v\nlist: %v\nprefix: %v\nmarker: %v\ngot: %v\nwant: %v", offset+1, test.path, test.prefix, test.want[offset].Name, got, test.want[offset+1]) } @@ -1518,10 +1537,25 @@ func TestSearchObjects(t *testing.T) { obj := newTestObject(frand.Intn(9) + 1) obj.Slabs = obj.Slabs[:1] obj.Slabs[0].Length = uint32(o.size) - if err := ss.UpdateObject(ctx, api.DefaultBucketName, o.path, testContractSet, testETag, testMimeType, obj); err != nil { + if err := ss.UpdateObject(ctx, api.DefaultBucketName, o.path, testContractSet, testETag, testMimeType, testMetadata, obj); err != nil { t.Fatal(err) } } + + assertEqual := func(got []api.ObjectMetadata, want []api.ObjectMetadata) { + if len(got) != len(want) { + t.Fatalf("unexpected result, we want %d items and we got %d items \ndiff: %v", len(want), len(got), cmp.Diff(got, want)) + } + for i := range got { + wantt := want[i] + if got[i].Name != wantt.Name || + got[i].Size != wantt.Size || + got[i].Health != wantt.Health { + t.Fatalf("unexpected result, got %v, want %v", got[i], wantt) + } + } + } + tests := []struct { path string want []api.ObjectMetadata @@ -1536,15 +1570,13 @@ func TestSearchObjects(t *testing.T) { if err != nil { t.Fatal(err) } - if !(len(got) == 0 && len(test.want) == 0) && !reflect.DeepEqual(got, test.want) { - t.Errorf("\nkey: %v\ngot: %v\nwant: %v", test.path, got, test.want) - } + assertEqual(got, test.want) for offset := 0; offset < len(test.want); offset++ { got, err := ss.SearchObjects(ctx, api.DefaultBucketName, test.path, offset, 1) if err != nil { t.Fatal(err) } - if len(got) != 1 || got[0] != test.want[offset] { + if len(got) != 1 || (got[0].Equals(test.want[offset])) { t.Errorf("\nkey: %v\ngot: %v\nwant: %v", test.path, got, test.want[offset]) } } @@ -1657,7 +1689,7 @@ func TestUnhealthySlabs(t *testing.T) { } ctx := context.Background() - if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, obj); err != nil { + if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, testMetadata, obj); err != nil { t.Fatal(err) } @@ -1757,7 +1789,7 @@ func TestUnhealthySlabsNegHealth(t *testing.T) { // add the object ctx := context.Background() - if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, obj); err != nil { + if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, testMetadata, obj); err != nil { t.Fatal(err) } @@ -1814,7 +1846,7 @@ func TestUnhealthySlabsNoContracts(t *testing.T) { // add the object ctx := context.Background() - if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, obj); err != nil { + if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, testMetadata, obj); err != nil { t.Fatal(err) } @@ -1906,7 +1938,7 @@ func TestUnhealthySlabsNoRedundancy(t *testing.T) { } ctx := context.Background() - if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, obj); err != nil { + if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, testMetadata, obj); err != nil { t.Fatal(err) } @@ -1965,7 +1997,7 @@ func TestContractSectors(t *testing.T) { }, } ctx := context.Background() - if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, obj); err != nil { + if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, testMetadata, obj); err != nil { t.Fatal(err) } @@ -1991,7 +2023,7 @@ func TestContractSectors(t *testing.T) { } // Add the object again. - if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, obj); err != nil { + if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, testMetadata, obj); err != nil { t.Fatal(err) } @@ -2048,7 +2080,7 @@ func TestUpdateSlab(t *testing.T) { }, } ctx := context.Background() - if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, obj); err != nil { + if err := ss.UpdateObject(ctx, api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, testMetadata, obj); err != nil { t.Fatal(err) } @@ -2319,7 +2351,7 @@ func TestRenameObjects(t *testing.T) { ctx := context.Background() for _, path := range objects { obj := newTestObject(1) - if err := ss.UpdateObject(ctx, api.DefaultBucketName, path, testContractSet, testETag, testMimeType, obj); err != nil { + if err := ss.UpdateObject(ctx, api.DefaultBucketName, path, testContractSet, testETag, testMimeType, testMetadata, obj); err != nil { t.Fatal(err) } } @@ -2437,7 +2469,7 @@ func TestObjectsStats(t *testing.T) { } key := hex.EncodeToString(frand.Bytes(32)) - err := ss.UpdateObject(context.Background(), api.DefaultBucketName, key, testContractSet, testETag, testMimeType, obj) + err := ss.UpdateObject(context.Background(), api.DefaultBucketName, key, testContractSet, testETag, testMimeType, testMetadata, obj) if err != nil { t.Fatal(err) } @@ -2609,7 +2641,7 @@ func TestPartialSlab(t *testing.T) { return obj } obj := testObject(slabs) - err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "key", testContractSet, testETag, testMimeType, obj) + err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "key", testContractSet, testETag, testMimeType, testMetadata, obj) if err != nil { t.Fatal(err) } @@ -2618,7 +2650,7 @@ func TestPartialSlab(t *testing.T) { t.Fatal(err) } if !reflect.DeepEqual(obj, fetched.Object) { - t.Fatal("mismatch", cmp.Diff(obj, fetched.Object)) + t.Fatal("mismatch", cmp.Diff(obj, fetched.Object, cmp.AllowUnexported(object.EncryptionKey{}))) } // Add the second slab. @@ -2649,7 +2681,7 @@ func TestPartialSlab(t *testing.T) { // Create an object again. obj2 := testObject(slabs) - err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "key2", testContractSet, testETag, testMimeType, obj2) + err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "key2", testContractSet, testETag, testMimeType, testMetadata, obj2) if err != nil { t.Fatal(err) } @@ -2701,7 +2733,7 @@ func TestPartialSlab(t *testing.T) { // Create an object again. obj3 := testObject(slabs) - err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "key3", testContractSet, testETag, testMimeType, obj3) + err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "key3", testContractSet, testETag, testMimeType, testMetadata, obj3) if err != nil { t.Fatal(err) } @@ -2710,7 +2742,7 @@ func TestPartialSlab(t *testing.T) { t.Fatal(err) } if !reflect.DeepEqual(obj3, fetched.Object) { - t.Fatal("mismatch", cmp.Diff(obj3, fetched.Object)) + t.Fatal("mismatch", cmp.Diff(obj3, fetched.Object, cmp.AllowUnexported(object.EncryptionKey{}))) } // Fetch the buffer for uploading @@ -2783,7 +2815,7 @@ func TestPartialSlab(t *testing.T) { } // Associate them with an object. - err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "foo", testContractSet, "", "", object.Object{ + err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, testMetadata, object.Object{ Key: object.GenerateEncryptionKey(), Slabs: append(slices1, slices2...), }) @@ -2865,7 +2897,7 @@ func TestContractSizes(t *testing.T) { // add an object to both contracts for i := 0; i < 2; i++ { - if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, fmt.Sprintf("obj_%d", i+1), testContractSet, testETag, testMimeType, object.Object{ + if err := ss.UpdateObject(context.Background(), api.DefaultBucketName, fmt.Sprintf("obj_%d", i+1), testContractSet, testETag, testMimeType, testMetadata, object.Object{ Key: object.GenerateEncryptionKey(), Slabs: []object.SlabSlice{ { @@ -3023,7 +3055,7 @@ func TestObjectsBySlabKey(t *testing.T) { } for _, name := range []string{"obj1", "obj2", "obj3"} { obj.Slabs[0].Length++ - err = ss.UpdateObject(context.Background(), api.DefaultBucketName, name, testContractSet, testETag, testMimeType, obj) + err = ss.UpdateObject(context.Background(), api.DefaultBucketName, name, testContractSet, testETag, testMimeType, testMetadata, obj) if err != nil { t.Fatal(err) } @@ -3095,7 +3127,7 @@ func TestBucketObjects(t *testing.T) { // Adding an object to a bucket that doesn't exist shouldn't work. obj := newTestObject(1) - err := ss.UpdateObject(context.Background(), "unknown-bucket", "foo", testContractSet, testETag, testMimeType, obj) + err := ss.UpdateObject(context.Background(), "unknown-bucket", "foo", testContractSet, testETag, testMimeType, testMetadata, obj) if !errors.Is(err, api.ErrBucketNotFound) { t.Fatal("expected ErrBucketNotFound", err) } @@ -3126,7 +3158,7 @@ func TestBucketObjects(t *testing.T) { obj := newTestObject(frand.Intn(9) + 1) obj.Slabs = obj.Slabs[:1] obj.Slabs[0].Length = uint32(o.size) - err := ss.UpdateObject(ctx, o.bucket, o.path, testContractSet, testETag, testMimeType, obj) + err := ss.UpdateObject(ctx, o.bucket, o.path, testContractSet, testETag, testMimeType, testMetadata, obj) if err != nil { t.Fatal(err) } @@ -3244,7 +3276,7 @@ func TestBucketObjects(t *testing.T) { // See if we can fetch the object by slab. var ec object.EncryptionKey - if obj, err := ss.object(context.Background(), ss.db, b1, "/bar"); err != nil { + if obj, err := ss.objectRaw(context.Background(), ss.db, b1, "/bar"); err != nil { t.Fatal(err) } else if err := ec.UnmarshalBinary(obj[0].SlabKey); err != nil { t.Fatal(err) @@ -3273,13 +3305,13 @@ func TestCopyObject(t *testing.T) { // Create one object. obj := newTestObject(1) - err := ss.UpdateObject(ctx, "src", "/foo", testContractSet, testETag, testMimeType, obj) + err := ss.UpdateObject(ctx, "src", "/foo", testContractSet, testETag, testMimeType, testMetadata, obj) if err != nil { t.Fatal(err) } // Copy it within the same bucket. - if om, err := ss.CopyObject(ctx, "src", "src", "/foo", "/bar", ""); err != nil { + if om, err := ss.CopyObject(ctx, "src", "src", "/foo", "/bar", "", nil); err != nil { t.Fatal(err) } else if entries, _, err := ss.ObjectEntries(ctx, "src", "/", "", "", "", "", 0, -1); err != nil { t.Fatal(err) @@ -3292,7 +3324,7 @@ func TestCopyObject(t *testing.T) { } // Copy it cross buckets. - if om, err := ss.CopyObject(ctx, "src", "dst", "/foo", "/bar", ""); err != nil { + if om, err := ss.CopyObject(ctx, "src", "dst", "/foo", "/bar", "", nil); err != nil { t.Fatal(err) } else if entries, _, err := ss.ObjectEntries(ctx, "dst", "/", "", "", "", "", 0, -1); err != nil { t.Fatal(err) @@ -3330,7 +3362,7 @@ func TestMarkSlabUploadedAfterRenew(t *testing.T) { } // add it to an object to prevent it from getting pruned. - err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "foo", testContractSet, "", "", object.Object{ + err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, testMetadata, object.Object{ Key: object.GenerateEncryptionKey(), Slabs: slabs, }) @@ -3414,7 +3446,7 @@ func TestListObjects(t *testing.T) { obj := newTestObject(frand.Intn(9) + 1) obj.Slabs = obj.Slabs[:1] obj.Slabs[0].Length = uint32(o.size) - if err := ss.UpdateObject(ctx, api.DefaultBucketName, o.path, testContractSet, testETag, testMimeType, obj); err != nil { + if err := ss.UpdateObject(ctx, api.DefaultBucketName, o.path, testContractSet, testETag, testMimeType, testMetadata, obj); err != nil { t.Fatal(err) } } @@ -3632,7 +3664,7 @@ func TestUpdateSlabSanityChecks(t *testing.T) { } // set slab. - err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, object.Object{ + err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "foo", testContractSet, testETag, testMimeType, testMetadata, object.Object{ Key: object.GenerateEncryptionKey(), Slabs: []object.SlabSlice{{Slab: slab}}, }) @@ -3718,7 +3750,7 @@ func TestSlabHealthInvalidation(t *testing.T) { // prepare a slab with pieces on h1 and h2 s1 := object.GenerateEncryptionKey() - err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "o1", testContractSet, testETag, testMimeType, object.Object{ + err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "o1", testContractSet, testETag, testMimeType, testMetadata, object.Object{ Key: object.GenerateEncryptionKey(), Slabs: []object.SlabSlice{{Slab: object.Slab{ Key: s1, @@ -3734,7 +3766,7 @@ func TestSlabHealthInvalidation(t *testing.T) { // prepare a slab with pieces on h3 and h4 s2 := object.GenerateEncryptionKey() - err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "o2", testContractSet, testETag, testMimeType, object.Object{ + err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "o2", testContractSet, testETag, testMimeType, testMetadata, object.Object{ Key: object.GenerateEncryptionKey(), Slabs: []object.SlabSlice{{Slab: object.Slab{ Key: s2, @@ -3833,14 +3865,3 @@ func TestSlabHealthInvalidation(t *testing.T) { } } } -func (s *SQLStore) overrideSlabHealth(objectID string, health float64) (err error) { - err = s.db.Exec(fmt.Sprintf(` - UPDATE slabs SET health = %v WHERE id IN ( - SELECT sla.id - FROM objects o - INNER JOIN slices sli ON o.id = sli.db_object_id - INNER JOIN slabs sla ON sli.db_slab_id = sla.id - WHERE o.object_id = "%s" - )`, health, objectID)).Error - return -} diff --git a/stores/migrations.go b/stores/migrations.go index 94708a8d4..7fa905e83 100644 --- a/stores/migrations.go +++ b/stores/migrations.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "time" "github.com/go-gormigrate/gormigrate/v2" "go.sia.tech/renterd/api" @@ -26,6 +27,8 @@ var ( &dbSlab{}, &dbSector{}, &dbSlice{}, + &dbObjectUserMetadata{}, + &dbMultipartMetadata{}, // bus.HostDB tables &dbAnnouncement{}, @@ -317,6 +320,18 @@ func performMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { return performMigration00036_contractPruneCfg(tx, logger) }, }, + { + ID: "00037_objectUserMetadata", + Migrate: func(tx *gorm.DB) error { + return performMigration00037_objectUserMetadata(tx, logger) + }, + }, + { + ID: "00038_multipartUserMetadata", + Migrate: func(tx *gorm.DB) error { + return performMigration00038_multipartUserMetadata(tx, logger) + }, + }, } // Create migrator. m := gormigrate.New(db, gormigrate.DefaultOptions, migrations) @@ -1444,3 +1459,39 @@ func performMigration00036_contractPruneCfg(txn *gorm.DB, logger *zap.SugaredLog logger.Info("migration 00036_contractPruneCfg complete") return nil } + +func performMigration00037_objectUserMetadata(txn *gorm.DB, logger *zap.SugaredLogger) error { + logger.Info("performing migration 00037_objectUserMetadata") + + if err := txn.Table(dbObjectUserMetadata{}.TableName()).Migrator().AutoMigrate(&struct { + ID uint `gorm:"primarykey"` + CreatedAt time.Time + + DBObjectID uint `gorm:"index:uniqueIndex:idx_object_meta_key"` + Key string `gorm:"index:uniqueIndex:idx_object_meta_key"` + Value string + }{}); err != nil { + return err + } + + logger.Info("migration 00037_objectUserMetadata complete") + return nil +} + +func performMigration00038_multipartUserMetadata(txn *gorm.DB, logger *zap.SugaredLogger) error { + logger.Info("performing migration 00038_multipartUserMetadata") + + if err := txn.Table(dbMultipartMetadata{}.TableName()).Migrator().AutoMigrate(&struct { + ID uint `gorm:"primarykey"` + CreatedAt time.Time + + DBMultipartUploadID uint `gorm:"index:uniqueIndex:idx_multipart_meta_key"` + Key string `gorm:"index:uniqueIndex:idx_multipart_meta_key"` + Value string + }{}); err != nil { + return err + } + + logger.Info("migration 00038_multipartUserMetadata complete") + return nil +} diff --git a/stores/multipart.go b/stores/multipart.go index c6adc014f..9246cec4d 100644 --- a/stores/multipart.go +++ b/stores/multipart.go @@ -22,12 +22,13 @@ type ( Model Key secretKey - UploadID string `gorm:"uniqueIndex;NOT NULL;size:64"` - ObjectID string `gorm:"index:idx_multipart_uploads_object_id;NOT NULL"` - DBBucket dbBucket `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete uploads when bucket is deleted - DBBucketID uint `gorm:"index:idx_multipart_uploads_db_bucket_id;NOT NULL"` - Parts []dbMultipartPart `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete parts too - MimeType string `gorm:"index:idx_multipart_uploads_mime_type"` + UploadID string `gorm:"uniqueIndex;NOT NULL;size:64"` + ObjectID string `gorm:"index:idx_multipart_uploads_object_id;NOT NULL"` + DBBucket dbBucket `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete uploads when bucket is deleted + DBBucketID uint `gorm:"index:idx_multipart_uploads_db_bucket_id;NOT NULL"` + Parts []dbMultipartPart `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete parts too + Metadata []dbMultipartMetadata `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete metadata too + MimeType string `gorm:"index:idx_multipart_uploads_mime_type"` } dbMultipartPart struct { @@ -37,7 +38,14 @@ type ( Size uint64 DBMultipartUploadID uint `gorm:"index;NOT NULL"` Slabs []dbSlice `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete slices too + } + + dbMultipartMetadata struct { + Model + DBMultipartUploadID uint `gorm:"index:uniqueIndex:idx_multipart_meta_key"` + Key string `gorm:"index:uniqueIndex:idx_multipart_meta_key"` + Value string } ) @@ -49,7 +57,11 @@ func (dbMultipartPart) TableName() string { return "multipart_parts" } -func (s *SQLStore) CreateMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey, mimeType string) (api.MultipartCreateResponse, error) { +func (dbMultipartMetadata) TableName() string { + return "multipart_metadata" +} + +func (s *SQLStore) CreateMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (api.MultipartCreateResponse, error) { // Marshal key key, err := ec.MarshalBinary() if err != nil { @@ -66,18 +78,26 @@ func (s *SQLStore) CreateMultipartUpload(ctx context.Context, bucket, path strin } else if err != nil { return fmt.Errorf("failed to fetch bucket id: %w", err) } + // Create multipart upload uploadIDEntropy := frand.Entropy256() uploadID = hex.EncodeToString(uploadIDEntropy[:]) - if err := s.db.Create(&dbMultipartUpload{ + multipartUpload := dbMultipartUpload{ DBBucketID: bucketID, Key: key, UploadID: uploadID, ObjectID: path, MimeType: mimeType, - }).Error; err != nil { + } + if err := tx.Create(&multipartUpload).Error; err != nil { return fmt.Errorf("failed to create multipart upload: %w", err) } + + // Create multipart metadata + if err := s.createMultipartMeta(tx, multipartUpload.ID, metadata); err != nil { + return fmt.Errorf("failed to create multipart metadata: %w", err) + } + return nil }) return api.MultipartCreateResponse{ @@ -379,6 +399,14 @@ func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path str } } + // Find multipart metadata. + var mpMetadata []dbMultipartMetadata + if err := tx.Model(&dbMultipartMetadata{}). + Where("db_multipart_upload_id = ?", mu.ID). + Find(&mpMetadata).Error; err != nil { + return fmt.Errorf("failed to fetch multipart metadata: %w", err) + } + // Compute ETag. sum := h.Sum() eTag = hex.EncodeToString(sum[:]) @@ -412,6 +440,17 @@ func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path str } } + // Convert metadata to object metadata + if len(mpMetadata) > 0 { + metadata := make(api.ObjectUserMetadata) + for _, meta := range mpMetadata { + metadata[meta.Key] = meta.Value + } + if err := s.createObjectMeta(tx, obj.ID, metadata); err != nil { + return fmt.Errorf("failed to create object metadata: %w", err) + } + } + // Delete the multipart upload. if err := tx.Delete(&mu).Error; err != nil { return fmt.Errorf("failed to delete multipart upload: %w", err) diff --git a/stores/multipart_test.go b/stores/multipart_test.go index 775c674e5..67cc366cf 100644 --- a/stores/multipart_test.go +++ b/stores/multipart_test.go @@ -46,7 +46,7 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) { totalSize := int64(nParts * partSize) // Upload parts until we have enough data for 2 buffers. - resp, err := ss.CreateMultipartUpload(ctx, api.DefaultBucketName, objName, object.NoOpKey, testMimeType) + resp, err := ss.CreateMultipartUpload(ctx, api.DefaultBucketName, objName, object.NoOpKey, testMimeType, testMetadata) if err != nil { t.Fatal(err) } diff --git a/stores/sql_test.go b/stores/sql_test.go index 56b96b12f..62d34a508 100644 --- a/stores/sql_test.go +++ b/stores/sql_test.go @@ -13,6 +13,8 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/alerts" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/object" "go.sia.tech/siad/modules" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -28,6 +30,13 @@ const ( testETag = "d34db33f" ) +var ( + testMetadata = api.ObjectUserMetadata{ + "foo": "bar", + "baz": "qux", + } +) + type testSQLStore struct { t *testing.T *SQLStore @@ -99,6 +108,26 @@ func newTestSQLStore(t *testing.T, cfg testSQLStoreConfig) *testSQLStore { } } +// newTestLogger creates a console logger used for testing. +func newTestLogger() logger.Interface { + config := zap.NewProductionEncoderConfig() + config.EncodeTime = zapcore.RFC3339TimeEncoder + config.EncodeLevel = zapcore.CapitalColorLevelEncoder + config.StacktraceKey = "" + consoleEncoder := zapcore.NewConsoleEncoder(config) + + l := zap.New( + zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), zapcore.DebugLevel), + zap.AddCaller(), + zap.AddStacktrace(zapcore.ErrorLevel), + ) + return NewSQLLogger(l, LoggerConfig{ + IgnoreRecordNotFoundError: false, + LogLevel: logger.Warn, + SlowThreshold: 100 * time.Millisecond, + }) +} + func (s *testSQLStore) Close() error { if err := s.SQLStore.Close(); err != nil { s.t.Error(err) @@ -117,24 +146,61 @@ func (s *testSQLStore) Reopen() *testSQLStore { return newTestSQLStore(s.t, cfg) } -// newTestLogger creates a console logger used for testing. -func newTestLogger() logger.Interface { - config := zap.NewProductionEncoderConfig() - config.EncodeTime = zapcore.RFC3339TimeEncoder - config.EncodeLevel = zapcore.CapitalColorLevelEncoder - config.StacktraceKey = "" - consoleEncoder := zapcore.NewConsoleEncoder(config) +// TODO PJ: update metadata_test to use this helper to avoid changing all tests if we pass more metadata +func (s *testSQLStore) addTestObject(path string, o object.Object) (api.Object, error) { + if err := s.UpdateObject(context.Background(), api.DefaultBucketName, path, testContractSet, testETag, testMimeType, testMetadata, o); err != nil { + return api.Object{}, err + } else if obj, err := s.Object(context.Background(), api.DefaultBucketName, path); err != nil { + return api.Object{}, err + } else { + return obj, nil + } +} - l := zap.New( - zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), zapcore.DebugLevel), - zap.AddCaller(), - zap.AddStacktrace(zapcore.ErrorLevel), - ) - return NewSQLLogger(l, LoggerConfig{ - IgnoreRecordNotFoundError: false, - LogLevel: logger.Warn, - SlowThreshold: 100 * time.Millisecond, - }) +func (s *SQLStore) addTestContracts(keys []types.PublicKey) (fcids []types.FileContractID, contracts []api.ContractMetadata, err error) { + cnt, err := s.contractsCount() + if err != nil { + return nil, nil, err + } + for i, key := range keys { + fcids = append(fcids, types.FileContractID{byte(int(cnt) + i + 1)}) + contract, err := s.addTestContract(fcids[len(fcids)-1], key) + if err != nil { + return nil, nil, err + } + contracts = append(contracts, contract) + } + return +} + +func (s *SQLStore) addTestContract(fcid types.FileContractID, hk types.PublicKey) (api.ContractMetadata, error) { + rev := testContractRevision(fcid, hk) + return s.AddContract(context.Background(), rev, types.ZeroCurrency, types.ZeroCurrency, 0, api.ContractStatePending) +} + +func (s *SQLStore) addTestRenewedContract(fcid, renewedFrom types.FileContractID, hk types.PublicKey, startHeight uint64) (api.ContractMetadata, error) { + rev := testContractRevision(fcid, hk) + return s.AddRenewedContract(context.Background(), rev, types.ZeroCurrency, types.ZeroCurrency, startHeight, renewedFrom, api.ContractStatePending) +} + +func (s *SQLStore) contractsCount() (cnt int64, err error) { + err = s.db. + Model(&dbContract{}). + Count(&cnt). + Error + return +} + +func (s *SQLStore) overrideSlabHealth(objectID string, health float64) (err error) { + err = s.db.Exec(fmt.Sprintf(` + UPDATE slabs SET health = %v WHERE id IN ( + SELECT sla.id + FROM objects o + INNER JOIN slices sli ON o.id = sli.db_object_id + INNER JOIN slabs sla ON sli.db_slab_id = sla.id + WHERE o.object_id = "%s" + )`, health, objectID)).Error + return } // TestConsensusReset is a unit test for ResetConsensusSubscription. diff --git a/worker/client/client.go b/worker/client/client.go index af435c86e..46ebb4070 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -115,18 +115,22 @@ func (c *Client) GetObject(ctx context.Context, bucket, path string, opts api.Do // range. size = dr.Size } - // Parse Last-Modified - modTime, err := time.Parse(http.TimeFormat, header.Get("Last-Modified")) - if err != nil { - return nil, err + + // Parse headers. + headers := make(map[string]string) + for k, v := range header { + if len(v) > 0 { + headers[k] = v[0] + } } return &api.GetObjectResponse{ - Content: body, - ContentType: header.Get("Content-Type"), - ModTime: api.TimeRFC3339(modTime.UTC()), - Range: r, - Size: size, + Content: body, + ContentType: header.Get("Content-Type"), + LastModified: header.Get("Last-Modified"), + Range: r, + Size: size, + Metadata: api.ObjectUserMetadataFrom(headers), }, nil } @@ -219,7 +223,7 @@ func (c *Client) UploadObject(ctx context.Context, r io.Reader, bucket, path str values := make(url.Values) values.Set("bucket", bucket) - opts.Apply(values) + opts.ApplyValues(values) u, err := url.Parse(fmt.Sprintf("%v/objects/%v", c.c.BaseURL, path)) if err != nil { panic(err) @@ -230,6 +234,7 @@ func (c *Client) UploadObject(ctx context.Context, r io.Reader, bucket, path str panic(err) } req.SetBasicAuth("", c.c.WithContext(ctx).Password) + opts.ApplyHeaders(req.Header) if opts.ContentLength != 0 { req.ContentLength = opts.ContentLength } else if req.ContentLength, err = sizeFromSeeker(r); err != nil { diff --git a/worker/serve.go b/worker/serve.go index a7fec1e21..578c5bd05 100644 --- a/worker/serve.go +++ b/worker/serve.go @@ -92,6 +92,11 @@ func serveContent(rw http.ResponseWriter, req *http.Request, obj api.Object, dow rw.Header().Set("ETag", api.FormatETag(obj.ETag)) rw.Header().Set("Content-Type", contentType) + // set the user metadata headers + for k, v := range obj.Metadata { + rw.Header().Set(fmt.Sprintf("%s%s", api.ObjectMetaPrefix, k), v) + } + http.ServeContent(rw, req, obj.Name, obj.ModTime.Std(), rs) return http.StatusOK, nil } diff --git a/worker/upload.go b/worker/upload.go index 99ca7efc2..dbb93d5e5 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -609,7 +609,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a } } else { // persist the object - err = mgr.os.AddObject(ctx, up.bucket, up.path, up.contractSet, o, api.AddObjectOptions{MimeType: up.mimeType, ETag: eTag}) + err = mgr.os.AddObject(ctx, up.bucket, up.path, up.contractSet, o, api.AddObjectOptions{MimeType: up.mimeType, ETag: eTag, Metadata: up.metadata}) if err != nil { return bufferSizeLimitReached, "", fmt.Errorf("couldn't add object: %w", err) } diff --git a/worker/upload_params.go b/worker/upload_params.go index c4fae96a0..030071563 100644 --- a/worker/upload_params.go +++ b/worker/upload_params.go @@ -22,6 +22,8 @@ type uploadParameters struct { contractSet string packing bool mimeType string + + metadata api.ObjectUserMetadata } func defaultParameters(bucket, path string) uploadParameters { @@ -95,3 +97,9 @@ func WithRedundancySettings(rs api.RedundancySettings) UploadOption { up.rs = rs } } + +func WithUserMetadata(metadata api.ObjectUserMetadata) UploadOption { + return func(up *uploadParameters) { + up.metadata = metadata + } +} diff --git a/worker/worker.go b/worker/worker.go index efc1e9e60..faedb34d7 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1002,6 +1002,14 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { return } + // parse headers and extract object meta + metadata := make(api.ObjectUserMetadata) + for k, v := range jc.Request.Header { + if strings.HasPrefix(strings.ToLower(k), strings.ToLower(api.ObjectMetaPrefix)) && len(v) > 0 { + metadata[k[len(api.ObjectMetaPrefix):]] = v[0] + } + } + // build options opts := []UploadOption{ WithBlockHeight(up.CurrentHeight), @@ -1009,6 +1017,7 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { WithMimeType(mimeType), WithPacking(up.UploadPacking), WithRedundancySettings(up.RedundancySettings), + WithUserMetadata(metadata), } // attach gouging checker to the context