Skip to content

Commit

Permalink
fix: improve logging + remove catchup (#647)
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel Bodin committed Jul 7, 2021
1 parent 5a48ad3 commit cbc545d
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 89 deletions.
3 changes: 1 addition & 2 deletions README.md
Expand Up @@ -233,8 +233,7 @@ See [config.js](./config.js):
- `apiKey`: [Algolia](https://www.algolia.com/) apiKey - **required**
- `appId`: [Algolia](https://www.algolia.com/) appId - _default `OFCNCOG2CU`_
- `indexName`: [Algolia](https://www.algolia.com/) indexName - _default `npm-search`_
- `bootstrapConcurrency`: How many docs to grab from npm registry at once in the bootstrap phase - _default `100`_
- `replicateConcurrency`: How many changes to grab from npm registry at once in the replicate phase - _default `10`_
- `bootstrapConcurrency`: How many docs to grab from npm registry at once in the bootstrap phase - _default `25`_
- `seq`: npm registry first [change sequence](http://docs.couchdb.org/en/2.0.0/json-structure.html#changes-information-for-a-database)
to start replication. In normal operations you should never have to use this. - _default `0`_
- `npmRegistryEndpoint`: npm registry endpoint to replicate from - _default `https://replicate.npmjs.com/registry`_
Expand Down
4 changes: 1 addition & 3 deletions src/bootstrap.ts
Expand Up @@ -107,12 +107,10 @@ async function loop(

const newLastId = res.rows[res.rows.length - 1].id;

const saved = await saveDocs({ docs: res.rows, index: bootstrapIndex });
await saveDocs({ docs: res.rows, index: bootstrapIndex });
await stateManager.save({
bootstrapLastId: newLastId,
});
log.info(` - saved ${saved} packages`);

await logProgress(res.offset, res.rows.length);

datadog.timing('loop', Date.now() - start);
Expand Down
1 change: 0 additions & 1 deletion src/config.ts
Expand Up @@ -163,7 +163,6 @@ export const config = {
apiKey: '',
indexName: 'npm-search',
bootstrapIndexName: 'npm-search-bootstrap',
replicateConcurrency: 1,
bootstrapConcurrency: 25,
timeToRedoBootstrap: ms('2 weeks'),
seq: undefined,
Expand Down
9 changes: 5 additions & 4 deletions src/jsDelivr/__test__/index.test.ts
Expand Up @@ -6,6 +6,7 @@ jest.mock('../../utils/log', () => {
log: {
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
},
};
});
Expand Down Expand Up @@ -85,8 +86,8 @@ describe('files', () => {
version: '3.33.0',
});
expect(files).toEqual([]);
expect(log.warn.mock.calls[0][0].message).toMatchInlineSnapshot(
`"Response code 404 (Not Found)"`
expect(log.error.mock.calls[0][0]).toEqual(
'Failed to fetch https://data.jsdelivr.com/v1/package/npm/thispackagedoesnotexist@3.33.0/flat'
);
});
});
Expand All @@ -111,8 +112,8 @@ describe('files', () => {
},
]);
expect(files).toMatchSnapshot();
expect(log.warn.mock.calls[0][0].message).toMatchInlineSnapshot(
`"Response code 404 (Not Found)"`
expect(log.error.mock.calls[0][0]).toEqual(
'Failed to fetch https://data.jsdelivr.com/v1/package/npm/thispackagedoesnotexist@3.33.0/flat'
);
});
});
Expand Down
14 changes: 6 additions & 8 deletions src/jsDelivr/index.ts
Expand Up @@ -26,7 +26,7 @@ export async function loadHits(): Promise<void> {
hits.set(pkg.name, pkg.hits);
});
} catch (e) {
log.error(e);
log.error('Failed to fetch', e);
}

datadog.timing('jsdelivr.loadHits', Date.now() - start);
Expand Down Expand Up @@ -85,16 +85,14 @@ export async function getFilesList(
}

let files: File[] = [];
const url = `${config.jsDelivrPackageEndpoint}/${pkg.name}@${pkg.version}/flat`;
try {
const response = await request<{ default: string; files: File[] }>(
`${config.jsDelivrPackageEndpoint}/${pkg.name}@${pkg.version}/flat`,
{
responseType: 'json',
}
);
const response = await request<{ default: string; files: File[] }>(url, {
responseType: 'json',
});
files = response.body.files;
} catch (e) {
log.warn(e);
log.error(`Failed to fetch ${url}`, e);
}

datadog.timing('jsdelivr.getFilesList', Date.now() - start);
Expand Down
8 changes: 6 additions & 2 deletions src/saveDocs.ts
Expand Up @@ -41,17 +41,21 @@ export default async function saveDocs({
log.info('馃攳 No pkgs found in response.');
return Promise.resolve(0);
}

log.info(`馃憯 Saving... ${names.length} packages`, names);
log.info(' => ', names);
log.info(' Adding metadata...');

let start2 = Date.now();
const pkgs = await addMetaData(rawPkgs);
datadog.timing('saveDocs.addMetaData', Date.now() - start2);

log.info(` Saving...`);

start2 = Date.now();
await index.saveObjects(pkgs);
datadog.timing('saveDocs.saveObjects', Date.now() - start2);

log.info(` Saved`);

datadog.timing('saveDocs', Date.now() - start);
return pkgs.length;
}
Expand Down
76 changes: 7 additions & 69 deletions src/watch.ts
@@ -1,28 +1,20 @@
import type { SearchIndex } from 'algoliasearch';
import type { QueueObject } from 'async';
import { queue } from 'async';
import ms from 'ms';
import type { DatabaseChangesResultItem, DocumentLookupFailure } from 'nano';

import type { StateManager } from './StateManager';
import { config } from './config';
import * as npm from './npm';
import saveDocs from './saveDocs';
import { datadog } from './utils/datadog';
import { log } from './utils/log';
import * as sentry from './utils/sentry';

let loopStart = Date.now();
let totalSequence: number; // Cached npmInfo.seq
let changesConsumer: QueueObject<DatabaseChangesResultItem>;

/**
* Run watch and catchup.
*
* --- Catchup ?
* If the bootstrap is long or the process has been stopped long enough,
* we are lagging behind few changes.
* Catchup() will paginate through changes that we have missed.
* Run watch.
*
* --- Watch ?
* Watch is "Long Polled. This mode is not paginated and the event system in CouchDB send
Expand All @@ -43,12 +35,6 @@ let changesConsumer: QueueObject<DatabaseChangesResultItem>;
* until an other package is updated.
* It will never be up to date because he receive event at the same pace
* as they arrive in listener A, even if it's not the same package.
*
*
* --- We could use catchup with a timeout between poll then?
* Yes !
* When we are catched up, we could await between poll and we will receive N changes.
* But long-polling is more efficient in term of bandwidth and more reactive.
*/
async function run(
stateManager: StateManager,
Expand All @@ -60,54 +46,11 @@ async function run(

changesConsumer = createChangeConsumer(stateManager, mainIndex);

await catchup(stateManager);

log.info('馃殌 Index is up to date, watch mode activated');

await watch(stateManager);

log.info('馃殌 watch is done');
}

/**
* Loop through all changes that may have been missed.
*/
async function catchup(stateManager: StateManager): Promise<void> {
let hasCaughtUp: boolean = false;

while (!hasCaughtUp) {
loopStart = Date.now();

try {
const npmInfo = await npm.getInfo();
totalSequence = npmInfo.seq;

const { seq } = await stateManager.get();

log.info('馃殌 Catchup: continue since sequence [%d]', seq);

// Get one chunk of changes from registry
const changes = await npm.getChanges({
since: seq,
limit: config.replicateConcurrency,
include_docs: true,
});

for (const change of changes.results) {
changesConsumer.push(change);
}
await changesConsumer.drain();

const newState = await stateManager.get();
if (newState.seq! >= totalSequence) {
hasCaughtUp = true;
}
} catch (err) {
sentry.report(err);
}
}
}

/**
* Active synchronous mode with Registry.
* Changes are polled with a keep-alived connection.
Expand Down Expand Up @@ -144,7 +87,7 @@ async function watch(stateManager: StateManager): Promise<true> {
}

/**
* Process changes.
* Process changes in order.
*/
async function loop(
mainIndex: SearchIndex,
Expand Down Expand Up @@ -180,20 +123,15 @@ async function loop(
}

/**
* Log our process through catchup/watch.
* Log our process through watch.
*
*/
function logProgress(seq: number, nbChanges: number): void {
const ratePerSecond = nbChanges / ((Date.now() - loopStart) / 1000);
const remaining = ((totalSequence - seq) / ratePerSecond) * 1000 || 0;

function logProgress(seq: number): void {
log.info(
`馃殌 Synced %d/%d changes (%d%), current rate: %d changes/s (%s remaining)`,
`馃殌 Synced %d/%d changes (%d%)`,
seq,
totalSequence,
Math.floor((Math.max(seq, 1) / totalSequence) * 100),
Math.round(ratePerSecond),
ms(remaining)
Math.floor((Math.max(seq, 1) / totalSequence) * 100)
);
}

Expand All @@ -220,7 +158,7 @@ function createChangeConsumer(
await stateManager.save({
seq,
});
logProgress(seq, 1);
logProgress(seq);
} catch (err) {
sentry.report(err);
}
Expand Down

0 comments on commit cbc545d

Please sign in to comment.