This repository has been archived by the owner on Nov 7, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
jaeger_thrift_http_sender.go
126 lines (111 loc) · 3.63 KB
/
jaeger_thrift_http_sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright 2018, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sender
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/apache/thrift/lib/go/thrift"
"go.uber.org/zap"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
"github.com/census-instrumentation/opencensus-service/translator/trace"
)
// Default timeout for http request in seconds
const defaultHTTPTimeout = time.Second * 5
// JaegerThriftHTTPSender forwards spans encoded in the jaeger thrift
// format to a http server
type JaegerThriftHTTPSender struct {
url string
headers map[string]string
client *http.Client
logger *zap.Logger
}
// HTTPOption sets a parameter for the HttpCollector
type HTTPOption func(s *JaegerThriftHTTPSender)
// HTTPTimeout sets maximum timeout for http request.
func HTTPTimeout(duration time.Duration) HTTPOption {
return func(s *JaegerThriftHTTPSender) { s.client.Timeout = duration }
}
// HTTPRoundTripper configures the underlying Transport on the *http.Client
// that is used
func HTTPRoundTripper(transport http.RoundTripper) HTTPOption {
return func(s *JaegerThriftHTTPSender) {
s.client.Transport = transport
}
}
// NewJaegerThriftHTTPSender returns a new HTTP-backend span sender. url should be an http
// url of the collector to handle POST request, typically something like:
// http://hostname:14268/api/traces?format=jaeger.thrift
func NewJaegerThriftHTTPSender(
url string,
headers map[string]string,
zlogger *zap.Logger,
options ...HTTPOption,
) *JaegerThriftHTTPSender {
s := &JaegerThriftHTTPSender{
url: url,
headers: headers,
client: &http.Client{Timeout: defaultHTTPTimeout},
logger: zlogger,
}
for _, option := range options {
option(s)
}
return s
}
// ProcessSpans sends the received data to the configured Jaeger Thrift end-point.
func (s *JaegerThriftHTTPSender) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) {
// TODO: (@pjanotti) In case of failure the translation to Jaeger Thrift is going to be remade, cache it somehow.
if batch == nil {
return 0, fmt.Errorf("Jaeger sender received nil batch")
}
tBatch, err := tracetranslator.OCProtoToJaegerThrift(batch)
if err != nil {
return uint64(len(batch.Spans)), err
}
mSpans := tBatch.Spans
body, err := serializeThrift(tBatch)
if err != nil {
return uint64(len(mSpans)), err
}
req, err := http.NewRequest("POST", s.url, body)
if err != nil {
return uint64(len(mSpans)), err
}
req.Header.Set("Content-Type", "application/x-thrift")
for k, v := range s.headers {
req.Header.Set(k, v)
}
resp, err := s.client.Do(req)
if err != nil {
return uint64(len(mSpans)), err
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode >= http.StatusBadRequest {
return uint64(len(mSpans)), fmt.Errorf("Jaeger Thirft HTTP sender error: %d", resp.StatusCode)
}
return 0, nil
}
func serializeThrift(obj thrift.TStruct) (*bytes.Buffer, error) {
t := thrift.NewTMemoryBuffer()
p := thrift.NewTBinaryProtocolTransport(t)
if err := obj.Write(p); err != nil {
return nil, err
}
return t.Buffer, nil
}