forked from araddon/qlbridge
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ddl.go
160 lines (134 loc) · 3.19 KB
/
ddl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package exec
import (
"encoding/json"
"fmt"
u "github.com/araddon/gou"
"github.com/araddon/qlbridge/lex"
"github.com/araddon/qlbridge/plan"
"github.com/araddon/qlbridge/schema"
)
var (
// Ensure that we implement the Task Runner interface
_ TaskRunner = (*Create)(nil)
_ TaskRunner = (*Drop)(nil)
_ TaskRunner = (*Alter)(nil)
)
type (
// Create is executeable task for SQL Create, Alter, Schema, Source etc.
Create struct {
*TaskBase
p *plan.Create
}
// Drop is executeable task for SQL DROP.
Drop struct {
*TaskBase
p *plan.Drop
}
// Alter is executeable task for SQL ALTER.
Alter struct {
*TaskBase
p *plan.Alter
}
)
// NewCreate creates new create exec task
func NewCreate(ctx *plan.Context, p *plan.Create) *Create {
m := &Create{
TaskBase: NewTaskBase(ctx),
p: p,
}
return m
}
// Close Create
func (m *Create) Close() error {
return m.TaskBase.Close()
}
// Run Create
func (m *Create) Run() error {
defer close(m.msgOutCh)
cs := m.p.Stmt
switch cs.Tok.T {
case lex.TokenSource, lex.TokenSchema:
/*
// "sub_schema_name" will create a new child schema called "sub_schema_name"
// that is added to "existing_schema_name"
// of source type elasticsearch
CREATE source sub_schema_name WITH {
"type":"elasticsearch",
"schema":"existing_schema_name",
"settings" : {
"apikey":"GET_YOUR_API_KEY"
}
};
*/
// If we specify a parent schema to add this child schema to
schemaName := cs.Identity
by, err := json.MarshalIndent(cs.With, "", " ")
if err != nil {
u.Errorf("could not convert conf = %v ", cs.With)
return fmt.Errorf("could not convert conf %v", cs.With)
}
sourceConf := &schema.ConfigSource{}
err = json.Unmarshal(by, sourceConf)
if err != nil {
u.Errorf("could not convert conf = %v ", string(by))
return fmt.Errorf("could not convert conf %v", cs.With)
}
sourceConf.Name = schemaName
reg := schema.DefaultRegistry()
return reg.SchemaAddFromConfig(sourceConf)
default:
u.Warnf("unrecognized create/alter: kw=%v stmt:%s", cs.Tok, m.p.Stmt)
}
return ErrNotImplemented
}
// NewDrop creates new drop exec task.
func NewDrop(ctx *plan.Context, p *plan.Drop) *Drop {
m := &Drop{
TaskBase: NewTaskBase(ctx),
p: p,
}
return m
}
// Close Drop
func (m *Drop) Close() error {
return m.TaskBase.Close()
}
// Run Drop
func (m *Drop) Run() error {
defer close(m.msgOutCh)
cs := m.p.Stmt
s := m.Ctx.Schema
if s == nil {
return fmt.Errorf("must have schema")
}
switch cs.Tok.T {
case lex.TokenSource, lex.TokenSchema, lex.TokenTable:
reg := schema.DefaultRegistry()
return reg.SchemaDrop(s.Name, cs.Identity, cs.Tok.T)
default:
u.Warnf("unrecognized DROP: kw=%v stmt:%s", cs.Tok, m.p.Stmt)
}
return ErrNotImplemented
}
// NewAlter creates new ALTER exec task.
func NewAlter(ctx *plan.Context, p *plan.Alter) *Alter {
m := &Alter{
TaskBase: NewTaskBase(ctx),
p: p,
}
return m
}
// Close Alter
func (m *Alter) Close() error {
return m.TaskBase.Close()
}
// Run Alter
func (m *Alter) Run() error {
defer close(m.msgOutCh)
cs := m.p.Stmt
switch cs.Tok.T {
default:
u.Warnf("unrecognized ALTER: kw=%v stmt:%s", cs.Tok, m.p.Stmt)
}
return ErrNotImplemented
}