-
Notifications
You must be signed in to change notification settings - Fork 56
/
serialization.go
444 lines (419 loc) · 13.2 KB
/
serialization.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
/*
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package serialization
import (
"fmt"
"math/big"
"reflect"
"time"
"github.com/hazelcast/hazelcast-go-client/internal"
ihzerrors "github.com/hazelcast/hazelcast-go-client/internal/hzerrors"
"github.com/hazelcast/hazelcast-go-client/internal/proxy"
pubserialization "github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/hazelcast/hazelcast-go-client/types"
)
// Service serializes user objects to Data and back to Object.
// Data is the internal representation of binary Data in Hazelcast.
type Service struct {
SerializationConfig *pubserialization.Config
registry map[int32]pubserialization.Serializer
portableSerializer *PortableSerializer
identifiedSerializer *IdentifiedDataSerializableSerializer
customSerializers map[reflect.Type]pubserialization.Serializer
compactSerializer *CompactStreamSerializer
}
func NewService(config *pubserialization.Config, schemaCh chan SchemaMsg) (*Service, error) {
var err error
cs, err := NewCompactStreamSerializer(config.Compact, schemaCh)
if err != nil {
return nil, err
}
s := &Service{
SerializationConfig: config,
registry: make(map[int32]pubserialization.Serializer),
customSerializers: config.CustomSerializers(),
compactSerializer: cs,
}
s.portableSerializer, err = NewPortableSerializer(s, s.SerializationConfig.PortableFactories(), s.SerializationConfig.PortableVersion)
if err != nil {
return nil, err
}
s.registerClassDefinitions(s.portableSerializer, s.SerializationConfig.ClassDefinitions())
s.registerCustomSerializers(config.CustomSerializers())
s.registerGlobalSerializer(config.GlobalSerializer())
if err = s.registerIdentifiedFactories(); err != nil {
return nil, err
}
return s, nil
}
func (s *Service) SchemaService() *SchemaService {
return s.compactSerializer.ss
}
// SetSchemaService is used in tests
func (s *Service) SetSchemaService(ss *SchemaService) {
s.compactSerializer.ss = ss
}
// ToData serializes an object to a Data.
// It can safely be called with a Data. In that case, that instance is returned.
// If it is called with nil, nil is returned.
func (s *Service) ToData(object interface{}) (r Data, err error) {
defer func() {
if rec := recover(); rec != nil {
err = makeError(rec)
}
}()
if object == nil {
return nil, nil
}
if serData, ok := object.(Data); ok {
return serData, nil
}
// initial size is kept minimal (head_data_offset + long_size), since it'll grow on demand
dataOutput := NewPositionalObjectDataOutput(16, s, !s.SerializationConfig.LittleEndian)
serializer, err := s.FindSerializerFor(object)
if err != nil {
return Data{}, err
}
dataOutput.WriteInt32BigEndian(0) // partition
dataOutput.WriteInt32BigEndian(serializer.ID())
serializer.Write(dataOutput, object)
return dataOutput.buffer[:dataOutput.position], err
}
// ToObject deserializes the given Data to an object.
// nil is returned if called with nil.
func (s *Service) ToObject(data Data) (r interface{}, err error) {
defer func() {
if rec := recover(); rec != nil {
err = makeError(rec)
}
}()
var ok bool
if data == nil {
return nil, nil
}
typeID := data.Type()
serializer := s.lookupBuiltinDeserializer(typeID)
if serializer == nil {
serializer, ok = s.registry[typeID]
if !ok {
return nil, ihzerrors.NewSerializationError(fmt.Sprintf("there is no suitable de-serializer for type %d", typeID), nil)
}
}
dataInput := NewObjectDataInput(data, DataOffset, s, !s.SerializationConfig.LittleEndian)
return serializer.Read(dataInput), nil
}
func (s *Service) WriteObject(output pubserialization.DataOutput, object interface{}) {
serializer, err := s.FindSerializerFor(object)
if err != nil {
panic(fmt.Errorf("error finding serializer: %w", err))
}
output.WriteInt32(serializer.ID())
serializer.Write(output, object)
}
func (s *Service) ReadObject(input pubserialization.DataInput) interface{} {
typeID := input.ReadInt32()
if serializer := s.lookupBuiltinDeserializer(typeID); serializer != nil {
return serializer.Read(input)
}
if serializer := s.registry[typeID]; serializer != nil {
return serializer.Read(input)
}
panic(fmt.Sprintf("unknown type ID: %d", typeID))
}
func (s *Service) FindSerializerFor(obj interface{}) (pubserialization.Serializer, error) {
if serializer := s.LookUpDefaultSerializer(obj); serializer != (pubserialization.Serializer)(nil) {
return serializer, nil
}
if serializer := s.lookUpCustomSerializer(obj); serializer != (pubserialization.Serializer)(nil) {
return serializer, nil
}
if serializer := s.lookUpGlobalSerializer(); serializer != (pubserialization.Serializer)(nil) {
return serializer, nil
}
// keeping the error in the result for future behavior change
return gobSerializer, nil
}
func (s *Service) LookUpDefaultSerializer(obj interface{}) pubserialization.Serializer {
serializer := s.lookupBuiltinSerializer(obj)
if serializer != (pubserialization.Serializer)(nil) {
return serializer
}
if s.compactSerializer.IsRegisteredAsCompact(reflect.TypeOf(obj)) {
return s.compactSerializer
}
if _, ok := obj.(pubserialization.IdentifiedDataSerializable); ok {
return s.identifiedSerializer
}
if _, ok := obj.(pubserialization.Portable); ok {
return s.portableSerializer
}
return nil
}
func (s *Service) lookupBuiltinDeserializer(typeID int32) pubserialization.Serializer {
switch typeID {
case TypeNil:
return nilSerializer
case TypeCompact:
return s.compactSerializer
case TypePortable:
return s.portableSerializer
case TypeDataSerializable:
return s.identifiedSerializer
case TypeBool:
return boolSerializer
case TypeString:
return stringSerializer
case TypeByte:
return uint8Serializer
case TypeUInt16:
return uint16Serializer
case TypeInt16:
return int16Serializer
case TypeInt32:
return int32Serializer
case TypeInt64:
return int64Serializer
case TypeFloat32:
return float32Serializer
case TypeFloat64:
return float64Serializer
case TypeBoolArray:
return boolArraySerializer
case TypeStringArray:
return stringArraySerializer
case TypeByteArray:
return uint8ArraySerializer
case TypeUInt16Array:
return uint16ArraySerializer
case TypeInt16Array:
return int16ArraySerializer
case TypeInt32Array:
return int32ArraySerializer
case TypeInt64Array:
return int64ArraySerializer
case TypeFloat32Array:
return float32ArraySerializer
case TypeFloat64Array:
return float64ArraySerializer
case TypeUUID:
return uuidSerializer
case TypeJavaDate:
return javaDateSerializer
case TypeJavaBigInteger:
return javaBigIntegerSerializer
case TypeJavaDecimal:
return javaDecimalSerializer
case TypeJSONSerialization:
return jsonSerializer
case TypeJavaArray:
return javaArraySerializer
case TypeJavaArrayList:
return javaArrayListSerializer
case TypeJavaLinkedList:
return javaLinkedListSerializer
case TypeJavaLocalDate:
return javaLocalDateSerializer
case TypeJavaLocalTime:
return javaLocalTimeSerializer
case TypeJavaLocalDateTime:
return javaLocalDateTimeSerializer
case TypeJavaOffsetDateTime:
return javaOffsetDateTimeSerializer
case TypeJavaClass:
return javaClassSerializer
case TypeGobSerialization:
return gobSerializer
}
return nil
}
func (s *Service) registerCustomSerializers(customSerializers map[reflect.Type]pubserialization.Serializer) {
for _, customSerializer := range customSerializers {
if err := s.registerSerializer(customSerializer); err != nil {
panic(err)
}
}
}
func (s *Service) registerSerializer(serializer pubserialization.Serializer) error {
if s.registry[serializer.ID()] != nil {
return ihzerrors.NewSerializationError("this serializer is already in the registry", nil)
}
s.registry[serializer.ID()] = serializer
return nil
}
func (s *Service) registerClassDefinitions(serializer *PortableSerializer, classDefs []*pubserialization.ClassDefinition) {
for _, cd := range classDefs {
if err := serializer.portableContext.RegisterClassDefinition(cd); err != nil {
panic(err)
}
}
}
func (s *Service) registerGlobalSerializer(globalSerializer pubserialization.Serializer) {
if globalSerializer != nil {
if err := s.registerSerializer(globalSerializer); err != nil {
panic(err)
}
}
}
func (s *Service) lookUpCustomSerializer(obj interface{}) pubserialization.Serializer {
for key, val := range s.customSerializers {
if key.Kind() == reflect.Interface {
if reflect.TypeOf(obj).Implements(key) {
return val
}
} else {
if reflect.TypeOf(obj) == key {
return val
}
}
}
return nil
}
func (s *Service) lookUpGlobalSerializer() pubserialization.Serializer {
return s.SerializationConfig.GlobalSerializer()
}
func (s *Service) registerIdentifiedFactories() error {
fs := map[int32]pubserialization.IdentifiedDataSerializableFactory{
internal.AggregateFactoryID: &proxy.AggregateFactory{},
}
for _, f := range s.SerializationConfig.IdentifiedDataSerializableFactories() {
fid := f.FactoryID()
if _, ok := fs[fid]; ok {
return ihzerrors.NewSerializationError("this serializer is already in the registry", nil)
}
fs[fid] = f
}
s.identifiedSerializer = NewIdentifiedDataSerializableSerializer(fs)
if err := s.registerSerializer(s.identifiedSerializer); err != nil {
return err
}
return nil
}
func (s *Service) lookupBuiltinSerializer(obj interface{}) pubserialization.Serializer {
switch obj.(type) {
case nil:
return nilSerializer
case bool:
return boolSerializer
case string:
return stringSerializer
case uint8:
return uint8Serializer
case uint16:
return uint16Serializer
case int:
return intSerializer
case int8:
return int8Serializer
case int16:
return int16Serializer
case int32:
return int32Serializer
case int64:
return int64Serializer
case float32:
return float32Serializer
case float64:
return float64Serializer
case []bool:
return boolArraySerializer
case []string:
return stringArraySerializer
case []uint8:
return uint8ArraySerializer
case []uint16:
return uint16ArraySerializer
case []int:
return int64ArraySerializer
case []int16:
return int16ArraySerializer
case []int32:
return int32ArraySerializer
case []int64:
return int64ArraySerializer
case []float32:
return float32ArraySerializer
case []float64:
return float64ArraySerializer
case []interface{}:
return javaArrayListSerializer
case types.UUID:
return uuidSerializer
case types.LocalDate:
return javaLocalDateSerializer
case types.LocalTime:
return javaLocalTimeSerializer
case types.LocalDateTime:
return javaLocalDateTimeSerializer
case types.OffsetDateTime:
return javaOffsetDateTimeSerializer
case time.Time:
return javaDateSerializer
case *big.Int:
return javaBigIntegerSerializer
case types.Decimal:
return javaDecimalSerializer
case pubserialization.JSON:
return jsonSerializer
}
return nil
}
func makeError(rec interface{}) error {
switch v := rec.(type) {
case error:
return v
case string:
return ihzerrors.NewSerializationError(v, nil)
default:
return fmt.Errorf("%v", rec)
}
}
// make sure to update checkNoDefaultSerializer in serialization/compact_config.go
// if you update the list of default serializers.
var nilSerializer = &NilSerializer{}
var boolSerializer = &BoolSerializer{}
var stringSerializer = &StringSerializer{}
var uint8Serializer = &ByteSerializer{}
var int8Serializer = &Int8Serializer{}
var uint16Serializer = &UInt16Serializer{}
var intSerializer = &IntSerializer{}
var int16Serializer = &Int16Serializer{}
var int32Serializer = &Int32Serializer{}
var int64Serializer = &Int64Serializer{}
var float32Serializer = &Float32Serializer{}
var float64Serializer = &Float64Serializer{}
var boolArraySerializer = &BoolArraySerializer{}
var stringArraySerializer = &StringArraySerializer{}
var uint8ArraySerializer = &ByteArraySerializer{}
var uint16ArraySerializer = &UInt16ArraySerializer{}
var int16ArraySerializer = &Int16ArraySerializer{}
var int32ArraySerializer = &Int32ArraySerializer{}
var int64ArraySerializer = &Int64ArraySerializer{}
var float32ArraySerializer = &Float32ArraySerializer{}
var float64ArraySerializer = &Float64ArraySerializer{}
var uuidSerializer = &UUIDSerializer{}
var jsonSerializer = &JSONValueSerializer{}
var javaDateSerializer = &JavaDateSerializer{}
var javaBigIntegerSerializer = &JavaBigIntegerSerializer{}
var javaDecimalSerializer = &JavaDecimalSerializer{}
var javaClassSerializer = &JavaClassSerializer{}
var javaArraySerializer = &JavaArraySerializer{}
var javaArrayListSerializer = &JavaArrayListSerializer{}
var javaLinkedListSerializer = &JavaLinkedListSerializer{}
var javaLocalDateSerializer = &JavaLocalDateSerializer{}
var javaLocalTimeSerializer = &JavaLocalTimeSerializer{}
var javaLocalDateTimeSerializer = &JavaLocalDateTimeSerializer{}
var javaOffsetDateTimeSerializer = &JavaOffsetDateTimeSerializer{}
var gobSerializer = &GobSerializer{}