Permalink
Browse files

initial commit, initial version

  • Loading branch information...
0 parents commit d0ef02212d5ca240732a5627f742fa80bff7672e @elemoine committed Apr 11, 2011
Showing with 472 additions and 0 deletions.
  1. +4 −0 CHANGES.rst
  2. +2 −0 MANIFEST.in
  3. +38 −0 README.rst
  4. +49 −0 development.ini
  5. +33 −0 papyrus_ogcproxy/__init__.py
  6. +150 −0 papyrus_ogcproxy/tests/tests.py
  7. +70 −0 papyrus_ogcproxy/views.py
  8. +63 −0 production.ini
  9. +27 −0 setup.cfg
  10. +36 −0 setup.py
@@ -0,0 +1,4 @@
+0.1
+---
+
+* First version
@@ -0,0 +1,2 @@
+include *.txt *.ini *.cfg *.rst
+recursive-include papyrus_ogcproxy *.ico *.png *.css *.gif *.jpg *.pt *.txt *.mak *.mako *.js *.html *.xml
@@ -0,0 +1,38 @@
+papyrus_ogcproxy
+=================
+
+papyrus_ogcproxy provides an easy and convenient method for embeding
+an OGC proxy in Pyramid applications.
+
+The source code of papyrus_ogcproxy is straightforward, so if
+papyrus_ogcproxy doesn't do what you want, open its source code, get
+inspiration, and write your own code.
+
+Install
+-------
+
+papyrus_ogcproxy can be installed with ``easy_install``::
+
+ $ easy_install papyrus_ogcproxy
+
+Often you'll want to make papyrus_ogcproxy a dependency of your Pyramid
+application, which is done by adding ``papyrus_ogcproxy`` to the
+``install_requires`` list defined in the Pyramid application's ``setup.py``
+file.
+
+Set up
+------
+
+Using papyrus_ogcproxy to set an OGC proxy in a Pyramid application is easy.
+
+Edit the application's main file, ``__init__.py``, and register
+papyrus_ogcproxy using the ``Configurator.include`` method::
+
+ def main(global_config, **settings):
+
+ config = Configurator(settings=settings)
+
+ import papyrus_ogcproxy
+ config.include(papyrus_ogcproxy)
+
+That's it! The OGC proxy is available at ``/ogcproxy``.
@@ -0,0 +1,49 @@
+[app:papyrus_ogcproxy]
+use = egg:papyrus_ogcproxy
+reload_templates = true
+debug_authorization = false
+debug_notfound = false
+debug_routematch = false
+debug_templates = true
+default_locale_name = en
+
+[pipeline:main]
+pipeline =
+ egg:WebError#evalerror
+ papyrus_ogcproxy
+
+[server:main]
+use = egg:Paste#http
+host = 0.0.0.0
+port = 6543
+
+# Begin logging configuration
+
+[loggers]
+keys = root, papyrus_ogcproxy
+
+[handlers]
+keys = console
+
+[formatters]
+keys = generic
+
+[logger_root]
+level = INFO
+handlers = console
+
+[logger_papyrus_ogcproxy]
+level = DEBUG
+handlers =
+qualname = papyrus_ogcproxy
+
+[handler_console]
+class = StreamHandler
+args = (sys.stderr,)
+level = NOTSET
+formatter = generic
+
+[formatter_generic]
+format = %(asctime)s %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s
+
+# End logging configuration
@@ -0,0 +1,33 @@
+from pyramid.config import Configurator
+
+def add_route(config):
+ """ Add a route to the ogcproxy view callable. The proxy
+ service is made available at ``/ogcproxy``.
+
+ Arguments:
+
+ * ``config``: the ``pyramid.config.Configurator`` object.
+ """
+ return config.add_route('ogcproxy', '/ogcproxy',
+ view='papyrus_ogcproxy.views:ogcproxy'
+ )
+
+def includeme(config):
+ """ The callable making it possible to include papyrus_ogcproxy
+ in a Pyramid application.
+
+ Calling ``config.include(papyrus_ogcproxy)`` will result in this
+ callable being called.
+
+ Arguments:
+
+ * ``config``: the ``pyramid.config.Configurator`` object.
+ """
+ add_route(config)
+
+def main(global_config, **settings):
+ """ Return the Pyramid application.
+ """
+ config = Configurator(settings=settings)
+ config.include(includeme)
+ return config.make_wsgi_app()
@@ -0,0 +1,150 @@
+import os
+import unittest
+import mock
+
+from pyramid import testing
+
+class AddRouteTests(unittest.TestCase):
+ def setUp(self):
+ self.config = testing.setUp()
+
+ def tearDown(self):
+ testing.tearDown()
+
+ def test(self):
+ from papyrus_ogcproxy import add_route
+ route = add_route(self.config)
+ from pyramid.urldispatch import Route
+ self.assertTrue(isinstance(route, Route))
+ self.assertEqual(route.name, 'ogcproxy')
+ self.assertEqual(route.pattern, '/ogcproxy')
+
+class IncludeMeTests(unittest.TestCase):
+ def setUp(self):
+ self.config = testing.setUp()
+
+ def tearDown(self):
+ testing.tearDown()
+
+ def test(self):
+ from pyramid.interfaces import IRoutesMapper
+ from papyrus_ogcproxy import includeme
+ views = []
+ def dummy_add_view(**kw):
+ views.append(kw)
+ self.config.add_view = dummy_add_view
+ includeme(self.config)
+ self.assertEqual(len(views), 1)
+ mapper = self.config.registry.getUtility(IRoutesMapper)
+ routes = mapper.get_routes()
+ self.assertEqual(len(routes), 1)
+ self.assertEqual(routes[0].name, 'ogcproxy')
+ self.assertEqual(routes[0].path, '/ogcproxy')
+
+class MainTests(unittest.TestCase):
+ def test(self):
+ from papyrus_ogcproxy import main
+ app = main({}, a='a')
+ from pyramid.router import Router
+ self.assertTrue(isinstance(app, Router))
+
+#class TileCacheTests(unittest.TestCase):
+class OgcProxy(unittest.TestCase):
+ def setUp(self):
+ self.config = testing.setUp()
+
+ def tearDown(self):
+ testing.tearDown()
+ from papyrus_ogcproxy import views
+ views.allowed_hosts = ()
+
+ def test_forbidden_scheme(self):
+ from papyrus_ogcproxy.views import ogcproxy
+ from pyramid.testing import DummyRequest
+ request = DummyRequest(scheme='ftp')
+ response = ogcproxy(request)
+ from pyramid.httpexceptions import HTTPForbidden
+ self.assertTrue(isinstance(response, HTTPForbidden))
+
+ def test_badrequest_url(self):
+ from papyrus_ogcproxy.views import ogcproxy
+ from pyramid.testing import DummyRequest
+ request = DummyRequest(scheme='http')
+ response = ogcproxy(request)
+ from pyramid.httpexceptions import HTTPBadRequest
+ self.assertTrue(isinstance(response, HTTPBadRequest))
+
+ def test_badrequest_netloc(self):
+ from papyrus_ogcproxy.views import ogcproxy
+ from pyramid.testing import DummyRequest
+ request = DummyRequest(scheme='http',
+ params={'url': 'http://'})
+ response = ogcproxy(request)
+ from pyramid.httpexceptions import HTTPBadRequest
+ self.assertTrue(isinstance(response, HTTPBadRequest))
+
+ def test_badgateway_url(self):
+ from papyrus_ogcproxy.views import ogcproxy
+ from pyramid.testing import DummyRequest
+ request = DummyRequest(scheme='http',
+ params={'url': 'http://__foo__.__toto__'})
+ response = ogcproxy(request)
+ from pyramid.httpexceptions import HTTPBadGateway
+ self.assertTrue(isinstance(response, HTTPBadGateway))
+
+ def test_forbidden_content_type(self):
+ from papyrus_ogcproxy.views import ogcproxy
+ from pyramid.testing import DummyRequest
+ request = DummyRequest(scheme='http',
+ params={'url': 'http://www.google.com'})
+ response = ogcproxy(request)
+ from pyramid.httpexceptions import HTTPForbidden
+ self.assertTrue(isinstance(response, HTTPForbidden))
+
+ def test_forbidden_content_type_with_post(self):
+ from papyrus_ogcproxy.views import ogcproxy
+ from pyramid.testing import DummyRequest
+ request = DummyRequest(scheme='http',
+ params={'url': 'http://www.google.com'},
+ post='foo')
+ response = ogcproxy(request)
+ from pyramid.httpexceptions import HTTPForbidden
+ self.assertTrue(isinstance(response, HTTPForbidden))
+
+ @mock.patch('papyrus_ogcproxy.views.Http')
+ def test_notacceptable_no_content_type(self, MockClass):
+ instance = MockClass.return_value
+ instance.request.return_value = ({}, 'content')
+ from papyrus_ogcproxy.views import ogcproxy
+ from pyramid.testing import DummyRequest
+ request = DummyRequest(scheme='http',
+ params={'url': 'http://www.google.com'})
+ response = ogcproxy(request)
+ from pyramid.httpexceptions import HTTPNotAcceptable
+ self.assertTrue(isinstance(response, HTTPNotAcceptable))
+
+ def test_allowed_host(self):
+ from papyrus_ogcproxy import views
+ from papyrus_ogcproxy.views import ogcproxy
+ from pyramid.testing import DummyRequest
+ views.allowed_hosts = ('www.google.com')
+ request = DummyRequest(scheme='http',
+ params={'url': 'http://www.google.com'})
+ response = ogcproxy(request)
+ from pyramid.response import Response
+ self.assertTrue(isinstance(response, Response))
+ self.assertEqual(response.status_int, 200)
+ self.assertEqual(response.content_type, 'text/html')
+
+ def test_allowed_content_type(self):
+ from papyrus_ogcproxy import views
+ from papyrus_ogcproxy.views import ogcproxy
+ from pyramid.testing import DummyRequest
+ url = 'http://wms.jpl.nasa.gov/wms.cgi?' \
+ 'SERVICE=WMS&REQUEST=GetCapabilities'
+ request = DummyRequest(scheme='http', params={'url': url})
+ response = ogcproxy(request)
+ from pyramid.response import Response
+ self.assertTrue(isinstance(response, Response))
+ self.assertEqual(response.status_int, 200)
+ self.assertEqual(response.content_type, 'application/vnd.ogc.wms_xml')
@@ -0,0 +1,70 @@
+from urlparse import urlparse
+from httplib2 import Http
+
+from pyramid.httpexceptions import (HTTPForbidden, HTTPBadRequest,
+ HTTPBadGateway, HTTPNotAcceptable)
+from pyramid.response import Response
+
+
+allowed_content_types = (
+ "application/xml", "text/xml",
+ "application/vnd.ogc.se_xml", # OGC Service Exception
+ "application/vnd.ogc.se+xml", # OGC Service Exception
+ "application/vnd.ogc.success+xml", # OGC Success (SLD Put)
+ "application/vnd.ogc.wms_xml", # WMS Capabilities
+ "application/vnd.ogc.context+xml", # WMC
+ "application/vnd.ogc.gml", # GML
+ "application/vnd.ogc.sld+xml", # SLD
+ "application/vnd.google-earth.kml+xml", # KML
+ )
+
+allowed_hosts = (
+ # list allowed hosts here (no port limiting)
+ )
+
+def ogcproxy(request):
+ scheme = request.scheme
+ if scheme not in ("http", "https"):
+ return HTTPForbidden()
+
+ if "url" not in request.params:
+ return HTTPBadRequest()
+
+ url = request.params["url"]
+
+ # check for full url
+ parsed_url = urlparse(url)
+ if not parsed_url.netloc or parsed_url.scheme not in ("http", "https"):
+ return HTTPBadRequest()
+
+ # get method
+ method = request.method
+
+ # get body
+ body = None
+ if method in ("POST", "PUT"):
+ body = request.body
+
+ # forward request to target (without Host Header)
+ http = Http()
+ h = dict(request.headers)
+ h.pop("Host", h)
+ try:
+ resp, content = http.request(url, method=method, body=body, headers=h)
+ except:
+ return HTTPBadGateway()
+
+ # check for allowed content types
+ if resp.has_key("content-type"):
+ ct = resp["content-type"]
+ if not ct.split(";")[0] in allowed_content_types:
+ # allow any content type from allowed hosts (any port)
+ if not parsed_url.netloc in allowed_hosts:
+ return HTTPForbidden()
+ else:
+ return HTTPNotAcceptable()
+
+ response = Response(content, status=resp.status,
+ headers={"Content-Type": ct})
+
+ return response
Oops, something went wrong.

0 comments on commit d0ef022

Please sign in to comment.