Skip to content

Commit

Permalink
Added support from learning from past BGP updates
Browse files Browse the repository at this point in the history
  • Loading branch information
vkotronis committed Oct 21, 2019
1 parent 6f559d3 commit a8ec7a5
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 26 deletions.
10 changes: 10 additions & 0 deletions backend/core/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,16 @@ def __check_monitors(self, _monitors):
raise ArtemisError(
"invalid-exabgp-autoconf-flag", entry["autoconf"]
)
elif key == "bgpstreamhist":
if "dir" in info and "autoconf" in info:
if info["autoconf"] == "true":
info["autoconf"] = True
elif info["autoconf"] == "false":
del info["autoconf"]
else:
raise ArtemisError(
"invalid-bgpostreamhist-autoconf-flag", info["autoconf"]
)

@staticmethod
def __check_asns(_asns):
Expand Down
24 changes: 13 additions & 11 deletions monitor/core/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,17 +367,19 @@ def init_bgpstreamhist_instance(self):
)
)
bgpstreamhist_dir = self.monitors["bgpstreamhist"]
p = Popen(
[
"/usr/local/bin/python3",
"taps/bgpstreamhist.py",
"--prefixes",
self.prefix_file,
"--dir",
bgpstreamhist_dir,
],
shell=False,
)
if "dir" in self.monitors["bgpstreamhist"]:
bgpstreamhist_dir = self.monitors["bgpstreamhist"]["dir"]
bgpstreamhist_cmd = [
"/usr/local/bin/python3",
"taps/bgpstreamhist.py",
"--prefixes",
self.prefix_file,
"--dir",
bgpstreamhist_dir,
]
if "autoconf" in self.monitors["bgpstreamhist"]:
bgpstreamhist_cmd.append("-a")
p = Popen(bgpstreamhist_cmd, shell=False)
self.process_ids.append(
(
"[bgpstreamhist] {} {}".format(
Expand Down
35 changes: 30 additions & 5 deletions monitor/core/taps/bgpstreamhist.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@
log = get_logger()


def parse_bgpstreamhist_csvs(prefixes_file=None, input_dir=None):
def parse_bgpstreamhist_csvs(prefixes_file=None, input_dir=None, autoconf=False):

prefixes = load_json(prefixes_file)
assert prefixes is not None

with Connection(RABBITMQ_URI) as connection:
exchange = Exchange(
update_exchange = Exchange(
"bgp-update", channel=connection, type="direct", durable=False
)
exchange.declare()
update_exchange.declare()
autoconf_exchange = Exchange(
"autoconf-local", channel=connection, type="direct", durable=False
)
update_exchange.declare()
autoconf_exchange.declare()
producer = Producer(connection)
validator = mformat_validator()
for csv_file in glob.glob("{}/*.csv".format(input_dir)):
Expand Down Expand Up @@ -77,9 +82,17 @@ def parse_bgpstreamhist_csvs(prefixes_file=None, input_dir=None):
for msg in msgs:
key_generator(msg)
log.debug(msg)
if autoconf:
producer.publish(
msg,
exchange=autoconf_exchange,
routing_key="update",
serializer="json",
priority=4,
)
producer.publish(
msg,
exchange=exchange,
exchange=update_exchange,
routing_key="update",
serializer="json",
)
Expand Down Expand Up @@ -114,12 +127,24 @@ def parse_bgpstreamhist_csvs(prefixes_file=None, input_dir=None):
default=None,
help="Directory with csvs to read",
)
parser.add_argument(
"-a",
"--autoconf",
dest="autoconf",
action="store_true",
help="Use the feed from this historical route collector to build the configuration",
)

args = parser.parse_args()
dir_ = args.dir.rstrip("/")
log.info(
"Starting BGPstreamhist on {} for {} (auto-conf: {})".format(
dir_, args.prefixes_file, args.autoconf
)
)

try:
parse_bgpstreamhist_csvs(args.prefixes_file, dir_)
parse_bgpstreamhist_csvs(args.prefixes_file, dir_, args.autoconf)
except Exception:
log.exception("exception")
except KeyboardInterrupt:
Expand Down
20 changes: 10 additions & 10 deletions monitor/core/taps/exabgp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,20 @@ def exabgp_msg(bgp_message):
for msg in msgs:
key_generator(msg)
log.debug(msg)
if self.autoconf:
producer.publish(
msg,
exchange=self.autoconf_exchange,
routing_key="update",
serializer="json",
priority=4,
)
producer.publish(
msg,
exchange=self.update_exchange,
routing_key="update",
serializer="json",
)
producer.publish(
msg,
exchange=self.autoconf_exchange,
routing_key="update",
serializer="json",
priority=4,
)
else:
log.warning("Invalid format message: {}".format(msg))

Expand All @@ -101,7 +102,7 @@ def exabgp_msg(bgp_message):
log.exception("exception")

def exit(self):
print("Exiting ExaBGP")
log.info("Exiting ExaBGP")
if self.sio is not None:
self.sio.disconnect()
self.sio.wait()
Expand All @@ -128,7 +129,6 @@ def exit(self):
parser.add_argument(
"-a",
"--autoconf",
type=bool,
dest="autoconf",
action="store_true",
help="Use the feed from this local route collector to build the configuration",
Expand All @@ -137,7 +137,7 @@ def exit(self):
args = parser.parse_args()
ping_redis(redis)

print(
log.info(
"Starting ExaBGP on {} for {} (auto-conf: {})".format(
args.host, args.prefixes_file, args.autoconf
)
Expand Down

0 comments on commit a8ec7a5

Please sign in to comment.