-
Notifications
You must be signed in to change notification settings - Fork 78
/
flight_stub.go
140 lines (112 loc) · 3.39 KB
/
flight_stub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package client
import (
"context"
"errors"
"io"
"net"
"strconv"
"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/flight"
"github.com/apache/arrow/go/v8/arrow/ipc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
ticketpb2 "github.com/deephaven/deephaven-core/go/internal/proto/ticket"
)
// flightStub wraps Arrow Flight gRPC calls.
type flightStub struct {
client *Client
stub flight.Client // The stub for performing Arrow Flight gRPC requests.
}
func newFlightStub(client *Client, host string, port string) (*flightStub, error) {
stub, err := flight.NewClientWithMiddleware(
net.JoinHostPort(host, port),
nil,
nil,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, err
}
return &flightStub{client: client, stub: stub}, nil
}
// handshake creates a client for performing token handshakes with the Flight service.
// The client can be used to obtain and refresh authentication tokens.
func (fs *flightStub) handshake(ctx context.Context, opts ...grpc.CallOption) (flight.FlightService_HandshakeClient, error) {
return fs.stub.Handshake(ctx, opts...)
}
// snapshotRecord downloads the data currently in the provided table and returns it as an Arrow Record.
func (fs *flightStub) snapshotRecord(ctx context.Context, ticket *ticketpb2.Ticket) (arrow.Record, error) {
ctx, err := fs.client.tokenMgr.withToken(ctx)
if err != nil {
return nil, err
}
fticket := &flight.Ticket{Ticket: ticket.GetTicket()}
req, err := fs.stub.DoGet(ctx, fticket)
if err != nil {
return nil, err
}
defer req.CloseSend()
reader, err := flight.NewRecordReader(req)
if err != nil {
return nil, err
}
defer reader.Release()
rec1, err := reader.Read()
if err != nil {
return nil, err
}
rec1.Retain()
rec2, err := reader.Read()
if err != io.EOF {
rec1.Release()
rec2.Release()
return nil, errors.New("multiple records retrieved during snapshot")
}
return rec1, nil
}
// ImportTable uploads a table to the Deephaven server.
// The table can then be manipulated and referenced using the returned TableHandle.
func (fs *flightStub) ImportTable(ctx context.Context, rec arrow.Record) (*TableHandle, error) {
ctx, err := fs.client.tokenMgr.withToken(ctx)
if err != nil {
return nil, err
}
doPut, err := fs.stub.DoPut(ctx)
if err != nil {
return nil, err
}
defer doPut.CloseSend()
ticketNum := fs.client.ticketFact.nextId()
//todo Seems like this should be a fixed size int cast here and not a generic int
descr := &flight.FlightDescriptor{Type: flight.DescriptorPATH, Path: []string{"export", strconv.Itoa(int(ticketNum))}}
writer := flight.NewRecordWriter(doPut, ipc.WithSchema(rec.Schema()))
writer.SetFlightDescriptor(descr)
err = writer.Write(rec)
if err != nil {
return nil, err
}
err = writer.Close()
if err != nil {
return nil, err
}
_, err = doPut.Recv()
if err != nil {
return nil, err
}
ticket := fs.client.ticketFact.makeTicket(ticketNum)
schema := rec.Schema()
return newTableHandle(fs.client, &ticket, schema, rec.NumRows(), true), nil
}
// Close closes the flight stub and frees any associated resources.
// The flight stub should not be used after calling this function.
// The client lock should be held when calling this function.
func (fs *flightStub) Close() error {
if fs.stub != nil {
err := fs.stub.Close()
if err != nil {
return err
}
fs.stub = nil
}
return nil
}