-
Notifications
You must be signed in to change notification settings - Fork 0
/
supplier.go
151 lines (129 loc) · 3.61 KB
/
supplier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package supplier
import (
"context"
"encoding/json"
. "github.com/YRXING/data-primitive/pkg/constants"
"github.com/YRXING/data-primitive/pkg/util"
"github.com/YRXING/data-primitive/proto/agent"
"github.com/opentracing/opentracing-go"
log "github.com/sirupsen/logrus"
)
type supplier struct {
address string
name string
totalStock int
totalFunds int
funcs map[string]interface{}
parentSpan opentracing.Span
receivedPacket *agent.Packet
}
var _ DigitalObject = &supplier{}
func NewSupplier() *supplier {
s := &supplier{
address: "127.0.0.1:8080",
name: "supplierA",
}
s.funcs = map[string]interface{}{
"GetProducts": s.GetProducts,
}
return s
}
func (s *supplier) Run() error {
go agent.RunServer(SUPPLIER_SERVICE, s.address, s)
return nil
}
func (s *supplier) Interact(ctx context.Context, p *agent.Packet) (*agent.Packet, error) {
// get the server side root span
s.parentSpan = opentracing.SpanFromContext(ctx)
// store the received packet for subsequent use
s.receivedPacket = p
switch p.Type {
case agent.PacketType_INVOKE:
//util.ProcessInvokePacket(s,p)
res, err := util.Call(s.GetFuncs(), p.GetInvoke().FuncName, p.GetInvoke().Args)
if err != nil {
log.Println(err)
return nil, err
}
//change []reflect.Value to []interface{}
data := make([]interface{}, 0)
for _, v := range res {
data = append(data, v.Interface())
}
// we have only one value
bytes, err := json.Marshal(data[0])
if err != nil {
log.Println(err)
return nil, err
}
// make return packet
pkt := util.GenerateDataPacket(s.GetAddress(), bytes)
return pkt, nil
case agent.PacketType_TRANSPORT:
case agent.PacketType_DEPLOY:
}
return nil, nil
}
func (s *supplier) GetProducts(bytes []byte) *Products {
var (
o Order
res *Products
)
err := json.Unmarshal(bytes, &o)
if err != nil {
return ErrorProducts(s.name, "wrong data format")
}
log.Info("get an order ",o," start producing....")
span := opentracing.StartSpan("GetProducts", opentracing.FollowsFrom(s.parentSpan.Context()))
defer span.Finish()
switch o.OrderType {
case NORMAL:
log.Info("products ready,start transportation...")
res = SuccessProducts(s.name)
case FINACING_WAREHOUSE:
case ACCOUNT_RECEIVABLE:
log.Info("insufficient funds,looking for a bank to make a loan...")
log.Info("finding bank...")
conn := util.NewConn(opentracing.GlobalTracer(),
"127.0.0.1:8082",
context.Background())
defer conn.Close()
c := agent.NewAgentClient(conn)
log.Infof("bank find: bankA, establish connection successfully")
// generate data
f := &Form{
Type: ACCOUNT_RECEIVABLE,
SupplierName: s.name,
DistributorName: o.DistributorName,
Num: 10000,
}
log.Infof("generate form: %+v",f)
bytes, err := json.Marshal(f)
if err != nil {
log.Errorf("serialization failed, %v",err)
return ErrorProducts(s.name,err.Error())
}
log.Printf("start sending data: %s",bytes)
p := util.GenerateInvokePacket(s.address, "GetLoan", bytes)
resP, err := c.Interact(opentracing.ContextWithSpan(context.Background(), span), p)
var capital Capital
json.Unmarshal(resP.GetTransport().Data,&capital)
if err != nil || capital.Num == 0 {
log.Errorf("get loan failed!")
res = ErrorProducts(s.name, "Insufficient funds!")
return res
}
log.Info("products ready,start transportation...")
res = SuccessProducts(s.name)
case ADVANCE:
default:
res = ErrorProducts("unknown", "unknown order type!")
}
return res
}
func (s *supplier) GetAddress() string {
return s.address
}
func (s *supplier) GetFuncs() map[string]interface{} {
return s.funcs
}