Skip to content

Commit

Permalink
feat: implement worker pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
sundowndev committed Dec 9, 2020
1 parent 689b2e0 commit 0b8fed7
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .support/docker/docker-compose.dev.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3.7'

services:
db:
postgres:
image: postgres:latest
restart: on-failure
ports:
Expand Down
32 changes: 30 additions & 2 deletions cmd/gilfoyle/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ func init() {

var workerCmd = &cobra.Command{
Use: "worker",
Short: "Launch background task worker",
Short: "Launch a background task worker node",
Long: "Multiple worker nodes compose a worker pool. We usually recommend to launch a minimum of 2 worker nodes to ensure fail over.",
Example: "gilfoyle worker",
Run: func(cmd *cobra.Command, args []string) {
logger := gilfoyle.Logger
Expand All @@ -34,6 +35,33 @@ var workerCmd = &cobra.Command{
}
defer w.Close()

w.Init()
forever := make(chan bool)

err = w.Init()
if err != nil {
logger.Fatal("Failed to initialize worker queues", zap.Error(err))
}

w.Consume()

ch, err := w.Client.Channel()
if err != nil {
logger.Fatal("Failed to create message queue channel", zap.Error(err))
}
defer ch.Close()

//time.Sleep(2 * time.Second)
//err = ch.Publish("", worker.VideoTranscodingQueue, false, false, amqp.Publishing{
// DeliveryMode: amqp.Persistent,
// ContentType: "text/plain",
// Body: []byte("hello!!!"),
//})
//if err != nil {
// logger.Error("Failed to publish a message", zap.Error(err))
//}

logger.Info("Worker is now ready to handle incoming jobs")

<-forever
},
}
20 changes: 20 additions & 0 deletions worker/consumers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package worker

import (
"github.com/google/uuid"
"github.com/streadway/amqp"
"go.uber.org/zap"
"time"
)

type VideoTranscodingParams struct {
MediaUUID uuid.UUID
}

func videoTranscodingQueueConsumer(w *Worker, msgs <-chan amqp.Delivery) {
for d := range msgs {
w.Logger.Info("Received a message", zap.ByteString("body", d.Body))
time.Sleep(3 * time.Second)
w.Logger.Info("Done")
}
}
114 changes: 104 additions & 10 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,51 @@ import (
)

const (
//videoTranscodingQueue string = "videoTranscoding"
//thumbnailGenerationQueue string = "thumbnailGeneration"
//previewGenerationQueue string = "previewGeneration"
VideoTranscodingQueue string = "VideoTranscoding"
ThumbnailGenerationQueue string = "ThumbnailGeneration"
PreviewGenerationQueue string = "PreviewGeneration"
)

type Queue struct {
Name string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Args amqp.Table
Handler func(*Worker, <-chan amqp.Delivery)
}

var queues = []Queue{
{
Name: VideoTranscodingQueue,
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
Handler: videoTranscodingQueueConsumer,
},
{
Name: ThumbnailGenerationQueue,
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
Handler: func(*Worker, <-chan amqp.Delivery) {},
},
{
Name: PreviewGenerationQueue,
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
Handler: func(*Worker, <-chan amqp.Delivery) {},
},
}

type Options struct {
Host string
Port int16
Expand All @@ -21,8 +61,9 @@ type Options struct {
}

type Worker struct {
client *amqp.Connection
logger *zap.Logger
Queues map[string]amqp.Queue
Logger *zap.Logger
Client *amqp.Connection
}

func New(opts Options) (*Worker, error) {
Expand All @@ -38,15 +79,68 @@ func New(opts Options) (*Worker, error) {
}

return &Worker{
client: conn,
logger: opts.Logger,
Queues: map[string]amqp.Queue{},
Client: conn,
Logger: opts.Logger,
}, nil
}

func (w *Worker) Init() {
w.logger.Info("test")
func (w *Worker) Init() error {
ch, err := w.Client.Channel()
if err != nil {
return err
}

for _, q := range queues {
queue, err := ch.QueueDeclare(
q.Name, // name
q.Durable, // durable
q.AutoDelete, // delete when unused
q.Exclusive, // exclusive
q.NoWait, // no-wait
q.Args, // arguments
)
if err != nil {
return err
}

w.Queues[q.Name] = queue
}

return nil
}

func (w *Worker) Consume() {
ch, err := w.Client.Channel()
if err != nil {
w.Logger.Fatal("Error creating channel", zap.Error(err))
return
}

for _, q := range queues {
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
map[string]interface{}{},
)
if err != nil {
w.Logger.Sugar().Fatalf("Error consuming %s queue: %e", q.Name, err)
return
}

go q.Handler(w, msgs)
}
}

func (w *Worker) Close() error {
return w.client.Close()
err := w.Client.Close()
if err != nil {
return err
}

return nil
}

0 comments on commit 0b8fed7

Please sign in to comment.