Skip to content

Commit

Permalink
fix: put processes in class for state sake (#781)
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel Bodin committed Nov 4, 2021
1 parent 30ffa4c commit 275231c
Show file tree
Hide file tree
Showing 3 changed files with 318 additions and 285 deletions.
241 changes: 129 additions & 112 deletions src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,137 +16,154 @@ import { log } from './utils/log';
import * as sentry from './utils/sentry';
import { wait } from './utils/wait';

let prefetcher: Prefetcher;
let consumer: QueueObject<PrefetchedPkg>;
export class Bootstrap {
stateManager;
algoliaClient;
mainIndex;
bootstrapIndex;
prefetcher: Prefetcher | undefined;
consumer: QueueObject<PrefetchedPkg> | undefined;

constructor(
stateManager: StateManager,
algoliaClient: SearchClient,
mainIndex: SearchIndex,
bootstrapIndex: SearchIndex
) {
this.stateManager = stateManager;
this.algoliaClient = algoliaClient;
this.mainIndex = mainIndex;
this.bootstrapIndex = bootstrapIndex;
}

/**
* Bootstrap is the mode that goes from 0 to all the packages in NPM
* In other word it is reindexing everything from scratch.
*
* It is useful if:
* - you are starting this project for the first time
* - you messed up with your Algolia index
* - you lagged too much behind.
*
* Watch mode should/can be reliably left running for weeks/months as CouchDB is made for that.
* BUT for the moment it's mandatory to relaunch it because it's the only way to update: typescript, downloads stats.
*/
export async function run(
stateManager: StateManager,
algoliaClient: SearchClient,
mainIndex: SearchIndex,
bootstrapIndex: SearchIndex
): Promise<void> {
log.info('-----');
log.info('⛷ Bootstrap: starting');
const state = await stateManager.check();

if (state.seq && state.seq > 0 && state.bootstrapDone === true) {
await algolia.putDefaultSettings(mainIndex, config);
log.info('⛷ Bootstrap: done');
/**
* Bootstrap is the mode that goes from 0 to all the packages in NPM
* In other word it is reindexing everything from scratch.
*
* It is useful if:
* - you are starting this project for the first time
* - you messed up with your Algolia index
* - you lagged too much behind.
*
* Watch mode should/can be reliably left running for weeks/months as CouchDB is made for that.
* BUT for the moment it's mandatory to relaunch it because it's the only way to update: typescript, downloads stats.
*/
async run(): Promise<void> {
log.info('-----');
return;
}
log.info('⛷ Bootstrap: starting');
const state = await this.stateManager.check();

await stateManager.save({
stage: 'bootstrap',
});

const { seq, nbDocs: totalDocs } = await npm.getInfo();
if (!state.bootstrapLastId) {
// Start from 0
log.info('⛷ Bootstrap: starting from the first doc');
// first time this launches, we need to remember the last seq our bootstrap can trust
await stateManager.save({ seq });
await algolia.putDefaultSettings(bootstrapIndex, config);
} else {
log.info('⛷ Bootstrap: starting at doc %s', state.bootstrapLastId);
}
if (state.seq && state.seq > 0 && state.bootstrapDone === true) {
await algolia.putDefaultSettings(this.mainIndex, config);
log.info('⛷ Bootstrap: done');
log.info('-----');
return;
}

log.info('-----');
log.info(chalk.yellowBright`Total packages: ${totalDocs}`);
log.info('-----');
await this.stateManager.save({
stage: 'bootstrap',
});

const { seq, nbDocs: totalDocs } = await npm.getInfo();
if (!state.bootstrapLastId) {
// Start from 0
log.info('⛷ Bootstrap: starting from the first doc');
// first time this launches, we need to remember the last seq our bootstrap can trust
await this.stateManager.save({ seq });
await algolia.putDefaultSettings(this.bootstrapIndex, config);
} else {
log.info('⛷ Bootstrap: starting at doc %s', state.bootstrapLastId);
}

prefetcher = new Prefetcher({
nextKey: state.bootstrapLastId,
});
prefetcher.launch();
log.info('-----');
log.info(chalk.yellowBright`Total packages: ${totalDocs}`);
log.info('-----');

let done = 0;
consumer = createPkgConsumer(stateManager, bootstrapIndex);
consumer.unsaturated(async () => {
const next = await prefetcher.getNext();
consumer.push(next);
done += 1;
});
consumer.buffer = 0;
const prefetcher = new Prefetcher({
nextKey: state.bootstrapLastId,
});
prefetcher.launch();

let processing = true;
while (processing) {
logProgress(done);
let done = 0;
const consumer = createPkgConsumer(this.stateManager, this.bootstrapIndex);
consumer.unsaturated(async () => {
const next = await prefetcher.getNext();
consumer.push(next);
done += 1;
});
consumer.buffer = 0;

await wait(config.prefetchWaitBetweenPage);
this.prefetcher = prefetcher;
this.consumer = consumer;

processing = !prefetcher.isFinished;
done = 0;
let processing = true;
while (processing) {
this.logProgress(done);

// Push nothing to trigger event
consumer.push(null as any);
}
await wait(config.prefetchWaitBetweenPage);

if (consumer.length() > 0) {
// While we no longer are in "processing" mode
// it can be possible that there's a last iteration in the queue
await consumer.drain();
}
processing = !prefetcher.isFinished;
done = 0;

consumer.kill();
// Push nothing to trigger event
this.consumer.push(null as any);
}

await stateManager.save({
bootstrapDone: true,
bootstrapLastDone: Date.now(),
});
if (this.consumer.length() > 0) {
// While we no longer are in "processing" mode
// it can be possible that there's a last iteration in the queue
await this.consumer.drain();
}

await moveToProduction(stateManager, algoliaClient);
this.consumer.kill();

log.info('-----');
log.info('⛷ Bootstrap: done');
log.info('-----');
}
this.onDone();
}

/**
* Move algolia index to prod.
*/
async function moveToProduction(
stateManager: StateManager,
algoliaClient: SearchClient
): Promise<void> {
log.info('🚚 starting move to production');
async onDone(): Promise<void> {
await this.stateManager.save({
bootstrapDone: true,
bootstrapLastDone: Date.now(),
});

const currentState = await stateManager.get();
await algoliaClient
.copyIndex(config.bootstrapIndexName, config.indexName)
.wait();
await this.moveToProduction();

await stateManager.save(currentState);
}
log.info('-----');
log.info('⛷ Bootstrap: done');
log.info('-----');
}

/**
* Log approximate progress.
*/
async function logProgress(nbDocs: number): Promise<void> {
const { nbDocs: totalDocs } = await npm.getInfo();
const offset = prefetcher.offset;

log.info(
chalk.dim.italic
.white`[progress] %d/%d docs (%s%) (%s prefetched) (%s processing)`,
offset + nbDocs,
totalDocs,
((Math.max(offset + nbDocs, 1) / totalDocs) * 100).toFixed(2),
prefetcher.idleCount,
consumer.running()
);
/**
* Move algolia index to prod.
*/
async moveToProduction(): Promise<void> {
log.info('🚚 starting move to production');

const currentState = await this.stateManager.get();
await this.algoliaClient
.copyIndex(config.bootstrapIndexName, config.indexName)
.wait();

await this.stateManager.save(currentState);
}

/**
* Log approximate progress.
*/
async logProgress(nbDocs: number): Promise<void> {
const { nbDocs: totalDocs } = await npm.getInfo();
const offset = this.prefetcher!.offset;

log.info(
chalk.dim.italic
.white`[progress] %d/%d docs (%s%) (%s prefetched) (%s processing)`,
offset + nbDocs,
totalDocs,
((Math.max(offset + nbDocs, 1) / totalDocs) * 100).toFixed(2),
this.prefetcher!.idleCount,
this.consumer!.running()
);
}
}

/**
Expand Down
18 changes: 13 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import { nextTick } from 'async';
import { StateManager } from './StateManager';
import * as algolia from './algolia/index';
import { createAPI } from './api';
import * as bootstrap from './bootstrap';
import { Bootstrap } from './bootstrap';
import { config } from './config';
import * as jsDelivr from './jsDelivr/index';
import * as typescript from './typescript/index';
import { datadog } from './utils/datadog';
import { log } from './utils/log';
import * as sentry from './utils/sentry';
import * as watch from './watch';
import { Watch } from './watch';

log.info('🗿 npm ↔️ Algolia replication starts ⛷ 🐌 🛰');

Expand All @@ -36,7 +36,7 @@ async function main(): Promise<void> {
createAPI();

// first we make sure the bootstrap index has the correct settings
log.info('💪 Setting up Algolia', [
log.info('💪 Setting up Algolia', config.appId, [
config.bootstrapIndexName,
config.indexName,
]);
Expand All @@ -54,14 +54,22 @@ async function main(): Promise<void> {
await jsDelivr.loadHits();
await typescript.loadTypesIndex();

const bootstrap = new Bootstrap(
stateManager,
algoliaClient,
mainIndex,
bootstrapIndex
);
const watch = new Watch(stateManager, mainIndex);

// then we run the bootstrap
// after a bootstrap is done, it's moved to main (with settings)
// if it was already finished, we will set the settings on the main index
await bootstrap.run(stateManager, algoliaClient, mainIndex, bootstrapIndex);
await bootstrap.run();

// then we figure out which updates we missed since
// the last time main index was updated
await watch.run(stateManager, mainIndex);
await watch.run();
}

main().catch(async (err) => {
Expand Down

0 comments on commit 275231c

Please sign in to comment.