Skip to content

Commit

Permalink
feat: allow configuring crawler statistics (#2213)
Browse files Browse the repository at this point in the history
Allow disabling automatic persistence in Statistics and SessionPool.
Add additional methods for manual disabling in case it's needed, but
just not the automatic one
Closes #1789

---------

Co-authored-by: Martin Adámek <banan23@gmail.com>
  • Loading branch information
foxt451 and B4nan committed Dec 20, 2023
1 parent 81672c3 commit 9fd60e4
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 10 deletions.
13 changes: 12 additions & 1 deletion packages/basic-crawler/src/internals/basic-crawler.ts
Expand Up @@ -25,6 +25,7 @@ import type {
SessionPoolOptions,
Source,
StatisticState,
StatisticsOptions,
} from '@crawlee/core';
import {
AutoscaledPool,
Expand Down Expand Up @@ -338,6 +339,12 @@ export interface BasicCrawlerOptions<Context extends CrawlingContext = BasicCraw
* WARNING: these options are not guaranteed to be stable and may change or be removed at any time.
*/
experiments?: CrawlerExperiments;

/**
* Customize the way statistics collecting works, such as logging interval or
* whether to output them to the Key-Value store.
*/
statisticsOptions?: StatisticsOptions;
}

/**
Expand Down Expand Up @@ -524,6 +531,8 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
// internal
log: ow.optional.object,
experiments: ow.optional.object,

statisticsOptions: ow.optional.object,
};

/**
Expand Down Expand Up @@ -569,6 +578,8 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext

statusMessageLoggingInterval = 10,
statusMessageCallback,

statisticsOptions,
} = options;

this.requestList = requestList;
Expand Down Expand Up @@ -650,7 +661,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
this.sameDomainDelayMillis = sameDomainDelaySecs * 1000;
this.maxSessionRotations = maxSessionRotations;
this.handledRequestsCount = 0;
this.stats = new Statistics({ logMessage: `${log.getOptions().prefix} request statistics:`, config });
this.stats = new Statistics({ logMessage: `${log.getOptions().prefix} request statistics:`, config, ...statisticsOptions });
this.sessionPoolOptions = {
...sessionPoolOptions,
log,
Expand Down
69 changes: 63 additions & 6 deletions packages/core/src/crawlers/statistics.ts
Expand Up @@ -39,6 +39,17 @@ const errorTrackerConfig = {
showFullMessage: false,
};

/**
* Persistence-related options to control how and when crawler's data gets persisted.
*/
export interface PersistenceOptions {
/**
* Use this flag to disable or enable periodic persistence to key value store.
* @default true
*/
enable?: boolean;
}

/**
* The statistics class provides an interface to collecting and logging run
* statistics for requests.
Expand Down Expand Up @@ -92,6 +103,7 @@ export class Statistics {
private instanceStart!: number;
private logInterval: unknown;
private events: EventManager;
private persistenceOptions: PersistenceOptions;

/**
* @internal
Expand All @@ -102,13 +114,17 @@ export class Statistics {
logMessage: ow.optional.string,
keyValueStore: ow.optional.object,
config: ow.optional.object,
persistenceOptions: ow.optional.object,
}));

const {
logIntervalSecs = 60,
logMessage = 'Statistics',
keyValueStore,
config = Configuration.getGlobalConfig(),
persistenceOptions = {
enable: true,
},
} = options;

this.logIntervalMillis = logIntervalSecs * 1000;
Expand All @@ -117,6 +133,7 @@ export class Statistics {
this.listener = this.persistState.bind(this);
this.events = config.getEventManager();
this.config = config;
this.persistenceOptions = persistenceOptions;

// initialize by "resetting"
this.reset();
Expand Down Expand Up @@ -155,7 +172,14 @@ export class Statistics {
this._teardown();
}

async resetStore() {
/**
* @param options - Override the persistence options provided in the constructor
*/
async resetStore(options?: PersistenceOptions) {
if (!this.persistenceOptions.enable && !options?.enable) {
return;
}

if (!this.keyValueStore) {
return;
}
Expand Down Expand Up @@ -247,13 +271,14 @@ export class Statistics {
async startCapturing() {
this.keyValueStore ??= await KeyValueStore.open(null, { config: this.config });

await this._maybeLoadStatistics();

if (this.state.crawlerStartedAt === null) {
this.state.crawlerStartedAt = new Date();
}

this.events.on(EventType.PERSIST_STATE, this.listener);
if (this.persistenceOptions.enable) {
await this._maybeLoadStatistics();
this.events.on(EventType.PERSIST_STATE, this.listener);
}

this.logInterval = setInterval(() => {
this.log.info(this.logMessage, {
Expand Down Expand Up @@ -284,8 +309,13 @@ export class Statistics {

/**
* Persist internal state to the key value store
* @param options - Override the persistence options provided in the constructor
*/
async persistState() {
async persistState(options?: PersistenceOptions) {
if (!this.persistenceOptions.enable && !options?.enable) {
return;
}

// this might be called before startCapturing was called without using await, should not crash
if (!this.keyValueStore) {
return;
Expand Down Expand Up @@ -382,11 +412,38 @@ export class Statistics {
}
}

interface StatisticsOptions {
/**
* Configuration for the {@apilink Statistics} instance used by the crawler
*/
export interface StatisticsOptions {
/**
* Interval in seconds to log the current statistics
* @default 60
*/
logIntervalSecs?: number;

/**
* Message to log with the current statistics
* @default 'Statistics'
*/
logMessage?: string;

/**
* Key value store instance to persist the statistics.
* If not provided, the default one will be used when capturing starts
*/
keyValueStore?: KeyValueStore;

/**
* Configuration instance to use
* @default Configuration.getGlobalConfig()
*/
config?: Configuration;

/**
* Control how and when to persist the statistics.
*/
persistenceOptions?: PersistenceOptions;
}

/**
Expand Down
40 changes: 37 additions & 3 deletions packages/core/src/session_pool/session_pool.ts
Expand Up @@ -9,6 +9,7 @@ import { BLOCKED_STATUS_CODES, MAX_POOL_SIZE, PERSIST_STATE_KEY } from './consts
import type { SessionOptions } from './session';
import { Session } from './session';
import { Configuration } from '../configuration';
import type { PersistenceOptions } from '../crawlers/statistics';
import type { EventManager } from '../events/event_manager';
import { EventType } from '../events/event_manager';
import { log as defaultLog } from '../log';
Expand Down Expand Up @@ -60,6 +61,11 @@ export interface SessionPoolOptions {

/** @internal */
log?: Log;

/**
* Control how and when to persist the state of the session pool.
*/
persistenceOptions?: PersistenceOptions;
}

/**
Expand Down Expand Up @@ -140,6 +146,8 @@ export class SessionPool extends EventEmitter {
protected _listener!: () => Promise<void>;
protected events: EventManager;
protected readonly blockedStatusCodes: number[];
protected persistenceOptions: PersistenceOptions;
protected isInitialized = false;

private queue = new AsyncQueue();

Expand All @@ -157,6 +165,7 @@ export class SessionPool extends EventEmitter {
sessionOptions: ow.optional.object,
blockedStatusCodes: ow.optional.array.ofType(ow.number),
log: ow.optional.object,
persistenceOptions: ow.optional.object,
}));

const {
Expand All @@ -167,12 +176,16 @@ export class SessionPool extends EventEmitter {
sessionOptions = {},
blockedStatusCodes = BLOCKED_STATUS_CODES,
log = defaultLog,
persistenceOptions = {
enable: true,
},
} = options;

this.config = config;
this.blockedStatusCodes = blockedStatusCodes;
this.events = config.getEventManager();
this.log = log.child({ prefix: 'SessionPool' });
this.persistenceOptions = persistenceOptions;

// Pool Configuration
this.maxPoolSize = maxPoolSize;
Expand Down Expand Up @@ -210,7 +223,15 @@ export class SessionPool extends EventEmitter {
* It is called automatically by the {@apilink SessionPool.open} function.
*/
async initialize(): Promise<void> {
if (this.isInitialized) {
return;
}

this.keyValueStore = await KeyValueStore.open(this.persistStateKeyValueStoreId, { config: this.config });
if (!this.persistenceOptions.enable) {
this.isInitialized = true;
return;
}

if (!this.persistStateKeyValueStoreId) {
// eslint-disable-next-line max-len
Expand All @@ -223,6 +244,7 @@ export class SessionPool extends EventEmitter {
this._listener = this.persistState.bind(this);

this.events.on(EventType.PERSIST_STATE, this._listener);
this.isInitialized = true;
}

/**
Expand Down Expand Up @@ -301,7 +323,14 @@ export class SessionPool extends EventEmitter {
}
}

async resetStore() {
/**
* @param options - Override the persistence options provided in the constructor
*/
async resetStore(options?: PersistenceOptions) {
if (!this.persistenceOptions.enable && !options?.enable) {
return;
}

await this.keyValueStore?.setValue(this.persistStateKey, null);
}

Expand All @@ -320,8 +349,13 @@ export class SessionPool extends EventEmitter {
/**
* Persists the current state of the `SessionPool` into the default {@apilink KeyValueStore}.
* The state is persisted automatically in regular intervals.
* @param options - Override the persistence options provided in the constructor
*/
async persistState(): Promise<void> {
async persistState(options?: PersistenceOptions): Promise<void> {
if (!this.persistenceOptions.enable && !options?.enable) {
return;
}

this.log.debug('Persisting state', {
persistStateKeyValueStoreId: this.persistStateKeyValueStoreId,
persistStateKey: this.persistStateKey,
Expand All @@ -342,7 +376,7 @@ export class SessionPool extends EventEmitter {
* SessionPool should not work before initialization.
*/
protected _throwIfNotInitialized() {
if (!this._listener) throw new Error('SessionPool is not initialized.');
if (!this.isInitialized) throw new Error('SessionPool is not initialized.');
}

/**
Expand Down

0 comments on commit 9fd60e4

Please sign in to comment.