-
Notifications
You must be signed in to change notification settings - Fork 318
/
instance.go
95 lines (83 loc) · 2.01 KB
/
instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package main
import (
"fmt"
"io"
"os"
openedge "github.com/baidu/openedge/sdk/openedge-go"
)
// Instance function instance interface
type Instance interface {
ID() uint32
Name() string
Call(msg *openedge.FunctionMessage) (*openedge.FunctionMessage, error)
io.Closer
}
// Producer function instance producer interface
type Producer interface {
StartInstance(id uint32) (Instance, error)
StopInstance(i Instance) error
}
type producer struct {
ctx openedge.Context
cfg FunctionInfo
}
func newProducer(ctx openedge.Context, cfg FunctionInfo) Producer {
return &producer{ctx: ctx, cfg: cfg}
}
// StartInstance starts instance
func (p *producer) StartInstance(id uint32) (Instance, error) {
name := fmt.Sprintf("%s.%s.%d", p.cfg.Service, p.cfg.Name, id)
port := "50051"
serverHost := "0.0.0.0"
clientHost := name
if os.Getenv(openedge.EnvRunningModeKey) == "native" {
var err error
port, err = p.ctx.GetAvailablePort()
if err != nil {
return nil, err
}
serverHost = "127.0.0.1"
clientHost = serverHost
}
address := fmt.Sprintf("%s:%s", serverHost, port)
dc := map[string]string{
openedge.EnvServiceAddressKey: address, // deprecated, for v0.1.2
openedge.EnvServiceInstanceAddressKey: address,
}
err := p.ctx.StartInstance(p.cfg.Service, name, dc)
if err != nil {
return nil, err
}
fcc := openedge.FunctionClientConfig{}
fcc.Address = fmt.Sprintf("%s:%s", clientHost, port)
fcc.Message = p.cfg.Message
fcc.Timeout = p.cfg.Timeout
fcc.Backoff = p.cfg.Backoff
cli, err := openedge.NewFClient(fcc)
if err != nil {
p.ctx.StopInstance(p.cfg.Service, name)
return nil, err
}
return &instance{
name: name,
FClient: cli,
}, nil
}
// StopInstance stops instance
func (p *producer) StopInstance(i Instance) error {
i.Close()
return p.ctx.StopInstance(p.cfg.Service, i.Name())
}
type instance struct {
id uint32
name string
*openedge.FClient
}
// ID returns id
func (i *instance) ID() uint32 {
return i.id
}
// Name returns name
func (i *instance) Name() string {
return i.name
}