Skip to content

Commit

Permalink
feat: add FileDownload "crawler" (#2435)
Browse files Browse the repository at this point in the history
Adds a new package `@crawlee/file-download`, which overrides the
`HttpCrawler`'s MIME type limitations and allows the users to download
arbitrary files.

Aside from the regular `requestHandler`, this crawler introduces
`streamHandler`, which passes a `ReadableStream` with the downloaded
data to the user handler.

---------

Co-authored-by: Martin Adámek <banan23@gmail.com>
Co-authored-by: Jan Buchar <jan.buchar@apify.com>
  • Loading branch information
3 people committed May 16, 2024
1 parent 99e1a94 commit d73756b
Show file tree
Hide file tree
Showing 3 changed files with 384 additions and 0 deletions.
1 change: 1 addition & 0 deletions packages/http-crawler/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from '@crawlee/basic';
export * from './internals/http-crawler';
export * from './internals/file-download';
225 changes: 225 additions & 0 deletions packages/http-crawler/src/internals/file-download.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import { finished } from 'stream/promises';
import { isPromise } from 'util/types';

import type { Dictionary } from '@crawlee/types';

import type {
ErrorHandler,
GetUserDataFromRequest,
HttpCrawlerOptions,
InternalHttpCrawlingContext,
InternalHttpHook,
RequestHandler,
RouterRoutes,
} from '../index';
import {
HttpCrawler,
Router,
} from '../index';

export type FileDownloadErrorHandler<
UserData extends Dictionary = any, // with default to Dictionary we cant use a typed router in untyped crawler
JSONData extends Dictionary = any, // with default to Dictionary we cant use a typed router in untyped crawler
> = ErrorHandler<FileDownloadCrawlingContext<UserData, JSONData>>;

export type StreamHandlerContext = Omit<FileDownloadCrawlingContext, 'body' | 'response' | 'parseWithCheerio' | 'json' | 'addRequests' | 'contentType'> & {
stream: ReadableStream;
};

type StreamHandler = (context: StreamHandlerContext) => void | Promise<void>;

export type FileDownloadOptions<
UserData extends Dictionary = any, // with default to Dictionary we cant use a typed router in untyped crawler
JSONData extends Dictionary = any, // with default to Dictionary we cant use a typed router in untyped crawler
> =
// eslint-disable-next-line max-len
(Omit<HttpCrawlerOptions<FileDownloadCrawlingContext<UserData, JSONData>>, 'requestHandler' > & { requestHandler?: never; streamHandler?: StreamHandler }) |
// eslint-disable-next-line max-len
(Omit<HttpCrawlerOptions<FileDownloadCrawlingContext<UserData, JSONData>>, 'requestHandler' > & { requestHandler: FileDownloadRequestHandler; streamHandler?: never });

export type FileDownloadHook<
UserData extends Dictionary = any, // with default to Dictionary we cant use a typed router in untyped crawler
JSONData extends Dictionary = any, // with default to Dictionary we cant use a typed router in untyped crawler
> = InternalHttpHook<FileDownloadCrawlingContext<UserData, JSONData>>;

export interface FileDownloadCrawlingContext<
UserData extends Dictionary = any, // with default to Dictionary we cant use a typed router in untyped crawler
JSONData extends Dictionary = any, // with default to Dictionary we cant use a typed router in untyped crawler
> extends InternalHttpCrawlingContext<UserData, JSONData, FileDownload> {}

export type FileDownloadRequestHandler<
UserData extends Dictionary = any, // with default to Dictionary we cant use a typed router in untyped crawler
JSONData extends Dictionary = any, // with default to Dictionary we cant use a typed router in untyped crawler
> = RequestHandler<FileDownloadCrawlingContext<UserData, JSONData>>;

/**
* Provides a framework for downloading files in parallel using plain HTTP requests. The URLs to download are fed either from a static list of URLs or they can be added on the fly from another crawler.
*
* Since `FileDownload` uses raw HTTP requests to download the files, it is very fast and bandwith-efficient.
* However, it doesn't parse the content - if you need to e.g. extract data from the downloaded files,
* you might need to use {@apilink CheerioCrawler}, {@apilink PuppeteerCrawler} or {@apilink PlaywrightCrawler} instead.
*
* `FileCrawler` downloads each URL using a plain HTTP request and then invokes the user-provided {@apilink FileDownloadOptions.requestHandler} where the user can specify what to do with the downloaded data.
*
* The source URLs are represented using {@apilink Request} objects that are fed from {@apilink RequestList} or {@apilink RequestQueue} instances provided by the {@apilink FileDownloadOptions.requestList} or {@apilink FileDownloadOptions.requestQueue} constructor options, respectively.
*
* If both {@apilink FileDownloadOptions.requestList} and {@apilink FileDownloadOptions.requestQueue} are used, the instance first processes URLs from the {@apilink RequestList} and automatically enqueues all of them to {@apilink RequestQueue} before it starts their processing. This ensures that a single URL is not crawled multiple times.
*
* The crawler finishes when there are no more {@apilink Request} objects to crawl.
*
* We can use the `preNavigationHooks` to adjust `gotOptions`:
*
* ```
* preNavigationHooks: [
* (crawlingContext, gotOptions) => {
* // ...
* },
* ]
* ```
*
* New requests are only dispatched when there is enough free CPU and memory available, using the functionality provided by the {@apilink AutoscaledPool} class. All {@apilink AutoscaledPool} configuration options can be passed to the `autoscaledPoolOptions` parameter of the `FileCrawler` constructor. For user convenience, the `minConcurrency` and `maxConcurrency` {@apilink AutoscaledPool} options are available directly in the `FileCrawler` constructor.
*
* ## Example usage
*
* ```ts
* const crawler = new FileDownloader({
* requestHandler({ body, request }) {
* writeFileSync(request.url.replace(/[^a-z0-9\.]/gi, '_'), body);
* },
* });
*
* await crawler.run([
* 'http://www.example.com/document.pdf',
* 'http://www.example.com/sound.mp3',
* 'http://www.example.com/video.mkv',
* ]);
* ```
*/
export class FileDownload extends HttpCrawler<FileDownloadCrawlingContext> {
private streamHandler?: StreamHandler;

constructor(options: FileDownloadOptions = {}) {
const { streamHandler } = options;
delete options.streamHandler;

if (streamHandler) {
// For streams, the navigation is done in the request handler.
(options as any).requestHandlerTimeoutSecs = options.navigationTimeoutSecs ?? 120;
}

super(options);

this.streamHandler = streamHandler;
if (this.streamHandler) {
this.requestHandler = this.streamRequestHandler;
}

// The base HttpCrawler class only supports a handful of text based mime types.
// With the FileDownload crawler, we want to download any file type.
(this as any).supportedMimeTypes = new Set(['*/*']);
}

protected override async _runRequestHandler(context: FileDownloadCrawlingContext) {
if (this.streamHandler) {
context.request.skipNavigation = true;
}

await super._runRequestHandler(context);
}

private async streamRequestHandler(context: FileDownloadCrawlingContext) {
const { log, request: { url } } = context;

const { gotScraping } = await import('got-scraping');

const stream = gotScraping.stream({
url,
timeout: { request: undefined },
proxyUrl: context.proxyInfo?.url,
isStream: true,
});

let pollingInterval: NodeJS.Timeout | undefined;

const cleanUp = () => {
clearInterval(pollingInterval!);
stream.destroy();
};

const downloadPromise = new Promise<void>((resolve, reject) => {
pollingInterval = setInterval(() => {
const { total, transferred } = stream.downloadProgress;

if (transferred > 0) {
log.debug(
`Downloaded ${transferred} bytes of ${total ?? 0} bytes from ${url}.`,
);
}
}, 5000);

stream.on('error', async (error: Error) => {
cleanUp();
reject(error);
});

let streamHandlerResult;

try {
context.stream = stream;
streamHandlerResult = this.streamHandler!(context as any);
} catch (e) {
cleanUp();
reject(e);
}

if (isPromise(streamHandlerResult)) {
streamHandlerResult.then(() => {
resolve();
}).catch((e: Error) => {
cleanUp();
reject(e);
});
} else {
resolve();
}
});

await Promise.all([
downloadPromise,
finished(stream),
]);

cleanUp();
}
}

/**
* Creates new {@apilink Router} instance that works based on request labels.
* This instance can then serve as a `requestHandler` of your {@apilink FileDownload}.
* Defaults to the {@apilink FileDownloadCrawlingContext}.
*
* > Serves as a shortcut for using `Router.create<FileDownloadCrawlingContext>()`.
*
* ```ts
* import { FileDownload, createFileRouter } from 'crawlee';
*
* const router = createFileRouter();
* router.addHandler('label-a', async (ctx) => {
* ctx.log.info('...');
* });
* router.addDefaultHandler(async (ctx) => {
* ctx.log.info('...');
* });
*
* const crawler = new FileDownload({
* requestHandler: router,
* });
* await crawler.run();
* ```
*/
export function createFileRouter<
Context extends FileDownloadCrawlingContext = FileDownloadCrawlingContext,
UserData extends Dictionary = GetUserDataFromRequest<Context['request']>,
>(routes?: RouterRoutes<Context, UserData>) {
return Router.create<Context>(routes);
}
158 changes: 158 additions & 0 deletions test/core/crawlers/file_download.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import type { Server } from 'http';
import type { AddressInfo } from 'node:net';
import { Duplex } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { ReadableStream } from 'node:stream/web';
import { setTimeout } from 'node:timers/promises';

import { Configuration, FileDownload } from '@crawlee/http';
import express from 'express';
import { startExpressAppPromise } from 'test/shared/_helper';

class ReadableStreamGenerator {
private static async generateRandomData(size: number, seed: number) {
const chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
const buffer = Buffer.alloc(size);
for (let i = 0; i < size; i++) {
// eslint-disable-next-line no-bitwise
seed = Math.imul(48271, seed) | 0 % 2147483647;
buffer[i] = chars.charCodeAt(seed % chars.length);
}
return buffer;
}

static getReadableStream(size: number, seed: number, throttle: number = 0): ReadableStream {
let bytesRead = 0;
const stream = new ReadableStream({
start: async (controller) => {
while (bytesRead < size) {
const chunkSize = Math.min(size - bytesRead, 1024);
const chunk = await this.generateRandomData(chunkSize, seed);
bytesRead += chunk.length;
controller.enqueue(chunk);

if (throttle > 0) {
await setTimeout(throttle);
}
}
controller.close();
},
});

return stream;
}

static async getBuffer(size: number, seed: number) {
const stream = this.getReadableStream(size, seed);
const chunks: string[] = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
return Buffer.from(chunks.join(''));
}
}

let url = 'http://localhost';
let server: Server;
beforeAll(async () => {
const app = express();

app.get('/file', async (req, res) => {
const reqUrl = new URL(req.url, 'http://localhost');

const size = Number(reqUrl.searchParams.get('size') ?? 1024);
const seed = Number(reqUrl.searchParams.get('seed') ?? 123);
const throttle = Number(reqUrl.searchParams.get('throttle') ?? 0);

const stream = ReadableStreamGenerator.getReadableStream(size, seed, throttle);

res.setHeader('content-type', 'application/octet-stream');
await pipeline(stream, res);

res.end();
});

server = await startExpressAppPromise(app, 0);
url = `http://localhost:${(server.address() as AddressInfo).port}`;
});

afterAll(async () => {
server.close();
});

test('requestHandler works', async () => {
const results: Buffer[] = [];

const crawler = new FileDownload({
maxRequestRetries: 0,
requestHandler: ({ body }) => {
results.push(body as Buffer);
},
});

const fileUrl = new URL('/file?size=1024&seed=123', url).toString();

await crawler.run([fileUrl]);

expect(results).toHaveLength(1);
expect(results[0].length).toBe(1024);
expect(results[0]).toEqual(await ReadableStreamGenerator.getBuffer(1024, 123));
});

test('streamHandler works', async () => {
let result: Buffer = Buffer.alloc(0);

const crawler = new FileDownload({
maxRequestRetries: 0,
streamHandler: async ({ stream }) => {
for await (const chunk of stream as ReadableStream<any>) {
result = Buffer.concat([result, chunk]);
}
},
});

const fileUrl = new URL('/file?size=1024&seed=456', url).toString();

await crawler.run([fileUrl]);

expect(result.length).toBe(1024);
expect(result).toEqual(await ReadableStreamGenerator.getBuffer(1024, 456));
});

test('crawler with streamHandler waits for the stream to finish', async () => {
const bufferingStream = new Duplex({
read() {},
write(chunk, encoding, callback) {
this.push(chunk);
callback();
},
});

const crawler = new FileDownload({
maxRequestRetries: 0,
streamHandler: ({ stream }) => {
pipeline(stream as any, bufferingStream).then(() => {
bufferingStream.push(null);
bufferingStream.end();
}).catch((e) => {
bufferingStream.destroy(e);
});
},
});

// waits for a second after every kilobyte sent.
const fileUrl = new URL(`/file?size=${5 * 1024}&seed=789&throttle=1000`, url).toString();
await crawler.run([fileUrl]);

// the stream should be finished once the crawler finishes.
expect(bufferingStream.writableFinished).toBe(true);

const bufferedData: Buffer[] = [];
for await (const chunk of bufferingStream) {
bufferedData.push(chunk);
}
const result = Buffer.concat(bufferedData);

expect(result.length).toBe(5 * 1024);
expect(result).toEqual(await ReadableStreamGenerator.getBuffer(5 * 1024, 789));
});

0 comments on commit d73756b

Please sign in to comment.