Skip to content

Commit

Permalink
HH-15804 naive threading for xslt impl
Browse files Browse the repository at this point in the history
spawning for new thread on each xslt = there is no thread pool
  • Loading branch information
pavel trukhanov committed Oct 19, 2010
1 parent 3ebec2d commit 008afd2
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 23 deletions.
29 changes: 18 additions & 11 deletions src/frontik/handler.py
Expand Up @@ -28,6 +28,7 @@
import frontik.handler_xml
import frontik.handler_whc_limit
import frontik.handler_debug
import frontik.jobs

import logging
log = logging.getLogger('frontik.handler')
Expand Down Expand Up @@ -176,6 +177,8 @@ def __init__(self, ph_globals, application, request):
self.xml = frontik.handler_xml.PageHandlerXML(self)
self.doc = self.xml.doc # backwards compatibility for self.doc.put

self.executor = frontik.jobs.executor

self.text = None

self.finish_group = frontik.async.AsyncGroup(self.async_callback(self._finish_page),
Expand Down Expand Up @@ -414,21 +417,25 @@ def _finish_page(self):

if self.text is not None:
res = self._prepare_finish_plaintext()
self._apply_postprocessor(res)
else:
res = self.xml._finish_xml()

if hasattr(self.config, 'postprocessor'):
if self.apply_postprocessor:
self.log.debug('applying postprocessor')
self.async_callback(self.config.postprocessor)(self, res, self.async_callback(partial(self._wait_postprocessor, time.time())))
else:
self.log.debug('skipping postprocessor')
self.finish(res)
#self.xml._finish_xml(self._apply_postprocessor)
self.async_callback(self.xml._finish_xml)( self.async_callback(self._apply_postprocessor) )

else:
self.log.warn('trying to finish already finished page, probably bug in a workflow, ignoring')

def _apply_postprocessor(self, res):
if hasattr(self.config, 'postprocessor'):
if self.apply_postprocessor:
self.log.debug('applying postprocessor')
self.async_callback(self.config.postprocessor)(self, res, self.async_callback(partial(self._wait_postprocessor, time.time())))
else:
self.log.debug('skipping postprocessor')
self.finish(res)

else:
self.log.warn('trying to finish already finished page, probably bug in a workflow, ignoring')
self.finish(res)


def _wait_postprocessor(self, start_time, data):
self.log.stage_tag("postprocess")
Expand Down
28 changes: 16 additions & 12 deletions src/frontik/handler_xml.py
Expand Up @@ -136,6 +136,7 @@ def __init__(self, handler):

self.doc = frontik.doc.Doc(root_node=etree.Element('doc', frontik='true'))
self.transform = None
self.transform_result = None
if not self.handler.config.apply_xsl:
self.log.debug('ignoring set_xsl() because config.apply_xsl=%s', self.handler.config.apply_xsl)
self.apply_xsl = False
Expand Down Expand Up @@ -168,27 +169,30 @@ def set_xsl(self, filename):
except:
self._set_xsl_log_and_raise('XSL transformation error with file {0}')

def _finish_xml(self):
def _finish_xml(self, cb):
if self.apply_xsl and self.transform:
return self._prepare_finish_with_xsl()
return self._prepare_finish_with_xsl(cb)
else:
return self._prepare_finish_wo_xsl()

def _prepare_finish_with_xsl(self):
def _prepare_finish_with_xsl(self, cb):
self.log.debug('finishing with xsl')

if not self.handler._headers.get("Content-Type", None):
self.handler.set_header('Content-Type', 'text/html')

try:
t = time.time()
result = str(self.transform(self.doc.to_etree_element()))
self.log.stage_tag("xsl")
self.log.debug('applied XSL %s in %.2fms', self.transform_filename, (time.time() - t)*1000)
return result
except:
self.log.exception('failed transformation with XSL %s' % self.transform_filename)
raise
def run():
try:
t = time.time()
result = str(self.transform(self.doc.to_etree_element()))
self.log.stage_tag("xsl")
self.log.debug('applied XSL %s in %.2fms', self.transform_filename, (time.time() - t)*1000)
return result
except:
self.log.exception('failed transformation with XSL %s' % self.transform_filename)
raise

self.handler.executor.start_job(run, cb)

def _prepare_finish_wo_xsl(self):
self.log.debug('finishing wo xsl')
Expand Down
50 changes: 50 additions & 0 deletions src/frontik/jobs.py
@@ -0,0 +1,50 @@
import time
import functools
import threading
import tornado.ioloop
io_loop = tornado.ioloop.IOLoop.instance()

import logging
log = logging.getLogger('frontik.jobs')

class _Job(threading.Thread):
def __init__(self, func, done):
threading.Thread.__init__(self)
self.func = func
self.result = None
self.done = done

def run(self):
self.result = self.func()
self.done.set()

class _Executor():
def __init__(self):
self.events = []

def start_job(self, func, cb):
done = threading.Event()
job = _Job(func, done)
def _cb():
cb(job.result)

This comment has been minimized.

Copy link
@elephantum

elephantum Oct 21, 2010

Contributor

Вот это не правильно. Весь код работающий в ioloop не thread-safe, поэтому нужно не на прямую запускать cb, а передавать управление в ioloop.

ioloop.add_callback(cb, job.result)
job.start()
self.events.append((done, _cb))
self.listen_events()

def listen_events(self):
# log.debug('active threads count = ' + str(threading.active_count()))
ev_c = len(self.events)
# log.debug('waiting events count = ' + str(ev_c))
if ev_c != 0:
# io_loop.add_callback(self._event_listener)
io_loop.add_timeout(time.time()+0.001, self._event_listener)

def _event_listener(self):
undone_events = filter(lambda (e, cb): not e.is_set(), self.events)
if undone_events.count != self.events.count:
done_events = filter(lambda (e, cb): e.is_set(), self.events)
self.events = undone_events
map(lambda (e, cb): cb(), done_events)
self.listen_events()

This comment has been minimized.

Copy link
@elephantum

elephantum Oct 21, 2010

Contributor

Вот эта конструкция загружает процессор на 100%, потому что если есть callback'и то ioloop вызывает epoll с нулевым таймаутом. И твой callback тут же добавляет себя снова в ioloop.

Эта ветка исполнения тебе вообще не нужна.

тебе нужен объект класса Queue.Queue и тело треда вида:

while True:
  (func, done_cb) = input_queue.get()
  func()
  ioloop.add_callback(done_cb)

executor = _Executor()

1 comment on commit 008afd2

@elephantum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не мудри, выкинь машинерию с jobs сейчас.

Сделай просто:

threading.Thread(target=apply_xslt_and_return_control_to_ioloop).start()

тебе больше ничего не нужно.

Please sign in to comment.