generated from TBD54566975/tbd-project-template
-
Notifications
You must be signed in to change notification settings - Fork 7
/
impl.go
118 lines (102 loc) · 3.29 KB
/
impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package internal
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"reflect"
"connectrpc.com/connect"
"github.com/puzpuzpuz/xsync/v3"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/go-runtime/encoding"
"github.com/TBD54566975/ftl/go-runtime/ftl/reflection"
"github.com/TBD54566975/ftl/internal/modulecontext"
"github.com/TBD54566975/ftl/internal/rpc"
)
type mapCacheEntry struct {
checksum [32]byte
output any
}
// RealFTL is the real implementation of the [internal.FTL] interface using the Controller.
type RealFTL struct {
dmctx *modulecontext.DynamicModuleContext
// Cache for Map() calls
mapped *xsync.MapOf[uintptr, mapCacheEntry]
}
// New creates a new [RealFTL]
func New(dmctx *modulecontext.DynamicModuleContext) *RealFTL {
return &RealFTL{
dmctx: dmctx,
mapped: xsync.NewMapOf[uintptr, mapCacheEntry](),
}
}
var _ FTL = &RealFTL{}
func (r *RealFTL) GetConfig(_ context.Context, name string, dest any) error {
return r.dmctx.CurrentContext().GetConfig(name, dest)
}
func (r *RealFTL) GetSecret(_ context.Context, name string, dest any) error {
return r.dmctx.CurrentContext().GetSecret(name, dest)
}
func (r *RealFTL) FSMSend(ctx context.Context, fsm, instance string, event any) error {
client := rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx)
body, err := encoding.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
_, err = client.SendFSMEvent(ctx, connect.NewRequest(&ftlv1.SendFSMEventRequest{
Fsm: &schemapb.Ref{Module: reflection.Module(), Name: fsm},
Instance: instance,
Event: schema.TypeToProto(reflection.ReflectTypeToSchemaType(reflect.TypeOf(event))),
Body: body,
}))
if err != nil {
return fmt.Errorf("failed to send event: %w", err)
}
return nil
}
func (r *RealFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any) error {
if topic.Module != reflection.Module() {
return fmt.Errorf("can not publish to another module's topic: %s", topic)
}
client := rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx)
body, err := encoding.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
_, err = client.PublishEvent(ctx, connect.NewRequest(&ftlv1.PublishEventRequest{
Topic: topic.ToProto().(*schemapb.Ref), //nolint: forcetypeassert
Body: body,
}))
if err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}
return nil
}
func (r *RealFTL) CallMap(ctx context.Context, mapper any, value any, mapImpl func(context.Context) (any, error)) any {
// Compute checksum of the input.
inputData, err := json.Marshal(value)
if err != nil {
return fmt.Errorf("failed to marshal input data: %w", err)
}
checksum := sha256.Sum256(inputData)
// Check cache.
key := reflect.ValueOf(mapper).Pointer()
cached, ok := r.mapped.Load(key)
if ok && checksum == cached.checksum {
return cached.output
}
// Map the value
t, err := mapImpl(ctx)
if err != nil {
panic(err)
}
// Write the cache back.
r.mapped.Store(key, mapCacheEntry{
checksum: checksum,
output: t,
})
return t
}