-
Notifications
You must be signed in to change notification settings - Fork 437
/
payload.go
153 lines (134 loc) · 4.43 KB
/
payload.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.
package tracer
import (
"bytes"
"encoding/binary"
"io"
"sync/atomic"
"github.com/tinylib/msgp/msgp"
)
// payload is a wrapper on top of the msgpack encoder which allows constructing an
// encoded array by pushing its entries sequentially, one at a time. It basically
// allows us to encode as we would with a stream, except that the contents of the stream
// can be read as a slice by the msgpack decoder at any time. It follows the guidelines
// from the msgpack array spec:
// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
//
// payload implements io.Reader and can be used with the decoder directly. To create
// a new payload use the newPayload method.
//
// payload is not safe for concurrent use.
//
// payload is meant to be used only once and eventually dismissed with the
// single exception of retrying failed flush attempts.
//
// ⚠️ Warning!
//
// The payload should not be reused for multiple sets of traces. Resetting the
// payload for re-use requires the transport to wait for the HTTP package to
// Close the request body before attempting to re-use it again! This requires
// additional logic to be in place. See:
//
// • https://github.com/golang/go/blob/go1.16/src/net/http/client.go#L136-L138
// • https://github.com/DataDog/dd-trace-go/pull/475
// • https://github.com/DataDog/dd-trace-go/pull/549
// • https://github.com/DataDog/dd-trace-go/pull/976
type payload struct {
// header specifies the first few bytes in the msgpack stream
// indicating the type of array (fixarray, array16 or array32)
// and the number of items contained in the stream.
header []byte
// off specifies the current read position on the header.
off int
// count specifies the number of items in the stream.
count uint32
// buf holds the sequence of msgpack-encoded items.
buf bytes.Buffer
// reader is used for reading the contents of buf.
reader *bytes.Reader
}
var _ io.Reader = (*payload)(nil)
// newPayload returns a ready to use payload.
func newPayload() *payload {
p := &payload{
header: make([]byte, 8),
off: 8,
}
return p
}
// push pushes a new item into the stream.
func (p *payload) push(t spanList) error {
if err := msgp.Encode(&p.buf, t); err != nil {
return err
}
atomic.AddUint32(&p.count, 1)
p.updateHeader()
return nil
}
// itemCount returns the number of items available in the srteam.
func (p *payload) itemCount() int {
return int(atomic.LoadUint32(&p.count))
}
// size returns the payload size in bytes. After the first read the value becomes
// inaccurate by up to 8 bytes.
func (p *payload) size() int {
return p.buf.Len() + len(p.header) - p.off
}
// reset sets up the payload to be read a second time. It maintains the
// underlying byte contents of the buffer. reset should not be used in order to
// reuse the payload for another set of traces.
func (p *payload) reset() {
p.updateHeader()
if p.reader != nil {
p.reader.Seek(0, 0)
}
}
// clear empties the payload buffers.
func (p *payload) clear() {
p.buf = bytes.Buffer{}
p.reader = nil
}
// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
const (
msgpackArrayFix byte = 144 // up to 15 items
msgpackArray16 = 0xdc // up to 2^16-1 items, followed by size in 2 bytes
msgpackArray32 = 0xdd // up to 2^32-1 items, followed by size in 4 bytes
)
// updateHeader updates the payload header based on the number of items currently
// present in the stream.
func (p *payload) updateHeader() {
n := uint64(atomic.LoadUint32(&p.count))
switch {
case n <= 15:
p.header[7] = msgpackArrayFix + byte(n)
p.off = 7
case n <= 1<<16-1:
binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes
p.header[5] = msgpackArray16
p.off = 5
default: // n <= 1<<32-1
binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes
p.header[3] = msgpackArray32
p.off = 3
}
}
// Close implements io.Closer
func (p *payload) Close() error {
return nil
}
// Read implements io.Reader. It reads from the msgpack-encoded stream.
func (p *payload) Read(b []byte) (n int, err error) {
if p.off < len(p.header) {
// reading header
n = copy(b, p.header[p.off:])
p.off += n
return n, nil
}
if p.reader == nil {
p.reader = bytes.NewReader(p.buf.Bytes())
}
return p.reader.Read(b)
}