Skip to content

Commit

Permalink
handle the throtle error for pagination scenarion (#368)
Browse files Browse the repository at this point in the history
Co-authored-by: Rakhi Mundhada <rakhi.mundhada@L7130100084000.local>
  • Loading branch information
rakhimundhada15 and Rakhi Mundhada committed May 30, 2024
1 parent 92a0c8b commit bfcf5d4
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 21 deletions.
36 changes: 18 additions & 18 deletions collectors/googlestackdriver/collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const packageJson = require('./package.json');
const protoFiles = require('google-proto-files');

const API_THROTTLING_ERROR = 8;
const API_THROTTLING_STATUS_CODE = 429;
const MAX_POLL_INTERVAL = 900;
const MAX_PAGE_SIZE = 1000;
const AUDIT_PAYLOAD_TYPE_URL = 'type.googleapis.com/google.cloud.audit.AuditLog';
Expand Down Expand Up @@ -132,7 +133,7 @@ timestamp < "${state.until}"`;
return callback(null, logs, newState, newState.poll_interval_sec);
})
.catch(err => {
AlLogger.error(`GSTA000003 err in collection ${err}`);
AlLogger.error(`GSTA000003 err in collection ${JSON.stringify(err.details)}`);

// Stackdriver Logging api has some rate limits that we might run into.
// If we run inot a rate limit error, instead of returning the error,
Expand All @@ -141,32 +142,31 @@ timestamp < "${state.until}"`;
// Error: 8 RESOURCE_EXHAUSTED: Received message larger than max (4518352 vs. 4194304),
// so half the given interval and if interval is less than 15 sec then reduce the page size to half.

if(err.code === API_THROTTLING_ERROR){
if (err.code === API_THROTTLING_ERROR || (err.response && err.response.status === API_THROTTLING_STATUS_CODE)) {
const currentInterval = moment(state.until).diff(state.since, 'seconds');
const interval = state.poll_interval_sec < 60 ? 60 : state.poll_interval_sec;
const nextPollInterval = state.poll_interval_sec < MAX_POLL_INTERVAL ?
interval + 60 : MAX_POLL_INTERVAL;
const currentInterval = moment(state.until).diff(state.since, 'seconds');
if (currentInterval <= 15 && err.details.includes('Received message larger than max')) {
// Reduce the page size to half to pull the data for throttling interval
if (state.nextPage && state.nextPage.pageSize) {
state.nextPage.pageSize = Math.ceil(state.nextPage.pageSize / 2);
}
else {
state.pageSize = Math.ceil(params.pageSize / 2)

if (state.nextPage && state.nextPage.pageSize) {
state.nextPage.pageSize = Math.ceil(state.nextPage.pageSize / 2);
AlLogger.debug(`Throttling error with nextPage: ${err.message}. Retrying with smaller pageSize.`);
} else {
if (currentInterval <= 15 && err.details.includes('Received message larger than max')) {
state.pageSize = state.pageSize ? Math.ceil(state.pageSize / 2) : Math.ceil(params.pageSize / 2);
AlLogger.debug(`Throttling error with no nextPage and large message: ${err.message}. Reducing pageSize.`);
} else {
state.until = moment(state.since).add(Math.ceil(currentInterval / 2), 'seconds').toISOString();
AlLogger.debug(`Throttling error with no nextPage: ${err.message}. Reducing time range.`);
}
AlLogger.warn(`RESOURCE_EXHAUSTED for ${currentInterval} sec time interval`);
}
else {
state.until = moment(state.since).add(Math.ceil(currentInterval / 2), 'seconds').toISOString();
}
const backOffState = Object.assign({}, state, {poll_interval_sec:nextPollInterval});
collector.reportApiThrottling(function () {
return callback(null, [], backOffState, nextPollInterval);
});
}
else {
// set errorCode if not available in error object to showcase client error on DDMetrics
if (err.code) {
} else {
// set errorCode if not available in error object to showcase client error on DDMetrics
if (err.code) {
err.errorCode = err.code;
}
return callback(err);
Expand Down
2 changes: 1 addition & 1 deletion collectors/googlestackdriver/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "googlestackdriver-collector",
"version": "1.2.9",
"version": "1.2.10",
"description": "Alert Logic AWS based Googlestackdriver Log Collector",
"repository": {},
"private": true,
Expand Down
36 changes: 35 additions & 1 deletion collectors/googlestackdriver/test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ describe('Unit Tests', function() {
});
});

it(`Get Logs check API Throttling with 'Received message larger than max (4776477 vs. 4194304)' for time interval less than 15 sec then check with reduce page size able to fetch the data`, function(done) {
it(`Get Logs check API Throttling when going through pagination then check if it reduce page size and able to fetch the data`, function(done) {
logginClientStub = sinon.stub(logging.v2.LoggingServiceV2Client.prototype, 'listLogEntries');

logginClientStub.onCall(0).callsFake(() => {
Expand Down Expand Up @@ -232,6 +232,40 @@ describe('Unit Tests', function() {
});
});

it(`Get Logs check API Throttling with 'Received message larger than max (4776477 vs. 4194304)' for time interval less than 15 sec then check with reduce page size able to fetch the data`, function(done) {
logginClientStub = sinon.stub(logging.v2.LoggingServiceV2Client.prototype, 'listLogEntries');

logginClientStub.onCall(0).callsFake(() => {
return new Promise((res, rej) => {
rej({code: 8,
details: 'Received message larger than max (4776477 vs. 4194304)'});
});
});

GooglestackdriverCollector.load().then(function(creds) {
var collector = new GooglestackdriverCollector(ctx, creds);
const startDate = moment().subtract(3, 'days');
const curState = {
since: startDate.toISOString(),
until: startDate.add(15, 'seconds').toISOString(),
stream: "projects/project-test",
poll_interval_sec: 60
};

var reportSpy = sinon.spy(collector, 'reportApiThrottling');
let putMetricDataStub = sinon.stub(CloudWatch.prototype, 'putMetricData').callsFake((params, callback) => callback());
collector.pawsGetLogs(curState, (err, logs, newState, newPollInterval) =>{
assert.equal(newState.pageSize, 500);
assert.equal(true, reportSpy.calledOnce);
assert.equal(logs.length, 0);
assert.equal(newPollInterval, 120);
restoreLoggingClientStub();
putMetricDataStub.restore();
done();
});
});
});

it('Stops paginiating at the pagination limit', function(done) {
logginClientStub = sinon.stub(logging.v2.LoggingServiceV2Client.prototype, 'listLogEntries');

Expand Down
2 changes: 1 addition & 1 deletion ps_spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ stages:
- ./build_collector.sh googlestackdriver
env:
ALPS_SERVICE_NAME: "paws-googlestackdriver-collector"
ALPS_SERVICE_VERSION: "1.2.9" #set the value from collector package json
ALPS_SERVICE_VERSION: "1.2.10" #set the value from collector package json
outputs:
file: ./googlestackdriver-collector*
packagers:
Expand Down

0 comments on commit bfcf5d4

Please sign in to comment.