Skip to content

Commit

Permalink
fix: bootstrap (#942)
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel Bodin committed Apr 22, 2022
1 parent 3eef75a commit 33bb876
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 57 deletions.
9 changes: 7 additions & 2 deletions src/__tests__/bootstrap.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { State } from '../StateManager';
import { StateManager } from '../StateManager';
import type { AlgoliaStore } from '../algolia';
import { Bootstrap } from '../bootstrap';

function getAlgoliaMock(): any {
Expand Down Expand Up @@ -37,7 +38,9 @@ describe('isDone', () => {
}),
} as any;
const stateManager = new StateManager(mock);
const bootstrap = new Bootstrap(stateManager, {} as any, mock, {} as any);
const bootstrap = new Bootstrap(stateManager, {
mainIndex: mock,
} as AlgoliaStore);

expect(await bootstrap.isDone()).toBe(true);
});
Expand All @@ -59,7 +62,9 @@ describe('isDone', () => {
}),
} as any;
const stateManager = new StateManager(mock);
const bootstrap = new Bootstrap(stateManager, {} as any, mock, {} as any);
const bootstrap = new Bootstrap(stateManager, {
mainIndex: mock,
} as AlgoliaStore);

expect(await bootstrap.isDone()).toBe(false);
});
Expand Down
32 changes: 25 additions & 7 deletions src/algolia/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ import algoliasearch from 'algoliasearch';
import type { Config } from '../config';
import { httpAgent, httpsAgent, USER_AGENT } from '../utils/request';

export interface AlgoliaStore {
mainIndex: SearchIndex;
mainLostIndex: SearchIndex;
bootstrapIndex: SearchIndex;
bootstrapLostIndex: SearchIndex;
client: SearchClient;
}

const requester = createNodeHttpRequester({
agent: httpsAgent,
httpAgent,
Expand Down Expand Up @@ -33,11 +41,7 @@ function createClient({
/**
* Prepare algolia for indexing.
*/
export async function prepare(config: Config): Promise<{
mainIndex: SearchIndex;
bootstrapIndex: SearchIndex;
client: SearchClient;
}> {
export async function prepare(config: Config): Promise<AlgoliaStore> {
if (!config.apiKey) {
throw new Error(
'npm-search: Please provide the `apiKey` env variable and restart'
Expand All @@ -46,20 +50,34 @@ export async function prepare(config: Config): Promise<{

// Get main index and boostrap algolia client
const { index: mainIndex, client } = createClient(config);
const { index: mainLostIndex } = createClient({
appId: config.appId,
apiKey: config.apiKey,
indexName: `${config.indexName}.lost`,
});
const { index: bootstrapIndex } = createClient({
appId: config.appId,
apiKey: config.apiKey,
indexName: config.bootstrapIndexName,
});
const { index: bootstrapLostIndex } = createClient({
appId: config.appId,
apiKey: config.apiKey,
indexName: `${config.bootstrapIndexName}.lost`,
});

// Ensure indices exists by calling an empty setSettings()
await mainIndex.setSettings({});
await bootstrapIndex.setSettings({});
await mainIndex.setSettings({}).wait();
await bootstrapIndex.setSettings({}).wait();
await mainLostIndex.setSettings({}).wait();
await bootstrapLostIndex.setSettings({}).wait();

return {
client,
mainIndex,
mainLostIndex,
bootstrapIndex,
bootstrapLostIndex,
};
}

Expand Down
58 changes: 33 additions & 25 deletions src/bootstrap.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { EventEmitter } from 'events';

import type { SearchClient, SearchIndex } from 'algoliasearch';
import type { QueueObject } from 'async';
import { queue } from 'async';
import chalk from 'chalk';

import type { StateManager } from './StateManager';
import type { AlgoliaStore } from './algolia';
import { putDefaultSettings } from './algolia';
import { config } from './config';
import { formatPkg } from './formatPkg';
Expand All @@ -26,24 +26,15 @@ type PkgJob = {

export class Bootstrap extends EventEmitter {
stateManager: StateManager;
algoliaClient: SearchClient;
mainIndex: SearchIndex;
bootstrapIndex: SearchIndex;
algoliaStore: AlgoliaStore;
prefetcher: Prefetcher | undefined;
consumer: QueueObject<PkgJob> | undefined;
interval: NodeJS.Timer | undefined;

constructor(
stateManager: StateManager,
algoliaClient: SearchClient,
mainIndex: SearchIndex,
bootstrapIndex: SearchIndex
) {
constructor(stateManager: StateManager, algoliaStore: AlgoliaStore) {
super();
this.stateManager = stateManager;
this.algoliaClient = algoliaClient;
this.mainIndex = mainIndex;
this.bootstrapIndex = bootstrapIndex;
this.algoliaStore = algoliaStore;
}

override on(param: 'finished', cb: () => any): this;
Expand All @@ -66,8 +57,8 @@ export class Bootstrap extends EventEmitter {
}

log.info('Stopped Bootstrap gracefully', {
queued: this.consumer?.length(),
processing: this.consumer?.running(),
queued: this.consumer?.length() || 0,
processing: this.consumer?.running() || 0,
});
}

Expand Down Expand Up @@ -98,7 +89,7 @@ export class Bootstrap extends EventEmitter {
log.info('⛷ Bootstrap: starting from the first doc');
// first time this launches, we need to remember the last seq our bootstrap can trust
await this.stateManager.save({ seq });
await putDefaultSettings(this.bootstrapIndex, config);
await putDefaultSettings(this.algoliaStore.bootstrapIndex, config);
} else {
log.info('⛷ Bootstrap: starting at doc %s', state.bootstrapLastId);
}
Expand All @@ -113,7 +104,7 @@ export class Bootstrap extends EventEmitter {
prefetcher.launch();

let done = 0;
const consumer = createPkgConsumer(this.stateManager, this.bootstrapIndex);
const consumer = createPkgConsumer(this.stateManager, this.algoliaStore);
consumer.unsaturated(async () => {
const next = await prefetcher.getNext();
consumer.push({ pkg: next, retry: 0 });
Expand Down Expand Up @@ -146,7 +137,7 @@ export class Bootstrap extends EventEmitter {
const state = await this.stateManager.check();

if (state.seq && state.seq > 0 && state.bootstrapDone === true) {
await putDefaultSettings(this.mainIndex, config);
await putDefaultSettings(this.algoliaStore.mainIndex, config);
log.info('⛷ Bootstrap: already done, skipping');

return true;
Expand Down Expand Up @@ -174,6 +165,8 @@ export class Bootstrap extends EventEmitter {
log.info('-----');
log.info('⛷ Bootstrap: done');
log.info('-----');

this.emit('finished');
}

/**
Expand All @@ -183,18 +176,22 @@ export class Bootstrap extends EventEmitter {
log.info('🚚 starting move to production');

const currentState = await this.stateManager.get();
await this.algoliaClient
// Backup current prod index
await this.algoliaStore.client
.copyIndex(
config.indexName,
`${config.indexName}.bak-${new Date()
.toLocaleDateString()
.replaceAll('/', '_')}`
`${config.indexName}.bak-${new Date().toISOString()}`
)
.wait();
await this.algoliaClient

// Replace prod with bootstrap
await this.algoliaStore.client
.copyIndex(config.bootstrapIndexName, config.indexName)
.wait();

// Remove bootstrap so we don't end up reusing a partial index
await this.algoliaStore.bootstrapIndex.delete();

await this.stateManager.save(currentState);
}

Expand Down Expand Up @@ -223,7 +220,7 @@ export class Bootstrap extends EventEmitter {
*/
function createPkgConsumer(
stateManager: StateManager,
index: SearchIndex
algoliaStore: AlgoliaStore
): QueueObject<PkgJob> {
const consumer = queue<PkgJob>(async ({ pkg, retry }) => {
if (!pkg) {
Expand All @@ -247,7 +244,7 @@ function createPkgConsumer(
if (!formatted) {
return;
}
await saveDoc({ formatted, index });
await saveDoc({ formatted, index: algoliaStore.bootstrapIndex });

const lastId = (await stateManager.get()).bootstrapLastId;

Expand All @@ -269,6 +266,17 @@ function createPkgConsumer(
}

sentry.report(new Error('Error during job'), { err });

// Store in lost index
try {
await algoliaStore.bootstrapLostIndex.saveObject({
objectID: pkg.id,
err: err instanceof Error ? err.toString() : err,
date: start,
});
} catch (err2) {
log.error(new Error('Error during lost'), err2);
}
} finally {
datadog.timing('loop', Date.now() - start);
}
Expand Down
17 changes: 4 additions & 13 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,17 @@ class Main {
config.bootstrapIndexName,
config.indexName,
]);
const {
client: algoliaClient,
mainIndex,
bootstrapIndex,
} = await algolia.prepare(config);
const algoliaStore = await algolia.prepare(config);
datadog.timing('main.init_algolia', Date.now() - start);

// Create State Manager that holds progression of indexing
const stateManager = new StateManager(mainIndex);
const stateManager = new StateManager(algoliaStore.mainIndex);

// Preload some useful data
await jsDelivr.loadHits();
await typescript.loadTypesIndex();
this.bootstrap = new Bootstrap(
stateManager,
algoliaClient,
mainIndex,
bootstrapIndex
);
this.watch = new Watch(stateManager, mainIndex);
this.bootstrap = new Bootstrap(stateManager, algoliaStore);
this.watch = new Watch(stateManager, algoliaStore);

if (!(await this.bootstrap.isDone())) {
this.bootstrap.on('finished', async () => {
Expand Down
32 changes: 22 additions & 10 deletions src/watch.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { SearchIndex } from 'algoliasearch';
import type { QueueObject } from 'async';
import { queue } from 'async';
import type { EventEmitter } from 'bunyan';
Expand All @@ -7,6 +6,7 @@ import type { DatabaseChangesResultItem } from 'nano';

import type { FinalPkg } from './@types/pkg';
import type { StateManager } from './StateManager';
import type { AlgoliaStore } from './algolia';
import { config } from './config';
import { DeletedError } from './errors';
import { formatPkg } from './formatPkg';
Expand All @@ -26,7 +26,7 @@ type ChangeJob = {

export class Watch {
stateManager: StateManager;
mainIndex: SearchIndex;
algoliaStore: AlgoliaStore;
skipped = new Map<string, ChangeJob>();
// Cached npmInfo.seq
totalSequence: number = 0;
Expand All @@ -35,9 +35,9 @@ export class Watch {

changesReader: EventEmitter | undefined;

constructor(stateManager: StateManager, mainIndex: SearchIndex) {
constructor(stateManager: StateManager, algoliaStore: AlgoliaStore) {
this.stateManager = stateManager;
this.mainIndex = mainIndex;
this.algoliaStore = algoliaStore;
}

/**
Expand Down Expand Up @@ -101,8 +101,8 @@ export class Watch {
this.changesReader?.removeAllListeners();

log.info('Stopped Watch gracefully', {
queued: this.changesConsumer?.length(),
processing: this.changesConsumer?.running(),
queued: this.changesConsumer?.length() || 0,
processing: this.changesConsumer?.running() || 0,
});
}

Expand Down Expand Up @@ -176,7 +176,7 @@ export class Watch {
}, config.refreshPeriod);

// We list all values in facet and pick the oldest (hopefully the oldest is in the list)
const res = await this.mainIndex.search('', {
const res = await this.algoliaStore.mainIndex.search('', {
facets: ['_searchInternal.expiresAt'],
hitsPerPage: 0,
sortFacetValuesBy: 'alpha',
Expand All @@ -201,7 +201,7 @@ export class Watch {
log.info(' > Picked the oldest', expiresAt.toISOString());

// Retrieve some packages to update, not too much to avoid flooding the queue
const pkgs = await this.mainIndex.search<FinalPkg>('', {
const pkgs = await this.algoliaStore.mainIndex.search<FinalPkg>('', {
facetFilters: [`_searchInternal.expiresAt:${pick}`],
facets: ['_searchInternal.expiresAt'],
hitsPerPage: 20,
Expand Down Expand Up @@ -279,7 +279,7 @@ export class Watch {
return;
}

await saveDoc({ formatted, index: this.mainIndex });
await saveDoc({ formatted, index: this.algoliaStore.mainIndex });
}

/**
Expand Down Expand Up @@ -340,7 +340,7 @@ export class Watch {
// - we received a change that is not marked as "deleted"
// - and the package has since been deleted
if (err instanceof DeletedError) {
this.mainIndex.deleteObject(id);
this.algoliaStore.mainIndex.deleteObject(id);
log.info(`deleted`, id);
return;
}
Expand All @@ -355,6 +355,18 @@ export class Watch {
log.error('Job has been retried too many times, skipping');
datadog.increment('packages.failed');
this.skipped.set(id, job);

// Store in lost index
try {
await this.algoliaStore.mainLostIndex.saveObject({
objectID: job.change.id,
err: err instanceof Error ? err.toString() : err,
date: start,
job,
});
} catch (err2) {
log.error(new Error('Error during lost'), err2);
}
}
} finally {
if (!ignoreSeq) {
Expand Down

0 comments on commit 33bb876

Please sign in to comment.