Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
modify struct and add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Jesse Ikawa <jikawa@amazon.com>
  • Loading branch information
jikawa-az committed Sep 3, 2020
1 parent 74c1486 commit 9bcaf27
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ class LogFileManager :
return as many logs as can fit within the 24 hour span and the actual number of
logs batched may end up being less than the original batch_size.
If a log is over 2 weeks old from the latest time,it will be discarded.
If the user cfg options for discard_old_logs and a log is over 2 weeks old from
the latest time,it will be discarded. This is because the AWS API for PutLogEvents
rejects batches with log events older than 14 days.
We must sort the log data chronologically because it is not guaranteed
to be ordered chronologically in the file, but CloudWatch requires all
Expand Down
33 changes: 16 additions & 17 deletions cloudwatch_logs_common/src/utils/log_file_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ FileObject<LogCollection> LogFileManager::readBatch(
Timestamp latestTime = std::get<0>(pq.top());
LogCollection log_data;
std::list<FileManagement::DataToken> data_tokens;
LogCollection discard_logs;
std::list<FileManagement::DataToken> discard_tokens;
int logsDiscarded = 0, logsIgnored = 0;
while(!pq.empty()){
Timestamp curTime = std::get<0>(pq.top());
std::string line = std::get<1>(pq.top());
Expand All @@ -71,30 +70,30 @@ FileObject<LogCollection> LogFileManager::readBatch(
log_data.push_front(input_event);
data_tokens.push_back(new_data_token);
}
else if(latestTime - curTime > TWO_WEEK_IN_SEC){
Aws::String aws_line(line.c_str());
Aws::Utils::Json::JsonValue value(aws_line);
Aws::CloudWatchLogs::Model::InputLogEvent input_event(value);
discard_logs.push_front(input_event);
discard_tokens.push_back(new_data_token);
else if(file_manager_strategy_->discardOldLogs() && latestTime - curTime > TWO_WEEK_IN_SEC){
file_manager_strategy_->resolve(new_data_token, true);
logsDiscarded++;
}
else{
logsIgnored++;
}
pq.pop();
}

if(log_data.size() != batch_size){
if(logsDiscarded > 0){
AWS_LOG_INFO(__func__, "Some logs were discarded since the time"
" difference was > 14 days./n"
"Logs read: %d, Logs discarded: %d", batch_size, logsDiscarded
);
}

if(logsIgnored > 0){
AWS_LOG_INFO(__func__, "Some logs were not batched since the time"
" difference was > 24 hours. Will try again in a separate batch./n"
"Logs read: %d, Logs batched: %d", batch_size, log_data.size()
"Logs read: %d, Logs batched: %d", batch_size, logsIgnored
);
}

FileObject<LogCollection> discard_files;
file_object.batch_data = discard_logs;
file_object.batch_size = discard_logs.size();
file_object.data_tokens = discard_tokens;

fileUploadCompleteStatus(Aws::DataFlow::UploadStatus::SUCCESS, discard_files);

FileObject<LogCollection> file_object;
file_object.batch_data = log_data;
file_object.batch_size = log_data.size();
Expand Down
124 changes: 86 additions & 38 deletions cloudwatch_logs_common/test/log_batch_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,51 +21,62 @@ using namespace Aws::CloudWatchLogs;
using namespace Aws::FileManagement;

const long ONE_DAY_IN_SEC = 86400000;
const long TWO_WEEK_IN_SEC = 1209600000;

class TestStrategy : public DataManagerStrategy {
public:
TestStrategy(const FileManagerStrategyOptions &options) {
options_ = options;
}

bool isDataAvailable(){
return true;
return !logs.empty() && !data_tokens.empty();
}

bool discardOldLogs(){
return options_.discard_2_week_logs;
}

DataToken read(std::string &data) override{
if(!logs.empty()){
data = logs.back();
logs.pop_back();
if(isDataAvailable()){
it++;
data = logs[it-1];
return data_tokens[it-1];
}

data_token++;
return data_token;
return 0;
}

void write(const std::string &data){
logs.push_back(data);
data_tokens.push_back(data_token);
data_token++;
}

void resolve(const DataToken &token, bool is_success){
if(is_success && token)
return;
else
return;
if(is_success){
for(int i = 0; i < (int)data_tokens.size(); i++){
if(data_tokens[i] == token){
data_tokens.erase(data_tokens.begin()+i);
logs.erase(logs.begin()+i);
return;
}
}
}
return;
}

std::vector<std::string> logs;

protected:

uint64_t data_token = 0;

/**
* Options for how and where to store files, and maximum file sizes.
*/
std::vector<uint64_t> data_tokens;
uint64_t data_token = 1;
int it = 0;
FileManagerStrategyOptions options_;
};

class LogBatchTest : public ::testing::Test{
public:
void SetUp() override {
test_strategy = std::make_shared<TestStrategy>();
test_strategy = std::make_shared<TestStrategy>(options);
file_manager = std::make_unique<LogFileManager>(test_strategy);
}

Expand All @@ -82,52 +93,89 @@ class LogBatchTest : public ::testing::Test{
}
}

void validateBatch(const std::vector<long> & timestamps){
auto it = batch.batch_data.begin();
for (auto ts : timestamps){
ASSERT_EQ(ts, (*it).GetTimestamp());
it++;
void readLogs(){
while(test_strategy->isDataAvailable()){
test_strategy->it = 0;
batch = file_manager->readBatch(test_strategy->logs.size());
resolveBatch();
ASSERT_TRUE(validateBatch());
}
}

//use test_strategy to mock read/write functions from data_manager_strategy
void resolveBatch(){
for (auto dt : batch.data_tokens){
test_strategy->resolve(dt, true);
}
}

//validate that the batch produced matches with the user's expectation
bool validateBatch(){
for(auto bd : batch.batch_data){
if(expectedTimeStamps.empty())
return false;

long expectedTimestamp = expectedTimeStamps.front().back();
expectedTimeStamps.front().pop_back();

if(expectedTimeStamps.front().empty())
expectedTimeStamps.erase(expectedTimeStamps.begin());

if(bd.GetTimestamp() != expectedTimestamp){
return false;
}
}

return true;
}

std::shared_ptr<TestStrategy> test_strategy;
std::unique_ptr<LogFileManager> file_manager;
LogCollection log_data;
Aws::CloudWatchLogs::Model::InputLogEvent input_event;
std::vector<long> timestamps;
FileObject<LogCollection> batch;
FileManagerStrategyOptions options{"test", "log_tests/", ".log", 1024*1024, 1024*1024, true};
std::vector<std::vector<long>> expectedTimeStamps;
};

/**
* Test that the upload complete with CW Failure goes to a file.
*/
TEST_F(LogBatchTest, test_readBatch_3_of_6_pass) {
timestamps = {ONE_DAY_IN_SEC+1, 2, ONE_DAY_IN_SEC+2, 1, 0, ONE_DAY_IN_SEC};
expectedTimeStamps = {{ONE_DAY_IN_SEC+2, ONE_DAY_IN_SEC+1, ONE_DAY_IN_SEC}, {2, 1, 0}};
createLogs(timestamps);
file_manager->write(log_data);
batch = file_manager->readBatch(test_strategy->logs.size());
ASSERT_EQ(3u, batch.batch_size);
timestamps = {ONE_DAY_IN_SEC, ONE_DAY_IN_SEC+1, ONE_DAY_IN_SEC+2};
validateBatch(timestamps);
readLogs();
}
TEST_F(LogBatchTest, test_readBatch_6_of_6_pass) {
TEST_F(LogBatchTest, test_readBatch_1_batch) {
timestamps = {1, 3, 0, ONE_DAY_IN_SEC-1, 4, 2};
expectedTimeStamps = {{ONE_DAY_IN_SEC-1, 4, 3, 2, 1, 0}};
createLogs(timestamps);
file_manager->write(log_data);
batch = file_manager->readBatch(test_strategy->logs.size());
ASSERT_EQ(6u, batch.batch_size);
timestamps = {0, 1, 2, 3, 4, ONE_DAY_IN_SEC-1};
validateBatch(timestamps);
readLogs();
}
TEST_F(LogBatchTest, test_readBatch_1_of_6_pass) {
timestamps = {1, ONE_DAY_IN_SEC+5, 4, 2, 0, 3};
expectedTimeStamps = {{ONE_DAY_IN_SEC+5},{4, 3, 2, 1, 0}};
createLogs(timestamps);
file_manager->write(log_data);
readLogs();
}
TEST_F(LogBatchTest, test_2_week_discard) {
timestamps = {1, ONE_DAY_IN_SEC, 4, TWO_WEEK_IN_SEC+5, 0, 3};
expectedTimeStamps = {{TWO_WEEK_IN_SEC+5}, {ONE_DAY_IN_SEC}};
createLogs(timestamps);
file_manager->write(log_data);
readLogs();
}
TEST_F(LogBatchTest, test_2_week_no_discard) {
test_strategy->options_.discard_2_week_logs = false;
timestamps = {1, ONE_DAY_IN_SEC, 4, TWO_WEEK_IN_SEC+5, 0, 3};
expectedTimeStamps = {{TWO_WEEK_IN_SEC+5},{ONE_DAY_IN_SEC, 4, 3, 1}, {0}};
createLogs(timestamps);
file_manager->write(log_data);
batch = file_manager->readBatch(test_strategy->logs.size());
ASSERT_EQ(1u, batch.batch_size);
timestamps = {ONE_DAY_IN_SEC+5};
validateBatch(timestamps);
readLogs();
}

int main(int argc, char** argv)
Expand Down
10 changes: 8 additions & 2 deletions file_management/include/file_management/file_manager_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ struct FileManagerStrategyOptions {
std::string _file_prefix,
std::string _file_extension,
size_t _maximum_file_size,
size_t _storage_limit)
size_t _storage_limit,
bool _discard_2_week_logs = false)
: storage_directory(std::move(_storage_directory)),
file_prefix(std::move(_file_prefix)),
file_extension(std::move(_file_extension)),
maximum_file_size_in_kb(_maximum_file_size),
storage_limit_in_kb(_storage_limit) {}
storage_limit_in_kb(_storage_limit),
discard_2_week_logs(_discard_2_week_logs) {}

/**
* The path to the folder where all files are stored. Can be absolute or relative
Expand All @@ -64,6 +66,10 @@ struct FileManagerStrategyOptions {
* After this limit is reached files will start to be deleted, oldest first.
*/
size_t storage_limit_in_kb{};
/**
* Option for the user to discard logs older than 2-weeks
*/
bool discard_2_week_logs;
};

static const FileManagerStrategyOptions kDefaultFileManagerStrategyOptions{"~/.ros/cwlogs", "cwlog", ".log", 1024, 1024*1024};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ class DataManagerStrategy : public Service {

virtual bool isDataAvailable() = 0;

virtual bool discardOldLogs() = 0;

virtual DataToken read(std::string &data) = 0;

virtual void write(const std::string &data) = 0;
Expand Down Expand Up @@ -301,6 +303,12 @@ class FileManagerStrategy : public DataManagerStrategy {
*/
bool isDataAvailable() override;

/**
* Returns true if there is offline data on disk awaiting to be uploaded, false otherwise.
* @return bool if there is data available
*/
bool discardOldLogs() override;

/**
* Reads a line of data from file storage. The most recent data in storage is read first.
* This returns a DataToken which should be passed to resolve() when the caller is done
Expand Down
4 changes: 4 additions & 0 deletions file_management/src/file_upload/file_manager_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ bool FileManagerStrategy::isDataAvailable() {
return !active_read_file_.empty() || !stored_files_.empty() || active_write_file_size_ > 0;
}

bool FileManagerStrategy::discardOldLogs() {
return options_.discard_2_week_logs;
}

void FileManagerStrategy::write(const std::string &data) {
try {
checkIfWriteFileShouldRotate(data.size());
Expand Down

0 comments on commit 9bcaf27

Please sign in to comment.