Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add messages' size to metrics in audit message for ubuntu-12.04 #27

Open
wants to merge 4 commits into
base: ubuntu-12.04
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions if/audit.thrift
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
#!/usr/local/bin/thrift --gen cpp:pure_enums --gen php
#!/usr/local/bin/thrift --gen cpp:pure_enums --gen java

namespace cpp audit.thrift
namespace java com.inmobi.audit.thrift
namespace cpp audit.thrift

struct AuditMetrics {
1: i64 count,
2: i64 size
}

struct AuditMessage
{
1: i64 timestamp,
2: string topic,
3: string tier,
4: string hostname,
5: i32 windowSize,
6: map<i64, i64> received,
7: map<i64, i64> sent,
8: list<string> filenames,
9: map<string, string> tags
}
1: i64 timestamp,
2: string topic,
3: string tier,
4: string hostname,
5: i32 windowSize,
6: map<i64,i64> received,
7: map<i64,i64> sent,
8: list<string> filenames,
9: map<string, string> tags,
10: map<i64, AuditMetrics> receivedMetrics,
11: map<i64, AuditMetrics> sentMetrics
}
132 changes: 101 additions & 31 deletions src/scribe_audit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ AuditManager::AuditManager(const shared_ptr<StoreQueue> pAuditStore) {
tier = "scribe";
windowSize = 60;
}
//add audit version entry to tags map
tags.insert(std::pair<std::string, std::string>("auditVersion", currentAuditVersion));
}

AuditManager::~AuditManager() {
Expand All @@ -113,11 +115,17 @@ void AuditManager::auditMessage(const LogEntry& entry, bool received) {

// get the timestamp of message
unsigned long long tsKey = 0;
int msgSize = 0;
try {
tsKey = validateMessageAndGetTimestamp(entry);

// if tsKey is 0, then probably message doesn't have a valid header; hence skip it
if (tsKey == 0)
if (validateMessage(entry)) {
tsKey = getTimestamp(entry);
msgSize = getMsgSize(entry);
LOG_OPER("[_audit] validate audit message, tsKey [%llu] and msgSize [%d]", tsKey, msgSize);
} else {
LOG_OPER("[Audit] msg not valid");
}
// if tsKey is 0 or msgSize is 0, then message doesn't have a valid header; hence skip it
if (tsKey == 0 || msgSize == 0)
return;
} catch (const std::exception& e) {
LOG_OPER("[Audit] Failed to validate message. Error <%s>", e.what());
Expand All @@ -137,7 +145,7 @@ void AuditManager::auditMessage(const LogEntry& entry, bool received) {
shared_ptr<audit_msg_t> audit_msg = getAuditMsg(entry.category);

// update audit message counter for the given message
updateAuditMessageCounter(audit_msg, tsKey, received);
updateAuditMessageCounter(audit_msg, tsKey, received, msgSize);

// finally, release the audit RW mutex
auditRWMutex->release();
Expand Down Expand Up @@ -166,7 +174,6 @@ void AuditManager::auditMessages(shared_ptr<logentry_vector_t>& messages,
if (!auditStore->isAuditStore()) {
return;
}

// acquire read lock on auditRWMutex. This allows multiple threads to audit their
// messages concurrently when audit store queue thread is not performing periodic
// task to generate audit messages from maps.
Expand All @@ -184,20 +191,24 @@ void AuditManager::auditMessages(shared_ptr<logentry_vector_t>& messages,

for (unsigned long index = offset; index < offset + count; index++) {
// get the timestamp of message
unsigned long long tsKey = validateMessageAndGetTimestamp(*(messages->at(index)));

// if tsKey is 0, then probably message doesn't have a valid header; hence skip it
if (tsKey == 0)
unsigned long long tsKey = 0;
int msgSize = 0;
if (validateMessage(*(messages->at(index)))) {
tsKey = getTimestamp(*(messages->at(index)));
msgSize = getMsgSize(*(messages->at(index)));
}
// if tsKey is 0 or msgSize is 0, then probably message doesn't have a valid header; hence skip it
if (tsKey == 0 || msgSize == 0)
continue;

// update audit message counter for the given message
updateAuditMessageCounter(audit_msg, tsKey, received);
updateAuditMessageCounter(audit_msg, tsKey, received, msgSize);

// if file store audit is enabled and messages are sent to file store, update
// file audit counters for given message
if (auditFileStore && file_audit_msg != NULL && file_audit_msg.get() != NULL
&& received == false) {
updateFileAuditMessageCounter(file_audit_msg, tsKey);
updateFileAuditMessageCounter(file_audit_msg, tsKey, msgSize);
}
}

Expand Down Expand Up @@ -288,18 +299,28 @@ shared_ptr<file_audit_msg_t> AuditManager::getFileAuditMsg(const string& filenam
// This method updates the sent/received counter for the given message entry and its
// corresponding timestamp key in the sent/received map.
void AuditManager::updateAuditMessageCounter(shared_ptr<audit_msg_t>& audit_msg,
unsigned long long timestampKey, bool received) {
unsigned long long timestampKey, bool received, int msgSize) {
// acquire category level mutex to synchronize access to map and insert/increment
// the received/sent counters.
pthread_mutex_lock(&(audit_msg->mutex));
if (received) {
unsigned long long counter = audit_msg->received[timestampKey];
audit_msg->received[timestampKey] = ++counter;
++(audit_msg->receivedCount);
//for audit version 2
unsigned long long prevCount = audit_msg->receivedMetrics[timestampKey].count;
audit_msg->receivedMetrics[timestampKey].count = ++prevCount;
unsigned long long prevSize = audit_msg->receivedMetrics[timestampKey].size;
audit_msg->receivedMetrics[timestampKey].size = prevSize + msgSize;
} else {
unsigned long long counter = audit_msg->sent[timestampKey];
audit_msg->sent[timestampKey] = ++counter;
++(audit_msg->sentCount);
//for audit version 2
unsigned long long prevCount = audit_msg->sentMetrics[timestampKey].count;
audit_msg->sentMetrics[timestampKey].count = ++prevCount;
unsigned long long prevSize = audit_msg->sentMetrics[timestampKey].size;
audit_msg->sentMetrics[timestampKey].size = prevSize + msgSize;
}
pthread_mutex_unlock(&(audit_msg->mutex));
}
Expand All @@ -308,9 +329,14 @@ void AuditManager::updateAuditMessageCounter(shared_ptr<audit_msg_t>& audit_msg,
// corresponding timestamp key in the received map. This method will be called
// only when messages are sent to a file store and auditFileStore flag is enabled.
void AuditManager::updateFileAuditMessageCounter(shared_ptr<file_audit_msg_t>& file_audit_msg,
unsigned long long timestampKey) {
unsigned long long timestampKey, int msgSize) {
unsigned long long counter = file_audit_msg->received[timestampKey];
file_audit_msg->received[timestampKey] = ++counter;
//for audit version 2
unsigned long long prevCount = file_audit_msg->receivedMetrics[timestampKey].count;
file_audit_msg->receivedMetrics[timestampKey].count = ++prevCount;
unsigned long long prevSize = file_audit_msg->receivedMetrics[timestampKey].size;
file_audit_msg->receivedMetrics[timestampKey].size = prevSize + msgSize;
++(file_audit_msg->receivedCount);
}

Expand Down Expand Up @@ -338,22 +364,24 @@ void AuditManager::auditFileClosed(const std::string& filename) {
}

// This method checks whether the given message has a valid header. If a valid header is
// found, this method returns timestamp key else returns 0.
unsigned long long AuditManager::validateMessageAndGetTimestamp(const LogEntry& entry) {
// found, it returns true, else returns false
bool AuditManager::validateMessage(const LogEntry& entry) {
// assuming that logEntry message is of the format:
// <version><magic bytes><timestamp><message size><message>

// first check that total message length should be at least 16
if ((int)entry.message.length() < headerLength) {
return 0;
LOG_OPER("[Audit] msg length is less than header length");
return false;
}

const char* data = entry.message.data();

// first validate the version byte
int version = (int)(data[0]);
if (version != 1) {
return 0;
if (version != 1 && version != 2) {
LOG_OPER("[Audit] version does not match: [%d]", version);
return false;
}

// now validate magic bytes. Note that in C++ there is no byte datatype.
Expand All @@ -362,7 +390,8 @@ unsigned long long AuditManager::validateMessageAndGetTimestamp(const LogEntry&
if ((((unsigned char)data[1]) != magicBytes[0]) ||
(((unsigned char)data[2]) != magicBytes[1]) ||
(((unsigned char)data[3]) != magicBytes[2])) {
return 0;
LOG_OPER("Magic bytes don't match");
return false;
}

// now get long timestamp value. This involves left shift each char to its
Expand All @@ -382,7 +411,8 @@ unsigned long long AuditManager::validateMessageAndGetTimestamp(const LogEntry&

// validate that timestamp value must be greater than min cut-off
if (timestamp < minTimestamp) {
return 0;
LOG_OPER("[Audit] timestamp is less than mintimestamp, [%llu]", timestamp);
return false;
}

// now validate that message size must be same as the length of message payload
Expand All @@ -392,15 +422,47 @@ unsigned long long AuditManager::validateMessageAndGetTimestamp(const LogEntry&
((int)(data[14] & 0xff) << 8) |
((int)(data[15] & 0xff));
if ((int)entry.message.length() != size + headerLength) {
return 0;
LOG_OPER("[Audit] entry message length[%d] is not equal to size + headerLength[%d]",(int)entry.message.length(), size + headerLength);
return false;
}

// If a valid header is found, convert the timestamp to a key that can be used
// to update the counter in received/sent map. The key is calculated based on
// window size audit config. E.g. if window size is 60 seconds, then all messages
// whose generation timestamp lie within 12:00 and 12:59 would be counted in the
// bucket with key as 12:00
return timestamp - timestamp % (windowSize * 1000);
return true;
}

// This method returns message size
int AuditManager::getMsgSize(const LogEntry& entry) {
const char* data = entry.message.data();

int size =
((int)(data[12] & 0xff) << 24) |
((int)(data[13] & 0xff) << 16) |
((int)(data[14] & 0xff) << 8) |
((int)(data[15] & 0xff));
LOG_OPER("[_audit] Returning msg size: [%d]", size);
return size;
}

// This method returns timestamp key
unsigned long long AuditManager::getTimestamp(const LogEntry& entry) {
const char* data = entry.message.data();

unsigned long long timestamp =
((long)(data[4] & 0xff) << 56) |
((long)(data[5] & 0xff) << 48) |
((long)(data[6] & 0xff) << 40) |
((long)(data[7] & 0xff) << 32) |
((long)(data[8] & 0xff) << 24) |
((long)(data[9] & 0xff) << 16) |
((long)(data[10] & 0xff) << 8) |
((long)(data[11] & 0xff));

LOG_OPER("[_audit] returning timestamp [%llu]", timestamp);
// Cconvert the timestamp to a key that can be used
// to update the counter in received/sent map. The key is calculated based on
// window size audit config. E.g. if window size is 60 seconds, then all messages
// whose generation timestamp lie within 12:00 and 12:59 would be counted in the
// bucket with key as 12:00
return timestamp - timestamp % (windowSize * 1000);
}

// This method is called by audit store thread periodically to generate audit messages
Expand All @@ -426,10 +488,12 @@ void AuditManager::performAuditTask() {
for (audit_iter = auditMap.begin(); audit_iter != auditMap.end(); audit_iter++) {
audit_msg = audit_iter->second;
// skip auditing if received & sent maps are both empty
if (audit_msg->received.size() == 0 && audit_msg->sent.size() == 0)
if (audit_msg->received.size() == 0 && audit_msg->sent.size() == 0) {
LOG_OPER("[Audit] Empty maps in audit_msg, continuing");
continue;
}

LOG_OPER("[Audit] category [%s], messages received [%llu], messages sent [%llu]",
LOG_OPER("[Audit] Category [%s], messages received [%llu], messages sent [%llu]",
audit_msg->topic.c_str(), audit_msg->receivedCount, audit_msg->sentCount);

// create a LogEntry instance from audit msg
Expand All @@ -444,6 +508,8 @@ void AuditManager::performAuditTask() {
// now clear the contents of received/sent maps within audit message instance
audit_msg->received.clear();
audit_msg->sent.clear();
audit_msg->receivedMetrics.clear();
audit_msg->sentMetrics.clear();

// finally clear the received/sent counters for this audit message instance
audit_msg->receivedCount = 0;
Expand Down Expand Up @@ -511,6 +577,9 @@ shared_ptr<LogEntry> AuditManager::serializeAuditMsg(shared_ptr<audit_msg_t>& au
Audit->windowSize = windowSize;
Audit->received = audit_msg->received;
Audit->sent = audit_msg->sent;
Audit->tags = tags;
Audit->receivedMetrics = audit_msg->receivedMetrics;
Audit->sentMetrics = audit_msg->sentMetrics;

// Perform in-memory Thrift serialization of Audit message content
shared_ptr<TMemoryBuffer> tmembuf(new TMemoryBuffer);
Expand All @@ -521,7 +590,6 @@ shared_ptr<LogEntry> AuditManager::serializeAuditMsg(shared_ptr<audit_msg_t>& au
shared_ptr<LogEntry> entry(new LogEntry);
entry->category = auditTopic;
entry->message = tmembuf->getBufferAsString();

return entry;
}

Expand All @@ -539,6 +607,8 @@ shared_ptr<LogEntry> AuditManager::serializeFileAuditMsg(shared_ptr<file_audit_m
Audit->windowSize = windowSize;
Audit->received = file_audit_msg->received;
Audit->filenames.push_back(file_audit_msg->filename);
Audit->tags = tags;
Audit->receivedMetrics = file_audit_msg->receivedMetrics;

// Perform in-memory Thrift serialization of Audit message content
shared_ptr<TMemoryBuffer> tmembuf(new TMemoryBuffer);
Expand Down
23 changes: 18 additions & 5 deletions src/scribe_audit.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ struct AuditMsg {
unsigned long long receivedCount;
// total msgs sent for this topic since last audit
unsigned long long sentCount;
// map of timestamp and audit metrics from audit version 2
std::map<long, audit::thrift::AuditMetrics> receivedMetrics;
std::map<long, audit::thrift::AuditMetrics> sentMetrics;
pthread_mutex_t mutex;
};

Expand All @@ -51,6 +54,8 @@ struct FileAuditMsg {
std::map<long, long> received;
// total msgs received by file store tier for this topic since last file audit
unsigned long long receivedCount;
// map of timestamp and audit metrics from audit version 2
std::map<long, audit::thrift::AuditMetrics> receivedMetrics;
// whether this file has been closed. File audit msg will be generated by
// audit store thread only after file is marked as closed.
bool fileClosed;
Expand All @@ -69,6 +74,7 @@ class StoreQueue;
// constant string for the name of special audit topic used internally by databus to
// generate audit messages for various tiers
static const std::string auditTopic = "_audit";
static const std::string currentAuditVersion = "2";

class AuditManager {
public:
Expand Down Expand Up @@ -104,20 +110,25 @@ class AuditManager {
boost::shared_ptr<file_audit_msg_t> getFileAuditMsg(const std::string& filename,
const std::string& category);

// This method checks whether the given message has a valid header. If a valid header is
// found, this method returns timestamp key else returns 0.
unsigned long long validateMessageAndGetTimestamp(const scribe::thrift::LogEntry& entry);
// This method checks if the message has a valid header
bool validateMessage(const scribe::thrift::LogEntry& entry);

// This method returns the timestamp at which message was generated from the header
unsigned long long getTimestamp(const scribe::thrift::LogEntry& entry);

// This method returns the message size from the header
int getMsgSize(const scribe::thrift::LogEntry& entry);

// This method updates the sent/received counter for the given message entry and its
// corresponding timestamp key in the sent/received map.
void updateAuditMessageCounter(boost::shared_ptr<audit_msg_t>& audit_msg,
unsigned long long timestampKey, bool received);
unsigned long long timestampKey, bool received, int msgSize);

// This method updates the received counter for the given message entry and its
// corresponding timestamp key in the received map. This method will be called
// only when messages are sent to a file store and auditFileStore flag is enabled.
void updateFileAuditMessageCounter(boost::shared_ptr<file_audit_msg_t>& file_audit_msg,
unsigned long long timestampKey);
unsigned long long timestampKey, int msgSize);

// This method serializes the audit message entry and sets it as the message payload
// of a logEntry instance. This instance will be later added in the audit thread queue.
Expand All @@ -141,6 +152,8 @@ class AuditManager {
// E.g. if window size is 60 seconds, then messages whose generation timestamp lies within
// 12:00 and 12:59 would be counted in the same bucket whose key is 12:00
long int windowSize;
// tags map which consists audit version entry
std::map<std::string, std::string> tags;
// Instance of audit map
audit_map_t auditMap;
// Instance of file audit map
Expand Down