Skip to content

Commit

Permalink
Validate the parent path samples prior to fetching a new sample
Browse files Browse the repository at this point in the history
  • Loading branch information
lphuberdeau committed May 28, 2018
1 parent 41e1ed4 commit b714244
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 7 deletions.
42 changes: 35 additions & 7 deletions hammertime/rules/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,34 @@ async def on_request_successful(self, entry):
# However this makes no sense when the server tells us it does not exist, so skil this case.
entry.result.soft404 = False
else:
soft_404_response = await self.get_soft_404_sample(entry.request.url)
if soft_404_response is not None and self._match(entry.response, soft_404_response):
entry.result.soft404 = True
else:
entry.result.soft404 = False

async def get_soft_404_sample(self, url):
entry.result.soft404 = await self.is_soft_404(entry.request.url, entry.response)

async def is_soft_404(self, url, response):
# Before fetching a new 404 sample for a specific path, verify if the currently
# obtained paths do not already have a matching sample. This will avoid multiple
# requests per sub-path and extension when a catch-all already exists.
for potential_target in self.enumerate_candidates(url):
candidate = await self.get_soft_404_sample(potential_target, fetch_missing=False)
if self._match(response, candidate):
return True

# Fully perform, fetching as required
soft_404_response = await self.get_soft_404_sample(url)
return self._match(response, soft_404_response)

async def get_soft_404_sample(self, url, *, fetch_missing=True):
server_address = urljoin(url, "/")
if url == server_address: # skip home page.
return None

# If we have a match, leave right away
request_url_pattern = self._extract_pattern_from_url(url)
if request_url_pattern in self.soft_404_responses[server_address]:
return self.soft_404_responses[server_address][request_url_pattern]

if not fetch_missing:
return None

if request_url_pattern not in self.performed[server_address]:
try:
# Temporarily assign a future to make sure work is not done twice
Expand Down Expand Up @@ -113,7 +130,18 @@ async def _collect_sample(self, url, url_pattern):
"raw_content_hash": self._hash(result.response),
"content_sample": self._sample(result.response)}

def enumerate_candidates(self, url):
parts = urlparse(url)
path = parts.path
while len(path) > 1:
yield urljoin(url, path)
yield urljoin(url, path) + "/"
path, _ = os.path.split(path)

def _match(self, response, soft_404_response):
if soft_404_response is None:
return False

if soft_404_response["code"] == response.code:
if "raw_content_hash" in soft_404_response:
if self._hash(response) == soft_404_response["raw_content_hash"]:
Expand Down
8 changes: 8 additions & 0 deletions tests/rules/reject_status_code_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ def test_create_random_url_matching_url_pattern_of_request(self):
self.assertTrue(result.startswith(base_url))
self.assertIsNotNone(re.match(regex, urlparse(result).path))

def test_obtain_potentially_valid_parent_paths(self):
self.assertIn("http://example.com/admin/",
list(self.rule.enumerate_candidates("http://example.com/admin/file.txt")))
self.assertIn("http://example.com/admin/much/",
list(self.rule.enumerate_candidates("http://example.com/admin/much/longer/path")))
self.assertIn("http://example.com/admin/much",
list(self.rule.enumerate_candidates("http://example.com/admin/much/longer/path")))

def create_entry(self, url, response_code=200, response_content="response content"):
response = StaticResponse(response_code, {}, response_content)
return Entry.create(url, response=response)
Expand Down

0 comments on commit b714244

Please sign in to comment.