forked from ClickHouse/clickhouse-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rows.go
159 lines (148 loc) · 3.72 KB
/
rows.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package clickhouse
import (
"database/sql/driver"
"fmt"
"io"
"reflect"
"time"
"github.com/kshvakov/clickhouse/lib/column"
"github.com/kshvakov/clickhouse/lib/protocol"
)
type rows struct {
ch *clickhouse
index int
finish func()
values [][]interface{}
totals [][]interface{}
extremes [][]interface{}
columns []string
blockColumns []column.Column
allDataIsReceived bool
}
func (rows *rows) Columns() []string {
return rows.columns
}
func (rows *rows) ColumnTypeScanType(idx int) reflect.Type {
return rows.blockColumns[idx].ScanType()
}
func (rows *rows) ColumnTypeDatabaseTypeName(idx int) string {
return rows.blockColumns[idx].CHType()
}
func (rows *rows) Next(dest []driver.Value) error {
for len(rows.values) == 0 || len(rows.values[0]) <= rows.index {
if rows.allDataIsReceived {
return io.EOF
}
if err := rows.receiveData(); err != nil {
return err
}
}
for i := range dest {
dest[i] = rows.values[i][rows.index]
}
rows.index++
if len(rows.values) == 0 || len(rows.values[0]) <= rows.index {
rows.values = nil
for !(rows.allDataIsReceived || len(rows.values) != 0) {
if err := rows.receiveData(); err != nil {
return err
}
}
}
return nil
}
func (rows *rows) HasNextResultSet() bool {
return len(rows.totals) != 0 || len(rows.extremes) != 0
}
func (rows *rows) NextResultSet() error {
switch {
case len(rows.totals) != 0:
for _, value := range rows.totals {
rows.values = append(rows.values, value)
}
rows.index = 0
rows.totals = nil
case len(rows.extremes) != 0:
for _, value := range rows.extremes {
rows.values = append(rows.values, value)
}
rows.index = 0
rows.extremes = nil
default:
return io.EOF
}
return nil
}
func (rows *rows) receiveData() error {
for {
packet, err := rows.ch.decoder.Uvarint()
if err != nil {
return err
}
switch packet {
case protocol.ServerException:
rows.ch.logf("[rows] <- exception")
return rows.ch.exception()
case protocol.ServerProgress:
progress, err := rows.ch.progress()
if err != nil {
return err
}
rows.ch.logf("[rows] <- progress: rows=%d, bytes=%d, total rows=%d",
progress.rows,
progress.bytes,
progress.totalRows,
)
case protocol.ServerProfileInfo:
profileInfo, err := rows.ch.profileInfo()
if err != nil {
return err
}
rows.ch.logf("[rows] <- profiling: rows=%d, bytes=%d, blocks=%d", profileInfo.rows, profileInfo.bytes, profileInfo.blocks)
case protocol.ServerData, protocol.ServerTotals, protocol.ServerExtremes:
var (
begin = time.Now()
block, err = rows.ch.readBlock()
)
if err != nil {
return err
}
rows.ch.logf("[rows] <- data: packet=%d, columns=%d, rows=%d, elapsed=%s", packet, block.NumColumns, block.NumRows, time.Since(begin))
if len(rows.columns) == 0 && len(block.Columns) != 0 {
rows.columns = block.ColumnNames()
rows.blockColumns = block.Columns
if block.NumRows == 0 {
return nil
}
}
switch block.Reset(); packet {
case protocol.ServerData:
rows.index = 0
rows.values = block.Values
case protocol.ServerTotals:
rows.totals = block.Values
case protocol.ServerExtremes:
rows.extremes = block.Values
}
if len(rows.values) != 0 {
return nil
}
case protocol.ServerEndOfStream:
rows.allDataIsReceived = true
rows.ch.logf("[rows] <- end of stream")
return nil
default:
rows.ch.conn.Close()
rows.ch.logf("[rows] unexpected packet [%d]", packet)
return fmt.Errorf("[rows] unexpected packet [%d] from server", packet)
}
}
}
func (rows *rows) Close() error {
rows.ch.logf("[rows] close")
rows.columns = nil
if rows.finish != nil {
rows.finish()
}
return nil
}