Skip to content

Commit

Permalink
Merge a1200fb into 4e4cca9
Browse files Browse the repository at this point in the history
  • Loading branch information
facundobatista committed Dec 5, 2018
2 parents 4e4cca9 + a1200fb commit ba2e679
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 23 deletions.
93 changes: 81 additions & 12 deletions fades/helpers.py
Expand Up @@ -23,7 +23,7 @@
import subprocess
import tempfile

from urllib import request
from urllib import request, parse
from urllib.error import HTTPError

import pkg_resources
Expand All @@ -40,10 +40,6 @@
print(json.dumps(d))
"""

# a user-agent for hitting the network
USER_AGENT = "fades/{} (https://github.com/PyAr/fades/)".format(_version.__version__)


# the url to query PyPI for project versions
BASE_PYPI_URL = 'https://pypi.python.org/pypi/{name}/json'
BASE_PYPI_URL_WITH_VERSION = 'https://pypi.python.org/pypi/{name}/{version}/json'
Expand Down Expand Up @@ -262,15 +258,88 @@ def check_pypi_exists(dependencies):
return True


def download_remote_script(url):
"""Download the content of a remote script to a local temp file."""
temp_fh = tempfile.NamedTemporaryFile(suffix=".py", delete=False)
logger.info("Downloading remote script from %r to %r", url, temp_fh.name)
req = request.Request(url, headers={
class _ScriptDownloader:
"""Grouping of different backends downloaders."""

# a user-agent for hitting the network
USER_AGENT = "fades/{} (https://github.com/PyAr/fades/)".format(_version.__version__)
HEADERS_PLAIN = {
'Accept': 'text/plain',
'User-Agent': USER_AGENT,
})
content = request.urlopen(req).read()
}
HEADERS_JSON = {
'Accept': 'application/json',
'User-Agent': USER_AGENT,
}

# simple network locations to name map
NETLOCS = {
'linkode.org': 'linkode',
'pastebin.com': 'pastebin',
'gist.github.com': 'gist',
}

def __init__(self, url):
"""Init."""
self.url = url
self.name = self._decide()

def _decide(self):
"""Find out which method should be applied to download that URL."""
netloc = parse.urlparse(self.url).netloc
name = self.NETLOCS.get(netloc, 'raw')
return name

def get(self):
"""Get the script content from the URL using the decided downloader."""
method_name = "_download_" + self.name
method = getattr(self, method_name)
return method()

def _download_raw(self, url=None):
"""Download content from URL directly."""
if url is None:
url = self.url
req = request.Request(url, headers=self.HEADERS_PLAIN)
return request.urlopen(req).read().decode("utf8")

def _download_linkode(self):
"""Download content from Linkode pastebin."""
# build the API url
linkode_id = self.url.split("/")[-1]
if linkode_id.startswith("#"):
linkode_id = linkode_id[1:]
url = "https://linkode.org/api/1/linkodes/" + linkode_id

req = request.Request(url, headers=self.HEADERS_JSON)
resp = request.urlopen(req)
raw = resp.read()
data = json.loads(raw.decode("utf8"))
content = data['content']
return content

def _download_pastebin(self):
"""Download content from Pastebin itself."""
paste_id = self.url.split("/")[-1]
url = "https://pastebin.com/raw/" + paste_id
return self._download_raw(url)

def _download_gist(self):
"""Download content from github's pastebin."""
parts = parse.urlparse(self.url)
url = "https://gist.github.com" + parts.path + "/raw"
return self._download_raw(url)


def download_remote_script(url):
"""Download the content of a remote script to a local temp file."""
temp_fh = tempfile.NamedTemporaryFile('wt', encoding='utf8', suffix=".py", delete=False)
downloader = _ScriptDownloader(url)
logger.info(
"Downloading remote script from %r using (%r downloader) to %r",
url, downloader.name, temp_fh.name)

content = downloader.get()
temp_fh.write(content)
temp_fh.close()
return temp_fh.name
140 changes: 129 additions & 11 deletions tests/test_helpers.py
Expand Up @@ -17,6 +17,7 @@
"""Tests for functions in helpers."""

import io
import json
import os
import sys
import tempfile
Expand Down Expand Up @@ -379,29 +380,146 @@ def test_redirect_response(self):
class ScriptDownloaderTestCase(unittest.TestCase):
"""Check the script downloader."""

def test_simple_complete(self):
def setUp(self):
logassert.setup(self, 'fades.helpers')

def test_external_public_function(self):
test_url = "http://scripts.com/foobar.py"
test_content = "test content of the remote script ññ"
with patch('fades.helpers._ScriptDownloader') as mock_downloader_class:
mock_downloader = mock_downloader_class()
mock_downloader.get.return_value = test_content
mock_downloader.name = 'mock downloader'
filepath = helpers.download_remote_script(test_url)

# plan to remove the downloaded content (so test remains clean)
self.addCleanup(os.unlink, filepath)

# checks
mock_downloader_class.assert_called_with(test_url)
self.assertLoggedInfo("Downloading remote script", test_url, filepath, 'mock downloader')
with open(filepath, "rt", encoding='utf8') as fh:
self.assertEqual(fh.read(), test_content)

def test_decide_linkode(self):
url = "http://linkode.org/#02c5nESQBLEjgBRhUwJK74"
downloader = helpers._ScriptDownloader(url)
name = downloader._decide()
self.assertEqual(name, 'linkode')

def test_decide_pastebin(self):
url = "https://pastebin.com/sZGwz7SL"
downloader = helpers._ScriptDownloader(url)
name = downloader._decide()
self.assertEqual(name, 'pastebin')

def test_decide_gist(self):
url = "https://gist.github.com/facundobatista/6ff4f75760a9acc35e68bae8c1d7da1c"
downloader = helpers._ScriptDownloader(url)
name = downloader._decide()
self.assertEqual(name, 'gist')

def test_downloader_raw(self):
test_url = "http://scripts.com/foobar.py"
raw_service_response = b"test content of the remote script"
downloader = helpers._ScriptDownloader(test_url)
with patch('urllib.request.urlopen') as mock_urlopen:
with patch('http.client.HTTPResponse') as mock_http_response:
mock_http_response.read.return_value = b"test content of the remote script"
mock_http_response.read.return_value = raw_service_response
mock_urlopen.return_value = mock_http_response

filepath = helpers.download_remote_script("http://scripts.com/foobar.py")
content = downloader.get()

# plan to remove the downloaded content (so test remains clean)
self.addCleanup(os.unlink, filepath)
# check urlopen was called with the proper url, and passing correct headers
headers = {
'Accept': 'text/plain',
'User-agent': helpers._ScriptDownloader.USER_AGENT,
}
(call,) = mock_urlopen.mock_calls
(called_request,) = call[1]
self.assertIsInstance(called_request, Request)
self.assertEqual(called_request.full_url, test_url)
self.assertEqual(called_request.headers, headers)
self.assertEqual(content, raw_service_response.decode("utf8"))

def test_downloader_linkode(self):
test_url = "http://linkode.org/#02c5nESQBLEjgBRhUwJK74"
test_content = "test content of the remote script áéíóú"
raw_service_response = json.dumps({
'content': test_content,
'morestuff': 'whocares',
}).encode("utf8")

downloader = helpers._ScriptDownloader(test_url)
with patch('urllib.request.urlopen') as mock_urlopen:
with patch('http.client.HTTPResponse') as mock_http_response:
mock_http_response.read.return_value = raw_service_response
mock_urlopen.return_value = mock_http_response

content = downloader.get()

# check urlopen was called with the proper url, and passing correct headers
headers = {
'Accept': 'application/json',
'User-agent': helpers._ScriptDownloader.USER_AGENT,
}
(call,) = mock_urlopen.mock_calls
(called_request,) = call[1]
self.assertIsInstance(called_request, Request)
self.assertEqual(
called_request.full_url, "https://linkode.org/api/1/linkodes/02c5nESQBLEjgBRhUwJK74")
self.assertEqual(called_request.headers, headers)
self.assertEqual(content, test_content)

def test_downloader_pastebin(self):
test_url = "http://pastebin.com/sZGwz7SL"
test_content = "test content of the remote script áéíóú"
raw_service_response = test_content.encode("utf8")

downloader = helpers._ScriptDownloader(test_url)
with patch('urllib.request.urlopen') as mock_urlopen:
with patch('http.client.HTTPResponse') as mock_http_response:
mock_http_response.read.return_value = raw_service_response
mock_urlopen.return_value = mock_http_response

content = downloader.get()

# check urlopen was called with the proper url, and passing correct headers
headers = {
'Accept': 'text/plain',
'User-agent': helpers.USER_AGENT,
'User-agent': helpers._ScriptDownloader.USER_AGENT,
}
(call,) = mock_urlopen.mock_calls
(called_request,) = call[1]
self.assertIsInstance(called_request, Request)
self.assertEqual(called_request.full_url, "http://scripts.com/foobar.py")
self.assertEqual(
called_request.full_url, "https://pastebin.com/raw/sZGwz7SL")
self.assertEqual(called_request.headers, headers)
self.assertEqual(content, test_content)

def test_downloader_gist(self):
test_url = "http://gist.github.com/facundobatista/6ff4f75760a9acc35e68bae8c1d7da1c"
test_content = "test content of the remote script áéíóú"
raw_service_response = test_content.encode("utf8")

# check content
with open(filepath, "rb") as fh:
temp_content = fh.read()
self.assertEqual(temp_content, b"test content of the remote script")
downloader = helpers._ScriptDownloader(test_url)
with patch('urllib.request.urlopen') as mock_urlopen:
with patch('http.client.HTTPResponse') as mock_http_response:
mock_http_response.read.return_value = raw_service_response
mock_urlopen.return_value = mock_http_response

content = downloader.get()

# check urlopen was called with the proper url, and passing correct headers
headers = {
'Accept': 'text/plain',
'User-agent': helpers._ScriptDownloader.USER_AGENT,
}
(call,) = mock_urlopen.mock_calls
(called_request,) = call[1]
self.assertIsInstance(called_request, Request)
self.assertEqual(
called_request.full_url,
"https://gist.github.com/facundobatista/6ff4f75760a9acc35e68bae8c1d7da1c/raw")
self.assertEqual(called_request.headers, headers)
self.assertEqual(content, test_content)

0 comments on commit ba2e679

Please sign in to comment.