Skip to content

Commit

Permalink
Join op (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
edoshor committed Apr 24, 2019
1 parent fd55263 commit 18feff1
Show file tree
Hide file tree
Showing 11 changed files with 370 additions and 3 deletions.
1 change: 1 addition & 0 deletions api/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
OP_SIRTUTIM = "sirtutim"
OP_INSERT = "insert"
OP_TRANSCODE = "transcode"
OP_JOIN = "join"

// Source Types
SRC_COLLECTION = "COLLECTION"
Expand Down
43 changes: 43 additions & 0 deletions api/docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,49 @@ func (suite *DocsSuite) Test911TranscodeHandlerError() {
suite.assertJsonOK(resp)
}

func (suite *DocsSuite) Test92JoinHandler() {
input := JoinRequest{
Operation: Operation{
Station: "Join station",
User: "operator@dev.com",
WorkflowID: "d12356789",
},
OriginalShas: []string{"0987654321fedcba0987654321fedcba09876543"},
ProxyShas: []string{"0987654321fedcba0987654321fedcba87654321"},
Original: AVFile{
File: File{
FileName: "heb_o_rav_rb-1990-02-kishalon_2016-09-14_lesson_o_trim.mp4",
Sha1: "0987654321fedcba0987654321fedcba11111113",
Size: 19800,
CreatedAt: &Timestamp{Time: time.Now()},
Type: "type",
SubType: "subtype",
MimeType: "mime_type",
Language: LANG_MULTI,
},
Duration: 871,
},
Proxy: AVFile{
File: File{
FileName: "heb_o_rav_rb-1990-02-kishalon_2016-09-14_lesson_p_trim.mp4",
Sha1: "0987654321fedcba0987654321fedcba22222223",
Size: 694,
CreatedAt: &Timestamp{Time: time.Now()},
Type: "type",
SubType: "subtype",
MimeType: "mime_type",
Language: LANG_HEBREW,
},
Duration: 871,
},
}

resp, err := suite.testOperation(OP_JOIN, input)
suite.Require().Nil(err)
suite.assertJsonOK(resp)
}


func (suite *DocsSuite) testOperation(name string, input interface{}) (*http.Response, error) {
b := new(bytes.Buffer)
err := json.NewEncoder(b).Encode(input)
Expand Down
71 changes: 70 additions & 1 deletion api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ func TranscodeHandler(c *gin.Context) {
}
}

// Join multiple files sequentially
func JoinHandler(c *gin.Context) {
log.Info(OP_JOIN)
var i JoinRequest
if c.BindJSON(&i) == nil {
handleOperation(c, i, handleJoin)
}
}

// This endpoint is used in the trim admin workflow
// We need this for the "fix" / "update" content unit flow.
// When a trim from capture is made to fix some unit on capture files
Expand Down Expand Up @@ -303,7 +312,11 @@ func handleCaptureStop(exec boil.Executor, input interface{}) (*models.Operation
}

log.Info("Creating file")
file, err := CreateFile(exec, parent, r.File, nil)
fProps := make(map[string]interface{})
if r.LabelID != "" {
fProps["label_id"] = r.LabelID
}
file, err := CreateFile(exec, parent, r.File, fProps)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -1051,6 +1064,62 @@ func handleTranscode(exec boil.Executor, input interface{}) (*models.Operation,
return operation, nil, operation.AddFiles(exec, false, opFiles...)
}

func handleJoin(exec boil.Executor, input interface{}) (*models.Operation, []events.Event, error) {
r := input.(JoinRequest)

// Fetch input files
inOriginals := make([]*models.File, 0)
for i := range r.OriginalShas {
f, _, err := FindFileBySHA1(exec, r.OriginalShas[i])
if err != nil {
return nil, nil, errors.Wrapf(err, "Original number %d, sha1 %s", i+1, r.OriginalShas[i])
}
inOriginals = append(inOriginals, f)
}

inProxies := make([]*models.File, 0)
for i := range r.ProxyShas {
f, _, err := FindFileBySHA1(exec, r.ProxyShas[i])
if err != nil {
return nil, nil, errors.Wrapf(err, "Proxy number %d, sha1 %s", i+1, r.ProxyShas[i])
}
inProxies = append(inProxies, f)
}

log.Info("Creating operation")
props := map[string]interface{}{
"original_shas": r.OriginalShas,
"proxy_shas": r.ProxyShas,
}
operation, err := CreateOperation(exec, OP_JOIN, r.Operation, props)
if err != nil {
return nil, nil, err
}

log.Info("Creating joined original")
props = map[string]interface{}{
"duration": r.Original.Duration,
}
original, err := CreateFile(exec, nil, r.Original.File, props)
if err != nil {
return nil, nil, err
}

log.Info("Creating joined proxy")
props = map[string]interface{}{
"duration": r.Proxy.Duration,
}
proxy, err := CreateFile(exec, nil, r.Proxy.File, props)
if err != nil {
return nil, nil, err
}

log.Info("Associating files to operation")
allFiles := append(inOriginals, inProxies...)
allFiles = append(allFiles, original, proxy)
return operation, nil, operation.AddFiles(exec, false, allFiles...)
}

// Helpers

type OperationHandler func(boil.Executor, interface{}) (*models.Operation, []events.Event, error)
Expand Down
114 changes: 113 additions & 1 deletion api/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (suite *HandlersSuite) TestHandleCaptureStop() {
CaptureSource: "mltcap",
CollectionUID: "abcdefgh",
Part: "part",
LabelID: "label",
}

op, evnts, err := handleCaptureStop(suite.tx, input)
Expand All @@ -139,6 +140,7 @@ func (suite *HandlersSuite) TestHandleCaptureStop() {
suite.Equal(input.CollectionUID, props["collection_uid"], "properties: collection_uid")
suite.Equal(input.Part, props["part"], "properties: part")


// Check user
suite.Require().Nil(op.L.LoadUser(suite.tx, true, op))
suite.Equal(input.Operation.User, op.R.User.Email, "Operation User")
Expand All @@ -152,7 +154,10 @@ func (suite *HandlersSuite) TestHandleCaptureStop() {
suite.Equal(input.Size, f.Size, "File: Size")
suite.Equal(input.CreatedAt.Time.Unix(), f.FileCreatedAt.Time.Unix(), "File: FileCreatedAt")
suite.Equal(parent.ID, f.ParentID.Int64, "File Parent.ID")
suite.False(f.Properties.Valid, "properties")

err = f.Properties.Unmarshal(&props)
suite.Require().Nil(err)
suite.Equal(input.LabelID, props["label_id"], "file properties: label_id")
}

func (suite *HandlersSuite) TestHandleDemux() {
Expand Down Expand Up @@ -1489,3 +1494,110 @@ func (suite *HandlersSuite) TestHandleTranscodeError() {
originalParent := op.R.Files[0]
suite.Equal(original.ID, originalParent.ID, "original <-> operation")
}

func (suite *HandlersSuite) TestHandleJoin() {
// create dummy input files

inOriginals := make([]string, 3)
for i:=0; i<len(inOriginals);i++ {
fi := File{
FileName: fmt.Sprintf("dummy original file %d",i+1),
CreatedAt: &Timestamp{time.Now()},
Sha1: utils.RandomSHA1(),
Size: math.MaxInt64,
}
_, err := CreateFile(suite.tx, nil, fi, nil)
suite.Require().Nil(err)
inOriginals[i] = fi.Sha1
}

inProxies := make([]string, 3)
for i:=0; i<len(inProxies);i++ {
fi := File{
FileName: fmt.Sprintf("dummy proxy file %d",i+1),
CreatedAt: &Timestamp{time.Now()},
Sha1: utils.RandomSHA1(),
Size: math.MaxInt64,
}
_, err := CreateFile(suite.tx, nil, fi, nil)
suite.Require().Nil(err)
inProxies[i] = fi.Sha1
}

// Do join operation
input := JoinRequest{
Operation: Operation{
Station: "Joiner station",
User: "operator@dev.com",
},
OriginalShas: inOriginals,
ProxyShas: inProxies,
Original: AVFile{
File: File{
FileName: "original_join.mp4",
Sha1: "012356789abcdef012356789abcdef1111111111",
Size: 98737,
CreatedAt: &Timestamp{Time: time.Now()},
},
Duration: 892.1900,
},
Proxy: AVFile{
File: File{
FileName: "proxy_join.mp4",
Sha1: "987653210abcdef012356789abcdef2222222222",
Size: 987,
CreatedAt: &Timestamp{Time: time.Now()},
},
Duration: 891.8800,
},
}

op, evnts, err := handleJoin(suite.tx, input)
suite.Require().Nil(err)
suite.Require().Nil(evnts)

// Check op
suite.Equal(OPERATION_TYPE_REGISTRY.ByName[OP_JOIN].ID, op.TypeID, "Operation TypeID")
suite.Equal(input.Operation.Station, op.Station.String, "Operation Station")
var props map[string]interface{}
err = op.Properties.Unmarshal(&props)
suite.Require().Nil(err)
for i, v := range input.OriginalShas {
suite.Equal(v, props["original_shas"].([]interface{})[i], "properties: original_shas[%d]", i)
}
for i, v := range input.ProxyShas {
suite.Equal(v, props["proxy_shas"].([]interface{})[i], "properties: proxy_shas[%d]", i)
}

// Check user
suite.Require().Nil(op.L.LoadUser(suite.tx, true, op))
suite.Equal(input.Operation.User, op.R.User.Email, "Operation User")

// Check associated files
suite.Require().Nil(op.L.LoadFiles(suite.tx, true, op))
suite.Len(op.R.Files, 8, "Number of files")
fm := make(map[string]*models.File)
for _, x := range op.R.Files {
fm[x.Name] = x
}

// Check original
original := fm[input.Original.FileName]
suite.Equal(input.Original.FileName, original.Name, "Original: Name")
suite.Equal(input.Original.Sha1, hex.EncodeToString(original.Sha1.Bytes), "Original: SHA1")
suite.Equal(input.Original.Size, original.Size, "Original: Size")
suite.Equal(input.Original.CreatedAt.Time.Unix(), original.FileCreatedAt.Time.Unix(), "Original: FileCreatedAt")
err = original.Properties.Unmarshal(&props)
suite.Require().Nil(err)
suite.Equal(input.Original.Duration, props["duration"], "Original props: duration")

// Check proxy
proxy := fm[input.Proxy.FileName]
suite.Equal(input.Proxy.FileName, proxy.Name, "Proxy: Name")
suite.Equal(input.Proxy.Sha1, hex.EncodeToString(proxy.Sha1.Bytes), "Proxy: SHA1")
suite.Equal(input.Proxy.Size, proxy.Size, "Proxy: Size")
suite.Equal(input.Proxy.CreatedAt.Time.Unix(), proxy.FileCreatedAt.Time.Unix(), "Proxy: FileCreatedAt")
err = proxy.Properties.Unmarshal(&props)
suite.Require().Nil(err)
suite.Equal(input.Proxy.Duration, props["duration"], "Proxy props: duration")
}
9 changes: 9 additions & 0 deletions api/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type (
CaptureSource string `json:"capture_source"`
CollectionUID string `json:"collection_uid"`
Part string `json:"part"`
LabelID string `json:"label_id"`
}

DemuxRequest struct {
Expand Down Expand Up @@ -165,6 +166,14 @@ type (
Message string `json:"message" binding:"omitempty"`
}

JoinRequest struct {
Operation
OriginalShas []string `json:"original_shas" binding:"required,dive,len=40,hexadecimal"`
ProxyShas []string `json:"proxy_shas" binding:"required,dive,len=40,hexadecimal"`
Original AVFile `json:"original"`
Proxy AVFile `json:"proxy"`
}

// REST

ListRequest struct {
Expand Down
2 changes: 1 addition & 1 deletion api/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var (

ALL_OPERATION_TYPES = []string{
OP_CAPTURE_START, OP_CAPTURE_STOP, OP_DEMUX, OP_TRIM, OP_SEND, OP_CONVERT, OP_UPLOAD, OP_IMPORT_KMEDIA,
OP_SIRTUTIM, OP_INSERT, OP_TRANSCODE,
OP_SIRTUTIM, OP_INSERT, OP_TRANSCODE, OP_JOIN,
}

// Types of various, secondary, content slots in big events like congress, unity day, etc...
Expand Down
1 change: 1 addition & 0 deletions api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func SetupRoutes(router *gin.Engine) {
operations.POST("/sirtutim", SirtutimHandler)
operations.POST("/insert", InsertHandler)
operations.POST("/transcode", TranscodeHandler)
operations.POST("/join", JoinHandler)
operations.GET("/descendant_units/:sha1", DescendantUnitsHandler)

rest := router.Group("rest")
Expand Down
18 changes: 18 additions & 0 deletions cmd/dgima.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package cmd

import (
"github.com/spf13/cobra"

"github.com/Bnei-Baruch/mdb/importer/dgima"
)

func init() {
command := &cobra.Command{
Use: "dgima-import",
Short: "Import capture labels data",
Run: func(cmd *cobra.Command, args []string) {
dgima.Import()
},
}
RootCmd.AddCommand(command)
}
51 changes: 51 additions & 0 deletions importer/dgima/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package dgima

import (
"context"
"database/sql"
"time"

log "github.com/Sirupsen/logrus"
"github.com/spf13/viper"
"github.com/volatiletech/sqlboiler/boil"

"github.com/Bnei-Baruch/mdb/api"
"github.com/Bnei-Baruch/mdb/events"
"github.com/Bnei-Baruch/mdb/utils"
)

var (
mdb *sql.DB
)

func Init() (time.Time, *events.BufferedEmitter) {
var err error
clock := time.Now()

log.SetFormatter(&log.TextFormatter{FullTimestamp: true})
//log.SetLevel(log.WarnLevel)

log.Info("Starting capture labels import")

log.Info("Setting up connection to MDB")
mdb, err = sql.Open("postgres", viper.GetString("mdb.url"))
utils.Must(err)
utils.Must(mdb.Ping())
boil.SetDB(mdb)
//boil.DebugMode = true

log.Info("Initializing static data from MDB")
utils.Must(api.InitTypeRegistries(mdb))

log.Info("Setting events handler")
emitter, err := events.InitEmitter()
utils.Must(err)

return clock, emitter
}

func Shutdown() {
ctx, _ := context.WithTimeout(context.Background(), 30*time.Second)
events.CloseEmitter(ctx)
utils.Must(mdb.Close())
}
Loading

0 comments on commit 18feff1

Please sign in to comment.