Skip to content

Commit

Permalink
Merge pull request #1768 from carlfeberhard/api.batch
Browse files Browse the repository at this point in the history
Api batch endpoint
  • Loading branch information
dannon committed Aug 23, 2016
2 parents b16a940 + e781aaa commit 9de06e1
Show file tree
Hide file tree
Showing 6 changed files with 368 additions and 2 deletions.
6 changes: 5 additions & 1 deletion lib/galaxy/web/framework/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __call__( self, environ, start_response ):
if self.trace_logger:
self.trace_logger.context_remove( "request_id" )

def handle_request( self, environ, start_response ):
def handle_request( self, environ, start_response, body_renderer=None ):
# Grab the request_id (should have been set by middleware)
request_id = environ.get( 'request_id', 'unknown' )
# Map url using routes
Expand Down Expand Up @@ -196,6 +196,10 @@ def handle_request( self, environ, start_response ):
body = self.handle_controller_exception( e, trans, **kwargs )
if not body:
raise
body_renderer = body_renderer or self._render_body
return body_renderer( trans, body, environ, start_response )

def _render_body( self, trans, body, environ, start_response ):
# Now figure out what we got back and try to get it to the browser in
# a smart way
if callable( body ):
Expand Down
184 changes: 184 additions & 0 deletions lib/galaxy/web/framework/middleware/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
"""
Batch API middleware
Adds a single route to the installation that:
1. accepts a POST call containing a JSON array of 'http-like' JSON
dictionaries.
2. Each dictionary describes a single API call within the batch and is routed
back by the middleware to the application's `handle_request` as if it was
a seperate request.
3. Each response generated is combined into a final JSON list that is
returned from the POST call.
In this way, API calls can be kept properly atomic and the endpoint can compose
them into complex tasks using only one request.
..note: This batch system is primarily designed for use by the UI as these
types of batch operations *reduce the number of requests* for a given group of
API tasks. IOW, this ain't about batching jobs.
..warning: this endpoint is experimental is likely to change.
"""
import io
from urlparse import urlparse
import json
import re

from paste import httpexceptions
import routes

import logging
log = logging.getLogger( __name__ )


class BatchMiddleware( object ):
"""
Adds a URL endpoint for processing batch API calls formatted as a JSON
array of JSON dictionaries. These dictionaries are in the form:
[
{
"url": "/api/histories",
"type": "POST",
"body": "{ \"name\": \"New History Name\" }"
},
...
]
where:
* `url` is the url for the API call to be made including any query string
* `type` is the HTTP method used (e.g. 'POST', 'PUT') - defaults to 'GET'
* `body` is the text body of the request (optional)
* `contentType` content-type request header (defaults to application/json)
"""
DEFAULT_CONFIG = {
'route' : '/api/batch',
'allowed_routes' : [
'^api\/users.*',
'^api\/histories.*',
'^api\/jobs.*',
]
}

def __init__( self, galaxy, application, config=None ):
#: the original galaxy webapp
self.galaxy = galaxy
#: the wrapped webapp
self.application = application
self.config = self.DEFAULT_CONFIG.copy()
self.config.update( config )
self.base_url = routes.url_for( '/' )
self.handle_request = self.galaxy.handle_request

def __call__( self, environ, start_response ):
if environ[ 'PATH_INFO' ] == self.config[ 'route' ]:
return self.process_batch_requests( environ, start_response )
return self.application( environ, start_response )

def process_batch_requests( self, batch_environ, start_response ):
"""
Loops through any provided JSON formatted 'requests', aggregates their
JSON responses, and wraps them in the batch call response.
"""
payload = self._read_post_payload( batch_environ )
requests = payload.get( 'batch', [] )

responses = []
for request in requests:
if not self._is_allowed_route( request[ 'url' ] ):
responses.append( self._disallowed_route_response( request[ 'url' ] ) )
continue

request_environ = self._build_request_environ( batch_environ, request )
response = self._proccess_batch_request( request, request_environ, start_response )
responses.append( response )

batch_response_body = json.dumps( responses )
start_response( '200 OK', [
( 'Content-Length', len( batch_response_body ) ),
( 'Content-Type', 'application/json' ),
])
return batch_response_body

def _read_post_payload( self, environ ):
request_body_size = int( environ.get( 'CONTENT_LENGTH', 0 ) )
request_body = environ[ 'wsgi.input' ].read( request_body_size ) or '{}'
# TODO: json decode error handling
# log.debug( 'request_body: (%s)\n%s', type( request_body ), request_body )
payload = json.loads( request_body )
return payload

def _is_allowed_route( self, route ):
if self.config.get( 'allowed_routes', None ):
shortened_route = route.replace( self.base_url, '', 1 )
matches = [ re.match( allowed, shortened_route ) for allowed in self.config[ 'allowed_routes' ] ]
return any( matches )
return True

def _disallowed_route_response( self, route ):
return dict( status=403, headers=self._default_headers(), body={
'err_msg' : 'Disallowed route used for batch operation',
'route' : route,
'allowed' : self.config[ 'allowed_routes' ]
})

def _build_request_environ( self, original_environ, request ):
"""
Given a request and the original environ used to call the batch, return
a new environ parsable/suitable for the individual api call.
"""
# TODO: use a dict of defaults/config
# copy the original environ and reconstruct a fake version for each batched request
request_environ = original_environ.copy()
# TODO: for now, do not overwrite the other headers used in the main api/batch request
request_environ[ 'CONTENT_TYPE' ] = request.get( 'contentType', 'application/json' )
request_environ[ 'REQUEST_METHOD' ] = request.get( 'method', request.get( 'type', 'GET' ) )
url = '{0}://{1}{2}'.format( request_environ.get( 'wsgi.url_scheme' ),
request_environ.get( 'HTTP_HOST' ), request[ 'url' ] )
parsed = urlparse( url )
request_environ[ 'PATH_INFO' ] = parsed.path
request_environ[ 'QUERY_STRING' ] = parsed.query

request_body = request.get( 'body', u'' )
# set this to None so webob/request will copy the body using the raw bytes
# if we set it, webob will try to use the buffer interface on a unicode string
request_environ[ 'CONTENT_LENGTH' ] = None
# this may well need to change in py3
request_body = io.BytesIO( bytearray( request_body, encoding='utf8' ) )
request_environ[ 'wsgi.input' ] = request_body
# log.debug( 'request_environ:\n%s', pprint.pformat( request_environ ) )

return request_environ

def _proccess_batch_request( self, request, environ, start_response ):
# We may need to include middleware to record various reponses, but this way of doing that won't work:
# status, headers, body = self.application( environ, start_response, body_renderer=self.body_renderer )

# We have to re-create the handle request method here in order to bypass reusing the 'api/batch' request
# because reuse will cause the paste error:
# File "./eggs/Paste-1.7.5.1-py2.7.egg/paste/httpserver.py", line 166, in wsgi_start_response
# assert 0, "Attempt to set headers a second time w/o an exc_info"
try:
response = self.galaxy.handle_request( environ, start_response, body_renderer=self.body_renderer )
# handle errors from galaxy.handle_request (only 404s)
except httpexceptions.HTTPNotFound:
response = dict( status=404, headers=self._default_headers(), body={} )
return response

def body_renderer( self, trans, body, environ, start_response ):
# this is a dummy renderer that does not call start_response
# See 'We have to re-create the handle request method...' in _process_batch_request above
return dict(
status=trans.response.status,
headers=trans.response.headers,
body=json.loads( self.galaxy.make_body_iterable( trans, body )[0] )
)

def _default_headers( self ):
return {
'x-frame-options': 'SAMEORIGIN',
'content-type' : 'application/json',
'cache-control' : 'max-age=0,no-cache,no-store'
}

def handle_exception( self, environ ):
return False
2 changes: 1 addition & 1 deletion lib/galaxy/webapps/galaxy/api/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def __get_job( self, trans, id ):
@expose_api
def create( self, trans, payload, **kwd ):
""" See the create method in tools.py in order to submit a job. """
raise NotImplementedError( 'Please POST to /api/tools instead.' )
raise exceptions.NotImplemented( 'Please POST to /api/tools instead.' )

@expose_api
def search( self, trans, payload, **kwd ):
Expand Down
8 changes: 8 additions & 0 deletions lib/galaxy/webapps/galaxy/buildapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,8 @@ def wrap_in_middleware( app, global_conf, **local_conf ):
Based on the configuration wrap `app` in a set of common and useful
middleware.
"""
webapp = app

# Merge the global and local configurations
conf = global_conf.copy()
conf.update(local_conf)
Expand Down Expand Up @@ -794,6 +796,12 @@ def wrap_in_middleware( app, global_conf, **local_conf ):
from galaxy.web.framework.middleware.request_id import RequestIDMiddleware
app = RequestIDMiddleware( app )
log.debug( "Enabling 'Request ID' middleware" )

# api batch call processing middleware
from galaxy.web.framework.middleware.batch import BatchMiddleware
app = BatchMiddleware( webapp, app, {})
log.debug( "Enabling 'Batch' middleware" )

return app


Expand Down
87 changes: 87 additions & 0 deletions test/api/test_api_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import json
import pprint
import logging
from requests import post
from base import api

log = logging.getLogger( "functional_tests.py" )


class ApiBatchTestCase( api.ApiTestCase ):

def _get_api_key( self, admin=False ):
return self.galaxy_interactor.api_key if not admin else self.galaxy_interactor.master_api_key

def _with_key( self, url, admin=False ):
sep = '&' if '?' in url else '?'
return url + sep + 'key=' + self._get_api_key( admin=admin )

def _post_batch( self, batch ):
data = json.dumps({ "batch" : batch })
return post( "%s/batch" % ( self.galaxy_interactor.api_url ), data=data )

def _log_reponse( self, response ):
log.debug( 'RESPONSE %s\n%s', ( '-' * 40 ), pprint.pformat( response ) )

# ---- tests
def test_simple_array( self ):
batch = [
dict( url=self._with_key( '/api/histories' ) ),
dict( url=self._with_key( '/api/histories' ),
method='POST', body=json.dumps( dict( name='Wat' ) ) ),
dict( url=self._with_key( '/api/histories' ) ),
]
response = self._post_batch( batch )
response = response.json()
# self._log_reponse( response )
self.assertIsInstance( response, list )
self.assertEquals( len( response ), 3 )

def test_unallowed_route( self ):
batch = [
dict( url=self._with_key( '/api/workflow' ) )
]
response = self._post_batch( batch )
response = response.json()
self.assertIsInstance( response, list )
self.assertEquals( response[0][ 'status' ], 403 )

def test_404_route( self ):
# needs to be within the allowed routes
batch = [
dict( url=self._with_key( '/api/histories_bler' ) )
]
response = self._post_batch( batch )
response = response.json()
self.assertIsInstance( response, list )
self.assertEquals( response[0][ 'status' ], 404 )

def test_errors( self ):
batch = [
dict( url=self._with_key( '/api/histories/abc123' ) ),
dict( url=self._with_key( '/api/jobs' ), method='POST', body=json.dumps( dict( name='Wat' ) ) ),
]
response = self._post_batch( batch )
response = response.json()
# self._log_reponse( response )
self.assertIsInstance( response, list )
self.assertEquals( response[0][ 'status' ], 400 )
self.assertEquals( response[1][ 'status' ], 501 )

def test_querystring_params( self ):
post_data = dict( name='test' )
create_response = self._post( 'histories', data=post_data ).json()

history_url = '/api/histories/' + create_response[ 'id' ]
history_url_with_keys = history_url + '?v=dev&keys=size,non_ready_jobs'
contents_url_with_filters = history_url + '/contents?v=dev&q=deleted&qv=True'
batch = [
dict( url=self._with_key( history_url_with_keys ) ),
dict( url=self._with_key( contents_url_with_filters ) ),
]
response = self._post_batch( batch )
response = response.json()
self._log_reponse( response )
self.assertEquals( len( response ), 2 )
self.assertEquals( len( response[0][ 'body' ].keys() ), 2 )
self.assertEquals( response[1][ 'body' ], [] )

0 comments on commit 9de06e1

Please sign in to comment.