-
Notifications
You must be signed in to change notification settings - Fork 545
/
conn_process.go
131 lines (124 loc) · 2.62 KB
/
conn_process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package clickhouse
import (
"context"
"fmt"
"io"
"time"
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
)
type onProcess struct {
data func(*proto.Block)
logs func([]Log)
progress func(*Progress)
profileInfo func(*ProfileInfo)
profileEvents func([]ProfileEvent)
}
func (c *connect) firstBlock(ctx context.Context, on *onProcess) (*proto.Block, error) {
for {
select {
case <-ctx.Done():
c.cancel()
return nil, ctx.Err()
default:
}
packet, err := c.decoder.ReadByte()
if err != nil {
return nil, err
}
switch packet {
case proto.ServerData:
return c.readData(packet, true)
case proto.ServerEndOfStream:
c.debugf("[end of stream]")
return nil, io.EOF
default:
if err := c.handle(packet, on); err != nil {
return nil, err
}
}
}
}
func (c *connect) process(ctx context.Context, on *onProcess) error {
c.lastUsedIn = time.Now()
for {
select {
case <-ctx.Done():
c.cancel()
return ctx.Err()
default:
}
packet, err := c.decoder.ReadByte()
if err != nil {
return err
}
switch packet {
case proto.ServerEndOfStream:
c.debugf("[end of stream]")
return nil
}
if err := c.handle(packet, on); err != nil {
return err
}
}
}
func (c *connect) handle(packet byte, on *onProcess) error {
switch packet {
case proto.ServerData, proto.ServerTotals, proto.ServerExtremes:
block, err := c.readData(packet, true)
if err != nil {
return err
}
if on.data != nil {
on.data(block)
}
case proto.ServerException:
return c.exception()
case proto.ServerProfileInfo:
var info proto.ProfileInfo
if err := info.Decode(c.decoder, c.revision); err != nil {
return err
}
c.debugf("[profile info] %s", &info)
on.profileInfo(&info)
case proto.ServerTableColumns:
var info proto.TableColumns
if err := info.Decode(c.decoder, c.revision); err != nil {
return err
}
c.debugf("[table columns]")
case proto.ServerProfileEvents:
events, err := c.profileEvents()
if err != nil {
return err
}
on.profileEvents(events)
case proto.ServerLog:
logs, err := c.logs()
if err != nil {
return err
}
on.logs(logs)
case proto.ServerProgress:
progress, err := c.progress()
if err != nil {
return err
}
c.debugf("[progress] %s", progress)
on.progress(progress)
default:
return &OpError{
Op: "process",
Err: fmt.Errorf("unexpected packet %d", packet),
}
}
return nil
}
func (c *connect) cancel() error {
c.conn.SetDeadline(time.Now().Add(2 * time.Second))
c.debugf("[cancel]")
c.closed = true
if err := c.encoder.Uvarint(proto.ClientCancel); err == nil {
return err
}
return c.encoder.Flush()
}