forked from koblas/impalathing
/
connection.go
99 lines (78 loc) · 2.27 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package impalathing
import (
"context"
"fmt"
"time"
"github.com/KosyanMedia/impalathing/services/beeswax"
impala "github.com/KosyanMedia/impalathing/services/impalaservice"
"github.com/apache/thrift/lib/go/thrift"
)
type Options struct {
PollIntervalSeconds float64
BatchSize int64
}
var (
DefaultOptions = Options{PollIntervalSeconds: 0.1, BatchSize: 10000}
)
type Connection struct {
client *impala.ImpalaServiceClient
handle *beeswax.QueryHandle
transport thrift.TTransport
options Options
}
func Connect(host string, port int, options Options, saslConfiguration map[string]string) (*Connection, error) {
socket, err := thrift.NewTSocketTimeout(fmt.Sprintf("%s:%d", host, port), 10000*time.Millisecond)
if err != nil {
return nil, err
}
var transport thrift.TTransport
if _, ok := saslConfiguration["mechanismName"]; ok {
saslConfiguration["service"] = "impala"
transport, err = NewTSaslTransport(socket, host, saslConfiguration["mechanismName"], saslConfiguration)
if err != nil {
return nil, err
}
} else {
transportFactory := thrift.NewTBufferedTransportFactory(24 * 1024 * 1024)
transport, _ = transportFactory.GetTransport(socket)
}
protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
if err := transport.Open(); err != nil {
return nil, err
}
client := impala.NewImpalaServiceClientFactory(transport, protocolFactory)
return &Connection{client, nil, transport, options}, nil
}
func (c *Connection) isOpen() bool {
return c.client != nil
}
func (c *Connection) Close() error {
if c.isOpen() {
if c.handle != nil {
_, err := c.client.Cancel(context.Background(), c.handle)
if err != nil {
return err
}
c.handle = nil
}
c.transport.Close()
c.client = nil
}
return nil
}
func (c *Connection) query(ctx context.Context, query string) (RowSet, error) {
bquery := beeswax.Query{}
bquery.Query = query
bquery.Configuration = []string{}
handle, err := c.client.Query(ctx, &bquery)
if err != nil {
return nil, err
}
return newRowSet(c.client, handle, c.options), nil
}
func (c *Connection) Query(query string) (RowSet, error) {
return c.query(context.Background(), query)
}
func (c *Connection) QueryWithContext(ctx context.Context, query string) (RowSet, error) {
return c.query(ctx, query)
}