-
Notifications
You must be signed in to change notification settings - Fork 327
/
error.go
255 lines (243 loc) · 8.18 KB
/
error.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"fmt"
proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/hashicorp/go-multierror"
)
// Result used to represent pulsar processing is an alias of type int.
type Result int
const (
// Ok means no errors
Ok Result = iota
// UnknownError means unknown error happened on broker
UnknownError
// InvalidConfiguration means invalid configuration
InvalidConfiguration
// TimeoutError means operation timed out
TimeoutError
//LookupError means broker lookup failed
LookupError
// ConnectError means failed to connect to broker
ConnectError
// ReadError means failed to read from socket
ReadError
// AuthenticationError means authentication failed on broker
AuthenticationError
// AuthorizationError client is not authorized to create producer/consumer
AuthorizationError
// ErrorGettingAuthenticationData client cannot find authorization data
ErrorGettingAuthenticationData
// BrokerMetadataError broker failed in updating metadata
BrokerMetadataError
// BrokerPersistenceError broker failed to persist entry
BrokerPersistenceError
// ChecksumError corrupt message checksum failure
ChecksumError
// ConsumerBusy means Exclusive consumer is already connected
ConsumerBusy
// NotConnectedError producer/consumer is not currently connected to broker
NotConnectedError
// AlreadyClosedError producer/consumer is already closed and not accepting any operation
AlreadyClosedError
// InvalidMessage error in publishing an already used message
InvalidMessage
// ConsumerNotInitialized consumer is not initialized
ConsumerNotInitialized
// ProducerNotInitialized producer is not initialized
ProducerNotInitialized
// TooManyLookupRequestException too many concurrent LookupRequest
TooManyLookupRequestException
// InvalidTopicName means invalid topic name
InvalidTopicName
// InvalidURL means Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
InvalidURL
// ServiceUnitNotReady unloaded between client did lookup and producer/consumer got created
ServiceUnitNotReady
// OperationNotSupported operation not supported
OperationNotSupported
// ProducerBlockedQuotaExceededError producer is blocked
ProducerBlockedQuotaExceededError
// ProducerBlockedQuotaExceededException producer is getting exception
ProducerBlockedQuotaExceededException
// ProducerQueueIsFull producer queue is full
ProducerQueueIsFull
// MessageTooBig trying to send a messages exceeding the max size
MessageTooBig
// TopicNotFound topic not found
TopicNotFound
// SubscriptionNotFound subscription not found
SubscriptionNotFound
// ConsumerNotFound consumer not found
ConsumerNotFound
// UnsupportedVersionError when an older client/version doesn't support a required feature
UnsupportedVersionError
// TopicTerminated topic was already terminated
TopicTerminated
// CryptoError error when crypto operation fails
CryptoError
// ConsumerClosed means consumer already been closed
ConsumerClosed
// InvalidBatchBuilderType invalid batch builder type
InvalidBatchBuilderType
// AddToBatchFailed failed to add sendRequest to batchBuilder
AddToBatchFailed
// SeekFailed seek failed
SeekFailed
// ProducerClosed means producer already been closed
ProducerClosed
// SchemaFailure means the payload could not be encoded using the Schema
SchemaFailure
// InvalidStatus means the component status is not as expected.
InvalidStatus
// TransactionNoFoundError The transaction is not exist in the transaction coordinator, It may be an error txn
// or already ended.
TransactionNoFoundError
// ClientMemoryBufferIsFull client limit buffer is full
ClientMemoryBufferIsFull
// ProducerFenced When a producer asks and fail to get exclusive producer access,
// or loses the exclusive status after a reconnection, the broker will
// use this error to indicate that this producer is now permanently
// fenced. Applications are now supposed to close it and create a
// new producer
ProducerFenced
)
// Error implement error interface, composed of two parts: msg and result.
type Error struct {
msg string
result Result
}
// Result get error's original result.
func (e *Error) Result() Result {
return e.result
}
func (e *Error) Error() string {
return e.msg
}
func newError(result Result, msg string) error {
return &Error{
msg: fmt.Sprintf("%s: %s", msg, getResultStr(result)),
result: result,
}
}
func getResultStr(r Result) string {
switch r {
case Ok:
return "OK"
case UnknownError:
return "UnknownError"
case InvalidConfiguration:
return "InvalidConfiguration"
case TimeoutError:
return "TimeoutError"
case LookupError:
return "LookupError"
case ConnectError:
return "ConnectError"
case ReadError:
return "ReadError"
case AuthenticationError:
return "AuthenticationError"
case AuthorizationError:
return "AuthorizationError"
case ErrorGettingAuthenticationData:
return "ErrorGettingAuthenticationData"
case BrokerMetadataError:
return "BrokerMetadataError"
case BrokerPersistenceError:
return "BrokerPersistenceError"
case ChecksumError:
return "ChecksumError"
case ConsumerBusy:
return "ConsumerBusy"
case NotConnectedError:
return "NotConnectedError"
case AlreadyClosedError:
return "AlreadyClosedError"
case InvalidMessage:
return "InvalidMessage"
case ConsumerNotInitialized:
return "ConsumerNotInitialized"
case ProducerNotInitialized:
return "ProducerNotInitialized"
case TooManyLookupRequestException:
return "TooManyLookupRequestException"
case InvalidTopicName:
return "InvalidTopicName"
case InvalidURL:
return "InvalidURL"
case ServiceUnitNotReady:
return "ServiceUnitNotReady"
case OperationNotSupported:
return "OperationNotSupported"
case ProducerBlockedQuotaExceededError:
return "ProducerBlockedQuotaExceededError"
case ProducerBlockedQuotaExceededException:
return "ProducerBlockedQuotaExceededException"
case ProducerQueueIsFull:
return "ProducerQueueIsFull"
case MessageTooBig:
return "MessageTooBig"
case TopicNotFound:
return "TopicNotFound"
case SubscriptionNotFound:
return "SubscriptionNotFound"
case ConsumerNotFound:
return "ConsumerNotFound"
case UnsupportedVersionError:
return "UnsupportedVersionError"
case TopicTerminated:
return "TopicTerminated"
case CryptoError:
return "CryptoError"
case ConsumerClosed:
return "ConsumerClosed"
case InvalidBatchBuilderType:
return "InvalidBatchBuilderType"
case AddToBatchFailed:
return "AddToBatchFailed"
case SeekFailed:
return "SeekFailed"
case ProducerClosed:
return "ProducerClosed"
case SchemaFailure:
return "SchemaFailure"
case ClientMemoryBufferIsFull:
return "ClientMemoryBufferIsFull"
case TransactionNoFoundError:
return "TransactionNoFoundError"
default:
return fmt.Sprintf("Result(%d)", r)
}
}
func getErrorFromServerError(serverError *proto.ServerError) error {
switch *serverError {
case proto.ServerError_TransactionNotFound:
return newError(TransactionNoFoundError, serverError.String())
case proto.ServerError_InvalidTxnStatus:
return newError(InvalidStatus, serverError.String())
default:
return newError(UnknownError, serverError.String())
}
}
// joinErrors can join multiple errors into one error, and the returned error can be tested by errors.Is()
// we use github.com/hashicorp/go-multierror instead of errors.Join() of Go 1.20 so that we can compile pulsar
// go client with go versions that newer than go 1.13
func joinErrors(errs ...error) error {
return multierror.Append(nil, errs...)
}