Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add the ability to extend errors from aws #496

Merged
merged 4 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export class Consumer extends TypedEventEmitter {
private isPolling = false;
private stopRequestedAtTimestamp: number;
public abortController: AbortController;
private extendedAWSErrors: boolean;

constructor(options: ConsumerOptions) {
super();
Expand All @@ -82,6 +83,7 @@ export class Consumer extends TypedEventEmitter {
this.pollingCompleteWaitTimeMs = options.pollingCompleteWaitTimeMs ?? 0;
this.shouldDeleteMessages = options.shouldDeleteMessages ?? true;
this.alwaysAcknowledge = options.alwaysAcknowledge ?? false;
this.extendedAWSErrors = options.extendedAWSErrors ?? false;
this.sqs =
options.sqs ||
new SQSClient({
Expand Down Expand Up @@ -297,7 +299,11 @@ export class Consumer extends TypedEventEmitter {

return result;
} catch (err) {
throw toSQSError(err, `SQS receive message failed: ${err.message}`);
throw toSQSError(
err,
`SQS receive message failed: ${err.message}`,
this.extendedAWSErrors,
);
}
}

Expand Down Expand Up @@ -439,7 +445,11 @@ export class Consumer extends TypedEventEmitter {
} catch (err) {
this.emit(
"error",
toSQSError(err, `Error changing visibility timeout: ${err.message}`),
toSQSError(
err,
`Error changing visibility timeout: ${err.message}`,
this.extendedAWSErrors,
),
message,
);
}
Expand Down Expand Up @@ -470,7 +480,11 @@ export class Consumer extends TypedEventEmitter {
} catch (err) {
this.emit(
"error",
toSQSError(err, `Error changing visibility timeout: ${err.message}`),
toSQSError(
err,
`Error changing visibility timeout: ${err.message}`,
this.extendedAWSErrors,
),
messages,
);
}
Expand Down Expand Up @@ -567,7 +581,11 @@ export class Consumer extends TypedEventEmitter {
this.sqsSendOptions,
);
} catch (err) {
throw toSQSError(err, `SQS delete message failed: ${err.message}`);
throw toSQSError(
err,
`SQS delete message failed: ${err.message}`,
this.extendedAWSErrors,
);
}
}

Expand Down Expand Up @@ -601,7 +619,11 @@ export class Consumer extends TypedEventEmitter {
this.sqsSendOptions,
);
} catch (err) {
throw toSQSError(err, `SQS delete message failed: ${err.message}`);
throw toSQSError(
err,
`SQS delete message failed: ${err.message}`,
this.extendedAWSErrors,
);
}
}
}
15 changes: 13 additions & 2 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ class SQSError extends Error {
service: string;
time: Date;
retryable: boolean;
fault: "client" | "server";
fault: AWSError["$fault"];
response?: AWSError["$response"];
metadata?: AWSError["$metadata"];

constructor(message: string) {
super(message);
Expand Down Expand Up @@ -67,7 +69,11 @@ function isConnectionError(err: Error): boolean {
* @param err The error object that was received.
* @param message The message to send with the error.
*/
function toSQSError(err: AWSError, message: string): SQSError {
function toSQSError(
err: AWSError,
message: string,
extendedAWSErrors: boolean,
): SQSError {
const sqsError = new SQSError(message);
sqsError.code = err.name;
sqsError.statusCode = err.$metadata?.httpStatusCode;
Expand All @@ -76,6 +82,11 @@ function toSQSError(err: AWSError, message: string): SQSError {
sqsError.fault = err.$fault;
sqsError.time = new Date();

if (extendedAWSErrors) {
sqsError.response = err.$response;
sqsError.metadata = err.$metadata;
}

return sqsError;
}

Expand Down
42 changes: 33 additions & 9 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ export interface ConsumerOptions {
* example to add middlewares.
*/
postReceiveMessageCallback?(): Promise<void>;
/**
* Set this to `true` if you want to receive additional information about the error
* that occurred from AWS, such as the response and metadata.
*/
extendedAWSErrors?: boolean;
}

/**
Expand Down Expand Up @@ -231,7 +236,7 @@ export type AWSError = {
/**
* Name, eg. ConditionalCheckFailedException
*/
name: string;
readonly name: string;

/**
* Human-readable error response message
Expand All @@ -246,7 +251,26 @@ export type AWSError = {
/**
* Whether the client or server are at fault.
*/
readonly $fault?: "client" | "server";
readonly $fault: "client" | "server";

/**
* Represents an HTTP message as received in reply to a request
*/
readonly $response?: {
/**
* The status code of the HTTP response.
*/
statusCode?: number;
/**
* The headers of the HTTP message.
*/
headers: Record<string, string>;
/**
* The body of the HTTP message.
* Can be: ArrayBuffer | ArrayBufferView | string | Uint8Array | Readable | ReadableStream
*/
body?: any;
};

/**
* The service that encountered the exception.
Expand All @@ -263,37 +287,37 @@ export type AWSError = {
readonly throttling?: boolean;
};

$metadata?: {
readonly $metadata: {
/**
* The status code of the last HTTP response received for this operation.
*/
httpStatusCode?: number;
readonly httpStatusCode?: number;

/**
* A unique identifier for the last request sent for this operation. Often
* requested by AWS service teams to aid in debugging.
*/
requestId?: string;
readonly requestId?: string;

/**
* A secondary identifier for the last request sent. Used for debugging.
*/
extendedRequestId?: string;
readonly extendedRequestId?: string;

/**
* A tertiary identifier for the last request sent. Used for debugging.
*/
cfId?: string;
readonly cfId?: string;

/**
* The number of times this operation was attempted.
*/
attempts?: number;
readonly attempts?: number;

/**
* The total amount of time (in milliseconds) that was spent waiting between
* retry attempts.
*/
totalRetryDelay?: number;
readonly totalRetryDelay?: number;
};
};
93 changes: 86 additions & 7 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ const mockChangeMessageVisibilityBatch = sinon.match.instanceOf(

class MockSQSError extends Error implements AWSError {
name: string;
$metadata: {
httpStatusCode: number;
};
$metadata: AWSError["$metadata"];
$service: string;
$retryable: {
throttling: boolean;
};
$fault: "client" | "server";
$retryable: AWSError["$retryable"];
$fault: AWSError["$fault"];
$response?:
| {
statusCode?: number | undefined;
headers: Record<string, string>;
body?: any;
}
| undefined;
time: Date;

constructor(message: string) {
Expand Down Expand Up @@ -245,6 +248,82 @@ describe("Consumer", () => {
assert.equal(err.time.toString(), receiveErr.time.toString());
assert.equal(err.service, receiveErr.$service);
assert.equal(err.fault, receiveErr.$fault);
assert.isUndefined(err.response);
assert.isUndefined(err.metadata);
});

it('includes the response and metadata in the error when "extendedAWSErrors" is true', async () => {
const receiveErr = new MockSQSError("Receive error");
receiveErr.name = "short code";
receiveErr.$retryable = {
throttling: false,
};
receiveErr.$metadata = {
httpStatusCode: 403,
};
receiveErr.time = new Date();
receiveErr.$service = "service";
receiveErr.$response = {
statusCode: 200,
headers: {},
body: "body",
};

sqs.send.withArgs(mockReceiveMessage).rejects(receiveErr);

consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
extendedAWSErrors: true,
});

consumer.start();
const err: any = await pEvent(consumer, "error");
consumer.stop();

assert.ok(err);
assert.equal(err.response, receiveErr.$response);
assert.equal(err.metadata, receiveErr.$metadata);
});

it("does not include the response and metadata in the error when extendedAWSErrors is false", async () => {
const receiveErr = new MockSQSError("Receive error");
receiveErr.name = "short code";
receiveErr.$retryable = {
throttling: false,
};
receiveErr.$metadata = {
httpStatusCode: 403,
};
receiveErr.time = new Date();
receiveErr.$service = "service";
receiveErr.$response = {
statusCode: 200,
headers: {},
body: "body",
};

sqs.send.withArgs(mockReceiveMessage).rejects(receiveErr);

consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
extendedAWSErrors: false,
});

consumer.start();
const err: any = await pEvent(consumer, "error");
consumer.stop();

assert.ok(err);
assert.isUndefined(err.response);
assert.isUndefined(err.metadata);
});

it("fires a timeout event if handler function takes too long", async () => {
Expand Down
Loading