-
Notifications
You must be signed in to change notification settings - Fork 546
/
context.go
137 lines (121 loc) · 2.67 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package clickhouse
import (
"context"
"github.com/ClickHouse/clickhouse-go/v2/external"
"go.opentelemetry.io/otel/trace"
)
var _contextOptionKey = &QueryOptions{
settings: Settings{
"_contextOption": struct{}{},
},
}
type Settings map[string]interface{}
type (
QueryOption func(*QueryOptions) error
QueryOptions struct {
span trace.SpanContext
queryID string
quotaKey string
events struct {
logs func(*Log)
progress func(*Progress)
profileInfo func(*ProfileInfo)
profileEvents func([]ProfileEvent)
}
settings Settings
external []*external.Table
}
)
func WithSpan(span trace.SpanContext) QueryOption {
return func(o *QueryOptions) error {
o.span = span
return nil
}
}
func WithQueryID(queryID string) QueryOption {
return func(o *QueryOptions) error {
o.queryID = queryID
return nil
}
}
func WithQuotaKey(quotaKey string) QueryOption {
return func(o *QueryOptions) error {
o.quotaKey = quotaKey
return nil
}
}
func WithSettings(settings Settings) QueryOption {
return func(o *QueryOptions) error {
o.settings = settings
return nil
}
}
func WithLogs(fn func(*Log)) QueryOption {
return func(o *QueryOptions) error {
o.events.logs = fn
return nil
}
}
func WithProgress(fn func(*Progress)) QueryOption {
return func(o *QueryOptions) error {
o.events.progress = fn
return nil
}
}
func WithProfileInfo(fn func(*ProfileInfo)) QueryOption {
return func(o *QueryOptions) error {
o.events.profileInfo = fn
return nil
}
}
func WithProfileEvents(fn func([]ProfileEvent)) QueryOption {
return func(o *QueryOptions) error {
o.events.profileEvents = fn
return nil
}
}
func WithExternalTable(t ...*external.Table) QueryOption {
return func(o *QueryOptions) error {
o.external = append(o.external, t...)
return nil
}
}
func Context(parent context.Context, options ...QueryOption) context.Context {
var opt QueryOptions
for _, f := range options {
f(&opt)
}
return context.WithValue(parent, _contextOptionKey, opt)
}
func queryOptions(ctx context.Context) QueryOptions {
if o, ok := ctx.Value(_contextOptionKey).(QueryOptions); ok {
return o
}
return QueryOptions{}
}
func (q *QueryOptions) onProcess() *onProcess {
return &onProcess{
logs: func(logs []Log) {
if q.events.logs != nil {
for _, l := range logs {
q.events.logs(&l)
}
}
},
progress: func(p *Progress) {
if q.events.progress != nil {
q.events.progress(p)
}
},
profileInfo: func(p *ProfileInfo) {
if q.events.profileInfo != nil {
q.events.profileInfo(p)
}
},
profileEvents: func(events []ProfileEvent) {
if q.events.profileEvents != nil {
q.events.profileEvents(events)
}
},
}
}