Skip to content

Commit

Permalink
ExtendedCymruInfo code review and availability check in test cases;
Browse files Browse the repository at this point in the history
max sleep time check of too long sleep increased to 1 second
  • Loading branch information
sebres committed Sep 30, 2016
1 parent ba9a889 commit 276759b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 21 deletions.
24 changes: 10 additions & 14 deletions fail2ban/server/banmanager.py
Expand Up @@ -143,10 +143,12 @@ def getBanListExtendedCymruInfo(self):
return_dict["country"].append("error")
return_dict["rir"].append("error")
return return_dict
self.__lock.acquire()
# get ips in lock:
with self.__lock:
banIPs = [banData.getIP() for banData in self.__banList.values()]
# get cymru info:
try:
for banData in self.__banList.values():
ip = banData.getIP()
for ip in banIPs:
# Reference: http://www.team-cymru.org/Services/ip-to-asn.html#dns
question = ip.getPTR(
"origin.asn.cymru.com" if ip.isIPv4
Expand All @@ -170,14 +172,17 @@ def getBanListExtendedCymruInfo(self):
except dns.exception.DNSException as dnse:
logSys.error("Unhandled DNSException querying Cymru for %s TXT" % question)
logSys.exception(dnse)
return_dict["error"] = dnse
break
except Exception as e:
logSys.error("Unhandled Exception querying Cymru for %s TXT" % question)
logSys.exception(e)
return_dict["error"] = e
break
except Exception as e:
logSys.error("Failure looking up extended Cymru info")
logSys.exception(e)
finally:
self.__lock.release()
return_dict["error"] = e
return return_dict

##
Expand All @@ -188,15 +193,12 @@ def getBanListExtendedCymruInfo(self):
# @return list of Banned ASNs

def geBanListExtendedASN(self, cymru_info):
self.__lock.acquire()
try:
return [asn for asn in cymru_info["asn"]]
except Exception as e:
logSys.error("Failed to lookup ASN")
logSys.exception(e)
return []
finally:
self.__lock.release()

##
# Returns list of Banned Countries from Cymru info
Expand All @@ -206,15 +208,12 @@ def geBanListExtendedASN(self, cymru_info):
# @return list of Banned Countries

def geBanListExtendedCountry(self, cymru_info):
self.__lock.acquire()
try:
return [country for country in cymru_info["country"]]
except Exception as e:
logSys.error("Failed to lookup Country")
logSys.exception(e)
return []
finally:
self.__lock.release()

##
# Returns list of Banned RIRs from Cymru info
Expand All @@ -224,15 +223,12 @@ def geBanListExtendedCountry(self, cymru_info):
# @return list of Banned RIRs

def geBanListExtendedRIR(self, cymru_info):
self.__lock.acquire()
try:
return [rir for rir in cymru_info["rir"]]
except Exception as e:
logSys.error("Failed to lookup RIR")
logSys.exception(e)
return []
finally:
self.__lock.release()

##
# Create a ban ticket.
Expand Down
19 changes: 13 additions & 6 deletions fail2ban/tests/banmanagertestcase.py
Expand Up @@ -147,26 +147,33 @@ def tearDown(self):
"""Call after every test case."""
pass

def testCymruInfo(self):
def _getBanListExtendedCymruInfo(self):
cymru_info = self.__banManager.getBanListExtendedCymruInfo()
if cymru_info.get("error"): # pragma: no cover - availability
raise unittest.SkipTest('Skip test because service is not available: %s' % cymru_info["error"])
return cymru_info


def testCymruInfo(self):
cymru_info = self._getBanListExtendedCymruInfo()
self.assertDictEqual(cymru_info,
{"asn": [self.__asn],
"country": [self.__country],
"rir": [self.__rir]})

def testCymruInfoASN(self):
self.assertEqual(
self.__banManager.geBanListExtendedASN(self.__banManager.getBanListExtendedCymruInfo()),
self.__banManager.geBanListExtendedASN(self._getBanListExtendedCymruInfo()),
[self.__asn])

def testCymruInfoCountry(self):
self.assertEqual(
self.__banManager.geBanListExtendedCountry(self.__banManager.getBanListExtendedCymruInfo()),
self.__banManager.geBanListExtendedCountry(self._getBanListExtendedCymruInfo()),
[self.__country])

def testCymruInfoRIR(self):
self.assertEqual(
self.__banManager.geBanListExtendedRIR(self.__banManager.getBanListExtendedCymruInfo()),
self.__banManager.geBanListExtendedRIR(self._getBanListExtendedCymruInfo()),
[self.__rir])

def testCymruInfoNxdomain(self):
Expand All @@ -175,7 +182,7 @@ def testCymruInfoNxdomain(self):
# non-existing IP
ticket = BanTicket("0.0.0.0", 1167605999.0)
self.assertTrue(self.__banManager.addBanTicket(ticket))
cymru_info = self.__banManager.getBanListExtendedCymruInfo()
cymru_info = self._getBanListExtendedCymruInfo()
self.assertDictEqual(cymru_info,
{"asn": ["nxdomain"],
"country": ["nxdomain"],
Expand All @@ -186,7 +193,7 @@ def testCymruInfoNxdomain(self):
# and new ones
ticket = BanTicket("10.0.0.0", 1167606000.0)
self.assertTrue(self.__banManager.addBanTicket(ticket))
cymru_info = self.__banManager.getBanListExtendedCymruInfo()
cymru_info = self._getBanListExtendedCymruInfo()
self.assertDictEqual(dict((k, sorted(v)) for k, v in cymru_info.iteritems()),
{"asn": sorted(["nxdomain", "4565",]),
"country": sorted(["nxdomain", "unknown"]),
Expand Down
2 changes: 1 addition & 1 deletion fail2ban/tests/utils.py
Expand Up @@ -248,7 +248,7 @@ def F2B_SkipIfFast():
# sleep intervals are large - use replacement for sleep to check time to sleep:
_org_sleep = time.sleep
def _new_sleep(v):
if (v > Utils.DEFAULT_SLEEP_TIME): # pragma: no cover
if v > min(1, Utils.DEFAULT_SLEEP_TIME): # pragma: no cover
raise ValueError('[BAD-CODE] To long sleep interval: %s, try to use conditional Utils.wait_for instead' % v)
_org_sleep(min(v, Utils.DEFAULT_SLEEP_TIME))
time.sleep = _new_sleep
Expand Down

0 comments on commit 276759b

Please sign in to comment.