Skip to content
This repository has been archived by the owner on Aug 3, 2022. It is now read-only.

Commit

Permalink
Fix script injection across webpage boundaries
Browse files Browse the repository at this point in the history
This patch re-implements the method for script injection so that it
can properly inject even when webpage boundaries occur at unfortunate
places, such as:

   <!doctype html><html><he[WEB_PAGE_REPLAY_CHUNK_BOUNDARY]ad></head></html>

It also adds several new tests related to script injection and largely
refactors the existing ones.

R=nednguyen@google.com
BUG=#77

Review URL: https://codereview.appspot.com/297440043 .
  • Loading branch information
nedn committed Jun 2, 2016
1 parent aaee1e0 commit 7dbd947
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 111 deletions.
33 changes: 22 additions & 11 deletions httparchive.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,10 +897,10 @@ def is_compressed(self):
def is_chunked(self):
return self.get_header('transfer-encoding') == 'chunked'

def get_data_as_text(self):
"""Return content as a single string.
def get_data_as_chunks(self):
"""Return content as a list of strings, each corresponding to a chunk.
Uncompresses and concatenates chunks with CHUNK_EDIT_SEPARATOR.
Uncompresses the chunks, if needed.
"""
content_type = self.get_header('content-type')
if (not content_type or
Expand All @@ -909,11 +909,16 @@ def get_data_as_text(self):
content_type.startswith('application/json'))):
return None
if self.is_compressed():
uncompressed_chunks = httpzlib.uncompress_chunks(
self.response_data, self.is_gzip())
return httpzlib.uncompress_chunks(self.response_data, self.is_gzip())
else:
uncompressed_chunks = self.response_data
return self.CHUNK_EDIT_SEPARATOR.join(uncompressed_chunks)
return self.response_data

def get_data_as_text(self):
"""Return content as a single string.
Uncompresses and concatenates chunks with CHUNK_EDIT_SEPARATOR.
"""
return self.CHUNK_EDIT_SEPARATOR.join(self.get_data_as_chunks())

def get_delays_as_text(self):
"""Return delays as editable text."""
Expand All @@ -932,12 +937,11 @@ def get_response_as_text(self):
delays = self.get_delays_as_text()
return self.DELAY_EDIT_SEPARATOR.join((delays, data))

def set_data(self, text):
"""Inverse of get_data_as_text().
def set_data_from_chunks(self, text_chunks):
"""Inverse of get_data_as_chunks().
Split on CHUNK_EDIT_SEPARATOR and compress if needed.
Compress, if needed.
"""
text_chunks = text.split(self.CHUNK_EDIT_SEPARATOR)
if self.is_compressed():
self.response_data = httpzlib.compress_chunks(text_chunks, self.is_gzip())
else:
Expand All @@ -946,6 +950,13 @@ def set_data(self, text):
content_length = sum(len(c) for c in self.response_data)
self.set_header('content-length', str(content_length))

def set_data(self, text):
"""Inverse of get_data_as_text().
Split on CHUNK_EDIT_SEPARATOR and compress if needed.
"""
self.set_data_from_chunks(text.split(self.CHUNK_EDIT_SEPARATOR))

def set_delays(self, delays_text):
"""Inverse of get_delays_as_text().
Expand Down
10 changes: 5 additions & 5 deletions httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ def _InjectScripts(response, inject_script):
logging.warn('tuple response: %s', response)
content_type = response.get_header('content-type')
if content_type and content_type.startswith('text/html'):
text = response.get_data_as_text()
text, already_injected = script_injector.InjectScript(
text, 'text/html', inject_script)
if not already_injected:
text_chunks = response.get_data_as_chunks()
text_chunks, just_injected = script_injector.InjectScript(
text_chunks, 'text/html', inject_script)
if just_injected:
response = copy.deepcopy(response)
response.set_data(text)
response.set_data_from_chunks(text_chunks)
return response


Expand Down
58 changes: 34 additions & 24 deletions script_injector.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,34 +49,44 @@ def GetInjectScript(scripts):

def _IsHtmlContent(content):
content = content.strip()
return content == '' or (content.startswith('<') and content.endswith('>'))
return content.startswith('<') and content.endswith('>')


def InjectScript(content, content_type, script_to_inject):
def InjectScript(text_chunks, content_type, script_to_inject):
"""Inject |script_to_inject| into |content| if |content_type| is 'text/html'.
Inject |script_to_inject| into |content| immediately after <head>, <html> or
<!doctype html>, if one of them is found. Otherwise, inject at the beginning.
Inject |script_to_inject| into |text_chunks| immediately after <head>,
<html> or <!doctype html>, if one of them is found. Otherwise, inject at
the beginning.
Returns:
content, already_injected
|content| is the new content if script is injected, otherwise the original.
|already_injected| indicates if |script_to_inject| is already in |content|.
text_chunks, already_injected
|text_chunks| is the new content if script is injected, otherwise
the original. If the script was injected, exactly one chunk in
|text_chunks| will have changed.
|just_injected| indicates if |script_to_inject| was just injected in
the content.
"""
already_injected = False
if content_type and content_type == 'text/html' and _IsHtmlContent(content):
already_injected = not content or script_to_inject in content
if not already_injected:
def InsertScriptAfter(matchobj):
return '%s<script>%s</script>' % (matchobj.group(0), script_to_inject)

content, is_injected = HEAD_RE.subn(InsertScriptAfter, content, 1)
if not is_injected:
content, is_injected = HTML_RE.subn(InsertScriptAfter, content, 1)
if not is_injected:
content, is_injected = DOCTYPE_RE.subn(InsertScriptAfter, content, 1)
if not is_injected:
content = '<script>%s</script>%s' % (script_to_inject, content)
logging.warning('Inject at the very beginning, because no tag of '
'<head>, <html> or <!doctype html> is found.')
return content, already_injected
if not content_type or content_type != 'text/html':
return text_chunks, False
content = "".join(text_chunks)
if not content or not _IsHtmlContent(content) or script_to_inject in content:
return text_chunks, False
for regexp in (HEAD_RE, HTML_RE, DOCTYPE_RE):
matchobj = regexp.search(content)
if matchobj:
pos = matchobj.end(0)
for i, chunk in enumerate(text_chunks):
if pos <= len(chunk):
result = text_chunks[:]
result[i] = '%s<script>%s</script>%s' % (chunk[0:pos],
script_to_inject,
chunk[pos:])
return result, True
pos -= len(chunk)
result = text_chunks[:]
result[0] = '<script>%s</script>%s' % (script_to_inject,
text_chunks[0])
logging.warning('Inject at the very beginning, because no tag of '
'<head>, <html> or <!doctype html> is found.')
return result, True
160 changes: 89 additions & 71 deletions script_injector_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,98 +13,116 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import httparchive
import script_injector
import unittest


LONG_COMMENT = '<!--%s-->' % ('comment,' * 200)
LONG_COMMENT = '<!--' + 'comment,' * 200 + '-->'
COMMENT_OR_NOT = ('', LONG_COMMENT)
SCRIPT_TO_INJECT = 'var flag = 0;'
EXPECTED_SCRIPT = '<script>%s</script>' % SCRIPT_TO_INJECT
EXPECTED_SCRIPT = '<script>' + SCRIPT_TO_INJECT + '</script>'
TEXT_HTML = 'text/html'
TEXT_CSS = 'text/css'
APPLICATION = 'application/javascript'

TEMPLATE_HEAD = '<!doctype html><html><head>%s</head><body></body></html>'
TEMPLATE_HTML = '<!doctype html><html>%s<body></body></html>'
TEMPLATE_DOCTYPE = '<!doctype html>%s<body></body>'
TEMPLATE_RAW = '%s<body></body>'
TEMPLATE_COMMENT = '%s<!doctype html>%s<html>%s<head>%s</head></html>'
SEPARATOR = httparchive.ArchivedHttpResponse.CHUNK_EDIT_SEPARATOR
SEPARATORS_OR_NOT = ('', SEPARATOR, SEPARATOR*3)

TEMPLATE_HEAD = """\
{boundary_at_start}\
<!doc{boundary_in_doctype}type html>{boundary_after_doctype}\
<ht{boundary_in_html}ml>{boundary_after_html}\
<he{boundary_in_head}ad>{injection}{boundary_after_head}\
</head></html>\
"""
TEMPLATE_HTML = """\
{boundary_at_start}\
<!doc{boundary_in_doctype}type html>{boundary_after_doctype}\
<ht{boundary_in_html}ml>{injection}{boundary_after_html}\
</html>\
"""
TEMPLATE_DOCTYPE = """\
{boundary_at_start}\
<!doc{boundary_in_doctype}type html>{injection}{boundary_after_doctype}\
<body></body>\
"""
TEMPLATE_RAW = """\
{boundary_at_start}\
{injection}<body></body>\
"""
NORMAL_TEMPLATES = (TEMPLATE_HEAD, TEMPLATE_HTML,
TEMPLATE_DOCTYPE, TEMPLATE_RAW)
TEMPLATE_COMMENT = """\
{comment_before_doctype}<!doctype html>{comment_after_doctype}\
<html>{comment_after_html}<head>{injection}</head></html>\
"""


def _wrap_inject_script(source, application, script_to_inject):
text_chunks = source.split(SEPARATOR)
text_chunks, just_injected = script_injector.InjectScript(
text_chunks, application, script_to_inject)
result = SEPARATOR.join(text_chunks)
return result, just_injected


class ScriptInjectorTest(unittest.TestCase):

def test_unsupported_content_type(self):
source = 'abc'
# CSS.
new_source, already_injected = script_injector.InjectScript(
source, TEXT_CSS, SCRIPT_TO_INJECT)
self.assertEqual(new_source, source)
self.assertFalse(already_injected)
# Javascript.
new_source, already_injected = script_injector.InjectScript(
source, APPLICATION, SCRIPT_TO_INJECT)
def _assert_no_injection(self, source, application):
new_source, just_injected = _wrap_inject_script(
source, application, SCRIPT_TO_INJECT)
self.assertEqual(new_source, source)
self.assertFalse(already_injected)
self.assertFalse(just_injected)

def _assert_successful_injection(self, template):
source, just_injected = _wrap_inject_script(
template.format(injection=''), TEXT_HTML, SCRIPT_TO_INJECT)
self.assertEqual(source, template.format(injection=EXPECTED_SCRIPT))
self.assertTrue(just_injected)

def test_unsupported_content_type(self):
self._assert_no_injection('abc', TEXT_CSS)
self._assert_no_injection('abc', APPLICATION)

def test_empty_content_as_already_injected(self):
source, already_injected = script_injector.InjectScript(
'', TEXT_HTML, SCRIPT_TO_INJECT)
self.assertEqual(source, '')
self.assertTrue(already_injected)
self._assert_no_injection('', TEXT_HTML)

def test_non_html_content_with_html_content_type(self):
json_source = '{"test": 1"}'
source, already_injected = script_injector.InjectScript(
json_source, TEXT_HTML, SCRIPT_TO_INJECT)
self.assertEqual(source, json_source)
self.assertFalse(already_injected)
self._assert_no_injection('{"test": 1"}', TEXT_HTML)

def test_already_injected(self):
source, already_injected = script_injector.InjectScript(
TEMPLATE_HEAD % EXPECTED_SCRIPT, TEXT_HTML, SCRIPT_TO_INJECT)
self.assertEqual(source, TEMPLATE_HEAD % EXPECTED_SCRIPT)
self.assertTrue(already_injected)

def _assert_successful_injection(self, template):
source, already_injected = script_injector.InjectScript(
template % '', TEXT_HTML, SCRIPT_TO_INJECT)
self.assertEqual(source, template % EXPECTED_SCRIPT)
self.assertFalse(already_injected)
parameters = {'injection': SCRIPT_TO_INJECT}
for template in NORMAL_TEMPLATES:
for parameters['boundary_at_start'] in SEPARATORS_OR_NOT:
for parameters['boundary_in_doctype'] in SEPARATORS_OR_NOT:
for parameters['boundary_after_doctype'] in SEPARATORS_OR_NOT:
for parameters['boundary_in_html'] in SEPARATORS_OR_NOT:
for parameters['boundary_after_html'] in SEPARATORS_OR_NOT:
for parameters['boundary_in_head'] in SEPARATORS_OR_NOT:
for parameters['boundary_after_head'] in SEPARATORS_OR_NOT:
source = template.format(**parameters)
self._assert_no_injection(source, TEXT_HTML)

def test_normal(self):
self._assert_successful_injection(TEMPLATE_HEAD)

def test_no_head_tag(self):
self._assert_successful_injection(TEMPLATE_HTML)

def test_no_head_and_html_tag(self):
self._assert_successful_injection(TEMPLATE_DOCTYPE)

def test_no_head_html_and_doctype_tag(self):
self._assert_successful_injection(TEMPLATE_RAW)

def _assert_successful_injection_with_comment(self, before_doctype,
after_doctype, after_html):
source, already_injected = script_injector.InjectScript(
TEMPLATE_COMMENT % (before_doctype, after_doctype, after_html, ''),
TEXT_HTML, SCRIPT_TO_INJECT)
expected_source = TEMPLATE_COMMENT % (before_doctype, after_doctype,
after_html, EXPECTED_SCRIPT)
self.assertEqual(source, expected_source)
self.assertFalse(already_injected)

def test_comment_before_doctype(self):
self._assert_successful_injection_with_comment(LONG_COMMENT, '', '')

def test_comment_after_doctype(self):
self._assert_successful_injection_with_comment('', LONG_COMMENT, '')

def test_comment_after_html(self):
self._assert_successful_injection_with_comment('', '', LONG_COMMENT)

def test_all_comments(self):
self._assert_successful_injection_with_comment(
LONG_COMMENT, LONG_COMMENT, LONG_COMMENT)
parameters = {'injection': '{injection}'}
for template in NORMAL_TEMPLATES:
for parameters['boundary_at_start'] in SEPARATORS_OR_NOT:
for parameters['boundary_in_doctype'] in SEPARATORS_OR_NOT:
for parameters['boundary_after_doctype'] in SEPARATORS_OR_NOT:
for parameters['boundary_in_html'] in SEPARATORS_OR_NOT:
for parameters['boundary_after_html'] in SEPARATORS_OR_NOT:
for parameters['boundary_in_head'] in SEPARATORS_OR_NOT:
for parameters['boundary_after_head'] in SEPARATORS_OR_NOT:
template = template.format(**parameters)
self._assert_successful_injection(template)

def test_comments(self):
parameters = {'injection': '{injection}'}
for parameters['comment_before_doctype'] in COMMENT_OR_NOT:
for parameters['comment_after_doctype'] in COMMENT_OR_NOT:
for parameters['comment_after_html'] in COMMENT_OR_NOT:
template = TEMPLATE_COMMENT.format(**parameters)
self._assert_successful_injection(template)


if __name__ == '__main__':
Expand Down

0 comments on commit 7dbd947

Please sign in to comment.