Skip to content

Commit

Permalink
Flow eviction handling changes
Browse files Browse the repository at this point in the history
With flow eviction, vrouter evicts flow without being in-sync with
Agent. When a flow is re-used, agent must identify the evicted flow and
clean up the old-flow. An index based tree is used to identify flows
evicted.

The earlier implementation of managing evicted flows invovled code
changes at multiple paths of flow-processing. It involved tweaking code
at many places.

Moved all flow eviction handling to one module - KSyncFlowIndexManager.
The KSyncFlowIndexManager maintains the flow index-tree and also per
flow-entry state to track owner of a flow-index. When a flow index is
reused, the old flow is evicted by delete agent structures. The new flow
waits till the old-flow is cleaned-up. Post clean-up, the old flow is
activated.

Closes-Bug: #1525772
Change-Id: Ib4eaa331591775f5216fbf98709696162578a31b
  • Loading branch information
praveenkv committed Dec 14, 2015
1 parent 4bce173 commit 8da0a18
Show file tree
Hide file tree
Showing 26 changed files with 1,195 additions and 239 deletions.
1 change: 1 addition & 0 deletions src/ksync/ksync_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ void KSyncObject::FreeInd(KSyncEntry *entry, uint32_t index) {
if (need_index_ == true && index != KSyncEntry::kInvalidIndex) {
index_table_.Free(index);
}
PreFree(entry);
delete entry;
}

Expand Down
1 change: 1 addition & 0 deletions src/ksync/ksync_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class KSyncObject {
bool IsEmpty(void) { return tree_.empty(); };

virtual bool DoEventTrace(void) { return true; }
virtual void PreFree(KSyncEntry *entry) { }
static void Shutdown();

std::size_t Size() { return tree_.size(); }
Expand Down
14 changes: 13 additions & 1 deletion src/ksync/ksync_sock_user.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ void KSyncSockTypeMap::ProcessSandesh(const uint8_t *parse_buf, size_t buf_len,
decode_buf_len -= decode_len;
}

PurgeTxBuffer();
}

void KSyncSockTypeMap::PurgeTxBuffer() {
// All responses are stored in tx_buff_list_
// Make io-vector of all responses and transmit them
// If there are more than one responses, they are sent as NETLINK MULTI
Expand Down Expand Up @@ -111,7 +115,8 @@ void KSyncSockTypeMap::ProcessSandesh(const uint8_t *parse_buf, size_t buf_len,
iovec.push_back(buffer((uint8_t *)&nlh, NLMSG_HDRLEN));
} else {
// Single buffer. Reset the MULTI flag
last_nlh->nlmsg_flags &= (~NLM_F_MULTI);
if (last_nlh)
last_nlh->nlmsg_flags &= (~NLM_F_MULTI);
}

// Send a message for each entry in io-vector
Expand Down Expand Up @@ -210,6 +215,12 @@ void KSyncSockTypeMap::SimulateResponse(uint32_t seq_num, int code, int flags) {
sock->AddNetlinkTxBuff(&cl);
}

void KSyncSockTypeMap::DisableReceiveQueue(bool disable) {
for(int i = 0; i < IoContext::MAX_WORK_QUEUES; i++) {
receive_work_queue[i]->set_disable(disable);
}
}

void KSyncSockTypeMap::SetDropStats(const vr_drop_stats_req &req) {
KSyncSockTypeMap *sock = KSyncSockTypeMap::GetKSyncSockTypeMap();
sock->drop_stats = req;
Expand Down Expand Up @@ -683,6 +694,7 @@ void KSyncSockTypeMap::PurgeBlockedMsg() {
delete ctx_queue_.front();
ctx_queue_.pop();
}
PurgeTxBuffer();
}

void KSyncSockTypeMap::SetBlockMsgProcessing(bool enable) {
Expand Down
2 changes: 2 additions & 0 deletions src/ksync/ksync_sock_user.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class KSyncSockTypeMap : public KSyncSock {
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no);
virtual void Receive(boost::asio::mutable_buffers_1);

void PurgeTxBuffer();
void ProcessSandesh(const uint8_t *, std::size_t, KSyncUserSockContext *);
static void set_error_code(int code) { error_code_ = code; }
static int error_code() { return error_code_; }
Expand Down Expand Up @@ -199,6 +200,7 @@ class KSyncSockTypeMap : public KSyncSock {
// Add a response in nl_client into tx_buff_list_
void AddNetlinkTxBuff(struct nl_client *cl);
void InitNetlinkDoneMsg(struct nlmsghdr *nlh, int seq_num);
void DisableReceiveQueue(bool disable);
private:
void PurgeBlockedMsg();
udp::socket sock_;
Expand Down
38 changes: 10 additions & 28 deletions src/vnsw/agent/pkt/flow_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
#include <pkt/flow_table.h>
#include <vrouter/flow_stats/flow_stats_collector.h>
#include <vrouter/ksync/ksync_init.h>
#include <ksync/ksync_entry.h>
#include <vrouter/ksync/flowtable_ksync.h>
#include <vrouter/ksync/ksync_flow_index_manager.h>

#include <route/route.h>
#include <cmn/agent_cmn.h>
Expand Down Expand Up @@ -105,7 +104,6 @@ FlowEntry::FlowEntry(const FlowKey &k, FlowTable *flow_table) :
data_(),
l3_flow_(true),
flow_handle_(kInvalidFlowHandle),
ksync_entry_(NULL),
deleted_(false),
flags_(0),
short_flow_reason_(SHORT_UNKNOWN),
Expand All @@ -114,7 +112,7 @@ FlowEntry::FlowEntry(const FlowKey &k, FlowTable *flow_table) :
peer_vrouter_(),
tunnel_type_(TunnelType::INVALID),
on_tree_(false), fip_(0),
fip_vmi_(AgentKey::ADD_DEL_CHANGE, nil_uuid(), "") {
fip_vmi_(AgentKey::ADD_DEL_CHANGE, nil_uuid(), ""), ksync_index_entry_() {
refcount_ = 0;
nw_ace_uuid_ = FlowPolicyStateStr.at(NOT_EVALUATED);
sg_rule_uuid_= FlowPolicyStateStr.at(NOT_EVALUATED);
Expand Down Expand Up @@ -145,7 +143,10 @@ void FlowEntry::Init() {
}

FlowEntry *FlowEntry::Allocate(const FlowKey &key, FlowTable *flow_table) {
return new FlowEntry(key, flow_table);
FlowEntry *flow = new FlowEntry(key, flow_table);
flow->ksync_index_entry_ = std::auto_ptr<KSyncFlowIndexEntry>
(new KSyncFlowIndexEntry());
return flow;
}

// selectively copy fields from RHS
Expand All @@ -159,6 +160,7 @@ void FlowEntry::Copy(const FlowEntry *rhs) {
tunnel_type_ = rhs->tunnel_type_;
fip_ = rhs->fip_;
fip_vmi_ = rhs->fip_vmi_;
flow_handle_ = rhs->flow_handle_;
}

/////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -224,23 +226,14 @@ bool FlowEntry::InitFlowCmn(const PktFlowInfo *info, const PktControlInfo *ctrl,
data_.in_vm_entry = ctrl->vm_ ? ctrl->vm_ : NULL;
data_.out_vm_entry = rev_ctrl->vm_ ? rev_ctrl->vm_ : NULL;
l3_flow_ = info->l3_flow;
data_.vrouter_evicted_flow = false;

return true;
}

void FlowEntry::InitFwdFlow(const PktFlowInfo *info, const PktInfo *pkt,
const PktControlInfo *ctrl,
const PktControlInfo *rev_ctrl,
FlowEntry *rflow, Agent *agent) {
if (flow_handle_ != pkt->GetAgentHdr().cmd_param) {
if (flow_handle_ != FlowEntry::kInvalidFlowHandle) {
LOG(DEBUG, "Flow index changed from " << flow_handle_
<< " to " << pkt->GetAgentHdr().cmd_param);
}
flow_handle_ = pkt->GetAgentHdr().cmd_param;
}

flow_handle_ = pkt->GetAgentHdr().cmd_param;
if (InitFlowCmn(info, ctrl, rev_ctrl, rflow) == false) {
return;
}
Expand Down Expand Up @@ -448,22 +441,11 @@ bool FlowEntry::set_pending_recompute(bool value) {
return false;
}

bool FlowEntry::set_flow_handle(uint32_t flow_handle, bool update) {
/* trigger update KSync on flow handle change */
void FlowEntry::set_flow_handle(uint32_t flow_handle) {
if (flow_handle_ != flow_handle) {
// TODO(prabhjot): enable when we handle ChangeKey failures
#if 0
// Skip ksync index manipulation, for deleted flow entry
// as ksync entry is not available for deleted flow
if (!deleted_ && flow_handle_ == kInvalidFlowHandle) {
flow_table_->UpdateFlowHandle(this, flow_handle);
}
#endif
assert(flow_handle_ == kInvalidFlowHandle);
flow_handle_ = flow_handle;
flow_table_->UpdateKSync(this, update);
return true;
}
return false;
}

const std::string& FlowEntry::acl_assigned_vrf() const {
Expand Down
15 changes: 6 additions & 9 deletions src/vnsw/agent/pkt/flow_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
class FlowTableKSyncEntry;
class FlowEntry;
struct FlowExportInfo;
class KSyncFlowIndexEntry;

typedef boost::intrusive_ptr<FlowEntry> FlowEntryPtr;

Expand Down Expand Up @@ -199,8 +200,7 @@ struct FlowData {
component_nh_idx((uint32_t)CompositeNH::kInvalidComponentNHIdx),
source_plen(0), dest_plen(0), drop_reason(0),
vrf_assign_evaluated(false), pending_recompute(false),
enable_rpf(true), l2_rpf_plen(Address::kMaxV4PrefixLen),
vrouter_evicted_flow(false){
enable_rpf(true), l2_rpf_plen(Address::kMaxV4PrefixLen) {
}

MacAddress smac;
Expand Down Expand Up @@ -238,7 +238,6 @@ struct FlowData {
FlowRouteRefMap flow_dest_plen_map;
bool enable_rpf;
uint8_t l2_rpf_plen;
bool vrouter_evicted_flow;
std::string vm_cfg_name;
// IMPORTANT: Keep this structure assignable. Assignment operator is used in
// FlowEntry::Copy() on this structure
Expand Down Expand Up @@ -346,7 +345,7 @@ class FlowEntry {
FlowTable *flow_table() const { return flow_table_; }
bool l3_flow() const { return l3_flow_; }
uint32_t flow_handle() const { return flow_handle_; }
bool set_flow_handle(uint32_t flow_handle, bool update);
void set_flow_handle(uint32_t flow_handle);
FlowEntry *reverse_flow_entry() { return reverse_flow_entry_.get(); }
uint32_t flags() const { return flags_; }
const FlowEntry *reverse_flow_entry() const {
Expand Down Expand Up @@ -388,10 +387,6 @@ class FlowEntry {
void set_on_tree() { on_tree_ = true; }
tbb::mutex &mutex() { return mutex_; }

FlowTableKSyncEntry *ksync_entry() const { return ksync_entry_; }
void set_ksync_entry(FlowTableKSyncEntry *ksync_entry) {
ksync_entry_ = ksync_entry;
}
const Interface *intf_entry() const { return data_.intf_entry.get(); }
const VnEntry *vn_entry() const { return data_.vn_entry.get(); }
const VmEntry *in_vm_entry() const { return data_.in_vm_entry.get(); }
Expand Down Expand Up @@ -441,6 +436,7 @@ class FlowEntry {
FlowSandeshData &fe_sandesh_data,
Agent *agent) const;
uint32_t InterfaceKeyToId(Agent *agent, const VmInterfaceKey &key);
KSyncFlowIndexEntry *ksync_index_entry() { return ksync_index_entry_.get();}
private:
friend class FlowTable;
friend class FlowStatsCollector;
Expand All @@ -461,7 +457,6 @@ class FlowEntry {
bool l3_flow_;
uint32_t flow_handle_;
FlowEntryPtr reverse_flow_entry_;
FlowTableKSyncEntry *ksync_entry_;
static tbb::atomic<int> alloc_count_;
bool deleted_;
uint32_t flags_;
Expand All @@ -482,6 +477,8 @@ class FlowEntry {
// Following fields are required for FIP stats accounting
uint32_t fip_;
VmInterfaceKey fip_vmi_;
// KSync state for the flow
std::auto_ptr<KSyncFlowIndexEntry> ksync_index_entry_;
// atomic refcount
tbb::atomic<int> refcount_;
tbb::mutex mutex_;
Expand Down
36 changes: 29 additions & 7 deletions src/vnsw/agent/pkt/flow_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ class FlowEvent {
DELETE_FLOW,
// Event by audit module to delete a flow
AUDIT_FLOW,
// In agent, flow is evicted if index is allocated for another flow
// We delete the flow on eviction. There is a corner case where evicted
// flow is added in parallel with different index. In that case
// we ignore the operation
EVICT_FLOW,
// Flow was waiting to index to be free. Event to specify that flow
// should retry to acquire index
RETRY_INDEX_ACQUIRE,
// Revaluate flow due to deletion of a DBEntry. Other than for INET
// routes, delete of a DBEntry will result in deletion of flows using
// the DBEntry
Expand All @@ -41,38 +49,50 @@ class FlowEvent {

FlowEvent() :
event_(INVALID), flow_(NULL), pkt_info_(), db_entry_(NULL),
gen_id_(0), del_rev_flow_(false) {
gen_id_(0), del_rev_flow_(false),
flow_handle_(FlowEntry::kInvalidFlowHandle) {
}

FlowEvent(Event event, FlowEntry *flow) :
event_(event), flow_(flow), pkt_info_(), db_entry_(NULL),
gen_id_(0), del_rev_flow_(false) {
gen_id_(0), del_rev_flow_(false),
flow_handle_(FlowEntry::kInvalidFlowHandle) {
}

FlowEvent(Event event, FlowEntry *flow, uint32_t flow_handle) :
event_(event), flow_(flow), pkt_info_(), db_entry_(NULL),
gen_id_(0), del_rev_flow_(false), flow_handle_(flow_handle) {
}

FlowEvent(Event event, FlowEntry *flow, const DBEntry *db_entry) :
event_(event), flow_(flow), pkt_info_(), db_entry_(db_entry),
gen_id_(0), del_rev_flow_(false) {
gen_id_(0), del_rev_flow_(false),
flow_handle_(FlowEntry::kInvalidFlowHandle) {
}

FlowEvent(Event event, const DBEntry *db_entry, uint32_t gen_id) :
event_(event), flow_(NULL), pkt_info_(), db_entry_(db_entry),
gen_id_(gen_id), del_rev_flow_(false) {
gen_id_(gen_id), del_rev_flow_(false),
flow_handle_(FlowEntry::kInvalidFlowHandle) {
}

FlowEvent(Event event, const FlowKey &key, bool del_rev_flow) :
event_(event), flow_(NULL), pkt_info_(), db_entry_(NULL),
gen_id_(0), flow_key_(key), del_rev_flow_(del_rev_flow) {
gen_id_(0), flow_key_(key), del_rev_flow_(del_rev_flow),
flow_handle_(FlowEntry::kInvalidFlowHandle) {
}

FlowEvent(Event event, PktInfoPtr pkt_info) :
event_(event), flow_(NULL), pkt_info_(pkt_info), db_entry_(NULL),
gen_id_(0), flow_key_(), del_rev_flow_() {
gen_id_(0), flow_key_(), del_rev_flow_(),
flow_handle_(FlowEntry::kInvalidFlowHandle) {
}

FlowEvent(const FlowEvent &rhs) :
event_(rhs.event_), flow_(rhs.flow()), pkt_info_(rhs.pkt_info_),
db_entry_(rhs.db_entry_), gen_id_(rhs.gen_id_),
flow_key_(rhs.flow_key_), del_rev_flow_(rhs.del_rev_flow_) {
flow_key_(rhs.flow_key_), del_rev_flow_(rhs.del_rev_flow_),
flow_handle_(rhs.flow_handle_) {
}

virtual ~FlowEvent() { }
Expand All @@ -86,6 +106,7 @@ class FlowEvent {
const FlowKey &get_flow_key() const { return flow_key_; }
bool get_del_rev_flow() const { return del_rev_flow_; }
PktInfoPtr pkt_info() const { return pkt_info_; }
uint32_t flow_handle() const { return flow_handle_; }

private:
Event event_;
Expand All @@ -95,6 +116,7 @@ class FlowEvent {
uint32_t gen_id_;
FlowKey flow_key_;
bool del_rev_flow_;
uint32_t flow_handle_;
};

#endif // __AGENT_FLOW_EVENT_H__
3 changes: 1 addition & 2 deletions src/vnsw/agent/pkt/flow_mgmt.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ bool FlowMgmtManager::RequestHandler(boost::shared_ptr<FlowMgmtRequest> req) {
// being modified by two threads. Avoid the concurrency issue by
// enqueuing a dummy request to flow-table queue. The reference will
// be removed in flow processing context
FlowEvent flow_resp(FlowEvent::FREE_FLOW_REF,
req->flow().get(), NULL);
FlowEvent flow_resp(FlowEvent::FREE_FLOW_REF, req->flow().get());
EnqueueFlowEvent(flow_resp);
break;
}
Expand Down
33 changes: 33 additions & 0 deletions src/vnsw/agent/pkt/flow_proto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ void FlowProto::EnqueueFlowEvent(const FlowEvent &event) {

case FlowEvent::AUDIT_FLOW:
case FlowEvent::DELETE_DBENTRY:
case FlowEvent::EVICT_FLOW:
case FlowEvent::RETRY_INDEX_ACQUIRE:
case FlowEvent::REVALUATE_FLOW:
case FlowEvent::FREE_FLOW_REF: {
FlowEntry *flow = event.flow();
Expand Down Expand Up @@ -206,6 +208,26 @@ bool FlowProto::FlowEventHandler(const FlowEvent &req) {
break;
}

// Check if flow-handle changed. This can happen if vrouter tries to
// setup the flow which was evicted earlier
case FlowEvent::EVICT_FLOW: {
FlowEntry *flow = req.flow();
if (flow->flow_handle() != req.flow_handle())
break;
flow->flow_table()->DeleteMessage(flow);
break;
}

// Flow was waiting for an index. Index is available now. Retry acquiring
// the index
case FlowEvent::RETRY_INDEX_ACQUIRE: {
FlowEntry *flow = req.flow();
if (flow->flow_handle() != req.flow_handle())
break;
flow->flow_table()->UpdateKSync(flow, false);
break;
}

case FlowEvent::FREE_FLOW_REF:
break;

Expand Down Expand Up @@ -236,6 +258,17 @@ void FlowProto::DeleteFlowRequest(const FlowKey &flow_key, bool del_rev_flow) {
return;
}

void FlowProto::EvictFlowRequest(FlowEntry *flow, uint32_t flow_handle) {
EnqueueFlowEvent(FlowEvent(FlowEvent::EVICT_FLOW, flow, flow_handle));
return;
}

void FlowProto::RetryIndexAcquireRequest(FlowEntry *flow, uint32_t flow_handle){
EnqueueFlowEvent(FlowEvent(FlowEvent::RETRY_INDEX_ACQUIRE, flow,
flow_handle));
return;
}

void FlowProto::CreateAuditEntry(FlowEntry *flow) {
EnqueueFlowEvent(FlowEvent(FlowEvent::AUDIT_FLOW, flow));
return;
Expand Down

0 comments on commit 8da0a18

Please sign in to comment.