Skip to content

Commit

Permalink
Make rftests work again. There's no test showing the multicontroller …
Browse files Browse the repository at this point in the history
…setup yet.
  • Loading branch information
alnvdl committed Jan 2, 2013
1 parent 51c36e6 commit 70b29ba
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 149 deletions.
16 changes: 8 additions & 8 deletions nox/src/nox/netapps/rfproxy/rfproxy.cc
Expand Up @@ -26,7 +26,7 @@
#define SUCCESS 1 #define SUCCESS 1


// TODO: proper support for ID // TODO: proper support for ID
#define ID 289 #define ID 0


namespace vigil { namespace vigil {


Expand Down Expand Up @@ -68,7 +68,7 @@ void rfproxy::flow_config(uint64_t dp_id, uint32_t operation_id) {
void rfproxy::flow_add(uint64_t dp_id, void rfproxy::flow_add(uint64_t dp_id,
IPAddress address, IPAddress netmask, IPAddress address, IPAddress netmask,
MACAddress src_hwaddress, MACAddress dst_hwaddress, MACAddress src_hwaddress, MACAddress dst_hwaddress,
uint32_t dst_port) { uint32_t dst_port) {
uint32_t address_ = address.toUint32(); uint32_t address_ = address.toUint32();
uint32_t netmask_ = netmask.toCIDRMask(); uint32_t netmask_ = netmask.toCIDRMask();
uint8_t src_hwaddress_[IFHWADDRLEN]; uint8_t src_hwaddress_[IFHWADDRLEN];
Expand Down Expand Up @@ -218,7 +218,7 @@ Disposition rfproxy::on_packet_in(const Event& e) {
} }


// IPC message processing // IPC message processing
bool rfproxy::process(const string &from, const string &to, bool rfproxy::process(const string &from, const string &to,
const string &channel, IPCMessage& msg) { const string &channel, IPCMessage& msg) {
int type = msg.get_type(); int type = msg.get_type();
if (type == DATAPATH_CONFIG) { if (type == DATAPATH_CONFIG) {
Expand All @@ -229,18 +229,18 @@ bool rfproxy::process(const string &from, const string &to,
else if (type == FLOW_MOD) { else if (type == FLOW_MOD) {
FlowMod* fmsg = dynamic_cast<FlowMod*>(&msg); FlowMod* fmsg = dynamic_cast<FlowMod*>(&msg);
if (fmsg->get_is_removal()) if (fmsg->get_is_removal())
flow_delete(fmsg->get_dp_id(), flow_delete(fmsg->get_dp_id(),
fmsg->get_address(), fmsg->get_netmask(), fmsg->get_address(), fmsg->get_netmask(),
fmsg->get_src_hwaddress()); fmsg->get_src_hwaddress());
else else
flow_add(fmsg->get_dp_id(), flow_add(fmsg->get_dp_id(),
fmsg->get_address(), fmsg->get_netmask(), fmsg->get_address(), fmsg->get_netmask(),
fmsg->get_src_hwaddress(), fmsg->get_dst_hwaddress(), fmsg->get_src_hwaddress(), fmsg->get_dst_hwaddress(),
fmsg->get_dst_port()); fmsg->get_dst_port());
} }
else if (type == DATA_PLANE_MAP) { else if (type == DATA_PLANE_MAP) {
DataPlaneMap* dpmmsg = dynamic_cast<DataPlaneMap*>(&msg); DataPlaneMap* dpmmsg = dynamic_cast<DataPlaneMap*>(&msg);
table.update_dp_port(dpmmsg->get_dp_id(), dpmmsg->get_dp_port(), table.update_dp_port(dpmmsg->get_dp_id(), dpmmsg->get_dp_port(),
dpmmsg->get_vs_id(), dpmmsg->get_vs_port()); dpmmsg->get_vs_id(), dpmmsg->get_vs_port());
} }
return true; return true;
Expand Down
50 changes: 25 additions & 25 deletions pox/ext/rfproxy.py
Expand Up @@ -50,7 +50,7 @@ def delete_dp(self, dp_id):
id_, port = self.vs_to_dp[key] id_, port = self.vs_to_dp[key]
if id_ == dp_id: if id_ == dp_id:
del self.vs_to_dp[key] del self.vs_to_dp[key]

# We're not considering the case of this table becoming invalid when a # We're not considering the case of this table becoming invalid when a
# datapath goes down. When the datapath comes back, the server recreates # datapath goes down. When the datapath comes back, the server recreates
# the association, forcing new map messages to be generated, overriding the # the association, forcing new map messages to be generated, overriding the
Expand All @@ -60,14 +60,14 @@ def delete_dp(self, dp_id):


netmask_prefix = lambda a: sum([bin(int(x)).count("1") for x in a.split(".", 4)]) netmask_prefix = lambda a: sum([bin(int(x)).count("1") for x in a.split(".", 4)])


# TODO: proper support for ID # TODO: add proper support for ID
ID = 140 ID = 0
ipc = MongoIPC.MongoIPCMessageService(MONGO_ADDRESS, MONGO_DB_NAME, str(ID)) ipc = MongoIPC.MongoIPCMessageService(MONGO_ADDRESS, MONGO_DB_NAME, str(ID))
table = Table() table = Table()


# Logging # Logging
log = core.getLogger("rfproxy") log = core.getLogger("rfproxy")

# Base methods # Base methods
def send_of_msg(dp_id, ofmsg): def send_of_msg(dp_id, ofmsg):
topology = core.components['topology'] topology = core.components['topology']
Expand Down Expand Up @@ -110,9 +110,9 @@ def flow_config(dp_id, operation_id):
def flow_add(dp_id, address, netmask, src_hwaddress, dst_hwaddress, dst_port): def flow_add(dp_id, address, netmask, src_hwaddress, dst_hwaddress, dst_port):
netmask = netmask_prefix(netmask) netmask = netmask_prefix(netmask)
address = address + "/" + str(netmask) address = address + "/" + str(netmask)

ofmsg = create_flow_install_msg(address, netmask, ofmsg = create_flow_install_msg(address, netmask,
src_hwaddress, dst_hwaddress, src_hwaddress, dst_hwaddress,
dst_port) dst_port)
if send_of_msg(dp_id, ofmsg) == SUCCESS: if send_of_msg(dp_id, ofmsg) == SUCCESS:
log.info("ofp_flow_mod(add) was sent to datapath (dp_id=%s)", log.info("ofp_flow_mod(add) was sent to datapath (dp_id=%s)",
Expand All @@ -124,7 +124,7 @@ def flow_add(dp_id, address, netmask, src_hwaddress, dst_hwaddress, dst_port):
def flow_delete(dp_id, address, netmask, src_hwaddress): def flow_delete(dp_id, address, netmask, src_hwaddress):
netmask = netmask_prefix(netmask) netmask = netmask_prefix(netmask)
address = address + "/" + str(netmask) address = address + "/" + str(netmask)

ofmsg1 = create_flow_remove_msg(address, netmask, src_hwaddress) ofmsg1 = create_flow_remove_msg(address, netmask, src_hwaddress)
if send_of_msg(dp_id, ofmsg1) == SUCCESS: if send_of_msg(dp_id, ofmsg1) == SUCCESS:
log.info("ofp_flow_mod(delete) was sent to datapath (dp_id=%s)", log.info("ofp_flow_mod(delete) was sent to datapath (dp_id=%s)",
Expand All @@ -140,47 +140,47 @@ def flow_delete(dp_id, address, netmask, src_hwaddress):
else: else:
log.info("Error sending ofp_flow_mod(delete) to datapath (dp_id=%s)", log.info("Error sending ofp_flow_mod(delete) to datapath (dp_id=%s)",
format_id(dp_id)) format_id(dp_id))

# Event handlers # Event handlers
def on_datapath_up(event): def on_datapath_up(event):
topology = core.components['topology'] topology = core.components['topology']
dp_id = event.dpid dp_id = event.dpid

ports = topology.getEntityByID(dp_id).ports ports = topology.getEntityByID(dp_id).ports
for port in ports: for port in ports:
if port <= OFPP_MAX: if port <= OFPP_MAX:
msg = DatapathPortRegister(ct_id=ID, dp_id=dp_id, dp_port=port) msg = DatapathPortRegister(ct_id=ID, dp_id=dp_id, dp_port=port)
ipc.send(RFSERVER_RFPROXY_CHANNEL, RFSERVER_ID, msg) ipc.send(RFSERVER_RFPROXY_CHANNEL, RFSERVER_ID, msg)

log.info("Registering datapath port (dp_id=%s, dp_port=%d)", log.info("Registering datapath port (dp_id=%s, dp_port=%d)",
format_id(dp_id), port) format_id(dp_id), port)

def on_datapath_down(event): def on_datapath_down(event):
dp_id = event.dpid dp_id = event.dpid

log.info("Datapath is down (dp_id=%s)", format_id(dp_id)) log.info("Datapath is down (dp_id=%s)", format_id(dp_id))

table.delete_dp(dp_id) table.delete_dp(dp_id)

msg = DatapathDown(ct_id=ID, dp_id=dp_id) msg = DatapathDown(ct_id=ID, dp_id=dp_id)
ipc.send(RFSERVER_RFPROXY_CHANNEL, RFSERVER_ID, msg) ipc.send(RFSERVER_RFPROXY_CHANNEL, RFSERVER_ID, msg)


def on_packet_in(event): def on_packet_in(event):
packet = event.parsed packet = event.parsed
dp_id = event.dpid dp_id = event.dpid
in_port = event.port in_port = event.port

# Drop all LLDP packets # Drop all LLDP packets
if packet.type == ethernet.LLDP_TYPE: if packet.type == ethernet.LLDP_TYPE:
return return

# If we have a mapping packet, inform RFServer through a Map message # If we have a mapping packet, inform RFServer through a Map message
if packet.type == RF_ETH_PROTO: if packet.type == RF_ETH_PROTO:
vm_id, vm_port = struct.unpack("QB", packet.raw[14:]) vm_id, vm_port = struct.unpack("QB", packet.raw[14:])


log.info("Received mapping packet (vm_id=%s, vm_port=%d, vs_id=%s, vs_port=%d)", log.info("Received mapping packet (vm_id=%s, vm_port=%d, vs_id=%s, vs_port=%d)",
format_id(vm_id), vm_port, event.dpid, event.port) format_id(vm_id), vm_port, event.dpid, event.port)

msg = VirtualPlaneMap(vm_id=vm_id, vm_port=vm_port, msg = VirtualPlaneMap(vm_id=vm_id, vm_port=vm_port,
vs_id=event.dpid, vs_port=event.port) vs_id=event.dpid, vs_port=event.port)
ipc.send(RFSERVER_RFPROXY_CHANNEL, RFSERVER_ID, msg) ipc.send(RFSERVER_RFPROXY_CHANNEL, RFSERVER_ID, msg)
Expand Down Expand Up @@ -214,17 +214,17 @@ def process(self, from_, to, channel, msg):
flow_config(msg.get_dp_id(), msg.get_operation_id()) flow_config(msg.get_dp_id(), msg.get_operation_id())
elif type_ == FLOW_MOD: elif type_ == FLOW_MOD:
if (msg.get_is_removal()): if (msg.get_is_removal()):
flow_delete(msg.get_dp_id(), flow_delete(msg.get_dp_id(),
msg.get_address(), msg.get_netmask(), msg.get_address(), msg.get_netmask(),
msg.get_src_hwaddress()) msg.get_src_hwaddress())
else: else:
flow_add(msg.get_dp_id(), flow_add(msg.get_dp_id(),
msg.get_address(), msg.get_netmask(), msg.get_address(), msg.get_netmask(),
msg.get_src_hwaddress(), msg.get_dst_hwaddress(), msg.get_src_hwaddress(), msg.get_dst_hwaddress(),
msg.get_dst_port()) msg.get_dst_port())

if type_ == DATA_PLANE_MAP: if type_ == DATA_PLANE_MAP:
table.update_dp_port(msg.get_dp_id(), msg.get_dp_port(), table.update_dp_port(msg.get_dp_id(), msg.get_dp_port(),
msg.get_vs_id(), msg.get_vs_port()) msg.get_vs_id(), msg.get_vs_port())


return True return True
Expand Down
34 changes: 17 additions & 17 deletions rfserver/rfserver.py
Expand Up @@ -28,7 +28,7 @@ def __init__(self, configfile):
ch.setLevel(logging.INFO) ch.setLevel(logging.INFO)
ch.setFormatter(logging.Formatter(logging.BASIC_FORMAT)) ch.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
self.log.addHandler(ch) self.log.addHandler(ch)

self.ipc = MongoIPC.MongoIPCMessageService(MONGO_ADDRESS, MONGO_DB_NAME, RFSERVER_ID) self.ipc = MongoIPC.MongoIPCMessageService(MONGO_ADDRESS, MONGO_DB_NAME, RFSERVER_ID)
self.ipc.listen(RFCLIENT_RFSERVER_CHANNEL, self, self, False) self.ipc.listen(RFCLIENT_RFSERVER_CHANNEL, self, self, False)
self.ipc.listen(RFSERVER_RFPROXY_CHANNEL, self, self, True) self.ipc.listen(RFSERVER_RFPROXY_CHANNEL, self, self, True)
Expand All @@ -42,8 +42,8 @@ def process(self, from_, to, channel, msg):
ri.from_message(msg) ri.from_message(msg)
self.register_route_information(ri) self.register_route_information(ri)
elif type_ == DATAPATH_PORT_REGISTER: elif type_ == DATAPATH_PORT_REGISTER:
self.register_dp_port(msg.get_ct_id(), self.register_dp_port(msg.get_ct_id(),
msg.get_dp_id(), msg.get_dp_id(),
msg.get_dp_port()) msg.get_dp_port())
elif type_ == DATAPATH_DOWN: elif type_ == DATAPATH_DOWN:
self.set_dp_down(msg.get_ct_id(), msg.get_dp_id()) self.set_dp_down(msg.get_ct_id(), msg.get_dp_id())
Expand All @@ -63,7 +63,7 @@ def register_vm_port(self, vm_id, vm_port):
action = REGISTER_IDLE action = REGISTER_IDLE
else: else:
entry = self.rftable.get_entry_by_dp_port(config_entry.ct_id, entry = self.rftable.get_entry_by_dp_port(config_entry.ct_id,
config_entry.dp_id, config_entry.dp_id,
config_entry.dp_port) config_entry.dp_port)
# If there's no entry, we have no DP, register VM as idle # If there's no entry, we have no DP, register VM as idle
if entry is None: if entry is None:
Expand Down Expand Up @@ -169,12 +169,12 @@ def register_dp_port(self, ct_id, dp_id, dp_port):
self.rftable.set_entry(entry) self.rftable.set_entry(entry)
self.config_vm_port(entry.vm_id, entry.vm_port) self.config_vm_port(entry.vm_id, entry.vm_port)
self.log.info("Registering datapath port and associating to client port (dp_id=%s, dp_port=%i, vm_id=%s, vm_port=%s)" % self.log.info("Registering datapath port and associating to client port (dp_id=%s, dp_port=%i, vm_id=%s, vm_port=%s)" %
(format_id(dp_id), dp_port, format(entry.vm_id), entry.vm_port)) (format_id(dp_id), dp_port, format(entry.vm_id), entry.vm_port))


def send_datapath_config_message(self, ct_id, dp_id, operation_id): def send_datapath_config_message(self, ct_id, dp_id, operation_id):
self.ipc.send(RFSERVER_RFPROXY_CHANNEL, str(ct_id), self.ipc.send(RFSERVER_RFPROXY_CHANNEL, str(ct_id),
DatapathConfig(ct_id=ct_id, DatapathConfig(ct_id=ct_id,
dp_id=dp_id, dp_id=dp_id,
operation_id=operation_id)) operation_id=operation_id))


def config_dp(self, ct_id, dp_id): def config_dp(self, ct_id, dp_id):
Expand All @@ -188,15 +188,15 @@ def config_dp(self, ct_id, dp_id):
# Configure a normal switch. Clear the tables and install default flows. # Configure a normal switch. Clear the tables and install default flows.
self.send_datapath_config_message(ct_id, dp_id, DC_CLEAR_FLOW_TABLE); self.send_datapath_config_message(ct_id, dp_id, DC_CLEAR_FLOW_TABLE);
# TODO: enforce order: clear should always be executed first # TODO: enforce order: clear should always be executed first
self.send_datapath_config_message(dp_id, DC_DROP_ALL); self.send_datapath_config_message(ct_id, dp_id, DC_DROP_ALL);
self.send_datapath_config_message(dp_id, DC_OSPF); self.send_datapath_config_message(ct_id, dp_id, DC_OSPF);
self.send_datapath_config_message(dp_id, DC_BGP_PASSIVE); self.send_datapath_config_message(ct_id, dp_id, DC_BGP_PASSIVE);
self.send_datapath_config_message(dp_id, DC_BGP_ACTIVE); self.send_datapath_config_message(ct_id, dp_id, DC_BGP_ACTIVE);
self.send_datapath_config_message(dp_id, DC_RIPV2); self.send_datapath_config_message(ct_id, dp_id, DC_RIPV2);
self.send_datapath_config_message(dp_id, DC_ARP); self.send_datapath_config_message(ct_id, dp_id, DC_ARP);
self.send_datapath_config_message(dp_id, DC_ICMP); self.send_datapath_config_message(ct_id, dp_id, DC_ICMP);
self.send_datapath_config_message(dp_id, DC_LDP_PASSIVE); self.send_datapath_config_message(ct_id, dp_id, DC_LDP_PASSIVE);
self.send_datapath_config_message(dp_id, DC_LDP_ACTIVE); self.send_datapath_config_message(ct_id, dp_id, DC_LDP_ACTIVE);
self.log.info("Configuring datapath (dp_id=%s)" % format_id(dp_id)) self.log.info("Configuring datapath (dp_id=%s)" % format_id(dp_id))
return is_rfvs(dp_id) return is_rfvs(dp_id)


Expand Down Expand Up @@ -232,7 +232,7 @@ def map_port(self, vm_id, vm_port, vs_id, vs_port):
# If the association is valid, activate it # If the association is valid, activate it
entry.activate(vs_id, vs_port) entry.activate(vs_id, vs_port)
self.rftable.set_entry(entry) self.rftable.set_entry(entry)
msg = DataPlaneMap(ct_id=entry.ct_id, msg = DataPlaneMap(ct_id=entry.ct_id,
dp_id=entry.dp_id, dp_port=entry.dp_port, dp_id=entry.dp_id, dp_port=entry.dp_port,
vs_id=vs_id, vs_port=vs_port) vs_id=vs_id, vs_port=vs_port)
self.ipc.send(RFSERVER_RFPROXY_CHANNEL, str(entry.ct_id), msg) self.ipc.send(RFSERVER_RFPROXY_CHANNEL, str(entry.ct_id), msg)
Expand Down
6 changes: 3 additions & 3 deletions rftest/rftest1config.csv
@@ -1,3 +1,3 @@
vm_id,vm_port,dp_id,dp_port vm_id,vm_port,ct_id,dp_id,dp_port
12A0A0A0A0A0,1,99,1 12A0A0A0A0A0,1,0,99,1
12A0A0A0A0A0,2,99,2 12A0A0A0A0A0,2,0,99,2

0 comments on commit 70b29ba

Please sign in to comment.