/
app.go
160 lines (129 loc) · 4.81 KB
/
app.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package main
import (
"encoding/json"
"fmt"
"log"
"context"
"net"
"github.com/golang/protobuf/ptypes/any"
"github.com/golang/protobuf/ptypes/empty"
"go.opencensus.io/trace"
"go.opencensus.io/trace/propagation"
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// server is our user app
type server struct {
}
type appResponse struct {
Message string `json:"message,omitempty"`
}
func main() {
log.Printf("Initializing grpc")
/* #nosec */
lis, err := net.Listen("tcp", ":3000")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
/* #nosec */
s := grpc.NewServer()
pb.RegisterAppCallbackServer(s, &server{})
fmt.Println("Client starting...")
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
// This is the server side in a grpc -> grpc test. It responds with the same string it was sent.
func (s *server) grpcTestHandler(data []byte) ([]byte, error) {
var t string
err := json.Unmarshal(data, &t)
if err != nil {
return nil, err
}
fmt.Printf("received %s\n", t)
resp, err := json.Marshal(appResponse{Message: t})
if err != nil {
fmt.Println("not marshal")
}
return resp, err
}
func (s *server) retrieveRequestObject(ctx context.Context) ([]byte, error) {
md, _ := metadata.FromIncomingContext(ctx)
var requestMD = map[string][]string{}
for k, vals := range md {
requestMD[k] = vals
fmt.Printf("incoming md: %s %q", k, vals)
}
header := metadata.Pairs(
"DaprTest-Response-1", "DaprTest-Response-Value-1",
"DaprTest-Response-2", "DaprTest-Response-Value-2")
// following traceid byte is of expectedTraceID "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
sc := trace.SpanContext{
TraceID: trace.TraceID{75, 249, 47, 53, 119, 179, 77, 166, 163, 206, 146, 157, 14, 14, 71, 54},
SpanID: trace.SpanID{0, 240, 103, 170, 11, 169, 2, 183},
TraceOptions: trace.TraceOptions(1),
}
header.Set("grpc-trace-bin", string(propagation.Binary(sc)))
grpc.SendHeader(ctx, header)
trailer := metadata.Pairs(
"DaprTest-Trailer-1", "DaprTest-Trailer-Value-1",
"DaprTest-Trailer-2", "DaprTest-Trailer-Value-2")
grpc.SetTrailer(ctx, trailer)
return json.Marshal(requestMD)
}
// This method gets invoked when a remote service has called the app through Dapr
// The payload carries a Method to identify the method, a set of metadata properties and an optional payload
func (s *server) OnInvoke(ctx context.Context, in *commonv1pb.InvokeRequest) (*commonv1pb.InvokeResponse, error) {
fmt.Printf("Got invoked method %s and data: %s\n", in.Method, string(in.GetData().Value))
var err error
var response []byte
switch in.Method {
case "httpToGrpcTest":
// not a typo, the handling is the same as the case below
fallthrough
case "grpcToGrpcTest":
response, err = s.grpcTestHandler(in.GetData().Value)
case "retrieve_request_object":
response, err = s.retrieveRequestObject(ctx)
}
if err != nil {
msg := "Error: " + err.Error()
response, _ = json.Marshal(msg)
}
respBody := &any.Any{Value: response}
return &commonv1pb.InvokeResponse{Data: respBody, ContentType: "application/json"}, nil
}
// Dapr will call this method to get the list of topics the app wants to subscribe to. In this example, we are telling Dapr
// To subscribe to a topic named TopicA
func (s *server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*pb.ListTopicSubscriptionsResponse, error) {
return &pb.ListTopicSubscriptionsResponse{
Subscriptions: []*pb.TopicSubscription{
{
Topic: "TopicA",
},
},
}, nil
}
// Dapr will call this method to get the list of bindings the app will get invoked by. In this example, we are telling Dapr
// To invoke our app with a binding named storage
func (s *server) ListInputBindings(ctx context.Context, in *empty.Empty) (*pb.ListInputBindingsResponse, error) {
return &pb.ListInputBindingsResponse{
Bindings: []string{"storage"},
}, nil
}
// This method gets invoked every time a new event is fired from a registered binding. The message carries the binding name, a payload and optional metadata
func (s *server) OnBindingEvent(ctx context.Context, in *pb.BindingEventRequest) (*pb.BindingEventResponse, error) {
fmt.Println("Invoked from binding")
return &pb.BindingEventResponse{}, nil
}
// This method is fired whenever a message has been published to a topic that has been subscribed. Dapr sends published messages in a CloudEvents 0.3 envelope.
func (s *server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*empty.Empty, error) {
fmt.Println("Topic message arrived")
return &empty.Empty{}, nil
}