Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: put processes in class for state #781

Merged
merged 2 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 129 additions & 112 deletions src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,137 +16,154 @@ import { log } from './utils/log';
import * as sentry from './utils/sentry';
import { wait } from './utils/wait';

let prefetcher: Prefetcher;
let consumer: QueueObject<PrefetchedPkg>;
export class Bootstrap {
stateManager;
algoliaClient;
mainIndex;
bootstrapIndex;
prefetcher: Prefetcher | undefined;
consumer: QueueObject<PrefetchedPkg> | undefined;

constructor(
stateManager: StateManager,
algoliaClient: SearchClient,
mainIndex: SearchIndex,
bootstrapIndex: SearchIndex
) {
this.stateManager = stateManager;
this.algoliaClient = algoliaClient;
this.mainIndex = mainIndex;
this.bootstrapIndex = bootstrapIndex;
}

/**
* Bootstrap is the mode that goes from 0 to all the packages in NPM
* In other word it is reindexing everything from scratch.
*
* It is useful if:
* - you are starting this project for the first time
* - you messed up with your Algolia index
* - you lagged too much behind.
*
* Watch mode should/can be reliably left running for weeks/months as CouchDB is made for that.
* BUT for the moment it's mandatory to relaunch it because it's the only way to update: typescript, downloads stats.
*/
export async function run(
stateManager: StateManager,
algoliaClient: SearchClient,
mainIndex: SearchIndex,
bootstrapIndex: SearchIndex
): Promise<void> {
log.info('-----');
log.info('⛷ Bootstrap: starting');
const state = await stateManager.check();

if (state.seq && state.seq > 0 && state.bootstrapDone === true) {
await algolia.putDefaultSettings(mainIndex, config);
log.info('⛷ Bootstrap: done');
/**
* Bootstrap is the mode that goes from 0 to all the packages in NPM
* In other word it is reindexing everything from scratch.
*
* It is useful if:
* - you are starting this project for the first time
* - you messed up with your Algolia index
* - you lagged too much behind.
*
* Watch mode should/can be reliably left running for weeks/months as CouchDB is made for that.
* BUT for the moment it's mandatory to relaunch it because it's the only way to update: typescript, downloads stats.
*/
async run(): Promise<void> {
log.info('-----');
return;
}
log.info('⛷ Bootstrap: starting');
const state = await this.stateManager.check();

await stateManager.save({
stage: 'bootstrap',
});

const { seq, nbDocs: totalDocs } = await npm.getInfo();
if (!state.bootstrapLastId) {
// Start from 0
log.info('⛷ Bootstrap: starting from the first doc');
// first time this launches, we need to remember the last seq our bootstrap can trust
await stateManager.save({ seq });
await algolia.putDefaultSettings(bootstrapIndex, config);
} else {
log.info('⛷ Bootstrap: starting at doc %s', state.bootstrapLastId);
}
if (state.seq && state.seq > 0 && state.bootstrapDone === true) {
await algolia.putDefaultSettings(this.mainIndex, config);
log.info('⛷ Bootstrap: done');
log.info('-----');
return;
}

log.info('-----');
log.info(chalk.yellowBright`Total packages: ${totalDocs}`);
log.info('-----');
await this.stateManager.save({
stage: 'bootstrap',
});

const { seq, nbDocs: totalDocs } = await npm.getInfo();
if (!state.bootstrapLastId) {
// Start from 0
log.info('⛷ Bootstrap: starting from the first doc');
// first time this launches, we need to remember the last seq our bootstrap can trust
await this.stateManager.save({ seq });
await algolia.putDefaultSettings(this.bootstrapIndex, config);
} else {
log.info('⛷ Bootstrap: starting at doc %s', state.bootstrapLastId);
}

prefetcher = new Prefetcher({
nextKey: state.bootstrapLastId,
});
prefetcher.launch();
log.info('-----');
log.info(chalk.yellowBright`Total packages: ${totalDocs}`);
log.info('-----');

let done = 0;
consumer = createPkgConsumer(stateManager, bootstrapIndex);
consumer.unsaturated(async () => {
const next = await prefetcher.getNext();
consumer.push(next);
done += 1;
});
consumer.buffer = 0;
const prefetcher = new Prefetcher({
nextKey: state.bootstrapLastId,
});
prefetcher.launch();

let processing = true;
while (processing) {
logProgress(done);
let done = 0;
const consumer = createPkgConsumer(this.stateManager, this.bootstrapIndex);
consumer.unsaturated(async () => {
const next = await prefetcher.getNext();
consumer.push(next);
done += 1;
});
consumer.buffer = 0;

await wait(config.prefetchWaitBetweenPage);
this.prefetcher = prefetcher;
this.consumer = consumer;

processing = !prefetcher.isFinished;
done = 0;
let processing = true;
while (processing) {
this.logProgress(done);

// Push nothing to trigger event
consumer.push(null as any);
}
await wait(config.prefetchWaitBetweenPage);

if (consumer.length() > 0) {
// While we no longer are in "processing" mode
// it can be possible that there's a last iteration in the queue
await consumer.drain();
}
processing = !prefetcher.isFinished;
done = 0;

consumer.kill();
// Push nothing to trigger event
this.consumer.push(null as any);
}

await stateManager.save({
bootstrapDone: true,
bootstrapLastDone: Date.now(),
});
if (this.consumer.length() > 0) {
// While we no longer are in "processing" mode
// it can be possible that there's a last iteration in the queue
await this.consumer.drain();
}

await moveToProduction(stateManager, algoliaClient);
this.consumer.kill();

log.info('-----');
log.info('⛷ Bootstrap: done');
log.info('-----');
}
this.onDone();
}

/**
* Move algolia index to prod.
*/
async function moveToProduction(
stateManager: StateManager,
algoliaClient: SearchClient
): Promise<void> {
log.info('🚚 starting move to production');
async onDone(): Promise<void> {
await this.stateManager.save({
bootstrapDone: true,
bootstrapLastDone: Date.now(),
});

const currentState = await stateManager.get();
await algoliaClient
.copyIndex(config.bootstrapIndexName, config.indexName)
.wait();
await this.moveToProduction();

await stateManager.save(currentState);
}
log.info('-----');
log.info('⛷ Bootstrap: done');
log.info('-----');
}

/**
* Log approximate progress.
*/
async function logProgress(nbDocs: number): Promise<void> {
const { nbDocs: totalDocs } = await npm.getInfo();
const offset = prefetcher.offset;

log.info(
chalk.dim.italic
.white`[progress] %d/%d docs (%s%) (%s prefetched) (%s processing)`,
offset + nbDocs,
totalDocs,
((Math.max(offset + nbDocs, 1) / totalDocs) * 100).toFixed(2),
prefetcher.idleCount,
consumer.running()
);
/**
* Move algolia index to prod.
*/
async moveToProduction(): Promise<void> {
log.info('🚚 starting move to production');

const currentState = await this.stateManager.get();
await this.algoliaClient
.copyIndex(config.bootstrapIndexName, config.indexName)
.wait();

await this.stateManager.save(currentState);
}

/**
* Log approximate progress.
*/
async logProgress(nbDocs: number): Promise<void> {
const { nbDocs: totalDocs } = await npm.getInfo();
const offset = this.prefetcher!.offset;

log.info(
chalk.dim.italic
.white`[progress] %d/%d docs (%s%) (%s prefetched) (%s processing)`,
offset + nbDocs,
totalDocs,
((Math.max(offset + nbDocs, 1) / totalDocs) * 100).toFixed(2),
this.prefetcher!.idleCount,
this.consumer!.running()
);
}
}

/**
Expand Down
18 changes: 13 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import { nextTick } from 'async';
import { StateManager } from './StateManager';
import * as algolia from './algolia/index';
import { createAPI } from './api';
import * as bootstrap from './bootstrap';
import { Bootstrap } from './bootstrap';
import { config } from './config';
import * as jsDelivr from './jsDelivr/index';
import * as typescript from './typescript/index';
import { datadog } from './utils/datadog';
import { log } from './utils/log';
import * as sentry from './utils/sentry';
import * as watch from './watch';
import { Watch } from './watch';

log.info('🗿 npm ↔️ Algolia replication starts ⛷ 🐌 🛰');

Expand All @@ -36,7 +36,7 @@ async function main(): Promise<void> {
createAPI();

// first we make sure the bootstrap index has the correct settings
log.info('💪 Setting up Algolia', [
log.info('💪 Setting up Algolia', config.appId, [
config.bootstrapIndexName,
config.indexName,
]);
Expand All @@ -54,14 +54,22 @@ async function main(): Promise<void> {
await jsDelivr.loadHits();
await typescript.loadTypesIndex();

const bootstrap = new Bootstrap(
stateManager,
algoliaClient,
mainIndex,
bootstrapIndex
);
const watch = new Watch(stateManager, mainIndex);

// then we run the bootstrap
// after a bootstrap is done, it's moved to main (with settings)
// if it was already finished, we will set the settings on the main index
await bootstrap.run(stateManager, algoliaClient, mainIndex, bootstrapIndex);
await bootstrap.run();

// then we figure out which updates we missed since
// the last time main index was updated
await watch.run(stateManager, mainIndex);
await watch.run();
}

main().catch(async (err) => {
Expand Down
Loading