forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
protobuf.go
446 lines (387 loc) · 15.1 KB
/
protobuf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package protobuf
import (
"bytes"
"fmt"
"io"
"reflect"
"github.com/gogo/protobuf/proto"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/framer"
)
var (
// protoEncodingPrefix serves as a magic number for an encoded protobuf message on this serializer. All
// proto messages serialized by this schema will be preceded by the bytes 0x6b 0x38 0x73, with the fourth
// byte being reserved for the encoding style. The only encoding style defined is 0x00, which means that
// the rest of the byte stream is a message of type k8s.io.kubernetes.pkg.runtime.Unknown (proto2).
//
// See k8s.io/kubernetes/pkg/runtime/generated.proto for details of the runtime.Unknown message.
//
// This encoding scheme is experimental, and is subject to change at any time.
protoEncodingPrefix = []byte{0x6b, 0x38, 0x73, 0x00}
)
type errNotMarshalable struct {
t reflect.Type
}
func (e errNotMarshalable) Error() string {
return fmt.Sprintf("object %v does not implement the protobuf marshalling interface and cannot be encoded to a protobuf message", e.t)
}
func IsNotMarshalable(err error) bool {
_, ok := err.(errNotMarshalable)
return err != nil && ok
}
// NewSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer
// is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written
// as-is (any type info passed with the object will be used).
//
// This encoding scheme is experimental, and is subject to change at any time.
func NewSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper, defaultContentType string) *Serializer {
return &Serializer{
prefix: protoEncodingPrefix,
creater: creater,
typer: typer,
contentType: defaultContentType,
}
}
type Serializer struct {
prefix []byte
creater runtime.ObjectCreater
typer runtime.ObjectTyper
contentType string
}
var _ runtime.Serializer = &Serializer{}
// Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
// gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
// the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
// be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
// not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
// errors, the method will return the calculated schema kind.
func (s *Serializer) Decode(originalData []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
if versioned, ok := into.(*runtime.VersionedObjects); ok {
into = versioned.Last()
obj, actual, err := s.Decode(originalData, gvk, into)
if err != nil {
return nil, actual, err
}
// the last item in versioned becomes into, so if versioned was not originally empty we reset the object
// array so the first position is the decoded object and the second position is the outermost object.
// if there were no objects in the versioned list passed to us, only add ourselves.
if into != nil && into != obj {
versioned.Objects = []runtime.Object{obj, into}
} else {
versioned.Objects = []runtime.Object{obj}
}
return versioned, actual, err
}
prefixLen := len(s.prefix)
switch {
case len(originalData) == 0:
// TODO: treat like decoding {} from JSON with defaulting
return nil, nil, fmt.Errorf("empty data")
case len(originalData) < prefixLen || !bytes.Equal(s.prefix, originalData[:prefixLen]):
return nil, nil, fmt.Errorf("provided data does not appear to be a protobuf message, expected prefix %v", s.prefix)
case len(originalData) == prefixLen:
// TODO: treat like decoding {} from JSON with defaulting
return nil, nil, fmt.Errorf("empty body")
}
data := originalData[prefixLen:]
unk := runtime.Unknown{}
if err := unk.Unmarshal(data); err != nil {
return nil, nil, err
}
actual := unk.GroupVersionKind()
copyKindDefaults(&actual, gvk)
if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
*intoUnknown = unk
if ok, _ := s.RecognizesData(bytes.NewBuffer(unk.Raw)); ok {
intoUnknown.ContentType = s.contentType
}
return intoUnknown, &actual, nil
}
if into != nil {
types, _, err := s.typer.ObjectKinds(into)
switch {
case runtime.IsNotRegisteredError(err):
pb, ok := into.(proto.Message)
if !ok {
return nil, &actual, errNotMarshalable{reflect.TypeOf(into)}
}
if err := proto.Unmarshal(unk.Raw, pb); err != nil {
return nil, &actual, err
}
return into, &actual, nil
case err != nil:
return nil, &actual, err
default:
copyKindDefaults(&actual, &types[0])
// if the result of defaulting did not set a version or group, ensure that at least group is set
// (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
// of into is set if there is no better information from the caller or object.
if len(actual.Version) == 0 && len(actual.Group) == 0 {
actual.Group = types[0].Group
}
}
}
if len(actual.Kind) == 0 {
return nil, &actual, runtime.NewMissingKindErr(fmt.Sprintf("%#v", unk.TypeMeta))
}
if len(actual.Version) == 0 {
return nil, &actual, runtime.NewMissingVersionErr(fmt.Sprintf("%#v", unk.TypeMeta))
}
return unmarshalToObject(s.typer, s.creater, &actual, into, unk.Raw)
}
// Encode serializes the provided object to the given writer.
func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
prefixSize := uint64(len(s.prefix))
var unk runtime.Unknown
switch t := obj.(type) {
case *runtime.Unknown:
estimatedSize := prefixSize + uint64(t.Size())
data := make([]byte, estimatedSize)
i, err := t.MarshalTo(data[prefixSize:])
if err != nil {
return err
}
copy(data, s.prefix)
_, err = w.Write(data[:prefixSize+uint64(i)])
return err
default:
kind := obj.GetObjectKind().GroupVersionKind()
unk = runtime.Unknown{
TypeMeta: runtime.TypeMeta{
Kind: kind.Kind,
APIVersion: kind.GroupVersion().String(),
},
}
}
switch t := obj.(type) {
case bufferedMarshaller:
// this path performs a single allocation during write but requires the caller to implement
// the more efficient Size and MarshalTo methods
encodedSize := uint64(t.Size())
estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize)
data := make([]byte, estimatedSize)
i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize)
if err != nil {
return err
}
copy(data, s.prefix)
_, err = w.Write(data[:prefixSize+uint64(i)])
return err
case proto.Marshaler:
// this path performs extra allocations
data, err := t.Marshal()
if err != nil {
return err
}
unk.Raw = data
estimatedSize := prefixSize + uint64(unk.Size())
data = make([]byte, estimatedSize)
i, err := unk.MarshalTo(data[prefixSize:])
if err != nil {
return err
}
copy(data, s.prefix)
_, err = w.Write(data[:prefixSize+uint64(i)])
return err
default:
// TODO: marshal with a different content type and serializer (JSON for third party objects)
return errNotMarshalable{reflect.TypeOf(obj)}
}
}
// RecognizesData implements the RecognizingDecoder interface.
func (s *Serializer) RecognizesData(peek io.Reader) (bool, error) {
prefix := make([]byte, 4)
n, err := peek.Read(prefix)
if err != nil {
if err == io.EOF {
return false, nil
}
return false, err
}
if n != 4 {
return false, nil
}
return bytes.Equal(s.prefix, prefix), nil
}
// copyKindDefaults defaults dst to the value in src if dst does not have a value set.
func copyKindDefaults(dst, src *unversioned.GroupVersionKind) {
if src == nil {
return
}
// apply kind and version defaulting from provided default
if len(dst.Kind) == 0 {
dst.Kind = src.Kind
}
if len(dst.Version) == 0 && len(src.Version) > 0 {
dst.Group = src.Group
dst.Version = src.Version
}
}
// bufferedMarshaller describes a more efficient marshalling interface that can avoid allocating multiple
// byte buffers by pre-calculating the size of the final buffer needed.
type bufferedMarshaller interface {
proto.Sizer
runtime.ProtobufMarshaller
}
// estimateUnknownSize returns the expected bytes consumed by a given runtime.Unknown
// object with a nil RawJSON struct and the expected size of the provided buffer. The
// returned size will not be correct if RawJSOn is set on unk.
func estimateUnknownSize(unk *runtime.Unknown, byteSize uint64) uint64 {
size := uint64(unk.Size())
// protobuf uses 1 byte for the tag, a varint for the length of the array (at most 8 bytes - uint64 - here),
// and the size of the array.
size += 1 + 8 + byteSize
return size
}
// NewRawSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If typer
// is not nil, the object has the group, version, and kind fields set. This serializer does not provide type information for the
// encoded object, and thus is not self describing (callers must know what type is being described in order to decode).
//
// This encoding scheme is experimental, and is subject to change at any time.
func NewRawSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper, defaultContentType string) *RawSerializer {
return &RawSerializer{
creater: creater,
typer: typer,
contentType: defaultContentType,
}
}
// RawSerializer encodes and decodes objects without adding a runtime.Unknown wrapper (objects are encoded without identifying
// type).
type RawSerializer struct {
creater runtime.ObjectCreater
typer runtime.ObjectTyper
contentType string
}
var _ runtime.Serializer = &RawSerializer{}
// Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
// gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
// the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
// be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
// not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
// errors, the method will return the calculated schema kind.
func (s *RawSerializer) Decode(originalData []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
if into == nil {
return nil, nil, fmt.Errorf("this serializer requires an object to decode into: %#v", s)
}
if versioned, ok := into.(*runtime.VersionedObjects); ok {
into = versioned.Last()
obj, actual, err := s.Decode(originalData, gvk, into)
if err != nil {
return nil, actual, err
}
if into != nil && into != obj {
versioned.Objects = []runtime.Object{obj, into}
} else {
versioned.Objects = []runtime.Object{obj}
}
return versioned, actual, err
}
if len(originalData) == 0 {
// TODO: treat like decoding {} from JSON with defaulting
return nil, nil, fmt.Errorf("empty data")
}
data := originalData
actual := &unversioned.GroupVersionKind{}
copyKindDefaults(actual, gvk)
if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
intoUnknown.Raw = data
intoUnknown.ContentEncoding = ""
intoUnknown.ContentType = s.contentType
intoUnknown.SetGroupVersionKind(*actual)
return intoUnknown, actual, nil
}
types, _, err := s.typer.ObjectKinds(into)
switch {
case runtime.IsNotRegisteredError(err):
pb, ok := into.(proto.Message)
if !ok {
return nil, actual, errNotMarshalable{reflect.TypeOf(into)}
}
if err := proto.Unmarshal(data, pb); err != nil {
return nil, actual, err
}
return into, actual, nil
case err != nil:
return nil, actual, err
default:
copyKindDefaults(actual, &types[0])
// if the result of defaulting did not set a version or group, ensure that at least group is set
// (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
// of into is set if there is no better information from the caller or object.
if len(actual.Version) == 0 && len(actual.Group) == 0 {
actual.Group = types[0].Group
}
}
if len(actual.Kind) == 0 {
return nil, actual, runtime.NewMissingKindErr("<protobuf encoded body - must provide default type>")
}
if len(actual.Version) == 0 {
return nil, actual, runtime.NewMissingVersionErr("<protobuf encoded body - must provide default type>")
}
return unmarshalToObject(s.typer, s.creater, actual, into, data)
}
// unmarshalToObject is the common code between decode in the raw and normal serializer.
func unmarshalToObject(typer runtime.ObjectTyper, creater runtime.ObjectCreater, actual *unversioned.GroupVersionKind, into runtime.Object, data []byte) (runtime.Object, *unversioned.GroupVersionKind, error) {
// use the target if necessary
obj, err := runtime.UseOrCreateObject(typer, creater, *actual, into)
if err != nil {
return nil, actual, err
}
pb, ok := obj.(proto.Message)
if !ok {
return nil, actual, errNotMarshalable{reflect.TypeOf(obj)}
}
if err := proto.Unmarshal(data, pb); err != nil {
return nil, actual, err
}
return obj, actual, nil
}
// Encode serializes the provided object to the given writer. Overrides is ignored.
func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error {
switch t := obj.(type) {
case bufferedMarshaller:
// this path performs a single allocation during write but requires the caller to implement
// the more efficient Size and MarshalTo methods
encodedSize := uint64(t.Size())
data := make([]byte, encodedSize)
n, err := t.MarshalTo(data)
if err != nil {
return err
}
_, err = w.Write(data[:n])
return err
case proto.Marshaler:
// this path performs extra allocations
data, err := t.Marshal()
if err != nil {
return err
}
_, err = w.Write(data)
return err
default:
return errNotMarshalable{reflect.TypeOf(obj)}
}
}
var LengthDelimitedFramer = lengthDelimitedFramer{}
type lengthDelimitedFramer struct{}
// NewFrameWriter implements stream framing for this serializer
func (lengthDelimitedFramer) NewFrameWriter(w io.Writer) io.Writer {
return framer.NewLengthDelimitedFrameWriter(w)
}
// NewFrameReader implements stream framing for this serializer
func (lengthDelimitedFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser {
return framer.NewLengthDelimitedFrameReader(r)
}