Skip to content

Commit

Permalink
Add counters to FilteredList classes
Browse files Browse the repository at this point in the history
- Add new filter rule match tracking support to the ksconf.filter.FilteredList
  and derived classes for future features using a counter.
- Fixed some bad hidden behaviors in FilterList and derived classes where
  feeding new rules after the first call to match() would cause breakage in
  for regex/wildcard use cases.
- Added new unit tests to test FilteredList classes directly, rather than
  relying on CLI test for filter/promote. (For now there's some overlap, but
- Fixed some minor issues & adding missing teardown for metadata unit testing.
  • Loading branch information
lowell80 committed Jun 16, 2020
1 parent a963171 commit dbf1b82
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 13 deletions.
62 changes: 49 additions & 13 deletions ksconf/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import re
import sys

from collections import Counter

from ksconf.conf.parser import GLOBAL_STANZA


Expand All @@ -15,6 +17,8 @@ class FilteredList(object):

def __init__(self, flags=0):
self.data = []
self.rules = None
self.counter = Counter()
self.flags = flags
self._prep = True

Expand Down Expand Up @@ -54,13 +58,20 @@ def match(self, item):
# Kick off any first-time preparatory activities
if self._prep is False:
self._pre_match()
self.reset_counters()
self._prep = True

# Q: Is this the best way to handle global entries?
if item is GLOBAL_STANZA:
item = "default"

result = self._match(item)
ret = self._match(item)
if ret:
self.counter[ret] += 1
result = True
else:
result = False

else:
# No patterns defined. No filter rule(s) => allow all through
return True
Expand All @@ -69,11 +80,17 @@ def match(self, item):
else:
return result

def reset_counters(self):
# Set all the counters to 0, so the caller can know which filters had 0 hits
self.counter = Counter()
self.counter.update((n, 0) for n in self.data)

@property
def has_rules(self):
return len(self.data) > 0
return bool(self.data)

def _match(self, item): # pragma: no cover
""" Return name of rule, indicating a match or not. """
raise NotImplementedError


Expand All @@ -83,44 +100,63 @@ class FilteredListString(FilteredList):
def _pre_match(self):
if self.flags & self.IGNORECASE:
# Lower-case all strings in self.data. (Only need to do this once)
self.data = {i.lower() for i in self.data}
self.rules = {i.lower() for i in self.data}
else:
self.rules = set(self.data)
return self.rules

def _match(self, item):
if self.flags & self.IGNORECASE:
item = item.lower()
return item in self.data
if item in self.rules:
return item
else:
return False

def reset_counters(self):
self.counter = Counter()
self.counter.update({n: 0 for n in self.rules})


class FilteredListRegex(FilteredList):
""" Regular Expression support """
def _pre_match(self):

def calc_regex_flags(self):
re_flags = 0
if self.flags & self.IGNORECASE:
re_flags |= re.IGNORECASE
return re_flags

def _pre_match(self):
# Compile all regular expressions
re_flags = self.calc_regex_flags()
# XXX: Add better error handling here for friendlier user feedback
self.data = [re.compile(pattern, re_flags) for pattern in self.data]
self.rules = [(pattern, re.compile(pattern, re_flags)) for pattern in self.data]

def _match(self, item):
for pattern_re in self.data:
for name, pattern_re in self.rules:
if pattern_re.match(item):
return True
#self.counter[name] += 1
return name
return False

def reset_counters(self):
self.counter = Counter()
self.counter.update({i[0]: 0 for i in self.rules})


class FilterListWildcard(FilteredListRegex):
""" Wildcard support (handling '*' and ?')
Technically fnmatch also supports [] and [!] character ranges, but we don't advertise that
"""

def _pre_match(self):
# Use fnmatch to translate wildcard expression to a regex
self.data = [fnmatch.translate(pat) for pat in self.data]
# Now call regex (parent version)
super(FilterListWildcard, self)._pre_match()
# Use fnmatch to translate wildcard expression to a regex, and compile regex
re_flags = self.calc_regex_flags()
self.rules = [(wc, re.compile(fnmatch.translate(wc), re_flags)) for wc in self.data]


def create_filtered_list(match_mode, flags):
def create_filtered_list(match_mode, flags=0):
if match_mode == "string":
return FilteredListString(flags)
elif match_mode == "wildcard":
Expand Down
120 changes: 120 additions & 0 deletions tests/test_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#!/usr/bin/env python

from __future__ import absolute_import, unicode_literals

import os
import sys
import unittest

from io import open

# Allow interactive execution from CLI, cd tests; ./test_meta.py
if __package__ is None:
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))


from ksconf.filter import (FilteredList, create_filtered_list,
FilteredListString, FilteredListRegex, FilterListWildcard)


class FilterTestCase(unittest.TestCase):

sample01 = [
"ftp:exchange",
"ftp:transfer",
"ftp:auth",
"ftp:debug",
"http:exchange",
"http:auth",
]

def test_helper_function(self):
self.assertIsInstance(create_filtered_list("string"), FilteredListString)
self.assertIsInstance(create_filtered_list("regex"), FilteredListRegex)
self.assertIsInstance(create_filtered_list("wildcard"), FilterListWildcard)

def filter(self, filter_type, filters, items, flags=0):
fl = create_filtered_list(filter_type, flags)
fl.feedall(filters)
return (fl, [item for item in items if fl.match(item)])

def test_string(self):
fl, res = self.filter("string", ("ftp:auth", "http:auth"), self.sample01)
self.assertEqual(res, ["ftp:auth", "http:auth"])

def test_string_blackslist(self):
fl, res = self.filter("string", ("ftp:auth", "http:auth"), self.sample01,
flags=FilteredList.BLACKLIST)
self.assertEqual(res, ["ftp:exchange", "ftp:transfer", "ftp:debug", "http:exchange"])

def test_regex(self):
fl, res = self.filter("regex", ("ftp:\w+",), self.sample01)
self.assertEqual(res, ["ftp:exchange", "ftp:transfer", "ftp:auth", "ftp:debug"])

fl, res = self.filter("regex", ("\w+:auth",), self.sample01)
self.assertEqual(res, ["ftp:auth", "http:auth"])

def test_wildcard(self):
fl, res = self.filter("wildcard", ("http:*",), self.sample01)
self.assertEqual(res, ["http:exchange", "http:auth"])

def test_reload_and_counter_reset(self):
sample = self.sample01
fl, res = self.filter("wildcard", ("http:*",), sample)
self.assertEqual(res, ["http:exchange", "http:auth"])
self.assertEqual(fl.counter["http:*"], 2)
# After running the filter once, add another filter rule and add more items.
fl.feed("ftp:*")
res2 = [item for item in sample if fl.match(item)]
self.assertEqual(len(fl.rules), 2)
self.assertEqual(res2, sample)
# Note that 'http:*' is 2 again, and NOT 4. Counters were reset after earlier match() call.
self.assertEqual(fl.counter["http:*"], 2)
self.assertEqual(fl.counter["ftp:*"], 4)

def test_string_counter(self):
fl, res = self.filter("string", ("http:auth", "ftp:auth", "ftp:bogus"), self.sample01)
self.assertEqual(res, ["ftp:auth", "http:auth"])
self.assertEqual(fl.counter["ftp:auth"], 1)
self.assertEqual(fl.counter["http:auth"], 1)
# Ensure that 0 matches still gets reported
self.assertEqual(fl.counter["ftp:bogus"], 0)
self.assertEqual(len(fl.counter), 3)

def test_wildcard_counter(self):
fl, res = self.filter("wildcard", ("http:*",), self.sample01)
self.assertEqual(res, ["http:exchange", "http:auth"])
self.assertEqual(fl.counter["http:*"], 2)
self.assertEqual(len(fl.counter), 1)

fl, res = self.filter("wildcard", ("*:auth", "*nomatch*"), self.sample01)
self.assertEqual(res, ["ftp:auth", "http:auth"])
self.assertEqual(fl.counter["*:auth"], 2)
self.assertEqual(fl.counter["*nomatch*"], 0)
self.assertEqual(len(fl.counter), 2)

def test_string_blacklist_counter(self):
# Note that blacklist (match inversion) doesn't change the counter numbers calculation.
fl, res = self.filter("string", ("http:auth", "ftp:auth", "ftp:bogus"), self.sample01,
flags=FilteredList.BLACKLIST)
self.assertEqual(res, ["ftp:exchange", "ftp:transfer", "ftp:debug", "http:exchange"])
self.assertEqual(fl.counter["ftp:auth"], 1)
self.assertEqual(fl.counter["http:auth"], 1)
# Ensure that 0 matches still gets reported
self.assertEqual(fl.counter["ftp:bogus"], 0)
self.assertEqual(len(fl.counter), 3)

def test_string_ignorecase_counter(self):
# Note that blacklist (match inversion) doesn't change the counter numbers calculation.
sample = list(self.sample01)
sample[4] = sample[4].upper()
fl, res = self.filter("string", ("http:AUTH", "fTp:AuTh"), self.sample01, flags=FilteredList.IGNORECASE)
self.assertEqual(res, ["ftp:auth", "http:auth"])
# Note that the counter values are now lower case too
self.assertEqual(fl.counter["ftp:auth"], 1)
self.assertEqual(fl.counter["http:auth"], 1)
self.assertEqual(len(fl.counter), 2)


if __name__ == '__main__': # pragma: no cover
unittest.main()
3 changes: 3 additions & 0 deletions tests/test_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class MetaDataTestCase(unittest.TestCase):
def setUp(self):
self.twd = TestWorkDir()

def tearDown(self):
# Cleanup test working directory
self.twd.clean()

@property
def sample01(self):
Expand Down

0 comments on commit dbf1b82

Please sign in to comment.