/
sync-database-with-disk.ts
executable file
·153 lines (129 loc) · 5.3 KB
/
sync-database-with-disk.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env -S npx ts-node -T --esm
import {
assertOkResult,
convertJsonSyncEvent,
getAllNodesForConnection,
type JsonSyncEvent,
} from '@dossierhq/core';
import type { Server } from '@dossierhq/server';
import { config } from 'dotenv';
import assert from 'node:assert';
import { readFile, writeFile } from 'node:fs/promises';
import path from 'node:path';
import { parseArgs } from 'node:util';
import { SYSTEM_USERS } from '../config/SystemUsers.js';
import { getCurrentSyncEventFiles, updateSyncEventsOnDisk } from '../utils/FileSystemSerializer.js';
import type { AppAdminClient } from '../utils/SchemaTypes';
import { initializeServer } from '../utils/SharedServerUtils.js';
const DATA_DIR = new URL('../data', import.meta.url).pathname;
const PRINCIPALS_HEADER: [string, string, string] = ['provider', 'identifier', 'subjectId'];
// prefer .env.local file if exists, over .env file
config({ path: '.env.local' });
config({ path: '.env' });
async function getUnappliedEvents(adminClient: AppAdminClient) {
// Get file events
const files = await getCurrentSyncEventFiles(DATA_DIR);
const diskEventIds = files.map((it) => it.id);
const databaseEventIds: string[] = [];
for await (const node of getAllNodesForConnection({ first: 100 }, (paging) =>
adminClient.getChangelogEvents({}, paging),
)) {
databaseEventIds.push(node.valueOrThrow().id);
}
const minSize = Math.min(diskEventIds.length, databaseEventIds.length);
// check that the events match pairwise
for (let i = 0; i < minSize; i++) {
if (diskEventIds[i] !== databaseEventIds[i]) {
throw new Error(
`Mismatch between disk and database events at index ${i}: ${diskEventIds[i]} !== ${databaseEventIds[i]}`,
);
}
}
const headId = minSize > 0 ? diskEventIds[minSize - 1] : null;
const unappliedDiskFiles = files.slice(minSize);
const unappliedDatabaseEvents = databaseEventIds.slice(minSize);
return { headId, unappliedDiskFiles, unappliedDatabaseEvents };
}
async function applyDiskEvents(server: Server, unappliedDiskFiles: { path: string }[]) {
for (const { path } of unappliedDiskFiles) {
const event: JsonSyncEvent = JSON.parse(await readFile(path, { encoding: 'utf8' }));
const result = await server.applySyncEvent(convertJsonSyncEvent(event));
result.throwIfError();
}
let processNextDirtyEntity = true;
while (processNextDirtyEntity) {
const processed = (await server.processNextDirtyEntity()).valueOrThrow();
if (!processed) {
processNextDirtyEntity = false;
}
}
(await server.optimizeDatabase({ all: true })).throwIfError();
}
export async function updatePrincipalsOnDisk(server: Server, filename: string): Promise<void> {
const rows: (typeof PRINCIPALS_HEADER)[] = [];
rows.push(PRINCIPALS_HEADER);
const contentRows: typeof rows = [];
for await (const principal of getAllNodesForConnection({ first: 100 }, (paging) =>
server.getPrincipals(paging),
)) {
const { provider, identifier, subjectId } = principal.valueOrThrow();
contentRows.push([provider, identifier, subjectId]);
}
contentRows.sort((a, b) => {
const providerCompare = a[0].localeCompare(b[0]);
if (providerCompare !== 0) {
return providerCompare;
}
return a[1].localeCompare(b[1]);
});
rows.push(...contentRows);
const content = rows.map((row) => row.join('\t')).join('\n') + '\n';
await writeFile(filename, content, { encoding: 'utf8' });
}
export async function createPrincipalsFromDisk(server: Server, filename: string): Promise<void> {
const content = await readFile(filename, { encoding: 'utf8' });
const rows = content
.split('\n')
.filter((it) => it.length > 0)
.map((it) => it.split('\t')) as (typeof PRINCIPALS_HEADER)[];
const header = rows[0];
assert.deepEqual(header, PRINCIPALS_HEADER);
const contentRows = rows.slice(1);
for (const [provider, identifier, subjectId] of contentRows) {
assertOkResult(await server.createPrincipal({ provider, identifier, subjectId }));
}
}
async function main(filename: string, args: typeof parsedArgs) {
const { server } = (await initializeServer(filename)).valueOrThrow();
try {
const principalsFilename = path.join(DATA_DIR, 'principals.tsv');
if (args.values['update-db-only']) {
await createPrincipalsFromDisk(server, principalsFilename);
} else {
await updatePrincipalsOnDisk(server, principalsFilename);
}
const authResult = await server.createSession({
...SYSTEM_USERS.serverRenderer,
logger: null,
databasePerformance: null,
});
const adminClient = server.createAdminClient<AppAdminClient>(async () => authResult);
const { unappliedDiskFiles, unappliedDatabaseEvents } = await getUnappliedEvents(adminClient);
if (!args.values['update-db-only']) {
if (unappliedDatabaseEvents.length > 0) {
console.log(`Write ${unappliedDatabaseEvents.length} missing events to disk`);
await updateSyncEventsOnDisk(server, DATA_DIR);
}
}
if (unappliedDiskFiles.length > 0) {
console.log(`Applying ${unappliedDiskFiles.length} disk events`);
await applyDiskEvents(server, unappliedDiskFiles);
}
} finally {
(await server.shutdown()).throwIfError();
}
}
const parsedArgs = parseArgs({
options: { ['update-db-only']: { type: 'boolean' } },
});
await main(process.env.DATABASE_SQLITE_FILE!, parsedArgs);