/
multiplexed_protocol.go
160 lines (131 loc) · 4.88 KB
/
multiplexed_protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package thrift
import (
"fmt"
"strings"
)
/*
MultiplexedProtocol is a protocol-independent concrete decorator
that allows a Thrift client to communicate with a multiplexing Thrift server,
by prepending the service name to the function name during function calls.
NOTE: THIS IS NOT USED BY SERVERS. On the server, use MultiplexedProcessor to handle request
from a multiplexing client.
This example uses a single socket transport to invoke two services:
socket := thrift.NewSocket(thrift.SocketAddr(addr), thrif.SocketTimeout(TIMEOUT))
transport := thrift.NewFramedTransport(socket)
protocol := thrift.NewBinaryProtocolTransport(transport)
mp := thrift.NewMultiplexedProtocol(protocol, "Calculator")
service := Calculator.NewCalculatorClient(mp)
mp2 := thrift.NewMultiplexedProtocol(protocol, "WeatherReport")
service2 := WeatherReport.NewWeatherReportClient(mp2)
err := transport.Open()
if err != nil {
t.Fatal("Unable to open client socket", err)
}
fmt.Println(service.Add(2,2))
fmt.Println(service2.GetTemperature())
*/
type MultiplexedProtocol struct {
Protocol
serviceName string
}
const MULTIPLEXED_SEPARATOR = ":"
func NewMultiplexedProtocol(protocol Protocol, serviceName string) *MultiplexedProtocol {
return &MultiplexedProtocol{
Protocol: protocol,
serviceName: serviceName,
}
}
func (t *MultiplexedProtocol) WriteMessageBegin(name string, typeID MessageType, seqid int32) error {
if typeID == CALL || typeID == ONEWAY {
return t.Protocol.WriteMessageBegin(t.serviceName+MULTIPLEXED_SEPARATOR+name, typeID, seqid)
} else {
return t.Protocol.WriteMessageBegin(name, typeID, seqid)
}
}
/*
MultiplexedProcessor is a Processor allowing
a single Server to provide multiple services.
To do so, you instantiate the processor and then register additional
processors with it, as shown in the following example:
var processor = thrift.NewMultiplexedProcessor()
firsprocessor :=
processor.RegisterProcessor("FirstService", firsprocessor)
processor.registerProcessor(
"Calculator",
Calculator.NewCalculatorProcessor(&CalculatorHandler{}),
)
processor.registerProcessor(
"WeatherReport",
WeatherReport.NewWeatherReporprocessor(&WeatherReportHandler{}),
)
serverTransport, err := thrift.NewServerSocketTimeout(addr, TIMEOUT)
if err != nil {
t.Fatal("Unable to create server socket", err)
}
server := thrift.NewSimpleServer(processor, serverTransport)
server.Serve();
*/
type MultiplexedProcessor struct {
serviceProcessorMap map[string]Processor
Defaulprocessor Processor
}
func NewMultiplexedProcessor() *MultiplexedProcessor {
return &MultiplexedProcessor{
serviceProcessorMap: make(map[string]Processor),
}
}
func (t *MultiplexedProcessor) RegisterDefault(processor Processor) {
t.Defaulprocessor = processor
}
func (t *MultiplexedProcessor) RegisterProcessor(name string, processor Processor) {
if t.serviceProcessorMap == nil {
t.serviceProcessorMap = make(map[string]Processor)
}
t.serviceProcessorMap[name] = processor
}
// GetProcessorFunction implements the thrift.Processor interface. It parses the
// thrift function name to figure out which processor to route the request to and
// returns descriptive error messages to help clients diagnose errors.
func (t *MultiplexedProcessor) GetProcessorFunction(name string) (ProcessorFunction, error) {
//extract the service name
v := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2)
if len(v) != 2 {
if t.Defaulprocessor != nil {
return t.Defaulprocessor.GetProcessorFunction(name)
}
return nil, fmt.Errorf("Service name not found in message name: %s. Did you forget to use a MultiplexProtocol in your client?", name)
}
actualProcessor, ok := t.serviceProcessorMap[v[0]]
if !ok {
return nil, fmt.Errorf("Service name not found: %s. Did you forget to call registerProcessor()?", v[0])
}
return actualProcessor.GetProcessorFunction(v[1])
}
// Protocol that use stored message for ReadMessageBegin
type storedMessageProtocol struct {
Protocol
name string
typeID MessageType
seqid int32
}
func NewStoredMessageProtocol(protocol Protocol, name string, typeID MessageType, seqid int32) *storedMessageProtocol {
return &storedMessageProtocol{protocol, name, typeID, seqid}
}
func (s *storedMessageProtocol) ReadMessageBegin() (name string, typeID MessageType, seqid int32, err error) {
return s.name, s.typeID, s.seqid, nil
}