Skip to content

Commit

Permalink
feat: process package in queue instead of batch (#656)
Browse files Browse the repository at this point in the history
let's go
  • Loading branch information
Samuel Bodin committed Jul 12, 2021
1 parent 5c2cb7f commit c4f2aa2
Show file tree
Hide file tree
Showing 19 changed files with 309 additions and 127 deletions.
1 change: 1 addition & 0 deletions .eslintrc
Expand Up @@ -5,6 +5,7 @@
"algolia/typescript"
],
"rules": {
"no-continue": "off",
"valid-jsdoc": "off",
"import/extensions": [
"error",
Expand Down
2 changes: 2 additions & 0 deletions Dockerfile
Expand Up @@ -34,6 +34,8 @@ RUN true \
# This image must have the minimum amount of layers
FROM node:14.16.1-alpine as final

ENV NODE_ENV production

# Do not use root to run the app
USER node

Expand Down
3 changes: 2 additions & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "npm-search",
"version": "1.0.1",
"version": "1.0.2",
"private": true,
"author": {
"name": "Algolia, Inc.",
Expand Down Expand Up @@ -35,6 +35,7 @@
"async": "3.2.0",
"bunyan": "1.8.15",
"bunyan-debug-stream": "2.0.0",
"chalk": "4.1.1",
"dtrace-provider": "0.8.8",
"escape-html": "1.0.3",
"got": "11.8.2",
Expand Down
1 change: 1 addition & 0 deletions scripts/build.sh
Expand Up @@ -7,6 +7,7 @@ echo "Releasing: $current"
echo ""

docker build \
--platform linux/amd64 \
-t algolia/npm-search \
-t "algolia/npm-search:${current}" \
.
2 changes: 1 addition & 1 deletion src/__tests__/saveDocs.test.ts
Expand Up @@ -213,7 +213,7 @@ it('should be similar batch vs one', async () => {

const row = { id: '', key: 'preact', value: { rev: 'a' }, doc: preact };
await saveDocs({ docs: [row], index });
await saveDoc({ row, index });
await saveDoc({ row: preact, index });

expect(index.saveObjects).toHaveBeenCalledWith([clean]);
expect(index.saveObject).toHaveBeenCalledWith(clean);
Expand Down
159 changes: 99 additions & 60 deletions src/bootstrap.ts
@@ -1,16 +1,23 @@
import type { SearchClient, SearchIndex } from 'algoliasearch';
import ms from 'ms';
import type { DocumentListParams } from 'nano';
import type { QueueObject } from 'async';
import { queue } from 'async';
import chalk from 'chalk';

import type { StateManager } from './StateManager';
import * as algolia from './algolia';
import { config } from './config';
import * as npm from './npm';
import { saveDocs } from './saveDocs';
import type { PrefetchedPkg } from './npm/Prefetcher';
import { Prefetcher } from './npm/Prefetcher';
import { isFailure } from './npm/types';
import { saveDoc } from './saveDocs';
import { datadog } from './utils/datadog';
import { log } from './utils/log';
import * as sentry from './utils/sentry';
import { wait } from './utils/wait';

let loopStart: number = Date.now();
let prefetcher: Prefetcher;
let consumer: QueueObject<PrefetchedPkg>;

/**
* Bootstrap is the mode that goes from 0 to all the packages in NPM
Expand All @@ -24,17 +31,20 @@ let loopStart: number = Date.now();
* Watch mode should/can be reliably left running for weeks/months as CouchDB is made for that.
* BUT for the moment it's mandatory to relaunch it because it's the only way to update: typescript, downloads stats.
*/
async function run(
export async function run(
stateManager: StateManager,
algoliaClient: SearchClient,
mainIndex: SearchIndex,
bootstrapIndex: SearchIndex
): Promise<void> {
log.info('-----');
log.info('⛷ Bootstrap: starting');
const state = await stateManager.check();

if (state.seq && state.seq > 0 && state.bootstrapDone === true) {
await algolia.putDefaultSettings(mainIndex, config);
log.info('⛷ Bootstrap: done');
log.info('-----');
return;
}

Expand All @@ -54,70 +64,53 @@ async function run(
}

log.info('-----');
log.info(`Total packages ${totalDocs}`);
log.info(chalk.yellowBright`Total packages: ${totalDocs}`);
log.info('-----');

let lastProcessedId = state.bootstrapLastId;
do {
loopStart = Date.now();

lastProcessedId = await loop(lastProcessedId, stateManager, bootstrapIndex);
} while (lastProcessedId !== null);

log.info('-----');
log.info('⛷ Bootstrap: done');
await stateManager.save({
bootstrapDone: true,
bootstrapLastDone: Date.now(),
prefetcher = new Prefetcher({
nextKey: state.bootstrapLastId,
});
prefetcher.launch();

let done = 0;
consumer = createPkgConsumer(stateManager, bootstrapIndex);
consumer.unsaturated(async () => {
const next = await prefetcher.getNext();
consumer.push(next);
done += 1;
});
consumer.buffer = 0;

await moveToProduction(stateManager, algoliaClient);
}
let processing = true;
while (processing) {
logProgress(done);

/**
* Execute one loop for bootstrap,
* Fetch N packages from `lastId`, process and save them to Algolia.
* */
async function loop(
lastId: string | null,
stateManager: StateManager,
bootstrapIndex: SearchIndex
): Promise<string | null> {
const start = Date.now();
log.info('loop()', '::', lastId);

const options: DocumentListParams = {
limit: config.bootstrapConcurrency,
};
if (lastId) {
options.startkey = lastId;
options.skip = 1;
}
await wait(config.prefetchWaitBetweenPage);

const res = await npm.findAll(options);
processing = !prefetcher.isFinished;
done = 0;

if (res.rows.length <= 0) {
// Nothing left to process
// We return null to stop the bootstraping
return null;
// Push nothing to trigger event
consumer.push(null as any);
processing = false;
}

datadog.increment('packages', res.rows.length);
log.info(' - fetched', res.rows.length, 'packages');

const newLastId = res.rows[res.rows.length - 1].id;
consumer.pause();

await saveDocs({ docs: res.rows, index: bootstrapIndex });
log.info('-----');
log.info('⛷ Bootstrap: done');
log.info('-----');
await stateManager.save({
bootstrapLastId: newLastId,
bootstrapDone: true,
bootstrapLastDone: Date.now(),
});
await logProgress(res.offset, res.rows.length);

datadog.timing('loop', Date.now() - start);

return newLastId;
await moveToProduction(stateManager, algoliaClient);
}

/**
* Move algolia index to prod.
*/
async function moveToProduction(
stateManager: StateManager,
algoliaClient: SearchClient
Expand All @@ -130,18 +123,64 @@ async function moveToProduction(
await stateManager.save(currentState);
}

async function logProgress(offset: number, nbDocs: number): Promise<void> {
/**
* Log approximate progress.
*/
async function logProgress(nbDocs: number): Promise<void> {
const { nbDocs: totalDocs } = await npm.getInfo();
const offset = prefetcher.offset;

const ratePerSecond = nbDocs / ((Date.now() - loopStart) / 1000);
log.info(
`[progress] %d/%d docs (%d%), current rate: %d docs/s (%s remaining)`,
chalk.dim.italic
.white`[progress] %d/%d docs (%d%) (%s prefetched) (%s processing)`,
offset + nbDocs,
totalDocs,
Math.floor((Math.max(offset + nbDocs, 1) / totalDocs) * 100),
Math.round(ratePerSecond),
ms(((totalDocs - offset - nbDocs) / ratePerSecond) * 1000)
prefetcher.idleCount,
consumer.running()
);
}

export { run };
/**
* Consume packages.
*/
function createPkgConsumer(
stateManager: StateManager,
index: SearchIndex
): QueueObject<PrefetchedPkg> {
return queue<PrefetchedPkg>(async (pkg) => {
if (!pkg) {
return;
}

log.info(`Start:`, pkg.id);
const start = Date.now();

try {
datadog.increment('packages');

const res = await npm.getDoc(pkg.id);

if (isFailure(res)) {
log.error('Got an error', res.error);
return;
}

await saveDoc({ row: res, index });

const lastId = (await stateManager.get()).bootstrapLastId;

// Because of concurrency we can have processed a package after in the list but sooner in the process.
if (!lastId || lastId < pkg.id) {
await stateManager.save({
bootstrapLastId: pkg.id,
});
}
} catch (err) {
sentry.report(err);
} finally {
log.info(`Done:`, pkg.id);
datadog.timing('loop', Date.now() - start);
}
}, config.bootstrapConcurrency);
}
1 change: 0 additions & 1 deletion src/changelog.ts
Expand Up @@ -105,7 +105,6 @@ export async function getChangelog(
for (const file of filelist) {
const name = path.basename(file.name);
if (!fileRegex.test(name)) {
// eslint-disable-next-line no-continue
continue;
}

Expand Down
2 changes: 2 additions & 0 deletions src/config.ts
Expand Up @@ -172,6 +172,8 @@ export const config = {
expiresAt: ms('30 days'),
popularExpiresAt: ms('7 days'),
cacheTotalDownloads: ms('1 minute'),
prefetchWaitBetweenPage: 5000,
prefetchMaxIdle: 100,
};

export type Config = typeof config;
Expand Down
6 changes: 4 additions & 2 deletions src/index.ts
Expand Up @@ -34,7 +34,10 @@ async function main(): Promise<void> {
createAPI();

// first we make sure the bootstrap index has the correct settings
log.info('💪 Setting up Algolia');
log.info('💪 Setting up Algolia', [
config.bootstrapIndexName,
config.indexName,
]);
const {
client: algoliaClient,
mainIndex,
Expand All @@ -56,7 +59,6 @@ async function main(): Promise<void> {

// then we figure out which updates we missed since
// the last time main index was updated
log.info('🚀 Launching Watch');
await watch.run(stateManager, mainIndex);
}

Expand Down
2 changes: 1 addition & 1 deletion src/jsDelivr/index.ts
Expand Up @@ -95,7 +95,7 @@ export async function getFilesList(
});
files = response.body.files;
} catch (e) {
log.error(`Failed to fetch ${url}`, e);
log.error(`Failed to fetch ${url}`, e.message);
}

datadog.timing('jsdelivr.getFilesList', Date.now() - start);
Expand Down

0 comments on commit c4f2aa2

Please sign in to comment.