Skip to content

Commit

Permalink
feat: extend authentication error to handle more codes (#489)
Browse files Browse the repository at this point in the history
* feat: extend authentication error to handle more codes

* chore simplifying the conditional
  • Loading branch information
nicholasgriffintn committed Apr 27, 2024
1 parent 494ed94 commit 45f0916
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ export class Consumer extends TypedEventEmitter {
this.emitError(err);
if (isConnectionError(err)) {
logger.debug("authentication_error", {
code: err.code || "Unknown",
detail:
"There was an authentication error. Pausing before retrying.",
});
Expand Down
23 changes: 16 additions & 7 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,28 @@ class StandardError extends Error {
}
}

/**
* List of SQS error codes that are considered connection errors.
*/
const CONNECTION_ERRORS = [
"CredentialsError",
"UnknownEndpoint",
"AWS.SimpleQueueService.NonExistentQueue",
"CredentialsProviderError",
"InvalidAddress",
"InvalidSecurity",
"QueueDoesNotExist",
"RequestThrottled",
"OverLimit",
];

/**
* Checks if the error provided should be treated as a connection error.
* @param err The error that was received.
*/
function isConnectionError(err: Error): boolean {
if (err instanceof SQSError) {
return (
err.statusCode === 403 ||
err.code === "CredentialsError" ||
err.code === "UnknownEndpoint" ||
err.code === "AWS.SimpleQueueService.NonExistentQueue" ||
err.code === "CredentialsProviderError"
);
return err.statusCode === 403 || CONNECTION_ERRORS.includes(err.code);
}
return false;
}
Expand Down
165 changes: 165 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ describe("Consumer", () => {
});

it("waits before repolling when a credentials error occurs", async () => {
const loggerDebug = sandbox.stub(logger, "debug");

const credentialsErr = {
name: "CredentialsError",
message: "Missing credentials in config",
Expand All @@ -424,9 +426,16 @@ describe("Consumer", () => {
sandbox.assert.calledTwice(errorListener);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage);

sandbox.assert.calledWith(loggerDebug, "authentication_error", {
code: "CredentialsError",
detail: "There was an authentication error. Pausing before retrying.",
});
});

it("waits before repolling when a 403 error occurs", async () => {
const loggerDebug = sandbox.stub(logger, "debug");

const invalidSignatureErr = {
$metadata: {
httpStatusCode: 403,
Expand All @@ -444,9 +453,16 @@ describe("Consumer", () => {
sandbox.assert.calledTwice(errorListener);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage);

sandbox.assert.calledWith(loggerDebug, "authentication_error", {
code: "Unknown",
detail: "There was an authentication error. Pausing before retrying.",
});
});

it("waits before repolling when a UnknownEndpoint error occurs", async () => {
const loggerDebug = sandbox.stub(logger, "debug");

const unknownEndpointErr = {
name: "UnknownEndpoint",
message:
Expand All @@ -464,9 +480,16 @@ describe("Consumer", () => {
sandbox.assert.calledTwice(sqs.send);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage);

sandbox.assert.calledWith(loggerDebug, "authentication_error", {
code: "UnknownEndpoint",
detail: "There was an authentication error. Pausing before retrying.",
});
});

it("waits before repolling when a NonExistentQueue error occurs", async () => {
const loggerDebug = sandbox.stub(logger, "debug");

const nonExistentQueueErr = {
name: "AWS.SimpleQueueService.NonExistentQueue",
message: "The specified queue does not exist for this wsdl version.",
Expand All @@ -483,9 +506,16 @@ describe("Consumer", () => {
sandbox.assert.calledTwice(sqs.send);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage);

sandbox.assert.calledWith(loggerDebug, "authentication_error", {
code: "AWS.SimpleQueueService.NonExistentQueue",
detail: "There was an authentication error. Pausing before retrying.",
});
});

it("waits before repolling when a CredentialsProviderError error occurs", async () => {
const loggerDebug = sandbox.stub(logger, "debug");

const credentialsProviderErr = {
name: "CredentialsProviderError",
message: "Could not load credentials from any providers.",
Expand All @@ -502,6 +532,141 @@ describe("Consumer", () => {
sandbox.assert.calledTwice(sqs.send);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage);

sandbox.assert.calledWith(loggerDebug, "authentication_error", {
code: "CredentialsProviderError",
detail: "There was an authentication error. Pausing before retrying.",
});
});

it("waits before repolling when a InvalidAddress error occurs", async () => {
const loggerDebug = sandbox.stub(logger, "debug");

const credentialsProviderErr = {
name: "InvalidAddress",
message: "The address some-queue-url is not valid for this endpoint.",
};
sqs.send.withArgs(mockReceiveMessage).rejects(credentialsProviderErr);
const errorListener = sandbox.stub();
consumer.on("error", errorListener);

consumer.start();
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT);
consumer.stop();

sandbox.assert.calledTwice(errorListener);
sandbox.assert.calledTwice(sqs.send);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage);

sandbox.assert.calledWith(loggerDebug, "authentication_error", {
code: "InvalidAddress",
detail: "There was an authentication error. Pausing before retrying.",
});
});

it("waits before repolling when a InvalidSecurity error occurs", async () => {
const loggerDebug = sandbox.stub(logger, "debug");

const credentialsProviderErr = {
name: "InvalidSecurity",
message: "The queue is not is not HTTPS and SigV4.",
};
sqs.send.withArgs(mockReceiveMessage).rejects(credentialsProviderErr);
const errorListener = sandbox.stub();
consumer.on("error", errorListener);

consumer.start();
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT);
consumer.stop();

sandbox.assert.calledTwice(errorListener);
sandbox.assert.calledTwice(sqs.send);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage);

sandbox.assert.calledWith(loggerDebug, "authentication_error", {
code: "InvalidSecurity",
detail: "There was an authentication error. Pausing before retrying.",
});
});

it("waits before repolling when a QueueDoesNotExist error occurs", async () => {
const loggerDebug = sandbox.stub(logger, "debug");

const credentialsProviderErr = {
name: "QueueDoesNotExist",
message: "The queue does not exist.",
};
sqs.send.withArgs(mockReceiveMessage).rejects(credentialsProviderErr);
const errorListener = sandbox.stub();
consumer.on("error", errorListener);

consumer.start();
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT);
consumer.stop();

sandbox.assert.calledTwice(errorListener);
sandbox.assert.calledTwice(sqs.send);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage);

sandbox.assert.calledWith(loggerDebug, "authentication_error", {
code: "QueueDoesNotExist",
detail: "There was an authentication error. Pausing before retrying.",
});
});

it("waits before repolling when a RequestThrottled error occurs", async () => {
const loggerDebug = sandbox.stub(logger, "debug");

const credentialsProviderErr = {
name: "RequestThrottled",
message: "Requests have been throttled.",
};
sqs.send.withArgs(mockReceiveMessage).rejects(credentialsProviderErr);
const errorListener = sandbox.stub();
consumer.on("error", errorListener);

consumer.start();
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT);
consumer.stop();

sandbox.assert.calledTwice(errorListener);
sandbox.assert.calledTwice(sqs.send);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage);

sandbox.assert.calledWith(loggerDebug, "authentication_error", {
code: "RequestThrottled",
detail: "There was an authentication error. Pausing before retrying.",
});
});

it("waits before repolling when a RequestThrottled error occurs", async () => {
const loggerDebug = sandbox.stub(logger, "debug");

const credentialsProviderErr = {
name: "OverLimit",
message: "An over limit error.",
};
sqs.send.withArgs(mockReceiveMessage).rejects(credentialsProviderErr);
const errorListener = sandbox.stub();
consumer.on("error", errorListener);

consumer.start();
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT);
consumer.stop();

sandbox.assert.calledTwice(errorListener);
sandbox.assert.calledTwice(sqs.send);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage);

sandbox.assert.calledWith(loggerDebug, "authentication_error", {
code: "OverLimit",
detail: "There was an authentication error. Pausing before retrying.",
});
});

it("waits before repolling when a polling timeout is set", async () => {
Expand Down

0 comments on commit 45f0916

Please sign in to comment.