Skip to content

Commit

Permalink
Generics (#47)
Browse files Browse the repository at this point in the history
* migration v1: transforming tasks and workers

* Generic Consumer

* Generic workers

* fix s3 uploader test

* generic json task

* Generic compression task

* sqs consumer and deleter generic

* http communicator with new generics arch

* Delete executor

* Add new add input/output ch methods

* Improve example

* Finish example

* Rename T/K to I/O on generics interface

* Fix docs
  • Loading branch information
OtavioHenrique committed Apr 13, 2024
1 parent d817fe3 commit 620453f
Show file tree
Hide file tree
Showing 32 changed files with 546 additions and 948 deletions.
211 changes: 116 additions & 95 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Every worker and task will share the same metadata about the information being p

### Tasks Shipped with Vecna (More coming!)

Currently, three worker types are provided:
Currently, four worker types are provided:

* [Producer](pkg/workers/producer.go): Worker pool who only produces messages to a channel based on `Task` execution response
* [Consumer](pkg/workers/consumer.go): Worker pool who only consume for a channel and execute tasks.
Expand All @@ -44,7 +44,13 @@ Some basic tasks are already provided (and welcome):
* [Json marshal/unmarshal](pkg/task/json/json.go)
* [HTTP Communicator to do HTTP requests](pkg/task/http_communicator/http_communicator.go)

But you're heavily encouraged to code your business logic too.
But you're heavily encouraged to code your business logic too. Just implementing the [task interface](./pkg/task/task.go).

```go
type Task[T any, K any] interface {
Run(context.Context, T, map[string]interface{}, string) (K, error)
}
```

## Monitoring

Expand All @@ -67,136 +73,151 @@ To use just create your workers and tasks as you want. Check examples on [exampl

```go
metric := &metrics.TODO{}
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))

sess, err := session.NewSession(&aws.Config{
Region: aws.String("us-east-1"),
})
sess, err := session.NewSession(&aws.Config{
Region: aws.String("us-east-1"),
})

if err != nil {
panic(err)
}
if err != nil {
panic(err)
}

sqsClient := sqs.New(sess)
sqsClient := awsSqs.New(sess)

sqsConsumer := workers.NewProducerWorker(
"Event Created",
task.NewSQSConsumer(sqsClient, logger, &task.SQSConsumerOpts{
QueueName: "any-queue",
}),
10,
logger,
metric,
500*time.Millisecond,
)
sqsConsumer := workers.NewProducerWorker(
"Event Created",
sqs.NewSQSConsumer(sqsClient, logger, &sqs.SQSConsumerOpts{
QueueName: "any-queue",
}),
10,
logger,
metric,
500*time.Millisecond,
)

breaker := workers.NewEventBreakerWorker(
"break sqs messages",
1,
logger,
metric,
)
inputCh := make(chan *workers.WorkerData[[]*sqs.SQSConsumerOutput], 10)
sqsConsumer.AddOutputCh(inputCh)

s3Client := s3.New(sess)
breaker := workers.NewEventBreakerWorker[[]*sqs.SQSConsumerOutput, *sqs.SQSConsumerOutput](
"break sqs messages",
1,
logger,
metric,
)

s3Downloader := workers.NewBiDirectionalWorker(
"Download Data",
task.NewS3Downloader(
s3Client,
"bucket",
func(i interface{}, _ map[string]interface{}) (*string, error) {
task, _ := i.(*task.SQSConsumerOutput)
breaker.AddInputCh(inputCh)

return task.Content, nil
},
logger,
),
5,
logger,
metric,
)
msgsCh := make(chan *workers.WorkerData[*sqs.SQSConsumerOutput], 10)
breaker.AddOutputCh(msgsCh)

decompressor := workers.NewBiDirectionalWorker(
"Decompress Data",
task.NewDecompressor(
"gzip",
func(i interface{}, _ map[string]interface{}) ([]byte, error) {
task, _ := i.(*task.S3DownloaderOutput)
pathExtractor := workers.NewBiDirectionalWorker[*sqs.SQSConsumerOutput, string](
"Path Extractor",
&PathExtractor[*sqs.SQSConsumerOutput, string]{},
1,
logger,
metric,
)
pathCh := make(chan *workers.WorkerData[string], 10)
pathExtractor.AddInputCh(msgsCh)
pathExtractor.AddOutputCh(pathCh)

return task.Data, nil
},
logger,
),
5,
logger,
metric,
)
s3Client := awsS3.New(sess)

businessLogic := workers.NewConsumerWorker(
"Process Data",
&Printer{},
5,
logger,
metric,
)
s3Downloader := workers.NewBiDirectionalWorker(
"Download Data",
s3.NewS3Downloader(
s3Client,
"bucket",
logger,
),
5,
logger,
metric,
)
s3Downloader.AddInputCh(pathCh)
s3DownOutputCh := make(chan *workers.WorkerData[*s3.S3DownloaderOutput], 5)
s3Downloader.AddOutputCh(s3DownOutputCh)

// Other way of create intermediate workers without burocracy
rawContentCh := make(chan *workers.WorkerData[[]byte], 10)
go func() {
for {
out := <-s3DownOutputCh

rawContentCh <- &workers.WorkerData[[]byte]{Data: out.Data.Data, Metadata: out.Metadata}
}
}()

decompressor := workers.NewBiDirectionalWorker(
"Decompress Data",
compression.NewDecompressor(
"gzip",
logger,
),
5,
logger,
metric,
)
decompressor.AddInputCh(rawContentCh)
decompressedCh := make(chan *workers.WorkerData[[]byte], 5)
decompressor.AddOutputCh(decompressedCh)

executor := executor.NewExecutor(
[]executor.ExecutorInput{
{Worker: sqsConsumer, QueueSize: 10},
{Worker: breaker, QueueSize: 10},
{Worker: s3Downloader, QueueSize: 10},
{Worker: decompressor, QueueSize: 10},
{Worker: businessLogic, QueueSize: 10},
},
logger,
)
businessLogic := workers.NewConsumerWorker(
"Process Data",
&Printer[[]byte, []byte]{},
5,
logger,
metric,
)
businessLogic.AddInputCh(decompressedCh)

executor.StartWorkers(context.TODO())
```
ctx := context.TODO()

In this example, a simple logic is being made, consume message from SQS, Download object from S3 (Based on SQS Consumer output), and process it with some busines logic (custom Task).
sqsConsumer.Start(ctx)
breaker.Start(ctx)
pathExtractor.Start(ctx)
s3Downloader.Start(ctx)
decompressor.Start(ctx)
businessLogic.Start(ctx)
```

The use of Executor is needed to Start workers, and later Stop() if wanted.
In this example, a simple logic is being made, consume message from SQS, Download object from S3 (Based on SQS Consumer output), and process it with some busines logic (custom Task).

## Creating your own tasks

To create your own task is simple, just follow the [Task interface](pkg/task/task.go), and a simple `Run()`` method is needed.

```
Run(context.Context, interface{}, map[string]interface{}, string) (interface{}, error)
Run(context.Context, T, map[string]interface{}, string) (K, error)
```

## Extend Existent Code

All workers after be initialized by `Executor` will return its input channel on method `InputCh()` you're able to put any message on it that your worker will read, this allows you to migrate or put a vector pipeline inside your application.
All workers will consume its input channel (except producer worker which produces on it), you're able to put any message on it that your worker will read, this allows you to migrate or put a vector pipeline inside your application.

```
type MyInput struct {
Path string
}
s3Downloader := workers.NewBiDirectionalWorker(
"Download Data",
s3.NewS3Downloader(
s3Client,
"bucket",
func(i interface{}, _ map[string]interface{}) (*string, error) {
task, _ := i.(MyInput)
return task.Path, nil
},
logger,
),
5,
logger,
metric,
)
"Download Data",
s3.NewS3Downloader(
s3Client,
"bucket",
logger,
),
5,
logger,
metric,
)
// Initialize other workers and call Executor
inputCh := s3Downloader.InputCh()
inputCh <- workers.WorkerData{Data: MyInput{Path: "path/to/s3", Metadata: map[string]interface{}{}}}
inputCh <- workers.WorkerData[MyInput]{Data: MyInput{Path: "path/to/s3"}, Metadata: map[string]interface{}{}}
```

### Development
Expand Down
Loading

0 comments on commit 620453f

Please sign in to comment.