forked from spotify/luigi
/
process.py
117 lines (95 loc) · 3.11 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Contains some helper functions to run luigid in daemon mode
"""
from __future__ import print_function
import datetime
import logging
import logging.handlers
import os
import random
import signal
rootlogger = logging.getLogger()
server_logger = logging.getLogger("luigi.server")
def check_pid(pidfile):
if pidfile and os.path.exists(pidfile):
try:
pid = int(open(pidfile).read().strip())
os.kill(pid, 0)
return pid
except BaseException:
return 0
return 0
def write_pid(pidfile):
server_logger.info("Writing pid file")
piddir = os.path.dirname(pidfile)
if piddir != '' and not os.path.exists(piddir):
os.makedirs(piddir)
with open(pidfile, 'w') as fobj:
fobj.write(str(os.getpid()))
def get_log_format():
return "%(asctime)s %(name)s[%(process)s] %(levelname)s: %(message)s"
def get_spool_handler(filename):
handler = logging.handlers.TimedRotatingFileHandler(
filename=filename,
when='d',
encoding='utf8',
backupCount=7 # keep one week of historical logs
)
formatter = logging.Formatter(get_log_format())
handler.setFormatter(formatter)
return handler
def _server_already_running(pidfile):
existing_pid = check_pid(pidfile)
if pidfile and existing_pid:
return True
return False
def daemonize(cmd, pidfile=None, logdir=None, api_port=8082, address=None):
import daemon
logdir = logdir or "/var/log/luigi"
if not os.path.exists(logdir):
os.makedirs(logdir)
log_path = os.path.join(logdir, "luigi-server.log")
# redirect stdout/stderr
today = datetime.date.today()
stdout_path = os.path.join(
logdir,
"luigi-server-{0:%Y-%m-%d}.out".format(today)
)
stderr_path = os.path.join(
logdir,
"luigi-server-{0:%Y-%m-%d}.err".format(today)
)
stdout_proxy = open(stdout_path, 'a+')
stderr_proxy = open(stderr_path, 'a+')
ctx = daemon.DaemonContext(
stdout=stdout_proxy,
stderr=stderr_proxy,
working_directory='.'
)
with ctx:
loghandler = get_spool_handler(log_path)
rootlogger.addHandler(loghandler)
if pidfile:
server_logger.info("Checking pid file")
existing_pid = check_pid(pidfile)
if pidfile and existing_pid:
server_logger.info("Server already running (pid=%s)", existing_pid)
return
write_pid(pidfile)
cmd(api_port=api_port, address=address)