-
Notifications
You must be signed in to change notification settings - Fork 158
/
ingestor.go
144 lines (112 loc) · 3.55 KB
/
ingestor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package ingestor
import (
"context"
"fmt"
"github.com/hatchet-dev/hatchet/internal/datautils"
"github.com/hatchet-dev/hatchet/internal/repository"
"github.com/hatchet-dev/hatchet/internal/repository/prisma/db"
"github.com/hatchet-dev/hatchet/internal/services/ingestor/contracts"
"github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes"
"github.com/hatchet-dev/hatchet/internal/taskqueue"
"github.com/steebchen/prisma-client-go/runtime/types"
)
type Ingestor interface {
contracts.EventsServiceServer
IngestEvent(tenantId, eventName string, data any) (*db.EventModel, error)
IngestReplayedEvent(tenantId string, replayedEvent *db.EventModel) (*db.EventModel, error)
}
type IngestorOptFunc func(*IngestorOpts)
type IngestorOpts struct {
eventRepository repository.EventRepository
taskQueue taskqueue.TaskQueue
}
func WithEventRepository(r repository.EventRepository) IngestorOptFunc {
return func(opts *IngestorOpts) {
opts.eventRepository = r
}
}
func WithTaskQueue(tq taskqueue.TaskQueue) IngestorOptFunc {
return func(opts *IngestorOpts) {
opts.taskQueue = tq
}
}
func defaultIngestorOpts() *IngestorOpts {
return &IngestorOpts{}
}
type IngestorImpl struct {
contracts.UnimplementedEventsServiceServer
eventRepository repository.EventRepository
tq taskqueue.TaskQueue
}
func NewIngestor(fs ...IngestorOptFunc) (Ingestor, error) {
opts := defaultIngestorOpts()
for _, f := range fs {
f(opts)
}
if opts.eventRepository == nil {
return nil, fmt.Errorf("event repository is required. use WithEventRepository")
}
if opts.taskQueue == nil {
return nil, fmt.Errorf("task queue is required. use WithTaskQueue")
}
return &IngestorImpl{
eventRepository: opts.eventRepository,
tq: opts.taskQueue,
}, nil
}
func (i *IngestorImpl) IngestEvent(tenantId, key string, data any) (*db.EventModel, error) {
// transform data to a JSON object
jsonType, err := datautils.ToJSONType(data)
if err != nil {
return nil, fmt.Errorf("could not convert event data to JSON: %w", err)
}
event, err := i.eventRepository.CreateEvent(&repository.CreateEventOpts{
TenantId: tenantId,
Key: key,
Data: jsonType,
})
if err != nil {
return nil, fmt.Errorf("could not create event: %w", err)
}
err = i.tq.AddTask(context.Background(), taskqueue.EVENT_PROCESSING_QUEUE, eventToTask(event))
if err != nil {
return nil, fmt.Errorf("could not add event to task queue: %w", err)
}
return event, nil
}
func (i *IngestorImpl) IngestReplayedEvent(tenantId string, replayedEvent *db.EventModel) (*db.EventModel, error) {
// transform data to a JSON object
var data *types.JSON
if jsonType, ok := replayedEvent.Data(); ok {
data = &jsonType
}
event, err := i.eventRepository.CreateEvent(&repository.CreateEventOpts{
TenantId: tenantId,
Key: replayedEvent.Key,
Data: data,
ReplayedEvent: &replayedEvent.ID,
})
if err != nil {
return nil, fmt.Errorf("could not create event: %w", err)
}
err = i.tq.AddTask(context.Background(), taskqueue.EVENT_PROCESSING_QUEUE, eventToTask(event))
if err != nil {
return nil, fmt.Errorf("could not add event to task queue: %w", err)
}
return event, nil
}
func eventToTask(e *db.EventModel) *taskqueue.Task {
payload, _ := datautils.ToJSONMap(tasktypes.EventTaskPayload{
EventId: e.ID,
})
metadata, _ := datautils.ToJSONMap(tasktypes.EventTaskMetadata{
EventKey: e.Key,
TenantId: e.TenantID,
})
return &taskqueue.Task{
ID: "event",
Queue: taskqueue.EVENT_PROCESSING_QUEUE,
Payload: payload,
Metadata: metadata,
}
}