Skip to content

Commit

Permalink
Adding option "DeleteAssetsFromObjStore" to the encoding request, all…
Browse files Browse the repository at this point in the history
…owing to delete used assets after a successful encoding
  • Loading branch information
SoTrx committed Aug 25, 2022
1 parent 5cf8f1f commit 8a99004
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 10 deletions.
7 changes: 5 additions & 2 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ A valid job format for the **/encode** endpoint is the following:
// Storage backend retrieval keys for the image track
"imageKey":string,
// All available options for encoding
"options":string,

"options":{
// Wether to delete used assets (videoKey, audioKeys and ImageKey)
// from the remote object storage. Default is false
"deleteAssetsFromObjStore": boolean
},
}
```

Expand Down
43 changes: 42 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
)

var (
Expand Down Expand Up @@ -112,14 +113,24 @@ func encodeSync[T object_storage.BindingProxy](w http.ResponseWriter, req *http.
return
}

// And clean up temp files. Downloaded assets are already cleaned up by the encode-box itself
// And clean up temp files on the container filesystem
// Downloaded assets are already cleaned up by the encode-box itself
log.Infof(`Removing working directory "%s" from the local filesystem`, workDir)
err = os.RemoveAll(workDir)
if err != nil {
log.Warnf(`Could not remove directiory "%s" : %s`, workDir, err.Error())
}
log.Infof(`Processing of request with id "%s" complete !`, encodeRequest.RecordId)

// Optionally, we can also clean up the used assets from the remote object storage

if encodeRequest.Options.DeleteAssetsFromObjStore {
log.Infof("Removing used assets from remote object storage")
err = cleanUpFromObjectStore(encodeRequest, comp.objStore)
if err != nil {
log.Warnf(err.Error())
}
}
// Finally, ACK the message
_, _ = w.Write([]byte("OK"))
}
Expand Down Expand Up @@ -162,6 +173,36 @@ func parseBody(from io.ReadCloser) (*encode_box.EncodingRequest, error) {
return &eReq, nil
}

func cleanUpFromObjectStore[T object_storage.BindingProxy](eReq *encode_box.EncodingRequest, objStore *object_storage.ObjectStorage[T]) error {
var failures []string
// Video
if eReq.VideoKey != "" {
err := objStore.Delete(eReq.VideoKey)
if err != nil {
failures = append(failures, eReq.VideoKey)
}
}
// Audio(s)
for _, key := range eReq.AudiosKeys {
err := objStore.Delete(key)
if err != nil {
failures = append(failures, key)
}
}
// Image
if eReq.ImageKey != "" {
err := objStore.Delete(eReq.ImageKey)
if err != nil {
failures = append(failures, eReq.ImageKey)
}
}

if len(failures) > 0 {
return fmt.Errorf(`failed to delete "%s" from remote object storage`, strings.Join(failures, ", "))
}
return nil
}

// Format a proper encoding request from a stream
func makeEncodingRequest(from io.ReadCloser) (*encode_box.EncodingRequest, error) {
if from == nil {
Expand Down
9 changes: 4 additions & 5 deletions cmd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"context"
encode_box "encode-box/pkg/encode-box"
object_storage "encode-box/pkg/object-storage"
"encoding/base64"
"fmt"
"github.com/dapr/go-sdk/client"
Expand All @@ -16,7 +17,7 @@ import (
)

const (
ResDir = "../resources/test"
ObjStoreComponent = "object-store"
)

// These are integration test, using all real components
Expand Down Expand Up @@ -49,10 +50,8 @@ func SetupInt(t *testing.T) (string, *encode_box.EncodeBox[client.Client]) {
if err != nil {
t.Fatal(err)
}
eBox, err := makeEncodeBox(&ctx)
if err != nil {
t.Fatal(err)
}
objStore, err := object_storage.NewDaprObjectStorage(&ctx, &daprClient, ObjStoreComponent)
eBox := encode_box.NewEncodeBox(&ctx, objStore)
return dir, eBox
}

Expand Down
124 changes: 122 additions & 2 deletions cmd/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,51 @@ func TestMain_Encode(t *testing.T) {
assert.Contains(t, err.Error(), "Invalid data")
}

func TestMain_cleanUpFromObjectStoreOk(t *testing.T) {
ctx := context.Background()
dir, err := os.MkdirTemp("", "assets")
if err != nil {
t.Fatal(err)
}
ctrl := gomock.NewController(t)
proxy := mock_object_storage.NewMockBindingProxy(ctrl)
proxy.EXPECT().InvokeBinding(gomock.Any(), gomock.Any()).Return(&client.BindingEvent{Data: []byte("a")}, nil)
proxy.EXPECT().InvokeBinding(gomock.Any(), gomock.Any()).Return(&client.BindingEvent{Data: []byte("a")}, nil)
proxy.EXPECT().InvokeBinding(gomock.Any(), gomock.Any()).Return(&client.BindingEvent{Data: []byte("a")}, nil)
objectStore := object_storage.NewObjectStorage[*mock_object_storage.MockBindingProxy](&ctx, dir, proxy)
eReq := encode_box.EncodingRequest{
VideoKey: "",
AudiosKeys: []string{"a", "b"},
ImageKey: "a",
Options: encode_box.EncodingOptions{},
}
err = cleanUpFromObjectStore[*mock_object_storage.MockBindingProxy](&eReq, objectStore)
assert.Nil(t, err)
}

func TestMain_cleanUpFromObjectStoreError(t *testing.T) {
ctx := context.Background()
dir, err := os.MkdirTemp("", "assets")
if err != nil {
t.Fatal(err)
}
ctrl := gomock.NewController(t)
proxy := mock_object_storage.NewMockBindingProxy(ctrl)
proxy.EXPECT().InvokeBinding(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("test"))
proxy.EXPECT().InvokeBinding(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("test"))
proxy.EXPECT().InvokeBinding(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("test"))
objectStore := object_storage.NewObjectStorage[*mock_object_storage.MockBindingProxy](&ctx, dir, proxy)
eReq := encode_box.EncodingRequest{
VideoKey: "",
AudiosKeys: []string{"a", "b"},
ImageKey: "a",
Options: encode_box.EncodingOptions{},
}
err = cleanUpFromObjectStore[*mock_object_storage.MockBindingProxy](&eReq, objectStore)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "a, b, a")
}

func TestMain_Healthz(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "/healthz", nil)
w := httptest.NewRecorder()
Expand Down Expand Up @@ -335,6 +380,81 @@ func TestMain_NewEncodeRequest_Ok(t *testing.T) {
assert.Equal(t, http.StatusOK, w.Code)
}

func TestMain_NewEncodeRequest_Ok_WithCleanup(t *testing.T) {
const (
vidKey = "v.mp4"
a1Key = "a.m4a"
a2Key = "b.m4a"
)
eReq := encode_box.EncodingRequest{
RecordId: "1",
VideoKey: vidKey,
AudiosKeys: []string{a1Key, a2Key},
ImageKey: "",
Options: encode_box.EncodingOptions{DeleteAssetsFromObjStore: true},
}
req, w, err := getMockedEncodingRequest(eReq)
if err != nil {
t.Fatal(err)
}
// Redirect calls to the backend storage to valid assets for each required assets in the request
ctx := context.Background()
ctrl := gomock.NewController(t)
proxy := mock_object_storage.NewMockBindingProxy(ctrl)

// VidKey -> Sample video
videoContent, err := ioutil.ReadFile(filepath.Join(ResDir, "video.mp4"))
if err != nil {
t.Fatal(err)
}
proxy.
EXPECT().
InvokeBinding(gomock.Any(), NewBidingMatcher(vidKey, "get")).
Return(&client.BindingEvent{Data: []byte(base64.StdEncoding.EncodeToString(videoContent))}, nil)

// a1Key -> Sample audio
audio1Content, err := ioutil.ReadFile(filepath.Join(ResDir, "audio.m4a"))
if err != nil {
t.Fatal(err)
}
proxy.
EXPECT().
InvokeBinding(gomock.Any(), NewBidingMatcher("a.m4a", "get")).
Return(&client.BindingEvent{Data: []byte(base64.StdEncoding.EncodeToString(audio1Content))}, nil)

// a2Key -> Sample audio
audio2Content, err := ioutil.ReadFile(filepath.Join(ResDir, "audio.m4a"))
if err != nil {
t.Fatal(err)
}
proxy.EXPECT().
InvokeBinding(gomock.Any(), NewBidingMatcher("b.m4a", "get")).
Return(&client.BindingEvent{Data: []byte(base64.StdEncoding.EncodeToString(audio2Content))}, nil)

// Finally, mock a Ok reponse when the server will try to upload on the remote storage
proxy.EXPECT().
InvokeBinding(gomock.Any(), NewBidingMatcher(fmt.Sprintf("%s.mp4", eReq.RecordId), "create")).
Return(&client.BindingEvent{}, nil)

// And except any number of deletion request
proxy.EXPECT().
InvokeBinding(gomock.Any(), NewBidingMatcher("*", "delete")).
Return(&client.BindingEvent{}, nil).
AnyTimes()

dir, err := os.MkdirTemp("", "test")
if err != nil {
t.Fatal(err)
}
objStore := object_storage.NewObjectStorage[*mock_object_storage.MockBindingProxy](&ctx, dir, proxy)
eBox := encode_box.NewEncodeBox(&ctx, objStore)
encodeSync(w, req, components[*mock_object_storage.MockBindingProxy]{
eBox: eBox,
objStore: objStore,
})
assert.Equal(t, http.StatusOK, w.Code)
}

func TestMain_NewEncodeRequest_EncodingError(t *testing.T) {
const (
vidKey = "v.mp4"
Expand Down Expand Up @@ -441,7 +561,6 @@ func TestMain_NewEncodeRequest_UploadError(t *testing.T) {
proxy.EXPECT().
InvokeBinding(gomock.Any(), NewBidingMatcher(fmt.Sprintf("%s.mp4", eReq.RecordId), "create")).
Return(&client.BindingEvent{}, fmt.Errorf("test"))

dir, err := os.MkdirTemp("", "test")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -494,7 +613,8 @@ func (m *bindingMatcher) Matches(x interface{}) bool {
if req.Operation != m.operation {
return false
}
if req.Metadata["key"] != m.name {
// If the name is a wildcard, accept anything
if m.name != "*" && req.Metadata["key"] != m.name {
return false
}
return true
Expand Down
3 changes: 3 additions & 0 deletions pkg/encode-box/encode-box.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,5 +171,8 @@ type EncodingRequest struct {
Options EncodingOptions `json:"options"`
}

// EncodingOptions All valid encoding options
type EncodingOptions struct {
// Clean up used video/audio/images assets if the encoding succeeded
DeleteAssetsFromObjStore bool `json:"deleteAssetsFromObjStore"`
}

0 comments on commit 8a99004

Please sign in to comment.