Skip to content

Commit

Permalink
A new version of the fff deletion daemon.
Browse files Browse the repository at this point in the history
Now includes a startup script.
  • Loading branch information
Dmitrijus Bugelskis committed Aug 13, 2014
1 parent 05ed6ce commit 9e26670
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 14 deletions.
69 changes: 69 additions & 0 deletions DQM/Integration/scripts/fff_deleter
@@ -0,0 +1,69 @@
#!/bin/bash
#
# /etc/rc.d/init.d/fff_deleter
#
# Init file for fff_deleter.py
# Copy it to /etc/rc.d/init.d/fff_deleter
#
# chkconfig: 345 80 20
# description: FFF Deletion daemon for DQM

# Source function library.
. /etc/rc.d/init.d/functions

DAEMON_BIN=/usr/local/bin/fff_deleter.py
PIDFILE=/var/run/fff_deleter.pid
RETVAL=0

start() {
echo -n $"Starting fff_deleter daemon: "
daemon $DAEMON_BIN
RETVAL=$?
echo

return $RETVAL
}

stop() {
echo -n $"Stopping fff_deleter daemon: "
killproc -p $PIDFILE
RETVAL=$?
echo

return $RETVAL
}


restart() {
stop
start
}

case "$1" in
start)
start
;;
stop)
stop
;;
reload)
echo "$0: Unimplemented feature."
RETVAL=3
;;
force-reload)
echo "$0: Unimplemented feature."
RETVAL=3
;;
restart)
restart
;;
status)
status acpid
RETVAL=$?
;;
*)
echo $"Usage: $0 {start|stop|status|restart}"
RETVAL=2
esac

exit $RETVAL
149 changes: 135 additions & 14 deletions DQM/Integration/scripts/fff_deletion.py → DQM/Integration/scripts/fff_deleter.py 100644 → 100755
@@ -1,13 +1,16 @@
#!/usr/bin/env python

import os
import logging
import re
import datetime
import subprocess
import socket
import time
import json
from StringIO import StringIO

logging.basicConfig(level=logging.INFO)
log = logging
log = logging.getLogger(__name__)

re_files = re.compile(r"^run(?P<run>\d+)/run(?P<runf>\d+)_ls(?P<ls>\d+)_.+\.(dat|raw)+(\.deleted)*")
def parse_file_name(rl):
Expand Down Expand Up @@ -72,12 +75,22 @@ def diskusage(top):
used = total - (st.f_bavail * st.f_frsize)
return float(used) * 100 / total

def diskusage_bytes(top):
st = os.statvfs(top)
total = st.f_blocks * st.f_frsize
free = st.f_bavail * st.f_frsize
used = total - free

return used, free, total

class FileDeleter(object):
def __init__(self, top, thresholds, email_to, fake=True, ):
def __init__(self, top, thresholds, email_to, report_directory, fake=True, ):
self.top = top
self.fake = fake
self.email_to = email_to
self.thresholds = thresholds
self.report_directory = report_directory
self.sequence = 0

self.last_email = None
self.min_interval = datetime.timedelta(seconds=60*10)
Expand Down Expand Up @@ -130,7 +143,58 @@ def send_smg(self, used_pc):
p = subprocess.Popen(["/bin/mail", "-s", subject, email], stdin=subprocess.PIPE, shell=False)
p.communicate(input=text)

def run(self):
def make_report(self, logout):
if not os.path.isdir(self.report_directory):
log.warning("Directory %s does not exists. Reports disabled.", self.report_directory)
return

meminfo = list(open("/proc/meminfo", "r").readlines())
def entry_to_dict(line):
key, value = line.split()[:2]
value = int(value)
return (key.strip(":"), value, )
meminfo = dict(map(entry_to_dict, meminfo))

if os.path.isdir(self.top):
used, free, total = diskusage_bytes(self.top)
else:
used, free, total = -1, -1, -1

doc = {
"sequence": self.sequence,
"memory_used": (meminfo["MemTotal"] - meminfo["MemFree"]) * 1024,
"memory_free": meminfo["MemFree"] * 1024,
"memory_total": meminfo["MemTotal"] * 1024,
"disk_used": used,
"disk_free": free,
"disk_total": total,
"hostname": self.hostname,
"extra": {
"meminfo": meminfo,
"last_log": logout.split("\n"),
},
"pid": os.getpid(),
"_id": "dqm-diskspace-%s" % self.hostname,
"type": "dqm-diskspace"
}

fn = "dqm-diskspace-seq%06d" % (doc["sequence"], )
tmp_fp = os.path.join(self.report_directory, "." + fn + ".tmp")
final_fp = os.path.join(self.report_directory, fn + ".jsn")
fd = open(tmp_fp, "w")

json.dump(doc, fd, indent=True)
fd.write("\n")
fd.close()

os.rename(tmp_fp, final_fp)

def run(self):
self.sequence += 1
if not os.path.isdir(self.top):
log.warning("Directory %s does not exists.", self.top)
return

cleanup_threshold(self.top, self.thresholds['rename'], self.rename, "rename")
cleanup_threshold(self.top, self.thresholds['delete'], self.delete, "delete")

Expand All @@ -149,11 +213,41 @@ def lock(pname):
except socket.error:
return None

def daemon(deleter, delay_seconds=30):
def daemon(deleter, log_capture, delay_seconds=30):
while True:
deleter.run()

if log_capture:
log_out = log_capture.getvalue()
log_capture.truncate(0)
deleter.make_report(log_out)

time.sleep(delay_seconds)


def daemonize(logfile, pidfile):
# do the double fork
pid = os.fork()
if pid != 0:
sys.exit(0)

os.setsid()
sys.stdin.close()
sys.stdout.close()
sys.stderr.close()

fl = open(logfile, "a")
sys.stdout = fl
sys.stderr = fl

pid = os.fork()
if pid != 0:
sys.exit(0)

if pidfile:
f = open(pidfile, "w")
f.write("%d\n" % os.getpid())
f.close()

import sys
if __name__ == "__main__":
#import argparse
Expand All @@ -166,28 +260,55 @@ def daemon(deleter, delay_seconds=30):
# try to take the lock or quit
sock = lock("fff_deleter")
if sock is None:
log.info("Already running, exitting.")
sys.exit(0)
sys.stderr.write("Already running, exitting.\n")
sys.stderr.flush()
sys.exit(1)

# threshold rename and delete must be in order
# in other words, always: delete > rename
# this is because delete only deletes renamed files

# email threshold has no restrictions
top = "/fff.changeme/ramdisk"
top = "/fff/ramdisk"
thresholds = {
'delete': 80,
'rename': 60,
'delete': 80,
'email': 90,
}
fake = not (len(sys.argv) > 1 and sys.argv[1] == "doit")

deleter = FileDeleter(
top = top,
thresholds = thresholds,
# put "41XXXXXXXXX@mail2sms.cern.ch" to send the sms
email_to = ["dmitrijus.bugelskis@cern.ch", ],
fake = fake,
email_to = [
"dmitrijus.bugelskis@cern.ch",
"atanas.batinkov@cern.ch",
"daniel.joseph.duggan@cern.ch",
],
report_directory = "/tmp/dqm_monitoring/",
fake = False,
)

daemon(deleter=deleter)
# setup logging
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

log.setLevel(logging.INFO)
log_capture = None
log_capture = StringIO()
log_capture_ch = logging.StreamHandler(log_capture)
log_capture_ch.setLevel(logging.INFO)
log_capture_ch.setFormatter(formatter)
log.addHandler(log_capture_ch)

if True: # run background
daemonize("/var/log/fff_deleter.log", "/var/run/fff_deleter.pid")

# log to stderr (it might be redirected)
flog_ch = logging.StreamHandler()
flog_ch.setLevel(logging.INFO)
flog_ch.setFormatter(formatter)
log.addHandler(flog_ch)

# write the pid file
log.info("Pid is %d", os.getpid())
daemon(deleter=deleter, log_capture=log_capture)

0 comments on commit 9e26670

Please sign in to comment.