forked from araddon/qlbridge
/
into.go
211 lines (183 loc) · 4.69 KB
/
into.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package exec
import (
"database/sql/driver"
"fmt"
"net/url"
"time"
u "github.com/araddon/gou"
"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/expr"
"github.com/araddon/qlbridge/plan"
"github.com/araddon/qlbridge/rel"
)
var (
sinkFactories = make(map[string]SinkMaker)
)
// Into - Write to output sink
type Into struct {
*TaskBase
p *plan.Into
complete chan bool
Closed bool
isComplete bool
colIndexes map[string]int
sink Sink
}
// NewInto create new into exec task
func NewInto(ctx *plan.Context, p *plan.Into) *Into {
o := &Into{
TaskBase: NewTaskBase(ctx),
p: p,
complete: make(chan bool),
}
return o
}
// Registry for sinks
func Register(name string, factory SinkMaker) {
if factory == nil {
panic(fmt.Sprintf("SinkMaker factory %s does not exist.", name))
}
_, registered := sinkFactories[name]
if registered {
return
}
sinkFactories[name] = factory
}
func (m *Into) Open(ctx *plan.Context, destination string) (err error) {
params := make(map[string]interface{}, 0)
if m.TaskBase.Ctx.Stmt.(*rel.SqlSelect).With != nil {
params = m.TaskBase.Ctx.Stmt.(*rel.SqlSelect).With
}
if url, err := url.Parse(destination); err == nil {
if newSink, ok := sinkFactories[url.Scheme]; !ok {
m := fmt.Sprintf("scheme [%s] not registered!", url.Scheme)
panic(m)
} else {
m.sink, err = newSink(ctx, destination, params)
}
} else { // First treat this as a output Table
if newSink, ok := sinkFactories["table"]; !ok {
m := fmt.Sprintf("INTO <TABLE> sink factory not found!")
panic(m)
} else {
m.sink, err = newSink(ctx, destination, params)
}
}
return
}
func (m *Into) Close() error {
m.Lock()
if m.Closed {
m.Unlock()
return nil
}
m.Closed = true
m.sink.Close() //FIX: handle error on close
m.Unlock()
// what should this be?
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
//u.Infof("%p into sink final Close() waiting for complete", m)
select {
case <-ticker.C:
u.Warnf("into sink timeout???? ")
case <-m.complete:
//u.Warnf("%p got into sink complete", m)
}
return m.TaskBase.Close()
}
func (m *Into) Run() error {
defer m.Ctx.Recover()
defer close(m.msgOutCh)
//outCh := m.MessageOut()
inCh := m.MessageIn()
projCols := m.TaskBase.Ctx.Projection.Proj.Columns
cols := make(map[string]int, len(projCols))
for i, col := range projCols {
//u.Debugf("aliasing: key():%-15q As:%-15q %-15q", col.Key(), col.As, col.String())
cols[col.As] = i
}
//m.colIndexes = m.TaskBase.Ctx.Stmt.(*rel.SqlSelect).ColIndexes()
m.colIndexes = cols
if m.colIndexes == nil {
u.Errorf("Cannot get column indexes for output !")
return nil
}
// Open the output file sink
if err := m.Open(m.Ctx, m.p.Stmt.Table); err != nil {
u.Errorf("Open output sink failed! - %v", err)
return err
}
var rowCount, lastMsgId int64
msgReadLoop:
for {
select {
case <-m.SigChan():
//u.Warnf("got signal quit")
return nil
case <-m.ErrChan():
//u.Warnf("got err signal")
m.sink.Cleanup()
return nil
case msg, ok := <-inCh:
if !ok {
//u.Warnf("NICE, got closed channel shutdown")
//close(m.TaskBase.sigCh)
break msgReadLoop
} else {
var sdm *datasource.SqlDriverMessageMap
switch mt := msg.(type) {
case *datasource.SqlDriverMessageMap:
sdm = mt
m.sink.Next(sdm.Values(), m.colIndexes) // FIX: handle error return from Next()
rowCount++
lastMsgId = int64(mt.Id())
default:
msgReader, isContextReader := msg.(expr.ContextReader)
if !isContextReader {
err := fmt.Errorf("To use Into must use SqlDriverMessageMap but got %T", msg)
u.Errorf("unrecognized msg %T", msg)
close(m.TaskBase.sigCh)
return err
}
sdm = datasource.NewSqlDriverMessageMapCtx(msg.Id(), msgReader, m.colIndexes)
m.sink.Next(sdm.Values(), m.colIndexes) // FIX: handle error return from Next()
rowCount++
lastMsgId = int64(msg.Id())
}
}
}
}
//u.Warnf("HERE 1 %#v, %p, LEN = %d", m.ErrChan(), m.ErrChan(), len(m.ErrChan()))
errLoop:
for {
select {
case <-m.ErrChan():
//u.Warnf("HERE ERR")
m.sink.Cleanup()
break errLoop
default:
}
select {
case <-m.ErrChan():
//u.Warnf("HERE 3")
m.sink.Cleanup()
break errLoop
case <-m.SigChan():
//u.Warnf("HERE 2")
break errLoop
case _, ok := <-inCh:
//u.Warnf("HERE 4")
if !ok {
break errLoop
}
}
}
vals := make([]driver.Value, 2)
vals[0] = lastMsgId
vals[1] = rowCount
m.msgOutCh <- datasource.NewSqlDriverMessage(0, vals)
m.isComplete = true
close(m.complete)
return nil
}