-
Notifications
You must be signed in to change notification settings - Fork 120
/
triggers.ts
306 lines (285 loc) · 12.4 KB
/
triggers.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
import { Statement } from '../util'
import { dedent } from 'ts-dedent'
type ForeignKey = {
table: string
childKey: string
parentKey: string
}
type ColumnName = string
type SQLiteType = string
type PgType = string
type ColumnType = {
sqliteType: SQLiteType
pgType: PgType
}
type ColumnTypes = Record<ColumnName, ColumnType>
export type Table = {
tableName: string
namespace: string
columns: ColumnName[]
primary: ColumnName[]
foreignKeys: ForeignKey[]
columnTypes: ColumnTypes
}
type TableFullName = string
type Tables = Map<TableFullName, Table>
function mkStatement(sql: string): Statement {
return { sql }
}
/**
* Generates the triggers Satellite needs for the given table.
* Assumes that the necessary meta tables already exist.
* @param tableFullName - Full name of the table for which to generate triggers.
* @param table - A new or existing table for which to create/update the triggers.
* @returns An array of SQLite statements that add the necessary oplog triggers.
*
* @remarks
* We return an array of SQL statements because the DB drivers
* do not accept queries containing more than one SQL statement.
*/
export function generateOplogTriggers(
tableFullName: TableFullName,
table: Omit<Table, 'foreignKeys'>
): Statement[] {
const { tableName, namespace, columns, primary, columnTypes } = table
const newPKs = joinColsForJSON(primary, columnTypes, 'new')
const oldPKs = joinColsForJSON(primary, columnTypes, 'old')
const newRows = joinColsForJSON(columns, columnTypes, 'new')
const oldRows = joinColsForJSON(columns, columnTypes, 'old')
return [
dedent`
-- Toggles for turning the triggers on and off
INSERT OR IGNORE INTO _electric_trigger_settings(tablename,flag) VALUES ('${tableFullName}', 1);
`,
dedent`
/* Triggers for table ${tableName} */
-- ensures primary key is immutable
DROP TRIGGER IF EXISTS update_ensure_${namespace}_${tableName}_primarykey;
`,
dedent`
CREATE TRIGGER update_ensure_${namespace}_${tableName}_primarykey
BEFORE UPDATE ON "${namespace}"."${tableName}"
BEGIN
SELECT
CASE
${primary
.map(
(col) =>
`WHEN old."${col}" != new."${col}" THEN\n\t\tRAISE (ABORT, 'cannot change the value of column ${col} as it belongs to the primary key')`
)
.join('\n')}
END;
END;
`,
dedent`
-- Triggers that add INSERT, UPDATE, DELETE operation to the _opslog table
DROP TRIGGER IF EXISTS insert_${namespace}_${tableName}_into_oplog;
`,
dedent`
CREATE TRIGGER insert_${namespace}_${tableName}_into_oplog
AFTER INSERT ON "${namespace}"."${tableName}"
WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == '${tableFullName}')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
VALUES ('${namespace}', '${tableName}', 'INSERT', json_object(${newPKs}), json_object(${newRows}), NULL, NULL);
END;
`,
dedent`
DROP TRIGGER IF EXISTS update_${namespace}_${tableName}_into_oplog;
`,
dedent`
CREATE TRIGGER update_${namespace}_${tableName}_into_oplog
AFTER UPDATE ON "${namespace}"."${tableName}"
WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == '${tableFullName}')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
VALUES ('${namespace}', '${tableName}', 'UPDATE', json_object(${newPKs}), json_object(${newRows}), json_object(${oldRows}), NULL);
END;
`,
dedent`
DROP TRIGGER IF EXISTS delete_${namespace}_${tableName}_into_oplog;
`,
dedent`
CREATE TRIGGER delete_${namespace}_${tableName}_into_oplog
AFTER DELETE ON "${namespace}"."${tableName}"
WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == '${tableFullName}')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
VALUES ('${namespace}', '${tableName}', 'DELETE', json_object(${oldPKs}), NULL, json_object(${oldRows}), NULL);
END;
`,
].map(mkStatement)
}
/**
* Generates triggers for compensations for all foreign keys in the provided table.
*
* Compensation is recorded as a SatOpCompensation messaage. The entire reason
* for it existing is to maybe revive the row if it has been deleted, so we need
* correct tags.
*
* The compensation update contains _just_ the primary keys, no other columns are present.
*
* @param tableFullName Full name of the table.
* @param table The corresponding table.
* @param tables Map of all tables (needed to look up the tables that are pointed at by FKs).
* @returns An array of SQLite statements that add the necessary compensation triggers.
*/
function generateCompensationTriggers(table: Table): Statement[] {
const { tableName, namespace, foreignKeys, columnTypes } = table
const makeTriggers = (foreignKey: ForeignKey) => {
const { childKey } = foreignKey
const fkTableNamespace = 'main' // currently, Electric always uses the 'main' namespace
const fkTableName = foreignKey.table
const fkTablePK = foreignKey.parentKey // primary key of the table pointed at by the FK.
// This table's `childKey` points to the parent's table `parentKey`.
// `joinColsForJSON` looks up the type of the `parentKey` column in the provided `colTypes` object.
// However, `columnTypes` contains the types of the columns of this table
// so we need to pass an object containing the column type of the parent key.
// We can construct that object because the type of the parent key must be the same
// as the type of the child key that is pointing to it.
const joinedFkPKs = joinColsForJSON([fkTablePK], {
[fkTablePK]: columnTypes[foreignKey.childKey],
})
return [
dedent`-- Triggers for foreign key compensations
DROP TRIGGER IF EXISTS compensation_insert_${namespace}_${tableName}_${childKey}_into_oplog;`,
// The compensation trigger inserts a row in `_electric_oplog` if the row pointed at by the FK exists
// The way how this works is that the values for the row are passed to the nested SELECT
// which will return those values for every record that matches the query
// which can be at most once since we filter on the foreign key which is also the primary key and thus is unique.
dedent`
CREATE TRIGGER compensation_insert_${namespace}_${tableName}_${childKey}_into_oplog
AFTER INSERT ON "${namespace}"."${tableName}"
WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == '${fkTableNamespace}.${fkTableName}') AND
1 == (SELECT value from _electric_meta WHERE key == 'compensations')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
SELECT '${fkTableNamespace}', '${fkTableName}', 'COMPENSATION', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL
FROM "${fkTableNamespace}"."${fkTableName}" WHERE "${foreignKey.parentKey}" = new."${foreignKey.childKey}";
END;
`,
dedent`DROP TRIGGER IF EXISTS compensation_update_${namespace}_${tableName}_${foreignKey.childKey}_into_oplog;`,
dedent`
CREATE TRIGGER compensation_update_${namespace}_${tableName}_${foreignKey.childKey}_into_oplog
AFTER UPDATE ON "${namespace}"."${tableName}"
WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == '${fkTableNamespace}.${fkTableName}') AND
1 == (SELECT value from _electric_meta WHERE key == 'compensations')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
SELECT '${fkTableNamespace}', '${fkTableName}', 'COMPENSATION', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL
FROM "${fkTableNamespace}"."${fkTableName}" WHERE "${foreignKey.parentKey}" = new."${foreignKey.childKey}";
END;
`,
].map(mkStatement)
}
const fkTriggers = foreignKeys.map((fk) => makeTriggers(fk))
return fkTriggers.flat()
}
/**
* Generates the oplog triggers and compensation triggers for the provided table.
* @param tableFullName - Full name of the table for which to create the triggers.
* @param tables - Dictionary mapping full table names to the corresponding tables.
* @returns An array of SQLite statements that add the necessary oplog and compensation triggers.
*/
export function generateTableTriggers(
tableFullName: TableFullName,
table: Table
): Statement[] {
const oplogTriggers = generateOplogTriggers(tableFullName, table)
const fkTriggers = generateCompensationTriggers(table)
return oplogTriggers.concat(fkTriggers)
}
/**
* Generates triggers for all the provided tables.
* @param tables - Dictionary mapping full table names to the corresponding tables.
* @returns An array of SQLite statements that add the necessary oplog and compensation triggers for all tables.
*/
export function generateTriggers(tables: Tables): Statement[] {
const tableTriggers: Statement[] = []
tables.forEach((table, tableFullName) => {
const triggers = generateTableTriggers(tableFullName, table)
tableTriggers.push(...triggers)
})
const stmts = [
{ sql: 'DROP TABLE IF EXISTS _electric_trigger_settings;' },
{
sql: 'CREATE TABLE _electric_trigger_settings(tablename TEXT PRIMARY KEY, flag INTEGER);',
},
...tableTriggers,
]
return stmts
}
/**
* Joins the column names and values into a string of pairs of the form `'col1', val1, 'col2', val2, ...`
* that can be used to build a JSON object in a SQLite `json_object` function call.
* Values of type REAL are cast to text to avoid a bug in SQLite's `json_object` function (see below).
* Similarly, values of type INT8 (i.e. BigInts) are cast to text because JSON does not support BigInts.
*
* NOTE: There is a bug with SQLite's `json_object` function up to version 3.41.2
* that causes it to return an invalid JSON object if some value is +Infinity or -Infinity.
* @example
* sqlite> SELECT json_object('a',2e370,'b',-3e380);
* {"a":Inf,"b":-Inf}
*
* The returned JSON is not valid because JSON does not support `Inf` nor `-Inf`.
* @example
* sqlite> SELECT json_valid((SELECT json_object('a',2e370,'b',-3e380)));
* 0
*
* This is fixed in version 3.42.0 and on:
* @example
* sqlite> SELECT json_object('a',2e370,'b',-3e380);
* {"a":9e999,"b":-9e999}
*
* The returned JSON now is valid, the numbers 9e999 and -9e999
* are out of range of floating points and thus will be converted
* to `Infinity` and `-Infinity` when parsed with `JSON.parse`.
*
* Nevertheless version SQLite version 3.42.0 is very recent (May 2023)
* and users may be running older versions so we want to support them.
* Therefore we introduce the following workaround:
* @example
* sqlite> SELECT json_object('a', cast(2e370 as TEXT),'b', cast(-3e380 as TEXT));
* {"a":"Inf","b":"-Inf"}
*
* By casting the values to TEXT, infinity values are turned into their string representation.
* As such, the resulting JSON is valid.
* This means that the values will be stored as strings in the oplog,
* thus, we must be careful when parsing the oplog to convert those values back to their numeric type.
*
* For reference:
* - https://discord.com/channels/933657521581858818/1163829658236760185
* - https://www.sqlite.org/src/info/b52081d0acd07dc5bdb4951a3e8419866131965260c1e3a4c9b6e673bfe3dfea
*
* @param cols The column names
* @param target The target to use for the column values (new or old value provided by the trigger).
*/
function joinColsForJSON(
cols: string[],
colTypes: ColumnTypes,
target?: 'new' | 'old'
) {
// casts the value to TEXT if it is of type REAL
// to work around the bug in SQLite's `json_object` function
const castIfNeeded = (col: string, targettedCol: string) => {
const tpes = colTypes[col]
const sqliteType = tpes.sqliteType
const pgType = tpes.pgType
if (sqliteType === 'REAL' || pgType === 'INT8' || pgType === 'BIGINT') {
return `cast(${targettedCol} as TEXT)`
} else {
return targettedCol
}
}
if (typeof target === 'undefined') {
return cols
.sort()
.map((col) => `'${col}', ${castIfNeeded(col, `"${col}"`)}`)
.join(', ')
} else {
return cols
.sort()
.map((col) => `'${col}', ${castIfNeeded(col, `${target}."${col}"`)}`)
.join(', ')
}
}