Skip to content

Commit

Permalink
Introspect changes to read the flows for each task instance.
Browse files Browse the repository at this point in the history
Task exculsions are added to get
Concurrency between FlowTask Event PacketFlowResponder & main thread

closes-bug: #1548395

Conflicts:
	src/vnsw/agent/pkt/pkt_sandesh_flow.cc

Submitting on behalf of Jayaram

Change-Id: I018f74d5a87956e79a70fc11cf8acc86fd8322e5
  • Loading branch information
jayaramsatya authored and haripk committed Feb 24, 2016
1 parent 5831f2e commit 05245e8
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 45 deletions.
4 changes: 3 additions & 1 deletion src/vnsw/agent/cmn/agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ void Agent::SetAgentTaskPolicy() {
sizeof(db_exclude_list) / sizeof(char *));

const char *flow_table_exclude_list[] = {
AGENT_SHUTDOWN_TASKNAME,
"Agent::PktFlowResponder",
"sandesh::RecvQueue",
AGENT_SHUTDOWN_TASKNAME,
AGENT_INIT_TASKNAME
};
SetTaskPolicyOne(kTaskFlowEvent, flow_table_exclude_list,
Expand Down
123 changes: 82 additions & 41 deletions src/vnsw/agent/pkt/pkt_sandesh_flow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ using boost::system::error_code;
data.set_aging_port(fe->fsc()->flow_aging_key().port);\
}\

const std::string PktSandeshFlow::start_key = "0-0-0-0-0.0.0.0-0.0.0.0";
const std::string PktSandeshFlow::start_key = "0-0-0-0-0-0.0.0.0-0.0.0.0";

////////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -223,10 +223,11 @@ static void SetAclInfo(SandeshFlowData &data, FlowEntry *fe) {
////////////////////////////////////////////////////////////////////////////////

PktSandeshFlow::PktSandeshFlow(Agent *agent, FlowRecordsResp *obj,
std::string resp_ctx, std::string key) :
std::string resp_ctx, std::string key):
Task((TaskScheduler::GetInstance()->GetTaskId("Agent::PktFlowResponder")),
0), resp_obj_(obj), resp_data_(resp_ctx),
flow_iteration_key_(), key_valid_(false), delete_op_(false), agent_(agent) {
0), resp_obj_(obj), resp_data_(resp_ctx),
flow_iteration_key_(), key_valid_(false), delete_op_(false), agent_(agent),
partition_id_(0) {
if (key != agent_->NullString()) {
if (SetFlowKey(key)) {
key_valid_ = true;
Expand All @@ -250,8 +251,9 @@ void PktSandeshFlow::SendResponse(SandeshResponse *resp) {
resp->Response();
}

string PktSandeshFlow::GetFlowKey(const FlowKey &key) {
string PktSandeshFlow::GetFlowKey(const FlowKey &key, uint16_t partition_id) {
stringstream ss;
ss << partition_id << kDelimiter;
ss << key.nh << kDelimiter;
ss << key.src_port << kDelimiter;
ss << key.dst_port << kDelimiter;
Expand All @@ -264,12 +266,16 @@ string PktSandeshFlow::GetFlowKey(const FlowKey &key) {
bool PktSandeshFlow::SetFlowKey(string key) {
const char ch = kDelimiter;
size_t n = std::count(key.begin(), key.end(), ch);
if (n != 5) {
if (n != 6) {
return false;
}
stringstream ss(key);
string item, sip, dip;
uint32_t proto;

if (getline(ss, item, ch)) {
istringstream(item) >> partition_id_;
}
if (getline(ss, item, ch)) {
istringstream(item) >> flow_iteration_key_.nh;
}
Expand All @@ -288,7 +294,6 @@ bool PktSandeshFlow::SetFlowKey(string key) {
if (getline(ss, item, ch)) {
dip = item;
}

error_code ec;
flow_iteration_key_.src_addr = IpAddress::from_string(sip.c_str(), ec);
flow_iteration_key_.dst_addr = IpAddress::from_string(dip.c_str(), ec);
Expand All @@ -301,17 +306,26 @@ bool PktSandeshFlow::SetFlowKey(string key) {
return true;
}

// FIXME : Should handle multiple flow tables
bool PktSandeshFlow::Run() {
FlowTable::FlowEntryMap::iterator it;
std::vector<SandeshFlowData>& list =
const_cast<std::vector<SandeshFlowData>&>(resp_obj_->get_flow_list());
int count = 0;
bool flow_key_set = false;
FlowTable *flow_obj = agent_->pkt()->flow_table(0);

if (partition_id_ >= agent_->flow_thread_count()) {
FlowErrorResp *resp = new FlowErrorResp();
SendResponse(resp);
return true;
}

FlowTable *flow_obj = agent_->pkt()->flow_table(partition_id_);

if (delete_op_) {
flow_obj->DeleteAll();
for (int i =0; i < agent_->flow_thread_count(); i++){
flow_obj = agent_->pkt()->flow_table(i);
flow_obj->DeleteAll();
}
SendResponse(resp_obj_);
return true;
}
Expand All @@ -323,25 +337,34 @@ bool PktSandeshFlow::Run() {
SendResponse(resp);
return true;
}
FlowStatsCollector *fec =
agent_->flow_stats_manager()->default_flow_stats_collector();

while (it != flow_obj->flow_entry_map_.end()) {
FlowEntry *fe = it->second;
FlowStatsCollector *fec =
agent_->flow_stats_manager()->GetFlowStatsCollector(fe->key());
const FlowExportInfo *info = fec->FindFlowExportInfo(fe->uuid());
SetSandeshFlowData(list, fe, info);
++it;
count++;
if (count == kMaxFlowResponse) {
if (it != flow_obj->flow_entry_map_.end()) {
resp_obj_->set_flow_key(GetFlowKey(fe->key()));
resp_obj_->set_flow_key(GetFlowKey(fe->key(), partition_id_));
flow_key_set = true;

}
break;
}
}

if (!flow_key_set) {
resp_obj_->set_flow_key(PktSandeshFlow::start_key);
if (++partition_id_ < agent_->flow_thread_count()) {
FlowKey key;
resp_obj_->set_flow_key(GetFlowKey(key, partition_id_));
} else {
resp_obj_->set_flow_key(PktSandeshFlow::start_key);
}
}

SendResponse(resp_obj_);
return true;
}
Expand Down Expand Up @@ -380,6 +403,8 @@ void DeleteAllFlowRecords::HandleRequest() const {
void FetchFlowRecord::HandleRequest() const {
FlowKey key;
Agent *agent = Agent::GetInstance();
FlowTable *flow_obj;

key.nh = get_nh();
error_code ec;
key.src_addr = IpAddress::from_string(get_sip(), ec);
Expand All @@ -394,22 +419,28 @@ void FetchFlowRecord::HandleRequest() const {
key.protocol = get_protocol();

FlowTable::FlowEntryMap::iterator it;
FlowTable *flow_obj = agent->pkt()->flow_table(0);
FlowStatsCollector *fec =
agent->flow_stats_manager()->default_flow_stats_collector();
it = flow_obj->flow_entry_map_.find(key);
for (int i = 0; i < agent->flow_thread_count(); i++) {
flow_obj = agent->pkt()->flow_table(i);
it = flow_obj->flow_entry_map_.find(key);
if (it != flow_obj->flow_entry_map_.end())
break;
}

SandeshResponse *resp;
if (it != flow_obj->flow_entry_map_.end()) {
FlowRecordResp *flow_resp = new FlowRecordResp();
FlowEntry *fe = it->second;
FlowExportInfo *info = fec->FindFlowExportInfo(fe->uuid());
SandeshFlowData data;
SET_SANDESH_FLOW_DATA(agent, data, fe, info);
flow_resp->set_record(data);
resp = flow_resp;
FlowRecordResp *flow_resp = new FlowRecordResp();
FlowEntry *fe = it->second;
FlowStatsCollector *fec =
agent->flow_stats_manager()->GetFlowStatsCollector(fe->key());
FlowExportInfo *info = fec->FindFlowExportInfo(fe->uuid());
SandeshFlowData data;
SET_SANDESH_FLOW_DATA(agent, data, fe, info);
flow_resp->set_record(data);
resp = flow_resp;
} else {
resp = new FlowErrorResp();
}

resp->set_context(context());
resp->set_more(false);
resp->Response();
Expand Down Expand Up @@ -476,7 +507,13 @@ bool PktSandeshFlowStats::Run() {
int count = 0;
bool flow_key_set = false;

FlowTable *flow_obj = agent_->pkt()->flow_table(0);
if (partition_id_ > agent_->flow_thread_count()) {
FlowErrorResp *resp = new FlowErrorResp();
SendResponse(resp);
return true;
}

FlowTable *flow_obj = agent_->pkt()->flow_table(partition_id_);
FlowStatsManager *fm = agent_->flow_stats_manager();
const FlowStatsCollector *fsc = fm->Find(proto_, port_);
if (!fsc) {
Expand All @@ -486,14 +523,14 @@ bool PktSandeshFlowStats::Run() {
}

FlowTable::FlowEntryMap::iterator it;
if (key_valid_) {
if (key_valid_) {
it = flow_obj->flow_entry_map_.upper_bound(flow_iteration_key_);
} else {
FlowErrorResp *resp = new FlowErrorResp();
SendResponse(resp);
return true;
FlowErrorResp *resp = new FlowErrorResp();
SendResponse(resp);
return true;
}

while (it != flow_obj->flow_entry_map_.end()) {
FlowEntry *fe = it->second;
const FlowExportInfo *info = fsc->FindFlowExportInfo(fe->uuid());
Expand All @@ -504,7 +541,7 @@ bool PktSandeshFlowStats::Run() {
if (it != flow_obj->flow_entry_map_.end()) {
ostringstream ostr;
ostr << proto_ << ":" << port_ << ":"
<< GetFlowKey(fe->key());
<< GetFlowKey(fe->key(), partition_id_);
resp_->set_flow_key(ostr.str());
flow_key_set = true;
}
Expand All @@ -513,9 +550,17 @@ bool PktSandeshFlowStats::Run() {
}

if (!flow_key_set) {
ostringstream ostr;
ostr << proto_ << ":" << port_ << ":" << "0x0";
resp_->set_flow_key(ostr.str());
if ( ++partition_id_ < agent_->flow_thread_count()) {
FlowKey key;
ostringstream ostr;
ostr << proto_ << ":" << port_ << ":"
<< GetFlowKey(key, partition_id_);
resp_->set_flow_key(ostr.str());
} else {
ostringstream ostr;
ostr << proto_ << ":" << port_ << ":" << "0x0";
resp_->set_flow_key(ostr.str());
}
}
SendResponse(resp_);
return true;
Expand All @@ -534,19 +579,15 @@ bool PktSandeshFlowStats::SetProto(string &key) {
if (getline(ss, item, ':')) {
istringstream(item) >> port_;
}

long flow_ptr;
if (getline(ss, item)) {
istringstream(item) >> flow_ptr;
SetFlowKey(item);
}

flow_ptr_ = (FlowEntry *)(flow_ptr);
return true;
}

PktSandeshFlowStats::PktSandeshFlowStats(Agent *agent, FlowStatsCollectorRecordsResp *obj,
std::string resp_ctx, std::string key):
PktSandeshFlow(agent, NULL, resp_ctx, key), resp_(obj), flow_ptr_(NULL) {
PktSandeshFlow(agent, NULL, resp_ctx, key), resp_(obj) {
if (key != agent_->NullString()) {
if (SetProto(key)) {
key_valid_ = true;
Expand Down
5 changes: 2 additions & 3 deletions src/vnsw/agent/pkt/pkt_sandesh_flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ class PktSandeshFlow : public Task {
static const int kMaxFlowResponse = 100;
static const char kDelimiter = '-';
static const std::string start_key;

PktSandeshFlow(Agent *agent, FlowRecordsResp *obj, std::string resp_ctx,
std::string key);
virtual ~PktSandeshFlow();

void SendResponse(SandeshResponse *resp);
bool SetFlowKey(std::string key);
static std::string GetFlowKey(const FlowKey &key);
static std::string GetFlowKey(const FlowKey &key, uint16_t partition_id);

virtual bool Run();
std::string Description() const { return "PktSandeshFlow"; }
Expand All @@ -37,6 +36,7 @@ class PktSandeshFlow : public Task {
bool key_valid_;
bool delete_op_;
Agent *agent_;
uint16_t partition_id_;

private:
DISALLOW_COPY_AND_ASSIGN(PktSandeshFlow);
Expand All @@ -54,7 +54,6 @@ class PktSandeshFlowStats : public PktSandeshFlow {
uint32_t proto_;
uint32_t port_;
FlowStatsCollectorRecordsResp *resp_;
FlowEntry *flow_ptr_;
DISALLOW_COPY_AND_ASSIGN(PktSandeshFlowStats);
};
#endif

0 comments on commit 05245e8

Please sign in to comment.