/
rmqrpc.go
150 lines (127 loc) · 3.77 KB
/
rmqrpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package main
import (
"context"
"sync"
"github.com/ahamtat/micropic/internal/adapters/broker"
"github.com/ahamtat/micropic/internal/domain/entities"
"github.com/pkg/errors"
"github.com/ahamtat/micropic/internal/domain/interfaces"
"github.com/ahamtat/micropic/internal/adapters/logger"
"github.com/streadway/amqp"
)
// RMQRPC holds objects for making Remote Procedure Calls via RabbitMQ broker
// See https://medium.com/@eugenfedchenko/rpc-over-rabbitmq-golang-ff3d2b312a69
// && https://www.rabbitmq.com/tutorials/tutorial-six-go.html
type RMQRPC struct {
ctx context.Context
cancel context.CancelFunc
in *broker.AmqpReader
out *broker.AmqpWriter
rpcMx sync.Mutex
rpcCalls rpcPendingCallMap
}
type rpcPendingCall struct {
done chan bool
data interfaces.Message
}
type rpcPendingCallMap map[string]*rpcPendingCall
// NewRPC constructor
func NewRPC(conn *amqp.Connection) *RMQRPC {
// Create cancel context
ctx, cancel := context.WithCancel(context.Background())
in := broker.NewAmqpReader(ctx, conn, broker.ResponseQueueName, broker.ResponseRoutingKey)
out := broker.NewAmqpWriter(conn, broker.RequestQueueName, broker.RequestRoutingKey)
return &RMQRPC{
ctx: ctx,
cancel: cancel,
in: in,
out: out,
rpcMx: sync.Mutex{},
rpcCalls: make(rpcPendingCallMap),
}
}
// Close reading and writing channels
func (rpc *RMQRPC) Close() {
rpc.Stop()
// Close pending calls to quit blocked goroutines
rpc.rpcMx.Lock()
for _, call := range rpc.rpcCalls {
close(call.done)
}
rpc.rpcMx.Unlock()
// Close i/o channels
if err := rpc.in.Close(); err != nil {
logger.Error("error closing RabbitMQ reader", "error", err)
}
if err := rpc.out.Close(); err != nil {
logger.Error("error closing RabbitMQ writer", "error", err)
}
}
// Start functions make separate goroutine for message receiving and processing
func (rpc *RMQRPC) Start() {
// Read and process messages from previewer
for {
select {
case <-rpc.ctx.Done():
return
default:
// Read input message
inputEnvelope, toBeClosed, err := rpc.in.ReadEnvelope()
if err != nil {
logger.Error("error reading channel", "error", err)
break
}
if toBeClosed {
// Reading channel possibly is to be closed
break
}
// Check for RMQRPC responses
if len(inputEnvelope.Metadata.CorrelationID) > 0 {
// Make pending call
rpc.rpcMx.Lock()
rpcCall, ok := rpc.rpcCalls[inputEnvelope.Metadata.CorrelationID]
rpc.rpcMx.Unlock()
if ok {
rpcCall.data = inputEnvelope.Message
rpcCall.done <- true
}
break
}
}
}
}
// Stop message processing and writing off status to database
func (rpc *RMQRPC) Stop() {
// Stop goroutines - fire context cancelling
rpc.cancel()
}
// SendRequest sends tasks for previewer via RabbitMQ broker and
// blocks execution until response or timeout
func (rpc *RMQRPC) SendRequest(ctx context.Context, request *entities.Request) (response *entities.Response, err error) {
// Create message envelope and write it to broker
correlationID := broker.CreateCorrelationID()
env := broker.CreateEnvelope(request, correlationID,
entities.MessageTypeToString(entities.ProxyingRequest))
err = rpc.out.WriteEnvelope(env)
if err != nil {
return nil, errors.Wrap(err, "error writing envelope to RabbitMQ")
}
// Create and keep pending object
rpcCall := &rpcPendingCall{done: make(chan bool)}
rpc.rpcMx.Lock()
rpc.rpcCalls[correlationID] = rpcCall
rpc.rpcMx.Unlock()
// Wait until response comes or timeout
select {
case <-rpcCall.done:
response, _ = rpcCall.data.(*entities.Response)
case <-ctx.Done():
err = errors.New("timeout elapsed on RMQRPC request sending")
}
// Release pending object
rpc.rpcMx.Lock()
delete(rpc.rpcCalls, correlationID)
rpc.rpcMx.Unlock()
// Return response to caller
return
}