-
Notifications
You must be signed in to change notification settings - Fork 0
/
inbox.go
88 lines (74 loc) · 1.85 KB
/
inbox.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//
// Copyright (C) 2023 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the MIT license. See the LICENSE file for details.
// https://github.com/fogfish/medium
//
package inbox
import (
"context"
"log/slog"
"os"
"time"
_ "github.com/fogfish/logger/v3"
"github.com/fogfish/medium"
"github.com/fogfish/medium/internal/codec"
"github.com/fogfish/stream/service/s3"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/events3"
)
func Runner() {
q, err := events3.New(
os.Getenv("CONFIG_STORE_INBOX"),
swarm.WithLogStdErr(),
swarm.WithTimeToFlight(60*time.Second),
swarm.WithConfigFromEnv(),
)
if err != nil {
slog.Error("Failed to init events3 broker")
panic(err)
}
inbox, err := s3.New[*medium.Media](
s3.WithBucket(os.Getenv("CONFIG_STORE_INBOX")),
)
if err != nil {
slog.Error("Failed to init inbox s3 client")
panic(err)
}
media, err := s3.New[*medium.Media](
s3.WithBucket(os.Getenv("CONFIG_STORE_MEDIA")),
)
if err != nil {
slog.Error("Failed to init media s3 client")
panic(err)
}
profile, err := medium.NewProfile(os.Getenv("CONFIG_CODEC_PROFILE"))
if err != nil {
slog.Error("Invalid profile", "profile", os.Getenv("CONFIG_CODEC_PROFILE"))
panic(err)
}
bus := bus{
codec: codec.NewCodec(profile, inbox, media),
}
go bus.onEventS3(events3.Dequeue(q))
q.Await()
}
type bus struct {
codec interface {
Process(context.Context, *events3.Event) error
}
}
func (bus *bus) onEventS3(rcv <-chan *events3.Event, ack chan<- *events3.Event) {
for evt := range rcv {
evt.Digest.Error = bus.codec.Process(context.Background(), evt)
if evt.Digest.Error != nil {
slog.Error("failed to process s3 event",
slog.String("bucket", evt.Object.S3.Bucket.Name),
slog.String("key", evt.Object.S3.Object.Key),
"error", evt.Digest.Error,
)
}
ack <- evt
}
}