Skip to content

Commit

Permalink
Excise waitForRunToFinish from Apify.call and export it (#675)
Browse files Browse the repository at this point in the history
* Excise waitForRunToFinish from Apify.call and export it

* Rethrow generic errors
  • Loading branch information
mnmkng committed May 11, 2020
1 parent aceabb1 commit 7558eb3
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 71 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
NEXT
====================
- Add `Apify.utils.waitForRunToFinish()` which simplifies waiting for an actor run to finish.
- Add standard prefixes to log messages to improve readability and orientation in logs.
- Add support for `async` handlers in `Apify.utils.puppeteer.addInterceptRequestHandler()`
- EXPERIMENTAL: Add `cheerioCrawler.use()` function to enable attaching `CrawlerExtension`
to the crawler to modify its behavior. A plugin that extends functionality.
- Fix bug with cookie expiry in `SessionPool`.
- Fix issues in documentation.
- Updated Puppeteer to 3.0.2


0.20.3 / 2020-04-14
====================
- **DEPRECATED:** `CheerioCrawlerOptions.requestOptions` is now deprecated. Please use
Expand Down
127 changes: 59 additions & 68 deletions src/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,27 @@ import * as path from 'path';
import * as _ from 'underscore';
import { checkParamOrThrow } from 'apify-client/build/utils';
import { APIFY_PROXY_VALUE_REGEX } from 'apify-shared/regexs';
import { ENV_VARS, INTEGER_ENV_VARS, LOCAL_ENV_VARS, ACT_JOB_TERMINAL_STATUSES, ACT_JOB_STATUSES } from 'apify-shared/consts';
import { ENV_VARS, INTEGER_ENV_VARS, LOCAL_ENV_VARS, ACT_JOB_STATUSES } from 'apify-shared/consts';
import log from './utils_log';
import { EXIT_CODES, COUNTRY_CODE_REGEX } from './constants';
import { initializeEvents, stopEvents } from './events';
import { apifyClient, addCharsetToContentType, sleep, snakeCaseToCamelCase, isAtHome, logSystemInfo, printOutdatedSdkWarning } from './utils';
import {
apifyClient,
addCharsetToContentType,
sleep,
snakeCaseToCamelCase,
isAtHome,
logSystemInfo,
printOutdatedSdkWarning,
waitForRunToFinish,
} from './utils';
import { maybeStringify } from './key_value_store';
import { ApifyCallError } from './errors';

const METAMORPH_AFTER_SLEEP_MILLIS = 300000;

// eslint-disable-next-line import/named,no-unused-vars,import/first
import { ActorRun } from './typedefs';
import { ApifyCallError } from './errors';

const METAMORPH_AFTER_SLEEP_MILLIS = 300000;

/**
* Tries to parse a string with date.
Expand All @@ -26,56 +35,6 @@ const tryParseDate = (str) => {
return unix > 0 ? new Date(unix) : undefined;
};

/**
* Waits for given run to finish. If "waitSecs" is reached then returns unfinished run.
*
* @ignore
*/
const waitForRunToFinish = async ({ actId, runId, token, waitSecs, taskId }) => {
let updatedRun;

const { acts } = apifyClient;
const startedAt = Date.now();
const shouldRepeat = () => {
if (waitSecs && (Date.now() - startedAt) / 1000 >= waitSecs) return false;
if (updatedRun && ACT_JOB_TERMINAL_STATUSES.includes(updatedRun.status)) return false;

return true;
};

const getRunOpts = { actId, runId };
if (token) getRunOpts.token = token;

while (shouldRepeat()) {
getRunOpts.waitForFinish = waitSecs
? Math.round(waitSecs - (Date.now() - startedAt) / 1000)
: 999999;

updatedRun = await acts.getRun(getRunOpts);

// It might take some time for database replicas to get up-to-date,
// so getRun() might return null. Wait a little bit and try it again.
if (!updatedRun) await sleep(250);
}

if (!updatedRun) {
throw new ApifyCallError({ id: runId, actId }, 'Apify.call() failed, cannot fetch actor run details from the server');
}
const { status } = updatedRun;
if (
status !== ACT_JOB_STATUSES.SUCCEEDED
&& status !== ACT_JOB_STATUSES.RUNNING
&& status !== ACT_JOB_STATUSES.READY
) {
const message = taskId
? `The actor task ${taskId} invoked by Apify.call() did not succeed. For details, see https://my.apify.com/view/runs/${runId}`
: `The actor ${actId} invoked by Apify.call() did not succeed. For details, see https://my.apify.com/view/runs/${runId}`;
throw new ApifyCallError(updatedRun, message);
}

return updatedRun;
};

/**
* Parses input and contentType and appends it to a given options object.
* Throws if input is not valid.
Expand Down Expand Up @@ -402,12 +361,25 @@ export const call = async (actId, input, options = {}) => {
if (waitSecs <= 0) return run; // In this case there is nothing more to do.

// Wait for run to finish.
const updatedRun = await waitForRunToFinish({
actId,
runId: run.id,
token,
waitSecs,
});
let updatedRun;
try {
updatedRun = await waitForRunToFinish({
actorId: actId,
runId: run.id,
token,
waitSecs,
});
} catch (err) {
if (err.message.startsWith('Waiting for run to finish')) {
throw new ApifyCallError({ id: run.id, actId: run.actId }, 'Apify.call() failed, cannot fetch actor run details from the server');
}
throw err;
}

if (isRunUnsuccessful(updatedRun.status)) {
const message = `The actor ${actId} invoked by Apify.call() did not succeed. For details, see https://my.apify.com/view/runs/${run.id}`;
throw new ApifyCallError(updatedRun, message);
}

// Finish if output is not requested or run haven't finished.
const { fetchOutput = true } = options;
Expand Down Expand Up @@ -524,13 +496,26 @@ export const callTask = async (taskId, input, options = {}) => {
if (waitSecs <= 0) return run; // In this case there is nothing more to do.

// Wait for run to finish.
const updatedRun = await waitForRunToFinish({
actId: run.actId,
runId: run.id,
token,
waitSecs,
taskId,
});
let updatedRun;
try {
updatedRun = await waitForRunToFinish({
actorId: run.actId,
runId: run.id,
token,
waitSecs,
});
} catch (err) {
if (err.message.startsWith('Waiting for run to finish')) {
throw new ApifyCallError({ id: run.id, actId: run.actId }, 'Apify.call() failed, cannot fetch actor run details from the server');
}
throw err;
}

if (isRunUnsuccessful(updatedRun.status)) {
// TODO It should be callTask in the message, but I'm keeping it this way not to introduce a breaking change.
const message = `The actor task ${taskId} invoked by Apify.call() did not succeed. For details, see https://my.apify.com/view/runs/${run.id}`;
throw new ApifyCallError(updatedRun, message);
}

// Finish if output is not requested or run haven't finished.
const { fetchOutput = true } = options;
Expand All @@ -548,6 +533,12 @@ export const callTask = async (taskId, input, options = {}) => {
return Object.assign({}, updatedRun, { output });
};

function isRunUnsuccessful(status) {
return status !== ACT_JOB_STATUSES.SUCCEEDED
&& status !== ACT_JOB_STATUSES.RUNNING
&& status !== ACT_JOB_STATUSES.READY;
}


/**
* Transforms this actor run to an actor run of a given actor. The system stops the current container and starts the new container
Expand Down
3 changes: 2 additions & 1 deletion src/crawlers/cheerio_crawler.js
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,9 @@ class CheerioCrawler {
}

/**
* **EXPERIMENTAL**
* Function for attaching CrawlerExtensions such as the Unblockers.
* @param extension - Crawler extension that overrides the crawler configuration.
* @param {CrawlerExtension} extension - Crawler extension that overrides the crawler configuration.
*/
use(extension) {
const inheritsFromCrawlerExtension = extension instanceof CrawlerExtension;
Expand Down
69 changes: 67 additions & 2 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as psTree from '@apify/ps-tree';
import * as ApifyClient from 'apify-client';
import { checkParamOrThrow } from 'apify-client/build/utils';
import { version as apifyClientVersion } from 'apify-client/package.json';
import { ENV_VARS, LOCAL_ENV_VARS } from 'apify-shared/consts';
import { ACT_JOB_TERMINAL_STATUSES, ENV_VARS, LOCAL_ENV_VARS } from 'apify-shared/consts';
import { getRandomInt } from 'apify-shared/utilities';
import * as cheerio from 'cheerio';
import * as contentTypeParser from 'content-type';
Expand All @@ -25,6 +25,7 @@ import { version as apifyVersion } from '../package.json';
import { IncomingMessage } from 'http';
import { Response as PuppeteerResponse } from 'puppeteer';
import Request, { RequestOptions } from './request';
import { ActorRun } from './typedefs';
/* eslint-enable no-unused-vars,import/named,import/no-duplicates,import/order */

/**
Expand Down Expand Up @@ -685,6 +686,70 @@ export const parseContentTypeFromResponse = (response) => {
};
};

/**
* Returns a promise that resolves with the finished Run object when the provided actor run finishes
* or with the unfinished Run object when the `waitSecs` timeout lapses. The promise is NOT rejected
* based on run status. You can inspect the `status` property of the Run object to find out its status.
*
* This is useful when you need to chain actor executions. Similar effect can be achieved
* by using webhooks, so be sure to review which technique fits your use-case better.
*
* @param {object} options
* @param {string} options.actorId
* ID of the actor that started the run.
* @param {string} options.runId
* ID of the run itself.
* @param {string} [options.waitSecs]
* Maximum time to wait for the run to finish, in seconds.
* If the limit is reached, the returned promise is resolved to a run object that will have
* status `READY` or `RUNNING`. If `waitSecs` omitted, the function waits indefinitely.
* @param {string} [options.token]
* You can supply an Apify token to override the default one
* that's used by the default ApifyClient instance.
* E.g. you can track other users' runs.
* @returns {Promise<ActorRun>}
* @memberOf utils
* @function
*/
export const waitForRunToFinish = async (options) => {
const {
actorId,
runId,
token,
waitSecs,
} = options;
let run;

const startedAt = Date.now();
const shouldRepeat = () => {
if (waitSecs && (Date.now() - startedAt) / 1000 >= waitSecs) return false;
if (run && ACT_JOB_TERMINAL_STATUSES.includes(run.status)) return false;

return true;
};

const getRunOpts = { actId: actorId, runId };
if (token) getRunOpts.token = token;

while (shouldRepeat()) {
getRunOpts.waitForFinish = waitSecs
? Math.round(waitSecs - (Date.now() - startedAt) / 1000)
: 999999;

run = await apifyClient.acts.getRun(getRunOpts);

// It might take some time for database replicas to get up-to-date,
// so getRun() might return null. Wait a little bit and try it again.
if (!run) await sleep(250);
}

if (!run) {
throw new Error('Waiting for run to finish failed. Cannot fetch actor run details from the server.');
}

return run;
};

/**
* A namespace that contains various utilities.
*
Expand All @@ -710,5 +775,5 @@ export const publicUtils = {
URL_NO_COMMAS_REGEX,
URL_WITH_COMMAS_REGEX,
createRequestDebugInfo,
parseContentTypeFromResponse,
waitForRunToFinish,
};

0 comments on commit 7558eb3

Please sign in to comment.