Skip to content

Commit

Permalink
Merge branch 'threaded-xslt'
Browse files Browse the repository at this point in the history
  • Loading branch information
pavel trukhanov committed Oct 25, 2010
2 parents 0137ba0 + 8a04581 commit 5493ca2
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 26 deletions.
3 changes: 2 additions & 1 deletion .gitignore
@@ -1,4 +1,5 @@
*.pyc
*~
frontik.iml
src/frontik_test.log
debian/files
Expand All @@ -8,4 +9,4 @@ debian/*.substvars
debian/frontik
build
debian/pycompat
debian/python-module-stampdir/
debian/python-module-stampdir/
37 changes: 24 additions & 13 deletions src/frontik/handler.py 100755 → 100644
Expand Up @@ -18,6 +18,7 @@
import tornado.httpclient
import tornado.options
import tornado.web
import tornado.ioloop

from frontik import etree
import frontik.async
Expand All @@ -28,6 +29,7 @@
import frontik.handler_xml
import frontik.handler_whc_limit
import frontik.handler_debug
import frontik.jobs

import frontik.log as logging
log = logging.getLogger('frontik.handler')
Expand Down Expand Up @@ -153,6 +155,11 @@ def __init__(self, app_package):
self.http_client = frontik.http.TimeoutingHttpFetcher(
tornado.httpclient.AsyncHTTPClient(max_clients=200, max_simultaneous_connections=200))

if tornado.options.options.executor_pool:
self.executor = frontik.jobs.PoolExecutor(pool_size=tornado.options.options.executor_pool_size)
else:
self.executor = frontik.jobs.SimpleSpawnExecutor()


class PageHandler(tornado.web.RequestHandler):

Expand All @@ -168,14 +175,15 @@ def __init__(self, ph_globals, application, request):
self.ph_globals = ph_globals
self.config = self.ph_globals.config
self.http_client = self.ph_globals.http_client
self.executor = self.ph_globals.executor

self.debug = frontik.handler_debug.PageHandlerDebug(self)

self.whc_limit = frontik.handler_whc_limit.PageHandlerWHCLimit(self)

self.xml = frontik.handler_xml.PageHandlerXML(self)
self.doc = self.xml.doc # backwards compatibility for self.doc.put
self.doc = self.xml.doc # backwards compatibility for self.doc.put

self.text = None

self.finish_group = frontik.async.AsyncGroup(self.async_callback(self._finish_page),
Expand Down Expand Up @@ -415,21 +423,24 @@ def _finish_page(self):

if self.text is not None:
res = self._prepare_finish_plaintext()
self._apply_postprocessor(res)
else:
res = self.xml._finish_xml()

if hasattr(self.config, 'postprocessor'):
if self.apply_postprocessor:
self.log.debug('applying postprocessor')
self.async_callback(self.config.postprocessor)(self, res, self.async_callback(partial(self._wait_postprocessor, time.time())))
else:
self.log.debug('skipping postprocessor')
self.finish(res)
self.xml._finish_xml(self.async_callback(self._apply_postprocessor))

else:
self.log.warn('trying to finish already finished page, probably bug in a workflow, ignoring')

def _apply_postprocessor(self, res):
if hasattr(self.config, 'postprocessor'):
if self.apply_postprocessor:
self.log.debug('applying postprocessor')
self.async_callback(self.config.postprocessor)(self, res, self.async_callback(partial(self._wait_postprocessor, time.time())))
else:
self.log.debug('skipping postprocessor')
self.finish(res)

else:
self.log.warn('trying to finish already finished page, probably bug in a workflow, ignoring')
self.finish(res)


def _wait_postprocessor(self, start_time, data):
self.log.stage_tag("postprocess")
Expand Down
32 changes: 20 additions & 12 deletions src/frontik/handler_xml.py 100755 → 100644
@@ -1,12 +1,15 @@
# -*- coding: utf-8 -*-

import functools
import os.path
import weakref
import threading
import time
import urllib
import weakref

import tornado.autoreload
import tornado.options
import tornado.ioloop

from frontik import etree
import frontik.util
Expand Down Expand Up @@ -136,6 +139,7 @@ def __init__(self, handler):

self.doc = frontik.doc.Doc(root_node=etree.Element('doc', frontik='true'))
self.transform = None
self.transform_result = None
if not self.handler.config.apply_xsl:
self.log.debug('ignoring set_xsl() because config.apply_xsl=%s', self.handler.config.apply_xsl)
self.apply_xsl = False
Expand Down Expand Up @@ -168,33 +172,37 @@ def set_xsl(self, filename):
except:
self._set_xsl_log_and_raise('XSL transformation error with file {0}')

def _finish_xml(self):
def _finish_xml(self, cb):
if self.apply_xsl and self.transform:
return self._prepare_finish_with_xsl()
self._prepare_finish_with_xsl(cb)
else:
return self._prepare_finish_wo_xsl()
self._prepare_finish_wo_xsl(cb)

def _prepare_finish_with_xsl(self):
def _prepare_finish_with_xsl(self, cb):
self.log.debug('finishing with xsl')

if not self.handler._headers.get("Content-Type", None):
self.handler.set_header('Content-Type', 'text/html')

try:
@self.handler.async_callback
def reraise_in_ioloop(e):
self.log.exception('failed transformation with XSL %s' % self.transform_filename)
raise e

def apply_xsl():
t = time.time()
result = str(self.transform(self.doc.to_etree_element()))
self.log.stage_tag("xsl")
self.log.debug('applied XSL %s in %.2fms', self.transform_filename, (time.time() - t)*1000)
return result
except:
self.log.exception('failed transformation with XSL %s' % self.transform_filename)
raise
return result

self.handler.executor.add_job(apply_xsl, cb, reraise_in_ioloop)

def _prepare_finish_wo_xsl(self):
def _prepare_finish_wo_xsl(self, cb):
self.log.debug('finishing wo xsl')

# В режиме noxsl мы всегда отдаем xml.
self.handler.set_header('Content-Type', 'application/xml')

return self.doc.to_string()
cb(self.doc.to_string())

46 changes: 46 additions & 0 deletions src/frontik/jobs.py
@@ -0,0 +1,46 @@
import time
import functools
import threading
import Queue
import tornado.ioloop

import logging
log = logging.getLogger('frontik.jobs')

def work(func, cb, exception_cb):
try:
result = func()
tornado.ioloop.IOLoop.instance().add_callback(functools.partial(cb, result))
except Exception, e:
tornado.ioloop.IOLoop.instance().add_callback(functools.partial(exception_cb, e))

def queue_worker(queue):
while True:
(func, cb, exception_cb) = queue.get()
work(func, cb, exception_cb)


class PoolExecutor(object):
def __init__(self, pool_size=5):
self.log = log
self.events = Queue.Queue()

self.log.debug('pool size: '+str(pool_size))
self.workers = [threading.Thread(target=functools.partial(queue_worker, self.events))
for i in range(pool_size)]
[i.setDaemon(True) for i in self.workers]
[i.start() for i in self.workers]
self.log.debug('active threads count = ' + str(threading.active_count()))


def add_job(self, func, cb, exception_cb):
self.events.put((func, cb, exception_cb))

class SimpleSpawnExecutor(object):
def __init__(self):
self.log = log

def add_job(self, func, cb, error_cb):
threading.Thread(target=functools.partial(work, func, cb, error_cb)).start()
self.log.debug('active threads count (+1) = ' + str(threading.active_count()))

3 changes: 3 additions & 0 deletions src/frontik_srv.py
Expand Up @@ -28,6 +28,9 @@
tornado.options.define('debug_login', None, str)
tornado.options.define('debug_password', None, str)

tornado.options.define('executor_pool', False, bool)
tornado.options.define('executor_pool_size', 7, int)

tornado_util.server.bootstrap(config)

for log_channel_name in options.suppressed_loggers:
Expand Down
8 changes: 8 additions & 0 deletions src/test/frontik_www/pages/xsl_fail.py
@@ -0,0 +1,8 @@
import frontik.handler
from frontik import etree

class Page(frontik.handler.PageHandler):
def get_page(self):
self.set_xsl('fail.xsl')

self.doc.put(etree.Element('ok'))
11 changes: 11 additions & 0 deletions src/test/xsl/fail.xsl
@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="utf-8"?>
<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform">

<xsl:template match="doc">
<html><body>
<xsl:value-of select="$non-existing-var" />
</body></html>
</xsl:template>

</xsl:stylesheet>

9 changes: 9 additions & 0 deletions src/test_integration.py
Expand Up @@ -134,6 +134,15 @@ def test_content_type_with_xsl():
assert(get_page(srv_port, 'page/simple', xsl=True).headers['content-type'].startswith('text/html'))


def test_xsl_fail():
with FrontikTestInstance() as srv_port:
try:
get_page(srv_port, 'page/xsl_fail', xsl=True)
raise Exception('XSL should have failed')
except urllib2.HTTPError, e:
pass


def test_content_type_wo_xsl():
with FrontikTestInstance() as srv_port:
assert(get_page(srv_port, 'page/simple', xsl=False).headers['content-type'].startswith('application/xml'))
Expand Down

0 comments on commit 5493ca2

Please sign in to comment.