forked from piskvorky/gensim
/
lsi_worker.py
executable file
·126 lines (93 loc) · 3.7 KB
/
lsi_worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2010 Radim Rehurek <radimrehurek@seznam.cz>
# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html
"""
USAGE: %(program)s
Worker ("slave") process used in computing distributed LSI. Run this script \
on every node in your cluster. If you wish, you may even run it multiple times \
on a single machine, to make better use of multiple cores (just beware that \
memory footprint increases accordingly).
Example: python -m gensim.models.lsi_worker
"""
from __future__ import with_statement
import os, sys, logging
import threading
import tempfile
try:
import Queue
except ImportError:
import queue as Queue
import Pyro4
from gensim.models import lsimodel
from gensim import utils
logger = logging.getLogger('gensim.models.lsi_worker')
SAVE_DEBUG = 0 # save intermediate models after every SAVE_DEBUG updates (0 for never)
class Worker(object):
def __init__(self):
self.model = None
def initialize(self, myid, dispatcher, **model_params):
self.lock_update = threading.Lock()
self.jobsdone = 0 # how many jobs has this worker completed?
self.myid = myid # id of this worker in the dispatcher; just a convenience var for easy access/logging TODO remove?
self.dispatcher = dispatcher
self.finished = False
logger.info("initializing worker #%s" % myid)
self.model = lsimodel.LsiModel(**model_params)
@Pyro4.oneway
def requestjob(self):
"""
Request jobs from the dispatcher, in a perpetual loop until `getstate()` is called.
"""
if self.model is None:
raise RuntimeError("worker must be initialized before receiving jobs")
job = None
while job is None and not self.finished:
try:
job = self.dispatcher.getjob(self.myid)
except Queue.Empty:
# no new job: try again, unless we're finished with all work
continue
if job is not None:
logger.info("worker #%s received job #%i" % (self.myid, self.jobsdone))
self.processjob(job)
self.dispatcher.jobdone(self.myid)
else:
logger.info("worker #%i stopping asking for jobs" % self.myid)
@utils.synchronous('lock_update')
def processjob(self, job):
self.model.add_documents(job)
self.jobsdone += 1
if SAVE_DEBUG and self.jobsdone % SAVE_DEBUG == 0:
fname = os.path.join(tempfile.gettempdir(), 'lsi_worker.pkl')
self.model.save(fname)
@utils.synchronous('lock_update')
def getstate(self):
logger.info("worker #%i returning its state after %s jobs" %
(self.myid, self.jobsdone))
assert isinstance(self.model.projection, lsimodel.Projection)
self.finished = True
return self.model.projection
@utils.synchronous('lock_update')
def reset(self):
logger.info("resetting worker #%i" % self.myid)
self.model.projection = self.model.projection.empty_like()
self.finished = False
@Pyro4.oneway
def exit(self):
logger.info("terminating worker #%i" % self.myid)
os._exit(0)
#endclass Worker
def main():
logging.basicConfig(format = '%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
logger.info("running %s" % " ".join(sys.argv))
program = os.path.basename(sys.argv[0])
# make sure we have enough cmd line parameters
if len(sys.argv) < 1:
print(globals()["__doc__"] % locals())
sys.exit(1)
utils.pyro_daemon('gensim.lsi_worker', Worker(), random_suffix=True)
logger.info("finished running %s" % program)
if __name__ == '__main__':
main()