/
indexers.js
180 lines (161 loc) · 6.27 KB
/
indexers.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
/*
Indexer deals with indexing documents. Its public API is
`update`: responsible for getting any new upstream content into
the search index. update takes these optional arguments:
- forceRefresh: when true, we will force elasticsearch to index
the new content immediately. This is expensive if you do it
too often. When false, we will wait for the next scheduled
refresh to happen (the default Elasticsearch refresh_interval
is once per second). Defaults to false.
- hints: can contain a list of `{ id, type }`
references. This is intended as an optimization hint when we
know that certain resources are the ones that likely need to be
indexed right away. Indexers are responsible for discovering and
indexing arbitrary upstream changes regardless of this hint, but
the hint can make it easier to keep the search index nearly
real-time fresh.
`schema()`: retrieves the Schema.
A Schema instance is computed from all the schema models
that are discovered. Schema models are things like
`content-types`, `fields`, `data-sources`, `plugin-configs`,
etc. They are pieces of content, but special pieces of content
that can alter how other content gets indexed.
This method does it own caching, since schemas get computed as
part of indexing anyway. You can also directly invalidate the
cache, see next method.
`invalidateSchemaCache()`: does what it says on the
tin. This is a lighter-weight operation than `update`. It allows
us to decouple the question of when and how to index content
from the issue of maintaining schema correctness during
sequences of writes.
*/
const EventEmitter = require('events');
const log = require('@cardstack/logger')('cardstack/indexers');
const { declareInjections, getOwner } = require('@cardstack/di');
const bootstrapSchema = require('./bootstrap-schema');
const RunningIndexers = require('./indexing/running-indexers');
module.exports = declareInjections(
{
schemaLoader: 'hub:schema-loader',
dataSources: 'config:data-sources',
client: `plugin-client:${require.resolve('@cardstack/pgsearch/client')}`,
jobQueue: 'hub:queues',
},
class Indexers extends EventEmitter {
constructor() {
super();
this._queue = [];
this._forceRefreshQueue = [];
this._dataSourcesMemo = null;
this._schemaCache = null;
}
async schema() {
if (!this._schemaCache) {
this._schemaCache = (async () => {
let running = new RunningIndexers(
await this._seedSchema(),
this.client,
this.emit.bind(this),
this.schemaLoader.ownTypes(),
getOwner(this)
);
try {
return await running.schemas();
} finally {
await running.destroy();
}
})();
}
return await this._schemaCache;
}
async invalidateSchemaCache() {
if (this._schemaCache) {
let schema = await this._schemaCache;
await schema.teardown();
}
this._schemaCache = null;
}
static async teardown(instance) {
await instance.invalidateSchemaCache();
if (instance._dataSourcesMemo) {
await instance._dataSourcesMemo.teardown();
}
}
async update({ forceRefresh, hints, dontWaitForJob } = {}) {
await this._setupWorkers();
// Note that we dont want singletonKey, its inefficient due to the sophisticated invalidation we are using,
// also we dont want to use singletoneNextSlot, since all the indexing calls are important (as they can have different hints, and we dont want to collapse jobs)
if (dontWaitForJob) {
await this.jobQueue.publish(
'hub/indexers/update',
{ forceRefresh, hints },
{ singletonKey: 'hub/indexers/update', singletonNextSlot: true, expireIn: '2 hours' }
);
} else {
await this.jobQueue.publishAndWait(
'hub/indexers/update',
{ forceRefresh, hints },
{ singletonKey: 'hub/indexers/update', singletonNextSlot: true, expireIn: '2 hours' }
);
}
}
async _setupWorkers() {
if (!this._workersSetup) {
await this.jobQueue.subscribe('hub/indexers/update', async ({ data: { forceRefresh, hints } }) => {
await this._doUpdate(forceRefresh, hints);
});
this._workersSetup = true;
}
}
async _seedSchema() {
if (!this._dataSourcesMemo) {
let types = this.schemaLoader.ownTypes();
log.debug(`Indexers._seedSchema starting seed schema load`);
this._dataSourcesMemo = await this.schemaLoader.loadFrom(
bootstrapSchema.concat(this.dataSources.filter(model => types.includes(model.type)))
);
log.debug(`Indexers._seedSchema completed loading seed schema`);
}
return this._dataSourcesMemo;
}
async _doUpdate(forceRefresh, hints) {
log.debug('begin update, forceRefresh=%s', forceRefresh);
let priorCache = this._schemaCache;
let running = new RunningIndexers(
await this._seedSchema(),
this.client,
this.emit.bind(this),
this.schemaLoader.ownTypes(),
getOwner(this)
);
try {
let newSchema = await running.update(forceRefresh, hints);
if (this._schemaCache === priorCache) {
// nobody else has done a more recent update of the schema
// cache than us, so we can try to update it.
if (priorCache) {
let oldSchema = await priorCache;
if (!newSchema.equalTo(oldSchema)) {
log.info('schema was changed');
this._schemaCache = newSchema;
if (oldSchema) {
await oldSchema.teardown();
}
}
} else {
this._schemaCache = newSchema;
}
} else {
// somebody else has updated the cache in the time since we
// started running, so just drop the schemas we computed
// during indexing
await newSchema.teardown();
}
} finally {
await running.destroy();
}
this.emit('update_complete', hints);
log.debug('end update, forceRefresh=%s', forceRefresh);
}
}
);