Skip to content

Commit

Permalink
Merge pull request #1361 from anarkiwi/master
Browse files Browse the repository at this point in the history
Experimental event notification.
  • Loading branch information
anarkiwi committed Dec 5, 2017
2 parents 5dc67a2 + 7862012 commit 0f76717
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 5 deletions.
11 changes: 10 additions & 1 deletion faucet/faucet.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from faucet.valve_util import dpid_log, get_logger, kill_on_exception, get_bool_setting, get_setting
from faucet.valve import valve_factory, SUPPORTED_HARDWARE
from faucet import faucet_experimental_api
from faucet import faucet_experimental_event
from faucet import faucet_bgp
from faucet import faucet_metrics
from faucet import valve_util
Expand Down Expand Up @@ -106,6 +107,8 @@ def __init__(self, *args, **kwargs):

self.dpset = kwargs['dpset']
self.api = kwargs['faucet_experimental_api']
self.notifier = faucet_experimental_event.FaucetExperimentalEventNotifier(
get_setting('FAUCET_EVENT_SOCK'))

# Setup logging
self.logger = get_logger(
Expand All @@ -120,9 +123,15 @@ def __init__(self, *args, **kwargs):
self.metrics = faucet_metrics.FaucetMetrics()
self._bgp = faucet_bgp.FaucetBgp(self.logger, self._send_flow_msgs)

@kill_on_exception(exc_logname)
def start(self):
super(Faucet, self).start()

# Start event notifier
notifier_thread = self.notifier.start()
if notifier_thread is not None:
self.threads.append(notifier_thread)

# Start Prometheus
prom_port = int(get_setting('FAUCET_PROMETHEUS_PORT'))
prom_addr = get_setting('FAUCET_PROMETHEUS_ADDR')
Expand Down Expand Up @@ -170,7 +179,7 @@ def _apply_configs_new(self, dp_id, new_dp):
self.logger.info('Add new datapath %s', dpid_log(dp_id))
valve_cl = valve_factory(new_dp)
if valve_cl is not None:
return valve_cl(new_dp, self.logname)
return valve_cl(new_dp, self.logname, self.notifier)
self.logger.error(
'%s hardware %s must be one of %s',
new_dp.name,
Expand Down
78 changes: 78 additions & 0 deletions faucet/faucet_experimental_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Experimental FAUCET event notification."""

#### THIS API IS EXPERIMENTAL.
#### Discuss with faucet-dev list before relying on this API,
#### review http://www.hyrumslaw.com/.
#### It is subject to change without notice.

# TODO: events are currently schema-less. This is to facilitate rapid prototyping, and will change.
# TODO: not all cases where a notified client fails or could block, have been tested.

# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
# Copyright (C) 2015 Brad Cowie, Christopher Lorier and Joe Stringer.
# Copyright (C) 2015 Research and Education Advanced Network New Zealand Ltd.
# Copyright (C) 2015--2017 The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import queue
import socket
import time

from ryu.lib import hub
from ryu.lib.hub import StreamServer


class FaucetExperimentalEventNotifier(object):
"""Event notification, via Unix domain socket."""

def __init__(self, socket_path):
self.socket_path = socket_path
self.event_q = queue.Queue(16)

def start(self):
"""Start socket server."""
if self.socket_path:
return hub.spawn(
StreamServer((self.socket_path, None), self._loop).serve_forever)
return None

def _loop(self, _sock, _addr):
"""Serve events."""
while True:
event = self.event_q.get_nowait()
if event:
event_bytes = bytes('\n'.join((json.dumps(event), '')).encode('UTF-8'))
try:
_sock.sendall(event_bytes)
except (socket.error, IOError):
return
continue
hub.sleep(1)

def notify(self, dp_id, dp_name, event_dict):
"""Notify of an event."""
assert isinstance(event_dict, dict)
event = {
'version': 1,
'time': time.time(),
'dp_id': dp_id,
'dp_name': dp_name,
}
for header_key in list(event):
assert header_key not in event_dict
event.update(event_dict)
if self.socket_path:
if not self.event_q.full():
self.event_q.put(event)
29 changes: 26 additions & 3 deletions faucet/valve.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ class Valve(object):
base_prom_labels = None
recent_ofmsgs = queue.Queue(maxsize=32)

def __init__(self, dp, logname):
def __init__(self, dp, logname, notifier):
self.dp = dp
self.logger = ValveLogger(
logging.getLogger(logname + '.valve'), self.dp.dp_id)
self.notifier = notifier
self.base_prom_labels = {
'dp_id': hex(self.dp.dp_id),
'dp_name': self.dp.name,
Expand Down Expand Up @@ -118,6 +119,10 @@ def __init__(self, dp, logname):
self.dp.timeout, self.dp.learn_jitter, self.dp.learn_ban_timeout,
self.dp.low_priority, self.dp.highest_priority)

def _notify(self, event_dict):
"""Send an event notification."""
self.notifier.notify(self.dp.dp_id, self.dp.name, event_dict)

def switch_features(self, _msg):
"""Send configuration flows necessary for the switch implementation.
Expand Down Expand Up @@ -295,6 +300,11 @@ def _add_ports_and_vlans(self, discovered_port_nums):
return ofmsgs

def port_status_handler(self, port_no, reason, port_status):
self._notify(
{'PORT_CHANGE': {
'port_no': port_no,
'reason': reason,
'status': port_status}})
if reason == valve_of.ofp.OFPPR_ADD:
return self.port_add(port_no)
elif reason == valve_of.ofp.OFPPR_DELETE:
Expand Down Expand Up @@ -330,6 +340,9 @@ def datapath_connect(self, discovered_up_port_nums):
list: OpenFlow messages to send to datapath.
"""
self.logger.info('Cold start configuring DP')
self._notify(
{'DP_CHANGE': {
'reason': 'cold_start'}})
ofmsgs = []
ofmsgs.append(valve_of.faucet_config())
ofmsgs.append(valve_of.faucet_async())
Expand All @@ -340,9 +353,12 @@ def datapath_connect(self, discovered_up_port_nums):
return ofmsgs

def datapath_disconnect(self):
"""Handle Ryu datapath disconnection event. """
self.dp.running = False
"""Handle Ryu datapath disconnection event."""
self.logger.warning('datapath down')
self._notify(
{'DP_CHANGE': {
'reason': 'disconnect'}})
self.dp.running = False

def _port_add_acl(self, port, cold_start=False):
ofmsgs = []
Expand Down Expand Up @@ -656,6 +672,13 @@ def _learn_host(self, other_valves, pkt_meta):
pkt_meta.eth_src, pkt_meta.eth_type,
pkt_meta.l3_src, pkt_meta.port,
pkt_meta.vlan.vid, pkt_meta.vlan.hosts_count()))
self._notify(
{'L2_LEARN': {
'port_no': pkt_meta.port.number,
'vid': pkt_meta.vlan.vid,
'eth_src': pkt_meta.eth_src,
'eth_type': pkt_meta.eth_type,
'l3_src_ip': pkt_meta.l3_src}})
return ofmsgs

def parse_rcv_packet(self, in_port, vlan_vid, eth_type, data, orig_len, pkt, eth_pkt):
Expand Down
1 change: 1 addition & 0 deletions faucet/valve_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def get_sys_prefix():
'FAUCET_CONFIG_STAT_RELOAD': '0',
'FAUCET_LOG_LEVEL': 'INFO',
'FAUCET_LOG': _PREFIX + '/var/log/ryu/faucet/faucet.log',
'FAUCET_EVENT_SOCK': '',
'FAUCET_EXCEPTION_LOG': _PREFIX + '/var/log/ryu/faucet/faucet_exception.log',
'FAUCET_PROMETHEUS_PORT': '9302',
'FAUCET_PROMETHEUS_ADDR': '',
Expand Down
8 changes: 8 additions & 0 deletions tests/faucet_mininet_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import re
import shutil
import subprocess
import tempfile
import time
import unittest
import yaml
Expand Down Expand Up @@ -95,6 +96,7 @@ class FaucetTestBase(unittest.TestCase):
config_ports = {}
env = collections.defaultdict(dict)
rand_dpids = set()
event_sock = None


def __init__(self, name, config, root_tmpdir, ports_sock, max_test_load):
Expand All @@ -103,6 +105,7 @@ def __init__(self, name, config, root_tmpdir, ports_sock, max_test_load):
self.root_tmpdir = root_tmpdir
self.ports_sock = ports_sock
self.max_test_load = max_test_load
self.event_sock = os.path.join(tempfile.mkdtemp(), 'event.sock')

def rand_dpid(self):
reserved_range = 100
Expand All @@ -125,6 +128,7 @@ def _set_prom_port(self, name='faucet'):
def _set_static_vars(self):
self._set_var_path('faucet', 'FAUCET_CONFIG', 'faucet.yaml')
self._set_var_path('faucet', 'FAUCET_LOG', 'faucet.log')
self._set_var('faucet', 'FAUCET_EVENT_SOCK', self.event_sock)
self._set_var_path('faucet', 'FAUCET_EXCEPTION_LOG', 'faucet-exception.log')
self._set_var_path('gauge', 'GAUGE_CONFIG', 'gauge.yaml')
self._set_var_path('gauge', 'GAUGE_LOG', 'gauge.log')
Expand Down Expand Up @@ -263,6 +267,8 @@ def tearDown(self):
if self.net is not None:
self.net.stop()
self.net = None
if os.path.exists(self.event_sock):
shutil.rmtree(os.path.dirname(self.event_sock))
faucet_mininet_test_util.return_free_ports(
self.ports_sock, self._test_name())
if 'OVS_LOGDIR' in os.environ:
Expand Down Expand Up @@ -468,6 +474,8 @@ def _controllers_healthy(self):
for controller in self.net.controllers:
if not controller.healthy():
return False
if not os.path.exists(self.env['faucet']['FAUCET_EVENT_SOCK']):
return False
return True

def _controllers_connected(self):
Expand Down
5 changes: 5 additions & 0 deletions tests/faucet_mininet_test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,15 @@ def setUp(self):

def test_untagged(self):
"""All hosts on the same untagged VLAN should have connectivity."""
self.event_log = os.path.join(self.tmpdir, 'event.log')
controller = self._get_controller()
controller.cmd(faucet_mininet_test_util.timeout_cmd(
'nc -U %s > %s' % (self.env['faucet']['FAUCET_EVENT_SOCK'], self.event_log), 60))
self.ping_all_when_learned()
self.flap_all_switch_ports()
self.gauge_smoke_test()
self.prometheus_smoke_test()
self.assertGreater(os.path.getsize(self.event_log), 0)


class FaucetExperimentalAPITest(FaucetUntaggedTest):
Expand Down
6 changes: 5 additions & 1 deletion tests/test_valve.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from faucet.valve import valve_factory
from faucet.config_parser import dp_parser
from faucet import valve_packet
from faucet import faucet_experimental_event


def build_pkt(pkt):
Expand Down Expand Up @@ -131,10 +132,13 @@ class ValveTestBase(unittest.TestCase):

def setup_valve(self, config):
self.tmpdir = tempfile.mkdtemp()
self.faucet_event_sock = os.path.join(self.tmpdir, 'event.sock')
self.config_file = os.path.join(self.tmpdir, 'valve_unit.yaml')
self.table = FakeOFTable(self.NUM_TABLES)
self.notifier = faucet_experimental_event.FaucetExperimentalEventNotifier(
self.faucet_event_sock)
dp = self.update_config(config)
self.valve = valve_factory(dp)(dp, 'test_valve')
self.valve = valve_factory(dp)(dp, 'test_valve', self.notifier)

def update_config(self, config):
with open(self.config_file, 'w') as config_file:
Expand Down

0 comments on commit 0f76717

Please sign in to comment.