Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' of github.com:AutomatedTester/Garmr

  • Loading branch information...
commit 61416f391e0d55f328e878602e1b9624dff89b89 2 parents ea9678d + 082f274
@AutomatedTester authored
View
6 .gitignore
@@ -3,3 +3,9 @@ dist/
*.egg-info/
*.pyc
garmr-results.xml
+
+.project
+
+.pydevproject
+
+targets.txt
View
39 README.md
@@ -4,29 +4,34 @@
# Garmr
-Garmr is a checking that a site meets the basic requirements from a security point of view.
-It checks what the correct HTTP calls are allowed and others are blocked. It is installable from PyPi.
+Garmr is a tool to inspect the responses from websites for basic security requirements.
+
+Garmr includes a set of core test cases implemented in corechecks that are derived from
+the Secure Coding Guidelines that can be found at [https://wiki.mozilla.org/WebAppSec/Secure_Coding_Guidelines]
## Installation
-To install it is a simple case of
- sudo pip install garmr
+This version of Garmr :
+* does not support pip. Grab the source from git
+* requires Requests > 0.6.2-dev, which can be installed by following the instructions here:
+** http://docs.python-requests.org/en/latest/user/install/#get-the-code
## Usage
-garmr -u http://application.under.test/path
-
-This will create a file called garmr-results.xml which will have the results of the
-tests stored in it.
-
-### Options
-
-* "-u", "--url": Url to be tested
-* "-f", "--file": File name with URLS to test, Currently not available
-* "-x", "--xunit": Name of file that you wish to write to. Defaults to garmr-results.xml
+usage: garmr.py [-h] [-u TARGETS] [-m MODULES] [-f TARGET_FILES] [-p] [-d]
+optional arguments:
+ -h, --help show this help message and exit
+ -u TARGETS, --url TARGETS
+ add a target to test
+ -m MODULES, --module MODULES
+ load a test suite
+ -f TARGET_FILES, --file TARGET_FILES
+ File with urls to test
+ -p, --force-passive Force passives to be run for each active test
+ -d, --dns Skip DNS resolution when registering a target.
## Tasks
-
-If you want to see what is currently being worked on you can see it on the
-[Pivotal Tracker](https://www.pivotaltracker.com/projects/285905)
+* Implement sequences (i.e. a series of ActiveTests that once invoked, maintains a cookie jar until the list of URLs is exhausted)
+* Implement a proper detailed reporter; currently a range of data is accumulated, but never reported.
+* Implement more checks
View
1  __init__.py
@@ -1 +0,0 @@
-
View
29 config.txt
@@ -0,0 +1,29 @@
+[Garmr]
+force-passives = False
+module = corechecks, djangochecks
+reporter = reporter.AntXmlReporter
+output = garmr-results.xml
+dns = True
+
+[corechecks.StsUpgradeCheck]
+enabled = True
+
+[djangochecks.AdminAvailable]
+enabled = True
+path = console
+
+[corechecks.RobotsTest]
+enabled = True
+
+[corechecks.StsHeaderPresent]
+enabled = True
+
+[corechecks.SecureAttributePresent]
+enabled = True
+
+[corechecks.HttpOnlyPresent]
+enabled = True
+
+[corechecks.XfoPresent]
+enabled = True
+
View
125 corechecks.py
@@ -0,0 +1,125 @@
+from urlparse import urlparse
+import requests
+from scanner import ActiveTest, PassiveTest, Scanner, get_url
+
+
+class HttpOnlyAttributePresent(PassiveTest):
+ description = "Inspect the Set-Cookie: header and determine if the HttpOnly attribute is present."
+ def analyze(self, response):
+ cookieheader = "Set-Cookie"
+ has_cookie = cookieheader in response.headers
+ if has_cookie:
+ if "httponly" in response.headers[cookieheader].lower():
+ result = self.result("Pass", "HttpOnly is set", response.headers[cookieheader])
+ else:
+ result = self.result("Fail", "HttpOnly is not set", response.headers[cookieheader])
+ else:
+ result = self.result("Skip", "No cookie is set by this response.", None)
+ return result
+
+class SecureAttributePresent(PassiveTest):
+ description = "Inspect the Set-Cookie: header and determine if the Secure attribute is present."
+ def analyze(self, response):
+ url = urlparse(response.url)
+ cookieheader = "Set-Cookie"
+ has_cookie = cookieheader in response.headers
+ if has_cookie:
+ if "httponly" in response.headers[cookieheader].lower():
+ if url.scheme == "https":
+ result = self.result("Pass", "HttpOnly is set", response.headers[cookieheader])
+ else:
+ result = self.result("Fail", "HttpOnly should only be set for cookies sent over SSL.", response.headers[cookieheader])
+ else:
+ if url.scheme == "https":
+ result = self.result("Fail", "HttpOnly is not set", response.headers[cookieheader])
+ else:
+ result = self.result("Pass", "The secure attribute is not set (expected for HTTP)", response.headers[cookieheader])
+ else:
+ result = self.result("Skip", "No cookie is set by this response.", None)
+ return result
+
+
+class StrictTransportSecurityPresent(PassiveTest):
+ secure_only = True
+ description = "Check if the Strict-Transport-Security header is present in TLS requests."
+ def analyze(self, response):
+ stsheader = "Strict-Transport-Security"
+ sts = stsheader in response.headers
+ if sts == False:
+ result = self.result("Fail", "Strict-Transport-Security header not found.", None)
+ else:
+ result = self.result("Pass", "Strict-Transport-Security header present.", response.headers[stsheader])
+ return result
+
+class XFrameOptionsPresent(PassiveTest):
+ description = "Check if X-Frame-Options header is present."
+ def analyze(self, response):
+ xfoheader = "X-Frame-Options"
+ xfo = xfoheader in response.headers
+ if xfo == False:
+ result = self.result("Fail", "X-Frame-Options header not found.", None)
+ else:
+ result = self.result("Pass", "X-Frame-Options header present.", response.headers[xfoheader])
+ return result
+
+class RobotsTest(ActiveTest):
+ run_passives = True
+ description = "Check for the presence of a robots.txt file. If save_contents is true, the contents will be saved."
+ config = {"save_contents" : "False"}
+ def do_test(self, url):
+ u = urlparse(url)
+ roboturl="%s://%s/robots.txt" % (u.scheme, u.netloc)
+ response = requests.get(roboturl)
+ if response.status_code == 200:
+ result = self.result("Pass", "A robots.txt file is present on the server",
+ response.content if self.config["save_contents"].lower() == "true" else None)
+ else:
+ result = self.result("Fail", "No robots.txt file was found.", None)
+ return (result, response);
+
+class StsUpgradeCheck(ActiveTest):
+ insecure_only = True
+ run_passives = False
+ description = "Inspect the Strict-Transport-Security redirect process according to http://tools.ietf.org/html/draft-hodges-strict-transport-sec"
+
+ def do_test(self, url):
+ stsheader = "Strict-Transport-Security"
+ u = urlparse(url)
+ if u.scheme == "http":
+ correct_header = False
+ bad_redirect = False
+ response1 = get_url(url, False)
+ invalid_header = stsheader in response1.headers
+ is_redirect = response1.status_code == 301
+ if is_redirect == True:
+ redirect = response1.headers["location"]
+ r = urlparse(redirect)
+ if r.scheme == "https":
+ response2 = get_url(redirect, False)
+ correct_header = stsheader in response2.headers
+ else:
+ bad_redirect = True
+
+ success = invalid_header == False and is_redirect == True and correct_header == True
+ if success == True:
+ message = "The STS upgrade occurs properly (no STS header on HTTP, a 301 redirect, and an STS header in the subsequent request."
+ else:
+ message = "%s%s%s%s" % (
+ "The initial HTTP response included an STS header (RFC violation)." if invalid_header else "",
+ "" if is_redirect else "The initial HTTP response should be a 301 redirect (RFC violation see ).",
+ "" if correct_header else "The followup to the 301 redirect must include the STS header.",
+ "The 301 location must use the https scheme." if bad_redirect else ""
+ )
+ result = self.result("Pass" if success else "Fail", message, None)
+ return (result, response1)
+
+
+def configure(scanner):
+ if isinstance(scanner, Scanner) == False:
+ raise Exception("Cannot configure a non-scanner object!")
+ scanner.register_check(StrictTransportSecurityPresent())
+ scanner.register_check(XFrameOptionsPresent())
+ scanner.register_check(RobotsTest())
+ scanner.register_check(StsUpgradeCheck())
+ scanner.register_check(HttpOnlyAttributePresent())
+ scanner.register_check(SecureAttributePresent())
View
25 djangochecks.py
@@ -0,0 +1,25 @@
+from urlparse import urlparse
+import requests
+from scanner import ActiveTest, PassiveTest, Scanner, get_url
+
+
+class AdminAvailable(ActiveTest):
+ run_passives = True
+ config = {"path" : "admin"}
+
+ def do_test(self, url):
+ u = urlparse(url)
+ adminurl="%s://%s/%s" % (u.scheme, u.netloc, self.config["path"])
+ response = requests.get(adminurl)
+ if response.status_code == 200:
+ result = self.result("Pass", "Django admin page is present at %s." % adminurl, response.content)
+ else:
+ result = self.result("Fail", "Default Django admin page is not present at %s" % adminurl, None)
+ return (result, response);
+
+
+def configure(scanner):
+ if isinstance(scanner, Scanner) == False:
+ raise Exception("Cannot configure a non-scanner object!")
+ scanner.register_check(AdminAvailable())
+
View
306 garmr.py
@@ -1,224 +1,96 @@
-#!/usr/bin/python
+import argparse
+from scanner import ActiveTest, PassiveTest, Scanner
+import corechecks
+from reporter import Reporter
+import sys
+import traceback
-import httplib
-import urllib2
-from optparse import OptionParser
-import logging
-from datetime import datetime
-
-
-logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s')
-logger = logging.getLogger("Garmr")
-logger.setLevel(logging.DEBUG)
-
-class Reporter(object):
- """
- This class formats and writes a xUnit style report on the results from
- the basic tests
- """
+def main():
+ parser = argparse.ArgumentParser("Runs a set of tests against the set of provided URLs")
+ parser.add_argument("-u", "--url", action="append", dest="targets", help="Add a target to test")
+ parser.add_argument("-f", "--target-file", action="append", dest="target_files", help="File with URLs to test")
+
+ parser.add_argument("-m", "--module", action="append", default = ["corechecks"], dest="modules", help="Load an extension module")
+ parser.add_argument("-p", "--force-passive", action="store_true", default=False, dest="force_passives", help ="Force passives to be run for each active test")
+ parser.add_argument("-d", "--dns", action="store_false", default=True, dest="resolve_target", help ="Skip DNS resolution when registering a target")
+ parser.add_argument("-r", "--report", action="store", default="reporter.AntXmlReporter", dest="report",help="Load a reporter e.g. -r reporter.AntXmlReporter")
+ parser.add_argument("-o", "--output", action="store", default="garmr-results.xml", dest="output", help="Default output is garmr-results.xml")
+ parser.add_argument("-c", "--check", action="append", dest="opts", help="Set a parameter for a check (check:opt=value)" )
+ parser.add_argument("-e", "--exclude", action="append", dest="exclusions", help="Prevent a check from being run/processed")
+ parser.add_argument("--save", action="store", dest="dump_path", help="Write out a configuration file based on parameters (won't run scan)")
-
- suite_xml="""<?xml version="1.0" encoding="utf-8"?>
- <testsuite name="Garmr" errors="{error}" failures="{failure}"
- skips="{skips}" tests="{numtests}" time="{timetaken}">
- {testresults}
- </testsuite>"""
-
- def __init__(self, results=None):
- """
- Initializes the reporter class
- Args:
- results - optional parameter to take the results. If results
- are not passed in here they need to be passed in
- write_results method or an exception will be raised
- """
- logging.debug("Reporter class initialized")
- self.results = results
-
- def write_results(self, file_name='garmr-results.xml', results=None):
- """
- This writes the xml to disk.
- Args:
- file_name - optional parameter of the name of the file to create
- results - optional parameter of with the test results. If this is
- empty and nothing was passed in during object initialization
- an error will be raised.
- Note: if you pass in above and here the latest results will
- be used
-
- """
- if self.results is None and results is None:
- logging.exception("No test results have been passed Reporter")
- raise Exception("No results have been passed. Please pass in a result")
-
- if results is not None:
- self.results = results
-
- formatted = self._format_results()
- suite_results = self.suite_xml.format(error=formatted["errors"],
- failure=formatted["failed"],
- skips=formatted["skips"],
- numtests=formatted["tests"],
- timetaken=formatted["time_taken"],
- testresults=formatted["testcase"])
- file_results = open(file_name, "w")
- file_results.write(suite_results)
- file_results.close()
-
-
- def _format_results(self):
- testcase = """<testcase classname="" name="{testname}" time="{timetaken}" """
- errs = '><{errtype}>{message}</{errtype}></testcase>'
- formatted_results = ""
- results = {"time_taken":0,
- "errors" : 0,
- "failed" : 0,
- "skips" : 0,
- "tests" : 0}
-
- for res in self.results:
- results["tests"] += 1
- formatted_results += testcase.format(
- testname = res["name"],timetaken=res["time_taken"])
- if res.has_key("errors"):
- results["errors"] += 1
- formatted_results += errs.format(errtype="error", message=res["message"])
- elif res.has_key("failed"):
- results["failed"] += 1
- formatted_results += errs.format(errtype="failure", message=res["message"])
- elif res.has_key("skips"):
- results["skips"] += 1
- formatted_results += errs.format(errtype="skipped", message=res["message"])
- else:
- formatted_results += "/>"
- results["time_taken"] += res["time_taken"]
+ args = parser.parse_args()
+ scanner = Scanner()
+
+ scanner.force_passives = args.force_passives
+ scanner.resolve_target = args.resolve_target
+ scanner.output = args.output
+
+ # Start building target list.
+ if args.targets != None:
+ for target in args.targets:
+ scanner.register_target(target)
- results["testcase"] = formatted_results
- return results
-
-class Garmr(object):
-
-
- def __init__(self, urls):
- self.urls = urls
-
- def xframe_checks(self):
- result = {}
- result["name"] = self.xframe_checks.__name__
- start = datetime.now()
- try:
- response = urllib2.urlopen(self.urls)
- response_headers = response.headers.headers
- headers = self._clean_header(response_headers)
- logger.info("Checking x-frame-options")
+ # Add targets from files to the list.
+ if args.target_files != None:
+ for targets in args.target_files:
try:
- assert headers["x-frame-options"] == "DENY" or \
- headers["x-frame-options"] == "SAMEORIGIN", \
- "x-frame-options were: %s" % headers["x-frame-options"]
-
- logger.info("x-frame-options were correct")
- except KeyError:
- message = "x-frame-options were not found in headers"
- result["failed"] = True
- result["message"] = message
- logger.critical(message)
- except AssertionError as e:
- logger.error(str(e))
- result["errors"] = True
- result["message"] = str(e)
- finish = datetime.now()
- result["time_taken"] = self._total_seconds(start, finish)
- logger.debug("Time Taken: %s:" % result["time_taken"])
- return result
+ f = open(targets, "r")
+ for target in f:
+ t = target.strip()
+ if len(t) > 0:
+ scanner.register_target(t)
+ except:
+ Scanner.logger.error("Unable to process the target list in: %s", targets)
+
+ # Configure modules.
+ # TODO: change the module loading to scan the list of classes in a module and automagically
+ # detect any tests defined.
+ if args.modules != None:
+ for module in args.modules:
+ try:
+ __import__(module)
+ m = sys.modules[module]
+ m.configure(scanner)
+ except Exception, e:
+ Scanner.logger.fatal("Unable to load the requested module [%s]: %s", module, e)
+ quit()
+
+ # Set up the reporter (allow it to load from modules that are configured)
+ try:
+ reporter = args.report.split('.')
+ if len(reporter) == 1:
+ scanner.reporter = Reporter.reporters[reporter[0]]
+ else:
+ scanner.reporter = getattr(sys.modules[reporter[0]], reporter[1])()
+ Scanner.logger.info("Writing report to [%s] using [%s]" % (args.output, args.report))
+ if isinstance(scanner.reporter, Reporter) == False:
+ raise Exception("Cannot configure a non-scanner object!")
+ except Exception, e:
+ Scanner.logger.fatal("Unable to use the reporter class [%s]: %s", args.report, e)
+ quit()
- def trace_checks(self):
- result = {}
- result["name"] = self.trace_checks.__name__
- start = datetime.now()
- try:
- logger.info("Checking TRACE is not valid")
- http_urls = self._clean_url(self.urls)
- request = httplib.HTTPConnection(http_urls[0])
- if len(http_urls) > 1:
- request.request("TRACE", http_urls[1])
- else:
- request.request("TRACE", "/")
+ # Disable excluded checks.
+ if args.exclusions != None:
+ for exclude in args.exclusions:
+ scanner.disable_check(exclude)
+
+ # Configure checks
+ if args.opts != None:
+ for opt in args.opts:
+ try:
+ check = opt.split(":")[0]
+ key, value = opt[len(check)+1:].split("=")
+ scanner.configure_check(check, key, value)
+ except Exception, e:
+ Scanner.logger.fatal("Invalid check option: %s (%s)", opt, e)
- request.getresponse()
- raise Exception("TRACE is a valid HTTP call")
- except httplib.BadStatusLine, e:
- logger.info("TRACE is not valid")
- except Exception, e:
- logger.error(str(e))
- result["errors"] = True
- result["message"] = str(e)
- finish = datetime.now()
- result["time_taken"] = self._total_seconds(start, finish)
- logger.debug("Time Taken: %s:" % result["time_taken"])
- return result
-
-
- def redirect_checks(self):
- result = {}
- result["name"] = self.redirect_checks.__name__
- start = datetime.now()
- response = urllib2.urlopen(self.urls)
- try:
- logger.info("Checking for HTTPS")
- assert "https://" in response.geturl(), "Have not been redirected to HTTPS"
- logger.info("Redirected to HTTPS version of site")
- except AssertionError, e:
- logger.error(str(e))
- result["errors"] = True
- result["message"] = str(e)
- finish = datetime.now()
- result["time_taken"] = self._total_seconds(start, finish)
- logger.debug("Time Taken: %s:" % result["time_taken"])
- return result
-
- def _clean_header(self, response_headers):
- headers = {}
- for head in response_headers:
- lst = head.strip(" \r\n").split(":")
- headers[lst[0]] = lst[1].strip()
- return headers
-
- def _clean_url(self, urls):
- import re
- mtch = re.search("https?://([^/]*?)(/.*)?", urls)
- split = []
- for matches in mtch.groups():
- split.append(matches)
- return split
-
- def _total_seconds(self, start, finish):
- td = finish - start
- return float((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)) / 10**6
-
-def main():
- usage = "Usage: %prog [option] arg"
- parser = OptionParser(usage=usage, version="%prog 0.2")
- parser.add_option("-u", "--url", action="store", type="string",
- dest="aut", help="Url to be tested")
- parser.add_option("-f", "--file", action="store", type="string",
- dest="file_name",
- help="File name with URLS to test, Currently not available")
- parser.add_option("-x", "--xunit", action="store", type="string",
- dest="xunit", default='garmr-results.xml',
- help="Name of file that you wish to write to")
-
- (options, args) = parser.parse_args()
- if options.aut is None and options.file_name is None:
- parser.error("Please supply an argument")
-
- test_results = []
-
- garmr = Garmr(options.aut)
- test_results.append(garmr.trace_checks())
- test_results.append(garmr.xframe_checks())
- test_results.append(garmr.redirect_checks())
- reporter = Reporter(test_results)
- reporter.write_results(file_name=options.xunit)
-
+ if args.dump_path != None:
+ scanner.save_configuration(args.dump_path)
+ return
+
+ scanner.run_scan()
+
+
if __name__ == "__main__":
- main()
+ main()
View
116 reporter.py
@@ -0,0 +1,116 @@
+
+
+class Reporter():
+ reporters = {}
+ def start_report(self):
+ return None
+
+ def start_targets(self):
+ return None
+
+ def write_target(self, target):
+ return None
+
+ def start_actives(self):
+ return None
+
+ def write_active(self, test):
+ return None
+
+ def start_passives(self):
+ return None
+
+ def write_passive(self, target):
+ return None
+
+ def end_passives(self):
+ return None
+
+ def end_actives(self):
+ return None
+
+ def end_targets(self):
+ return None
+
+ def end_report(self):
+ return "This reporter is unimplemented!"
+
+class DetailReporter(Reporter):
+ # TODO Implement detailed reporter
+ def end_report(self):
+ return "This reporter should emit an XML report that includes all of the the details for each test, including captured data"
+
+Reporter.reporters['detail'] = DetailReporter()
+
+class AntXmlReporter(Reporter):
+
+ def __init__(self):
+ self.report = ""
+ self.errtypes = { 'Error' : "error", 'Fail' : "failure", 'Skip' : "skipped"}
+
+ def start_report(self):
+ self.report = '<?xml version="1.0" encoding="utf-8"?>\n'
+
+ return None
+
+ def start_targets(self):
+ self.report += "<testsuites>\n"
+ return None
+
+ def write_target(self, target):
+ self.states = {}
+ self.states["Skip"] = 0
+ self.states["Error"] = 0
+ self.states["Pass"] = 0
+ self.states["Fail"] = 0
+ self.checks = 0
+ self.current_target = target
+ self.lines = ""
+ return None
+
+ def start_actives(self):
+ return None
+
+ def write_active(self, test, result):
+ self.states[result["state"]] += 1
+ self.checks += 1
+ module, check = ("%s" % test ).split('.')
+ self.lines += '\t\t<testcase classname="%s" name="%s" time="%s"' % (module, check, result["duration"])
+ if result["state"] == "Pass":
+ self.lines += " />\n"
+ else:
+ self.lines += '>\n\t\t\t<{errtype}>{message}</{errtype}>\n\t\t</testcase>\n'.format(errtype=self.errtypes[result["state"]], message=result["message"])
+ return None
+
+ def start_passives(self):
+ return None
+
+ def write_passive(self, test, result):
+ self.states[result["state"]] += 1
+ self.checks += 1
+ module, check = ("%s" % test ).split('.')
+ self.lines += '\t\t<testcase classname="%s" name="%s" time="%s"' % (module, check, result["duration"])
+ if result["state"] == "Pass":
+ self.lines += " />\n"
+ else:
+ self.lines += '>\n\t\t\t<{errtype}>{message}</{errtype}>\n\t\t</testcase>\n'.format(errtype=self.errtypes[result["state"]], message=result["message"])
+ return None
+
+ def end_passives(self):
+ return None
+
+ def end_actives(self):
+ self.report+= '\t<testsuite name="{target}" errors="{errors}" failures="{failures}" skips="{skips}" tests="{checks}" time="{duration}">\n{lines}\t</testsuite>\n'.format(
+ target = self.current_target, errors=self.states["Error"], failures = self.states["Fail"],
+ skips = self.states["Skip"], checks = self.checks, duration=100, lines=self.lines)
+ return None
+
+ def end_targets(self):
+ self.report += "</testsuites>\n"
+ return None
+
+ def end_report(self):
+ return self.report
+
+Reporter.reporters['xml'] = AntXmlReporter()
+
View
254 scanner.py
@@ -0,0 +1,254 @@
+from datetime import datetime
+from reporter import Reporter
+from urlparse import urlparse
+import ConfigParser
+import logging
+import requests
+import socket
+import traceback
+
+def clean_headers(self, response_headers):
+ headers = {}
+ for head in response_headers:
+ lst = head.strip(" \r\n").split(":")
+ headers[lst[0]] = lst[1].strip()
+ return headers
+
+def get_url(url, status = True):
+ r = requests.get(url, allow_redirects = False)
+ if status:
+ r.raise_for_status()
+ return r
+
+class PassiveTest():
+ secure_only = False
+ insecure_only = False
+
+ def analyze(self, response, results):
+ return None
+
+ def result(self, state, message, data):
+ return {'state' : state, 'message' : message, 'data' : data }
+
+
+class ActiveTest():
+
+ secure_only = False
+ insecure_only = False
+ run_passives = True
+ description = "The base class for an Active Test."
+
+ def __init__(self):
+ if hasattr(self, "setup"):
+ self.setup()
+
+ def execute(self, url):
+ try:
+ result = self.do_test(url)
+ except Exception, e:
+ tb = traceback.format_exc()
+ result = (ActiveTest().result("Error", e, tb), None)
+
+ return result
+
+ def result(self, state, message, data):
+ return { 'state' : state, 'message' : message, 'data' : data, 'passive' : {}}
+
+class Scanner():
+ logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s')
+ logger = logging.getLogger("Garmr-Scanner")
+ logger.setLevel(logging.DEBUG)
+
+ def __init__(self):
+ self.resolve_target = True
+ self.force_passives = False
+ self._passive_tests_ = {}
+ self._active_tests_ = {}
+ self._targets_ = []
+ self._protos_ = ["http", "https"]
+ Scanner.logger.debug("Scanner initialized.")
+ self.reporter = Reporter()
+ self.modules = []
+
+ def do_passive_scan(self, passive, is_ssl, response):
+ if passive.secure_only and not is_ssl:
+ Scanner.logger.debug("\t\t[%s] Skip Test invalid for http scheme" % passive.__class__)
+ passive_result = PassiveTest().result("Skip", "This check is only applicable to SSL requests.", None)
+ start = datetime.now()
+ passive_result['start'] = start
+ passive_result['end'] = start
+ passive_result["duration"] = 0
+ else:
+ start = datetime.now()
+ passive_result = passive.analyze(response)
+ end = datetime.now()
+ td = end - start
+ passive_result['start'] = start
+ passive_result['end'] = end
+ passive_result['duration'] = float((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)) / 10**6
+ Scanner.logger.info("\t\t[%s] %s %s" % (passive.__class__, passive_result['state'], passive_result['message']))
+ return passive_result
+
+ def do_active_scan(self, test, is_ssl, target):
+ if (test.secure_only and not is_ssl):
+ Scanner.logger.info("\t[Skip] [%s] (reason: secure_only)" % test.__class__)
+ result = ActiveTest().result("Skip", "This check is only applicable to SSL requests", None)
+ result['start'] = datetime.now()
+ result['end'] = result['start']
+ result['duration'] = 0
+ return result
+ elif (test.insecure_only and is_ssl):
+ Scanner.logger.info("\t[Skip] [%s] (reason: insecure_only)" % test.__class__)
+ result = ActiveTest().result("Skip", "This check is only applicable to SSL requests", None)
+ result['start'] = datetime.now()
+ result['end'] = result['start']
+ result['duration'] = 0
+ return result
+ start = datetime.now()
+ result, response = test.execute(target)
+ end = datetime.now()
+ td = end - start
+ result['start'] = start
+ result['end'] = end
+ result['duration'] = float((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)) / 10**6
+ Scanner.logger.info("\t[%s] %s %s" % (test.__class__, result['state'], result['message']))
+ self.reporter.write_active(test.__class__, result)
+ if (result['state'] == "Error"):
+ Scanner.logger.error(result['data'])
+ if response != None and test.run_passives:
+ result['passive'] = {}
+ self.reporter.start_passives()
+ for passive_key in self._passive_tests_.keys():
+ passive = self._passive_tests_[passive_key]["test"]
+ result["passive"][passive.__class__] = self.do_passive_scan(passive, is_ssl, response)
+ self.reporter.write_passive(passive.__class__, result["passive"][passive.__class__])
+ self.reporter.end_passives()
+ return result
+
+ def scan_target(self, target):
+ self.reporter.write_target(target)
+ Scanner.logger.info("[%s] scanning:" % target)
+ url = urlparse(target)
+ is_ssl = url.scheme == "https"
+ results = {}
+ self.reporter.start_actives()
+ for key in self._active_tests_.keys():
+ test = self._active_tests_[key]["test"]
+ results[test.__class__] = self.do_active_scan(test, is_ssl, target)
+ self.reporter.end_actives()
+ return results
+
+ def run_scan(self):
+ results = {}
+ self.reporter.start_report()
+ self.reporter.start_targets()
+ if len(self._targets_) == 0:
+ Scanner.logger.error('No targets configured.')
+ return
+ for target in self._targets_:
+ try:
+ results[target] = self.scan_target(target)
+ except:
+ Scanner.logger.error(traceback.format_exc())
+ self.reporter.end_targets()
+ file = open(self.output, "w")
+ file.write(self.reporter.end_report())
+ file.close()
+
+
+ def register_target(self, url):
+ u = urlparse(url)
+ valid = u.netloc != "" and u.scheme in self._protos_
+ reason = "%s%s" % ("[bad netloc]" if u.netloc == "" else "", "" if u.scheme in self._protos_ else "[bad scheme]")
+
+ # todo - support ipv6 urls
+ host = u.netloc.split(':')[0]
+ if (self.resolve_target):
+ try:
+ socket.getaddrinfo(host, None)
+ except socket.gaierror:
+ valid = False
+ reason = "%s[dns]" % reason
+ else:
+ valid = True
+ if valid:
+ self._targets_.append(url)
+ Scanner.logger.debug("[target]: %s" % url)
+ return
+ Scanner.logger.error("%s is not a valid target (reason: %s)" % (url, reason))
+
+ def configure_check(self, check_name, key, value):
+ if self._active_tests_.has_key(check_name):
+ check = self._active_tests_[check_name]["test"]
+ elif self._passive_tests_.has_key(check_name):
+ check = self._passive_tests_[check_name]["test"]
+ else:
+ raise Exception("The requested check is not available (%s)" % check_name)
+ if hasattr(check, "config") == False:
+ raise Exception("This check cannot be configured.")
+ if check.config.has_key(key) == False:
+ raise Exception("%s is not a valid configuration for %s", key, check_name)
+ check.config[key] = value
+ Scanner.logger.debug("\t%s.%s=%s" % (check_name, key, value))
+
+ def disable_check(self, check_name):
+ if self._active_tests_.has_key(check_name):
+ self._active_tests_[check_name]["enabled"] = False
+ elif self._passive_tests_.has_key(check_name):
+ self._passive_tests_[check_name]["enabled"] = False
+ else:
+ raise Exception("The requested check is not available (%s)" % check_name)
+ Scanner.logger.debug("\t%s disabled.", check_name)
+
+ def register_check(self, test):
+ module = test.__class__.__module__
+
+ if module not in self.modules:
+ self.modules.append(module)
+
+ key = "%s" % test.__class__
+ if isinstance(test, ActiveTest):
+ self._active_tests_[key]= { "test" : test , "enabled" : True}
+ Scanner.logger.debug("Added %s to active tests." % test.__class__)
+ return len(self._active_tests_)
+ if isinstance(test, PassiveTest):
+ self._passive_tests_[key]= { "test" : test, "enabled" : True}
+ Scanner.logger.debug("Added %s to passive tests." % test.__class__)
+ return len(self._passive_tests_)
+ raise Exception('test is not a valid test type')
+
+ def save_configuration(self, path):
+ # write out a configuration file.
+ config = ConfigParser.RawConfigParser()
+ config.add_section("Garmr")
+ config.set("Garmr", "force-passives", self.force_passives)
+ config.set("Garmr", "module", ", ".join(self.modules))
+ config.set("Garmr", "reporter", self.reporter.__class__)
+ config.set("Garmr", "output", self.output)
+ config.set("Garmr", "dns", self.resolve_target)
+
+ if len(self._targets_) > 0:
+ config.add_section("Targets")
+ i = 0
+ for target in self._targets_:
+ config.set("Targets", "%s"%i, target)
+
+ for check in self._active_tests_.keys():
+ config.add_section(check)
+ config.set(check, "enabled", self._active_tests_[check]["enabled"])
+ if hasattr(self._active_tests_[check]["test"], "config"):
+ for key in self._active_tests_[check]["test"].config.keys():
+ config.set(check, key, self._active_tests_[check]["test"].config[key])
+
+ for check in self._passive_tests_.keys():
+ config.add_section(check)
+ config.set(check, "enabled", self._passive_tests_[check]["enabled"])
+ if hasattr(self._passive_tests_[check]["test"], "config"):
+ for key in self._passive_tests_[check]["test"].config.keys():
+ config.set(check, key, self._passive_tests_[check]["test"].config[key])
+
+
+ with open(path, 'w') as configfile:
+ config.write(configfile)
+
+
View
45 setup.py
@@ -1,45 +0,0 @@
-import os
-import sys
-from setuptools import setup, find_packages
-def main():
- setup(name='Garmr',
- version='0.2',
- description='A tool for testing a web application for basic security holes',
- author='David Burns',
- author_email='dburns at mozilladotcom',
- entry_points= make_entry_points(),
- url='https://github.com/AutomatedTester/Garmr',
- classifiers=['Development Status :: 3 - Alpha',
- 'Intended Audience :: Developers',
- 'License :: OSI Approved :: Mozilla Public License 1.1 (MPL 1.1)',
- 'Operating System :: POSIX',
- 'Operating System :: Microsoft :: Windows',
- 'Operating System :: MacOS :: MacOS X',
- 'Topic :: Software Development :: Testing',
- 'Topic :: Software Development :: Libraries',
- 'Programming Language :: Python'],
- packages=find_packages()
-)
-
-def cmdline_entrypoints(versioninfo, platform, basename):
- target = 'garmr:main'
- if platform.startswith('java'):
- points = {'garmr': target}
- else:
- if basename.startswith("pypy"):
- points = {'garmr-%s' % basename: target}
- else: # cpython
- points = {'garmr-%s.%s' % versioninfo[:2] : target,}
- points['garmr'] = target
- return points
-
-def make_entry_points():
- basename = os.path.basename(sys.executable)
- points = cmdline_entrypoints(sys.version_info, sys.platform, basename)
- keys = list(points.keys())
- keys.sort()
- l = ["%s = %s" % (x, points[x]) for x in keys]
- return {'console_scripts': l}
-
-if __name__ == '__main__':
- main()
View
149 test/test_reports.py
@@ -1,149 +0,0 @@
-import os
-from garmr import Reporter
-
-class TestReports:
-
- def setup_method(self, method):
- if os.path.exists("garmr-results.xml"):
- os.remove("garmr-results.xml")
-
- def test_report_throws_exception_with_no_data(self):
- reporter = Reporter()
- try:
- reporter.write_file()
- raise AssertionError("Exception should have been thrown")
- except AssertionError as e:
- raise
- except Exception:
- pass
-
- def test_reporter_formats_test_cases_with_no_errors_or_failures(self):
- tests_list = []
- tests_list.append({"name":"good test",
- "time_taken": 1,
- })
- reporter = Reporter(tests_list)
- result = reporter._format_results()
- assert result["testcase"] == """<testcase classname="" name="%s" time="%s" />""" % \
- (tests_list[0]["name"], tests_list[0]["time_taken"])
- assert result["time_taken"] == 1
- assert result["errors"] == 0
- assert result["failed"] == 0
- assert result["skips"] == 0
-
- def test_reporter_formats_2_test_cases_with_no_errors_or_failures(self):
- tests_list = []
- tests_list.append({"name":"good test",
- "time_taken": 1,
- })
- tests_list.append({"name":"good test 2",
- "time_taken": 1,
- })
- reporter = Reporter(tests_list)
- result = reporter._format_results()
- expected = """<testcase classname="" name="%s" time="%s" />""" % \
- (tests_list[0]["name"], tests_list[0]["time_taken"])
- expected += """<testcase classname="" name="%s" time="%s" />""" % \
- (tests_list[1]["name"], tests_list[1]["time_taken"])
- assert result["testcase"] == expected, result["testcase"]
- assert result["time_taken"] == 2
- assert result["errors"] == 0
- assert result["failed"] == 0
- assert result["tests"] == 2
- assert result["skips"] == 0
-
- def test_reporter_formats_2_test_cases_with_mix_of_errors_or_failures(self):
- tests_list = []
- tests_list.append({"name":"error test",
- "time_taken": 1,
- "errors" : True,
- "message" : "omg i errored",
- })
- tests_list.append({"name":"failure test",
- "time_taken": 1,
- "failed" : True,
- "message": "Omg I failed",
- })
- reporter = Reporter(tests_list)
- result = reporter._format_results()
- expected = """<testcase classname="" name="%s" time="%s" ><error>%s</error></testcase>""" % \
- (tests_list[0]["name"], tests_list[0]["time_taken"],tests_list[0]["message"])
- expected += """<testcase classname="" name="%s" time="%s" ><failure>%s</failure></testcase>""" % \
- (tests_list[1]["name"], tests_list[1]["time_taken"],tests_list[1]["message"])
- assert result["testcase"] == expected, result["testcase"]
- assert result["time_taken"] == 2
- assert result["errors"] == 1
- assert result["failed"] == 1
- assert result["skips"] == 0
-
- def test_reporter_formats_test_case_with_one_failure(self):
- tests_list = []
- tests_list.append({"name":"failedtest",
- "time_taken": 1,
- "failed": True,
- "message": "Omg I failed",
- })
- reporter = Reporter(tests_list)
- result = reporter._format_results()
- assert result["testcase"] == """<testcase classname="" name="%s" time="%s" ><failure>%s</failure></testcase>""" % \
- (tests_list[0]["name"], tests_list[0]["time_taken"],tests_list[0]["message"])
- assert result["time_taken"] == 1
- assert result["errors"] == 0
- assert result["failed"] == 1
- assert result["skips"] == 0
-
- def test_reporter_formats_test_case_with_error(self):
- tests_list = []
- tests_list.append({"name":"errorstest",
- "time_taken": 1,
- "errors": True,
- "message": "Omg I errored"
- })
- reporter = Reporter(tests_list)
- result = reporter._format_results()
- assert result["testcase"] == """<testcase classname="" name="%s" time="%s" ><error>%s</error></testcase>""" % \
- (tests_list[0]["name"], tests_list[0]["time_taken"],tests_list[0]["message"])
- assert result["time_taken"] == 1
- assert result["errors"] == 1
- assert result["failed"] == 0
- assert result["skips"] == 0
-
- def test_reporter_formats_test_case_with_skip(self):
- tests_list = []
- tests_list.append({"name":"skipstest",
- "time_taken": 1,
- "skips": True,
- "message": "Omg I skipped"
- })
- reporter = Reporter(tests_list)
- result = reporter._format_results()
- assert result["testcase"] == """<testcase classname="" name="%s" time="%s" ><skipped>%s</skipped></testcase>""" % \
- (tests_list[0]["name"], tests_list[0]["time_taken"],tests_list[0]["message"])
- assert result["time_taken"] == 1
- assert result["errors"] == 0
- assert result["failed"] == 0
- assert result["skips"] == 1
-
- def test_that_reporter_writes_to_disk(self):
- tests_list = []
- tests_list.append({"name":"skipstest",
- "time_taken": 1,
- })
- suite_xml="""<?xml version="1.0" encoding="utf-8"?>
- <testsuite name="Garmr" errors="{error}" failures="{failure}"
- skips="{skips}" tests="{numtests}" time="{timetaken}">
- {testresults}
- </testsuite>"""
- testcase = """<testcase classname="" name="%s" time="%s" />""" % \
- (tests_list[0]["name"], tests_list[0]["time_taken"])
-
- expected = suite_xml.format(error=0, failure=0, skips=0, numtests=len(tests_list),
- timetaken=1, testresults=testcase)
-
- reporter = Reporter(tests_list)
- reporter.write_results()
-
- f = open("garmr-results.xml", "r")
- contents = f.read()
- f.close()
- assert expected == contents
Please sign in to comment.
Something went wrong with that request. Please try again.