forked from SkygearIO/skygear-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
schema.go
360 lines (316 loc) · 9.98 KB
/
schema.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
// Copyright 2015-present Oursky Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pq
import (
"bytes"
"database/sql"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/lib/pq"
"github.com/skygeario/skygear-server/skydb"
)
func (db *database) Extend(recordType string, recordSchema skydb.RecordSchema) error {
remoteRecordSchema, err := db.remoteColumnTypes(recordType)
if err != nil {
return err
}
if len(remoteRecordSchema) == 0 {
if err := db.createTable(recordType); err != nil {
return fmt.Errorf("failed to create table: %s", err)
}
}
updatingSchema := skydb.RecordSchema{}
for key, schema := range recordSchema {
remoteSchema, ok := remoteRecordSchema[key]
if !ok {
updatingSchema[key] = schema
} else if isConflict(remoteSchema, schema) {
return fmt.Errorf("conflicting schema %v => %v", remoteSchema, schema)
}
// same data type, do nothing
}
if len(updatingSchema) > 0 {
stmt := db.addColumnStmt(recordType, updatingSchema)
log.WithField("stmt", stmt).Debugln("Adding columns to table")
if _, err := db.c.Exec(stmt); err != nil {
return fmt.Errorf("failed to alter table: %s", err)
}
}
delete(db.c.RecordSchema, recordType)
return nil
}
func (db *database) RenameSchema(recordType, oldName, newName string) error {
tableName := db.tableName(recordType)
oldName = pq.QuoteIdentifier(oldName)
newName = pq.QuoteIdentifier(newName)
stmt := fmt.Sprintf("ALTER TABLE %s RENAME %s TO %s", tableName, oldName, newName)
if _, err := db.c.Exec(stmt); err != nil {
return fmt.Errorf("failed to alter table: %s", err)
}
return nil
}
func (db *database) DeleteSchema(recordType, columnName string) error {
tableName := db.tableName(recordType)
columnName = pq.QuoteIdentifier(columnName)
stmt := fmt.Sprintf("ALTER TABLE %s DROP %s", tableName, columnName)
if _, err := db.c.Exec(stmt); err != nil {
return fmt.Errorf("failed to alter table: %s", err)
}
return nil
}
func (db *database) GetSchema(recordType string) (skydb.RecordSchema, error) {
remoteRecordSchema, err := db.remoteColumnTypes(recordType)
if err != nil {
return nil, err
}
return remoteRecordSchema, nil
}
func (db *database) GetRecordSchemas() (map[string]skydb.RecordSchema, error) {
schemaName := db.schemaName()
rows, err := db.c.Queryx(`
SELECT table_name
FROM information_schema.tables
WHERE (table_name NOT LIKE '\_%') AND (table_schema=$1)
`, schemaName)
if err != nil {
return nil, err
}
result := map[string]skydb.RecordSchema{}
for rows.Next() {
var recordType string
if err := rows.Scan(&recordType); err != nil {
return nil, err
}
log.Debugf("%s\n", recordType)
schema, err := db.GetSchema(recordType)
if err != nil {
return nil, err
}
result[recordType] = schema
}
log.Debugf("GetRecordSchemas Success")
return result, nil
}
func (db *database) createTable(recordType string) (err error) {
tablename := db.tableName(recordType)
stmt := createTableStmt(tablename)
log.WithField("stmt", stmt).Debugln("Creating table")
_, err = db.c.Exec(stmt)
if err != nil {
return err
}
const CreateTriggerStmtFmt = `CREATE TRIGGER trigger_notify_record_change
AFTER INSERT OR UPDATE OR DELETE ON %s FOR EACH ROW
EXECUTE PROCEDURE public.notify_record_change();
`
stmt = fmt.Sprintf(CreateTriggerStmtFmt, tablename)
log.WithField("stmt", stmt).Debugln("Creating trigger")
_, err = db.c.Exec(stmt)
return err
}
func createTableStmt(tableName string) string {
return fmt.Sprintf(`
CREATE TABLE %s (
_id text,
_database_id text,
_owner_id text,
_access jsonb,
_created_at timestamp without time zone NOT NULL,
_created_by text,
_updated_at timestamp without time zone NOT NULL,
_updated_by text,
PRIMARY KEY(_id, _database_id, _owner_id),
UNIQUE (_id)
);
`, tableName)
}
// STEP 1 & 2 are obtained by reverse engineering psql \d with -E option
//
// STEP 3: example of getting foreign keys
// SELECT
// tc.table_name, kcu.column_name,
// ccu.table_name AS foreign_table_name,
// ccu.column_name AS foreign_column_name
// FROM
// information_schema.table_constraints AS tc
// JOIN information_schema.key_column_usage
// AS kcu ON tc.constraint_name = kcu.constraint_name
// JOIN information_schema.constraint_column_usage
// AS ccu ON ccu.constraint_name = tc.constraint_name
// WHERE constraint_type = 'FOREIGN KEY'
// AND tc.table_schema = 'app__'
// AND tc.table_name = 'note';
func (db *database) remoteColumnTypes(recordType string) (skydb.RecordSchema, error) {
typemap := skydb.RecordSchema{}
// STEP 0: Return the cached ColumnType
if schema, ok := db.c.RecordSchema[recordType]; ok {
return schema, nil
}
defer func() {
db.c.RecordSchema[recordType] = typemap
log.Debugf("Cache remoteColumnTypes %s", recordType)
}()
log.Debugf("Querying remoteColumnTypes %s", recordType)
// STEP 1: Get the oid of the current table
var oid int
err := db.c.QueryRowx(`
SELECT c.oid
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = $1
AND n.nspname = $2`,
recordType, db.schemaName()).Scan(&oid)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
log.WithFields(log.Fields{
"schemaName": db.schemaName(),
"recordType": recordType,
"err": err,
}).Errorln("Failed to query oid of table")
return nil, err
}
// STEP 2: Get column name and data type
rows, err := db.c.Queryx(`
SELECT a.attname,
pg_catalog.format_type(a.atttypid, a.atttypmod)
FROM pg_catalog.pg_attribute a
WHERE a.attrelid = $1 AND a.attnum > 0 AND NOT a.attisdropped`,
oid)
if err != nil {
log.WithFields(log.Fields{
"schemaName": db.schemaName(),
"recordType": recordType,
"oid": oid,
"err": err,
}).Errorln("Failed to query column and data type")
return nil, err
}
var columnName, pqType string
for rows.Next() {
if err := rows.Scan(&columnName, &pqType); err != nil {
return nil, err
}
schema := skydb.FieldType{}
switch pqType {
case TypeString:
schema.Type = skydb.TypeString
case TypeNumber:
schema.Type = skydb.TypeNumber
case TypeTimestamp:
schema.Type = skydb.TypeDateTime
case TypeBoolean:
schema.Type = skydb.TypeBoolean
case TypeJSON:
if columnName == "_access" {
schema.Type = skydb.TypeACL
} else {
schema.Type = skydb.TypeJSON
}
case TypeLocation:
schema.Type = skydb.TypeLocation
case TypeInteger:
schema.Type = skydb.TypeInteger
default:
return nil, fmt.Errorf("received unknown data type = %s for column = %s", pqType, columnName)
}
typemap[columnName] = schema
}
// STEP 3: FOREIGN KEY, assumeing we can only reference _id i.e. "ccu.column_name" = _id
builder := psql.Select("kcu.column_name", "ccu.table_name").
From("information_schema.table_constraints AS tc").
Join("information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name").
Join("information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name").
Where("constraint_type = 'FOREIGN KEY' AND tc.table_schema = ? AND tc.table_name = ?", db.schemaName(), recordType)
refs, err := db.c.QueryWith(builder)
if err != nil {
log.WithFields(log.Fields{
"schemaName": db.schemaName(),
"recordType": recordType,
"err": err,
}).Errorln("Failed to query foreign key information schema")
return nil, err
}
for refs.Next() {
s := skydb.FieldType{}
var primaryColumn, referencedTable string
if err := refs.Scan(&primaryColumn, &referencedTable); err != nil {
log.Debugf("err %v", err)
return nil, err
}
switch referencedTable {
case "_asset":
s.Type = skydb.TypeAsset
default:
s.Type = skydb.TypeReference
s.ReferenceType = referencedTable
}
typemap[primaryColumn] = s
}
return typemap, nil
}
func isConflict(from, to skydb.FieldType) bool {
if from.Type == to.Type {
return false
}
// currently integer can only be created by sequence,
// so there are no conflicts
if from.Type == skydb.TypeInteger && to.Type == skydb.TypeSequence {
return false
}
// for manual assignment of sequence
if from.Type == skydb.TypeInteger && to.Type == skydb.TypeNumber {
return false
}
return true
}
// ALTER TABLE app__.note add collection text;
// ALTER TABLE app__.note
// ADD CONSTRAINT fk_note_collection_collection
// FOREIGN KEY (collection)
// REFERENCES app__.collection(_id);
func (db *database) addColumnStmt(recordType string, recordSchema skydb.RecordSchema) string {
buf := bytes.Buffer{}
buf.Write([]byte("ALTER TABLE "))
buf.WriteString(db.tableName(recordType))
buf.WriteByte(' ')
for column, schema := range recordSchema {
buf.Write([]byte("ADD "))
buf.WriteString(pq.QuoteIdentifier(column))
buf.WriteByte(' ')
buf.WriteString(pqDataType(schema.Type))
buf.WriteByte(',')
switch schema.Type {
case skydb.TypeAsset:
db.writeForeignKeyConstraint(&buf, column, "_asset", "id")
case skydb.TypeReference:
db.writeForeignKeyConstraint(&buf, column, schema.ReferenceType, "_id")
}
}
// remote the last ','
buf.Truncate(buf.Len() - 1)
return buf.String()
}
func (db *database) writeForeignKeyConstraint(buf *bytes.Buffer, localCol, referent, remoteCol string) {
buf.Write([]byte(`ADD CONSTRAINT `))
buf.WriteString(pq.QuoteIdentifier(fmt.Sprintf(`fk_%s_%s_%s`, localCol, referent, remoteCol)))
buf.Write([]byte(` FOREIGN KEY (`))
buf.WriteString(pq.QuoteIdentifier(localCol))
buf.Write([]byte(`) REFERENCES `))
buf.WriteString(db.tableName(referent))
buf.Write([]byte(` (`))
buf.WriteString(pq.QuoteIdentifier(remoteCol))
buf.Write([]byte(`),`))
}