Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Add lock to discard stale data container
Browse files Browse the repository at this point in the history
Signed-off-by: Jesse Ikawa <jikawa@amazon.com>
  • Loading branch information
jikawa-az committed Sep 8, 2020
1 parent 5bfb8ef commit 3d75033
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,8 @@ class LogFileManager :
*/
FileObject<LogCollection> readBatch(size_t batch_size) override;

/*
If the user cfg options for discard_old_logs and a log is over 2 weeks old from
the latest time,it will be discarded. This is because the AWS API for PutLogEvents
rejects batches with log events older than 14 days.
*/
void discardFiles() override;

using Timestamp = long;
Timestamp latestTime = 0;
std::priority_queue<std::tuple<Timestamp, std::string, FileManagement::DataToken>> pq;
};

} // namespace Utils
Expand Down
51 changes: 13 additions & 38 deletions cloudwatch_logs_common/src/utils/log_file_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <fstream>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <tuple>

Expand All @@ -36,9 +37,9 @@ FileObject<LogCollection> LogFileManager::readBatch(
size_t batch_size)
{
FileManagement::DataToken data_token;
pq = std::priority_queue<std::tuple<Timestamp, std::string, FileManagement::DataToken>>();
AWS_LOG_INFO(__func__, "Reading Logbatch");


std::priority_queue<std::tuple<Timestamp, std::string, FileManagement::DataToken>> pq;
for (size_t i = 0; i < batch_size; ++i) {
std::string line;
if (!file_manager_strategy_->isDataAvailable()) {
Expand All @@ -51,6 +52,7 @@ FileObject<LogCollection> LogFileManager::readBatch(
pq.push(std::make_tuple(input_event.GetTimestamp(), line, data_token));
}

std::lock_guard<std::mutex> lock(active_discard_stale_data_mutex_);
latestTime = std::get<0>(pq.top());
LogCollection log_data;
std::list<FileManagement::DataToken> data_tokens;
Expand All @@ -65,53 +67,26 @@ FileObject<LogCollection> LogFileManager::readBatch(
log_data.push_front(input_event);
data_tokens.push_back(new_data_token);
}
else{
AWS_LOG_INFO(__func__, "Some logs were not batched since the time"
" difference was > 24 hours. Will try again in a separate batch./n"
"Logs read: %d, Logs batched: %d", batch_size, log_data.size()
);
break;
else if(file_manager_strategy_->discardStaleData() && latestTime - curTime > TWO_WEEK_IN_SEC){
stale_data_.push_back(new_data_token);
}
pq.pop();
}

if(batch_size != log_data.size()){
AWS_LOG_INFO(__func__, "%d logs were not batched since the time"
" difference was > 24 hours. Will try again in a separate batch./n"
, batch_size - log_data.size()
);
}

FileObject<LogCollection> file_object;
file_object.batch_data = log_data;
file_object.batch_size = log_data.size();
file_object.data_tokens = data_tokens;
return file_object;
}

void LogFileManager::discardFiles()
{
if (!file_manager_strategy_->discardOldLogs() || pq.empty()) {
return;
}

AWS_LOG_INFO(__func__, "Discarding old logs from Logbatch");

std::list<FileManagement::DataToken> data_tokens;
int logsDiscarded = 0;

while(!pq.empty()){
Timestamp curTime = std::get<0>(pq.top());
std::string line = std::get<1>(pq.top());
FileManagement::DataToken data_token = std::get<2>(pq.top());
if(latestTime - curTime > TWO_WEEK_IN_SEC){
file_manager_strategy_->resolve(data_token, true);
logsDiscarded++;
}
pq.pop();
}

if(logsDiscarded > 0){
AWS_LOG_INFO(__func__, "Some logs were discarded since the time"
" difference was > 14 days./n"
"Logs discarded: %d", logsDiscarded
);
}
}

void LogFileManager::write(const LogCollection & data) {
for (const Model::InputLogEvent &log: data) {
auto aws_str = log.Jsonize().View().WriteCompact();
Expand Down
12 changes: 6 additions & 6 deletions cloudwatch_logs_common/test/log_batch_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TestStrategy : public DataManagerStrategy {
return !logs.empty() && !data_tokens.empty();
}

bool discardOldLogs(){
bool discardStaleData(){
return options_.discard_old_logs;
}

Expand Down Expand Up @@ -95,7 +95,7 @@ class LogBatchTest : public ::testing::Test{
test_strategy->it = 0;
batch = file_manager->readBatch(test_strategy->logs.size());
resolveBatch();
file_manager->discardFiles();
file_manager->discardOldLogData();
ASSERT_TRUE(validateBatch());
}
}
Expand Down Expand Up @@ -230,23 +230,23 @@ TEST_F(LogBatchTest, test_2_week_no_discard) {
}
/**
* FileManagerStrategyOptions defined with discard_old_logs set to true.
* We expect discardOldLogs will return true.
* We expect discardStaleData will return true.
*/
TEST(DiscardOptionTest, file_manager_discard_true) {
FileManagerStrategyOptions options{"test", "log_tests/", ".log", 1024*1024, 1024*1024, true};
std::shared_ptr<FileManagerStrategy> file_manager_strategy = std::make_shared<FileManagerStrategy>(options);
LogFileManager file_manager(file_manager_strategy);
ASSERT_TRUE(file_manager_strategy->discardOldLogs());
ASSERT_TRUE(file_manager_strategy->discardStaleData());
}
/**
* FileManagerStrategyOptions defined with discard_old_logs set to false.
* We expect discardOldLogs will return false.
* We expect discardStaleData will return false.
*/
TEST(DiscardOptionTest, file_manager_discard_false) {
FileManagerStrategyOptions options{"test", "log_tests/", ".log", 1024*1024, 1024*1024, false};
std::shared_ptr<FileManagerStrategy> file_manager_strategy = std::make_shared<FileManagerStrategy>(options);
LogFileManager file_manager(file_manager_strategy);
ASSERT_FALSE(file_manager_strategy->discardOldLogs());
ASSERT_FALSE(file_manager_strategy->discardStaleData());
}

int main(int argc, char** argv)
Expand Down
39 changes: 36 additions & 3 deletions file_management/include/file_management/file_upload/file_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ class DataReader : public Service {

virtual void setStatusMonitor(std::shared_ptr<StatusMonitor> status_monitor) = 0;

virtual void discardFiles(){

};
virtual void discardOldLogData() = 0;
};

/**
Expand Down Expand Up @@ -214,6 +212,34 @@ class FileManager :
return false;
}

/*
If the user cfg options for discard_old_logs and a log is over 2 weeks old from
the latest time,it will be discarded. This is because the AWS API for PutLogEvents
rejects batches with log events older than 14 days.
*/
void discardOldLogData(){
if (stale_data_.empty()) {
return;
}

AWS_LOG_INFO(__func__, "Discarding old logs from Logbatch");

std::lock_guard<std::mutex> lock(active_discard_stale_data_mutex_);
std::list<FileManagement::DataToken> data_tokens;
int logsDiscarded = 0;
while(!stale_data_.empty()){
file_manager_strategy_->resolve(stale_data_.back(), true);
logsDiscarded++;
stale_data_.pop_back();
}

if(logsDiscarded > 0){
AWS_LOG_INFO(__func__, "%d logs were discarded since the time"
" difference was > 14 days./n", logsDiscarded
);
}
}

protected:
/**
* The object that keeps track of which files to delete, read, or write to.
Expand All @@ -231,6 +257,13 @@ class FileManager :
*/
std::shared_ptr<StatusMonitor> file_status_monitor_;

/**
* A lock on the stale_data_ vector to prevent elements from being read or modified to
* stale_data_ while data is being read or modified from it.
*/
std::mutex active_discard_stale_data_mutex_;
std::vector<FileManagement::DataToken> stale_data_;

};


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class DataManagerStrategy : public Service {

virtual bool isDataAvailable() = 0;

virtual bool discardOldLogs() = 0;
virtual bool discardStaleData() = 0;

virtual DataToken read(std::string &data) = 0;

Expand Down Expand Up @@ -304,9 +304,11 @@ class FileManagerStrategy : public DataManagerStrategy {
bool isDataAvailable() override;

/**
* Returns true if user set the option to remove older logs
* Discards stale data that won't be accepted by the CloudWatch API.
* This is done in a separate method so it doesn't block the current publishing task after
* reading the data.
*/
bool discardOldLogs() override;
bool discardStaleData() override;

/**
* Reads a line of data from file storage. The most recent data in storage is read first.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class FileUploadStreamer :
AWS_LOG_DEBUG(__func__,
"Enqueue failed");
}
data_reader_->discardFiles();
data_reader_->discardOldLogData();
}

private:
Expand Down
2 changes: 1 addition & 1 deletion file_management/src/file_upload/file_manager_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ bool FileManagerStrategy::isDataAvailable() {
return !active_read_file_.empty() || !stored_files_.empty() || active_write_file_size_ > 0;
}

bool FileManagerStrategy::discardOldLogs() {
bool FileManagerStrategy::discardStaleData() {
return options_.discard_old_logs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class MockDataReader :
void(std::shared_ptr<StatusMonitor> monitor));
MOCK_METHOD0(isDataAvailableToRead, bool());

void discardOldLogData() override {
return;
}

/**
* Set the observer for the queue.
*
Expand Down

0 comments on commit 3d75033

Please sign in to comment.