Skip to content

Commit

Permalink
refactor(worker): media callback consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
sundowndev committed Jan 17, 2021
1 parent 9746981 commit dbdaa50
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
6 changes: 5 additions & 1 deletion api/media_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ func (s *Server) uploadVideoFile(ctx *gin.Context) {
format := strings.Split(data.Format.FormatName, ",")[0]
fps := int(transcoding.ParseFrameRates(data.Streams[0].RFrameRate))

RenditionsCount := 0

for _, r := range s.config.Settings.Encoding.Renditions {
// Ignore resolutions higher than original
if r.Width > data.FirstVideoStream().Width && r.Height > data.FirstVideoStream().Height {
Expand Down Expand Up @@ -195,11 +197,13 @@ func (s *Server) uploadVideoFile(ctx *gin.Context) {
util.NewError(ctx, http.StatusInternalServerError, err)
return
}

RenditionsCount++
}

err = worker.MediaProcessingCallbackProducer(ch, worker.MediaProcessingCallbackParams{
MediaUUID: m.ID,
MediaFilesCount: len(s.config.Settings.Encoding.Renditions),
MediaFilesCount: RenditionsCount,
})
if err != nil {
util.NewError(ctx, http.StatusInternalServerError, err)
Expand Down
13 changes: 6 additions & 7 deletions worker/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path"
"path/filepath"
"strings"
"time"
)

func setMediaStatusNack(w *Worker, d amqp.Delivery, uuid uuid.UUID, status media.Status) error {
Expand Down Expand Up @@ -42,7 +43,7 @@ func videoTranscodingConsumer(w *Worker, msgs <-chan amqp.Delivery) {
return
}

w.logger.Info("Received a message", zap.String("MediaUUID", body.MediaUUID.String()))
w.logger.Info("Received video transcoding message", zap.String("MediaUUID", body.MediaUUID.String()))

m, err := w.dbClient.Media.
Query().
Expand Down Expand Up @@ -189,20 +190,18 @@ func mediaProcessingCallbackConsumer(w *Worker, msgs <-chan amqp.Delivery) {
return
}

w.logger.Info("Received callback message", zap.String("MediaUUID", body.MediaUUID.String()), zap.Int("MediaFilesCount", body.MediaFilesCount))

m, err := w.dbClient.Media.Query().Where(media.ID(body.MediaUUID)).WithMediaFiles().Only(ctx)
if err != nil {
w.logger.Error("Database error", zap.Error(err))
_ = d.Nack(false, false)
return
}

if m.Status != media.StatusProcessing {
_ = d.Nack(false, m.Status != media.StatusErrored)
return
}

if len(m.Edges.MediaFiles) != body.MediaFilesCount {
_ = setMediaStatusNack(w, d, m.ID, media.StatusErrored)
time.Sleep(2 * time.Second)
_ = d.Nack(false, m.Status == media.StatusProcessing)
return
}

Expand Down

0 comments on commit dbdaa50

Please sign in to comment.