forked from openwpm/OpenWPM
/
MPLogger.py
155 lines (133 loc) · 5.11 KB
/
MPLogger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
""" Support for logging with the multiprocessing module """
from __future__ import absolute_import
from __future__ import print_function
from .SocketInterface import serversocket
from six.moves.queue import Empty as EmptyQueue
import logging.handlers
import logging
import struct
import json
import time
import sys
import os
class ClientSocketHandler(logging.handlers.SocketHandler):
"""
Make SocketHandler compatible with SocketInterface.py
"""
def makePickle(self, record):
"""
Serializes the record via json and prepends a length/serialization
flag. Returns it ready for transmission across the socket.
"""
ei = record.exc_info
if ei:
# just to get traceback text into record.exc_text ...
dummy = self.format(record) # noqa
record.exc_info = None # to avoid Unpickleable error
d = dict(record.__dict__)
d['msg'] = record.getMessage()
d['args'] = None
s = json.dumps(d).encode('utf-8')
if ei:
record.exc_info = ei # for next handler
return struct.pack('>Lc', len(s), b'j') + s
def loggingclient(logger_address, logger_port, level=logging.DEBUG):
""" Establishes a logger that sends log records to loggingserver """
logger = logging.getLogger(__name__)
logger.setLevel(level)
# Logger object shared, so we only want to connect handlers once
if not len(logger.handlers):
# Set up the SocketHandler - formatted server-side
socketHandler = ClientSocketHandler(logger_address, logger_port)
socketHandler.setLevel(level)
logger.addHandler(socketHandler)
# Set up logging to console
consoleHandler = logging.StreamHandler(sys.stdout)
consoleHandler.setLevel(logging.INFO)
formatter = logging.Formatter('%(module)-20s - %(levelname)-8s - %(message)s')
consoleHandler.setFormatter(formatter)
logger.addHandler(consoleHandler)
return logger
def loggingserver(log_file, status_queue):
"""
A logging server to serialize writes to the log file from multiple
processes.
<log_file> location of the log file on disk
<status_queue> is a queue connect to the TaskManager used for communication
"""
# Configure the log file
logging.basicConfig(filename=os.path.expanduser(log_file),
format= '%(asctime)s - %(processName)-11s[%(threadName)-10s]' +
' - %(module)-20s - %(levelname)-8s: %(message)s',
level=logging.INFO)
# Sets up the serversocket to start accepting connections
sock = serversocket()
status_queue.put(sock.sock.getsockname()) # let TM know location
sock.start_accepting()
while True:
# Check for KILL command from TaskManager
if not status_queue.empty():
status_queue.get()
sock.close()
_drain_queue(sock.queue)
break
# Process logs
try:
obj = sock.queue.get(True, 10)
_handleLogRecord(obj)
except EmptyQueue:
pass
def _handleLogRecord(obj):
""" Handle log, logs everything sent. Should filter client-side """
# Log message came from browser extension: requires special handling
if len(obj) == 2 and obj[0] == 'EXT':
obj = json.loads(obj[1])
record = logging.LogRecord(name=__name__,
level=obj['level'],
pathname=obj['pathname'],
lineno=obj['lineno'],
msg=obj['msg'],
args=obj['args'],
exc_info=obj['exc_info'],
func=obj['func'])
else:
record = logging.makeLogRecord(obj)
logger = logging.getLogger(record.name)
logger.handle(record)
def _drain_queue(sock_queue):
""" Ensures queue is empty before closing """
time.sleep(3) # TODO: the socket needs a better way of closing
while not sock_queue.empty():
obj = sock_queue.get()
_handleLogRecord(obj)
def main():
# Some tests
import logging, logging.handlers
import multiprocess as mp
# Set up loggingserver
log_file = '~/mplogger.log'
status_queue = mp.Queue()
loggingserver = mp.Process(target=loggingserver, args=(log_file, status_queue))
loggingserver.daemon = True
loggingserver.start()
server_address = status_queue.get()
# Connect main process to logging server
rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = ClientSocketHandler(*server_address)
rootLogger.addHandler(socketHandler)
# Send some sample logs
logging.info('Test1')
logging.error('Test2')
logging.critical('Test3')
logging.debug('Test4')
logging.warning('Test5')
logger1 = logging.getLogger('test1')
logger2 = logging.getLogger('test2')
logger1.info('asdfasdfsa')
logger2.info('1234567890')
# Close the logging server
status_queue.put('DIE')
loggingserver.join()
print("Server closed, exiting...")
if __name__ == '__main__': main()