/
module.py
139 lines (106 loc) · 2.89 KB
/
module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import inspect
import types
import logging
import os
import os.path as osp
import sys
import gevent.monkey
gevent.monkey.patch_all()
import gevent.pool
from gevent_zeromq import zmq
# pyscale
from .log import config_logger
from ..zmq import Socket, MultiSocket, RpcServer
# project
from config.app import Configuration
def job(method):
method.job = True
return method
class BaseModule(object):
""" Basic Module Class (daemon) """
def __init__(self, context=None):
# app config
self.conf = Configuration()
# module config
self.name = osp.basename(osp.dirname(osp.abspath(sys.argv[0])))
self.pidfile = "tmp/pids/%s.pid" % self.name
config_logger("logs/%s.log" % self.name, self.conf.log_level)
# zmq context
self.context = context or zmq.Context.instance()
# pool of greenlets
self.jobs = gevent.pool.Group()
# zmq REQ/REP API
self.rpc = RpcServer(self, "ipc://tmp/sockets/rpc/%s.sock" % self.name)
self.rpc.run()
# spawn jobs
bases = self.__class__.__mro__
for base in bases:
for func in base.__dict__.values():
if getattr(func, 'job', None):
method = types.MethodType(func, self, self.__class__)
self.jobs.spawn(method)
def run(self):
""" Run the current module (start greenlets) """
# check for previous crash
if os.access(self.pidfile, os.F_OK):
pid = open(self.pidfile, 'r').readline()
if osp.exists('/proc/' + pid):
logging.warn("%s already running with pid %s" % (self.name, pid))
return
else:
logging.warn("%s seems to have crashed.. deleting pidfile" % self.name)
os.remove(self.pidfile)
# run all jobs
with self:
try:
self.jobs.join()
except KeyboardInterrupt:
self.jobs.kill()
# zmq.Context.instance().term()
def sock(self, name, _type=None):
""" Socket convenience function """
if _type:
return Socket(name, _type)
else:
return Socket(name)
def multisock(self, name, _type=None):
""" MultiSocket convenince function """
if _type:
return MultiSocket(name, _type)
else:
return MultiSocket(name)
def __enter__(self):
logging.status("%s started" % self.name)
# create pidfile
open(self.pidfile, 'w').write(str(os.getpid()))
return self
def __exit__(self, type, value, traceback):
logging.status("%s stopped" % self.name)
# remove pidfile
os.remove(self.pidfile)
return False
def help(self):
methods = []
for name in dir(self):
obj = getattr(self, name)
if inspect.ismethod(obj):
if getattr(obj, 'api', False):
# extract specs
spec = inspect.getargspec(obj)
# extract docstring
doc = inspect.getdoc(obj)
if doc: doc = doc.strip()
methods.append((name, inspect.formatargspec(*spec), doc))
else:
if not name.startswith('_'):
methods.append(name)
return methods
# notifications
def notice(self, msg):
pass
def alert(self, msg):
pass
def error(self, msg):
pass