Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Allow user to automatically delete CloudWatch logs in batch that are > 14 days old #56

Merged
merged 8 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/

#pragma once
#include <queue>
#include <tuple>
#include <aws/logs/model/InputLogEvent.h>

#include <cloudwatch_logs_common/definitions/ros_cloudwatch_logs_errors.h>
Expand Down Expand Up @@ -53,9 +55,25 @@ class LogFileManager :

void write(const LogCollection & data) override;

/*
AWSClient will return 'InvalidParameterException' error when the log events in a
single batch span more than 24 hours. Therefore the readBatch function will only
return as many logs as can fit within the 24 hour span and the actual number of
logs batched may end up being less than the original batch_size.

We must sort the log data chronologically because it is not guaranteed
to be ordered chronologically in the file, but CloudWatch requires all
puts in a single batch to be sorted chronologically
*/
FileObject<LogCollection> readBatch(size_t batch_size) override;

using Timestamp = long;
Timestamp latestTime = 0;
};

} // namespace Utils

const long ONE_DAY_IN_SEC = 86400000;
const long TWO_WEEK_IN_SEC = 1209600000;
} // namespace CloudWatchLogs
} // namespace Aws
37 changes: 16 additions & 21 deletions cloudwatch_logs_common/src/utils/log_file_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <fstream>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <tuple>

Expand All @@ -27,29 +28,17 @@
#include <aws/core/utils/logging/LogMacros.h>
#include <cloudwatch_logs_common/definitions/definitions.h>

const long ONE_DAY_IN_SEC = 86400000;

namespace Aws {
namespace CloudWatchLogs {
namespace Utils {

/*
AWSClient will return 'InvalidParameterException' error when the log events in a
single batch span more than 24 hours. Therefore the readBatch function will only
return as many logs as can fit within the 24 hour span and the actual number of
logs batched may end up being less than the original batch_size.

We must sort the log data chronologically because it is not guaranteed
to be ordered chronologically in the file, but CloudWatch requires all
puts in a single batch to be sorted chronologically
*/
FileObject<LogCollection> LogFileManager::readBatch(
size_t batch_size)
{
FileManagement::DataToken data_token;
AWS_LOG_INFO(__func__, "Reading Logbatch");

using Timestamp = long;

std::priority_queue<std::tuple<Timestamp, std::string, FileManagement::DataToken>> pq;
for (size_t i = 0; i < batch_size; ++i) {
std::string line;
Expand All @@ -62,8 +51,8 @@ FileObject<LogCollection> LogFileManager::readBatch(
Aws::CloudWatchLogs::Model::InputLogEvent input_event(value);
pq.push(std::make_tuple(input_event.GetTimestamp(), line, data_token));
}

Timestamp latestTime = std::get<0>(pq.top());
latestTime = std::get<0>(pq.top());
LogCollection log_data;
std::list<FileManagement::DataToken> data_tokens;
while(!pq.empty()){
Expand All @@ -77,16 +66,22 @@ FileObject<LogCollection> LogFileManager::readBatch(
log_data.push_front(input_event);
data_tokens.push_back(new_data_token);
}
else{
AWS_LOG_INFO(__func__, "Some logs were not batched since the time"
" difference was > 24 hours. Will try again in a separate batch./n"
"Logs read: %d, Logs batched: %d", batch_size, log_data.size()
);
break;
else if(file_manager_strategy_->isDeleteStaleData() && latestTime - curTime > TWO_WEEK_IN_SEC){
{
std::lock_guard<std::mutex> lock(active_delete_stale_data_mutex_);
stale_data_.push_back(new_data_token);
}
}
pq.pop();
}

if(batch_size != log_data.size()){
AWS_LOG_INFO(__func__, "%d logs were not batched since the time"
" difference was > 24 hours. Will try again in a separate batch./n"
, batch_size - log_data.size()
);
}

FileObject<LogCollection> file_object;
file_object.batch_data = log_data;
file_object.batch_size = log_data.size();
Expand Down
209 changes: 164 additions & 45 deletions cloudwatch_logs_common/test/log_batch_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,52 +20,60 @@ using Aws::CloudWatchLogs::Utils::LogFileManager;
using namespace Aws::CloudWatchLogs;
using namespace Aws::FileManagement;

const long ONE_DAY_IN_SEC = 86400000;

class TestStrategy : public DataManagerStrategy {
public:
TestStrategy(const FileManagerStrategyOptions &options) {
options_ = options;
}

bool isDataAvailable(){
return true;
return !logs.empty() && !data_tokens.empty();
}

bool isDeleteStaleData(){
return options_.delete_stale_data;
}

DataToken read(std::string &data) override{
if(!logs.empty()){
data = logs.back();
logs.pop_back();
if(isDataAvailable()){
it++;
data = logs[it-1];
return data_tokens[it-1];
}

data_token++;
return data_token;
return 0;
}

void write(const std::string &data){
logs.push_back(data);
data_tokens.push_back(data_token);
data_token++;
}

void resolve(const DataToken &token, bool is_success){
if(is_success && token)
return;
else
return;
if(is_success){
for(int i = 0; i < (int)data_tokens.size(); i++){
if(data_tokens[i] == token){
data_tokens.erase(data_tokens.begin()+i);
logs.erase(logs.begin()+i);
return;
}
}
}
return;
}

std::vector<std::string> logs;

protected:

uint64_t data_token = 0;

/**
* Options for how and where to store files, and maximum file sizes.
*/
std::vector<uint64_t> data_tokens;
uint64_t data_token = 1;
int it = 0;
FileManagerStrategyOptions options_;
};

class LogBatchTest : public ::testing::Test{
public:
void SetUp() override {
test_strategy = std::make_shared<TestStrategy>();
test_strategy = std::make_shared<TestStrategy>(options);
file_manager = std::make_unique<LogFileManager>(test_strategy);
}

Expand All @@ -82,52 +90,163 @@ class LogBatchTest : public ::testing::Test{
}
}

void validateBatch(const std::vector<long> & timestamps){
auto it = batch.batch_data.begin();
for (auto ts : timestamps){
ASSERT_EQ(ts, (*it).GetTimestamp());
it++;
void readLogs(){
while(test_strategy->isDataAvailable()){
test_strategy->it = 0;
batch = file_manager->readBatch(test_strategy->logs.size());
resolveBatch();
file_manager->deleteStaleData();
ASSERT_TRUE(validateBatch());
}
}

void resolveBatch(){
for (auto dt : batch.data_tokens){
test_strategy->resolve(dt, true);
}
}

//use test_strategy to mock read/write functions from data_manager_strategy
//validate that the batch produced matches with the user's expectation
bool validateBatch(){
for(auto bd : batch.batch_data){
if(expectedTimeStamps.empty())
return false;

long expectedTimestamp = expectedTimeStamps.front().back();
expectedTimeStamps.front().pop_back();

if(expectedTimeStamps.front().empty())
expectedTimeStamps.erase(expectedTimeStamps.begin());

if(bd.GetTimestamp() != expectedTimestamp){
return false;
}
}

return true;
}

std::shared_ptr<TestStrategy> test_strategy;
std::unique_ptr<LogFileManager> file_manager;
LogCollection log_data;
Aws::CloudWatchLogs::Model::InputLogEvent input_event;
std::vector<long> timestamps;
FileObject<LogCollection> batch;

//FileManagerStrategyOptions includes a new option "delete_stale_data" which we will
//set to true for testing
FileManagerStrategyOptions options{"test", "log_tests/", ".log", 1024*1024, 1024*1024, true};

//the expecteTimeStamps will hold the log batch results we expect from completion of
//the readBatch function
std::vector<std::vector<long>> expectedTimeStamps;
jikawa-az marked this conversation as resolved.
Show resolved Hide resolved
};

/**
* Test that the upload complete with CW Failure goes to a file.
* This test will group all logs into one batch since they are all
* within 24 hours of each other
*/
TEST_F(LogBatchTest, test_readBatch_1_batch) {
timestamps = {1, 3, 0, ONE_DAY_IN_SEC-1, 4, 2};
expectedTimeStamps = {{ONE_DAY_IN_SEC-1, 4, 3, 2, 1, 0}};
createLogs(timestamps);
file_manager->write(log_data);
readLogs();
}
/**
* This test will group half the logs into one batch and half into another
* since there is > 24 hour difference from ONE_DAY_IN_SEC+2 and 2
*/
TEST_F(LogBatchTest, test_readBatch_3_of_6_pass) {
TEST_F(LogBatchTest, test_readBatch_2_batches) {
timestamps = {ONE_DAY_IN_SEC+1, 2, ONE_DAY_IN_SEC+2, 1, 0, ONE_DAY_IN_SEC};
expectedTimeStamps = {{ONE_DAY_IN_SEC+2, ONE_DAY_IN_SEC+1, ONE_DAY_IN_SEC}, {2, 1, 0}};
createLogs(timestamps);
file_manager->write(log_data);
batch = file_manager->readBatch(test_strategy->logs.size());
ASSERT_EQ(3u, batch.batch_size);
timestamps = {ONE_DAY_IN_SEC, ONE_DAY_IN_SEC+1, ONE_DAY_IN_SEC+2};
validateBatch(timestamps);
readLogs();
}
jikawa-az marked this conversation as resolved.
Show resolved Hide resolved
TEST_F(LogBatchTest, test_readBatch_6_of_6_pass) {
timestamps = {1, 3, 0, ONE_DAY_IN_SEC-1, 4, 2};
/**
* This test will group three different batches since ONE_DAY_IN_SEC*2+10,
* ONE_DAY_IN_SEC+5, and 4 are all > 24 hours apart
*/
TEST_F(LogBatchTest, test_readBatch_3_batches) {
timestamps = {1, ONE_DAY_IN_SEC+5, 4, 2, 0, ONE_DAY_IN_SEC*2+10};
expectedTimeStamps = {{ONE_DAY_IN_SEC*2+10}, {ONE_DAY_IN_SEC+5},{4, 2, 1, 0}};
createLogs(timestamps);
file_manager->write(log_data);
readLogs();
}
/**
* We defined delete_stale_data as true in our FileManagerStrategyOptions
* In this test we expect that there will be two separate batches
* separated by > 24 hours and we expect that timestamp 0 will be deleted
* since it is > 14 days old.
*/
TEST_F(LogBatchTest, test_2_week_delete_1_of_6) {
timestamps = {15, ONE_DAY_IN_SEC, 0, TWO_WEEK_IN_SEC+5, TWO_WEEK_IN_SEC+1, 10};
expectedTimeStamps = {{TWO_WEEK_IN_SEC+5, TWO_WEEK_IN_SEC+1}, {ONE_DAY_IN_SEC, 15, 10}};
createLogs(timestamps);
file_manager->write(log_data);
readLogs();
}
/**
* We defined delete_stale_data as true in our FileManagerStrategyOptions
* In this test we expect that there will be two separate batches
* separated by > 24 hours and we expect that timestamp 0, 1, 3, 4 will be
* deleted since it is > 14 days old.
*/
TEST_F(LogBatchTest, test_2_week_delete_4_of_6) {
timestamps = {1, ONE_DAY_IN_SEC, 4, TWO_WEEK_IN_SEC+5, 0, 3};
expectedTimeStamps = {{TWO_WEEK_IN_SEC+5}, {ONE_DAY_IN_SEC}};
createLogs(timestamps);
file_manager->write(log_data);
batch = file_manager->readBatch(test_strategy->logs.size());
ASSERT_EQ(6u, batch.batch_size);
timestamps = {0, 1, 2, 3, 4, ONE_DAY_IN_SEC-1};
validateBatch(timestamps);
readLogs();
}
TEST_F(LogBatchTest, test_readBatch_1_of_6_pass) {
timestamps = {1, ONE_DAY_IN_SEC+5, 4, 2, 0, 3};
/**
* We defined delete_stale_data as true in our FileManagerStrategyOptions
* In this test we expect that there will be two separate batches
* separated by > 24 hours and we expect that timestamp 0, 1, 3, 4, and
* ONE_DAY_IN_SEC will be deleted since it is > 14 days old.
*/
TEST_F(LogBatchTest, test_2_week_delete_5_of_6) {
timestamps = {1, ONE_DAY_IN_SEC, 4, TWO_WEEK_IN_SEC+ONE_DAY_IN_SEC+5, 0, 3};
expectedTimeStamps = {{TWO_WEEK_IN_SEC+ONE_DAY_IN_SEC+5}};
createLogs(timestamps);
file_manager->write(log_data);
readLogs();
}
/**
* We defined delete_stale_data as true in our FileManagerStrategyOptions
* In this test, we set the option delete_stale_data to false and expect
* that none of the logs will be deleted. We expect three batches separated
* by > 24 hours
*/
TEST_F(LogBatchTest, test_2_week_no_delete) {
test_strategy->options_.delete_stale_data = false;
timestamps = {1, ONE_DAY_IN_SEC, 4, TWO_WEEK_IN_SEC+5, 0, 3};
expectedTimeStamps = {{TWO_WEEK_IN_SEC+5},{ONE_DAY_IN_SEC, 4, 3, 1}, {0}};
createLogs(timestamps);
file_manager->write(log_data);
batch = file_manager->readBatch(test_strategy->logs.size());
ASSERT_EQ(1u, batch.batch_size);
timestamps = {ONE_DAY_IN_SEC+5};
validateBatch(timestamps);
readLogs();
}
/**
* FileManagerStrategyOptions defined with delete_stale_data set to true.
* We expect isDeleteStaleData to return true.
*/
TEST(DeleteOptionTest, file_manager_delete_true) {
FileManagerStrategyOptions options{"test", "log_tests/", ".log", 1024*1024, 1024*1024, true};
std::shared_ptr<FileManagerStrategy> file_manager_strategy = std::make_shared<FileManagerStrategy>(options);
LogFileManager file_manager(file_manager_strategy);
ASSERT_TRUE(file_manager_strategy->isDeleteStaleData());
}
/**
* FileManagerStrategyOptions defined with delete_stale_data set to false.
* We expect isDeleteStaleData to return false.
*/
TEST(DeleteOptionTest, file_manager_delete_false) {
FileManagerStrategyOptions options{"test", "log_tests/", ".log", 1024*1024, 1024*1024, false};
std::shared_ptr<FileManagerStrategy> file_manager_strategy = std::make_shared<FileManagerStrategy>(options);
LogFileManager file_manager(file_manager_strategy);
ASSERT_FALSE(file_manager_strategy->isDeleteStaleData());
}

int main(int argc, char** argv)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ static const Aws::FileManagement::FileManagerStrategyOptions kDefaultMetricFileM
"cwmetric",
Aws::FileManagement::kDefaultFileManagerStrategyOptions.file_extension,
Aws::FileManagement::kDefaultFileManagerStrategyOptions.maximum_file_size_in_kb,
Aws::FileManagement::kDefaultFileManagerStrategyOptions.storage_limit_in_kb
Aws::FileManagement::kDefaultFileManagerStrategyOptions.storage_limit_in_kb,
Aws::FileManagement::kDefaultFileManagerStrategyOptions.delete_stale_data
};

static const CloudWatchOptions kDefaultCloudWatchOptions{
Expand Down
Loading