Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This patch adds a producer of StAR accounting records. It is strongly based on the initial work by Henrik Jensen from Nordunet, but updated to provide better integration with the dCache configuration. Target: trunk Patch: http://rb.dcache.org/r/4970 Acked-by: Karsten Schwank Acked-by: Gerd Behrmann Require-notes: yes Require-book: yes
- Loading branch information
1 parent
de45d40
commit 60f8aa7
Showing
10 changed files
with
395 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,271 @@ | ||
#!/usr/bin/env python | ||
# | ||
# This script creates a file containing zero or more StAR records. | ||
# These records provide storage-usage accounting information. | ||
# | ||
import os | ||
import time | ||
import psycopg2 | ||
import csv | ||
from subprocess import Popen, PIPE | ||
from string import maketrans | ||
|
||
# Load in dCache configuration | ||
d = Popen(["dcache", "loader", "-q", "compile", "-python"], stdout=PIPE) | ||
exec d.communicate()[0] | ||
|
||
|
||
PHYSICAL_USAGE_QUERY = """ | ||
SELECT | ||
igid AS gid, | ||
substring(ilocation from '^.*://([^/]*)') AS site, | ||
CASE WHEN t_locationinfo.itype = 0 THEN 'tape' | ||
WHEN t_locationinfo.itype = 1 THEN 'disk' | ||
ELSE 'unknown media' | ||
END AS media, | ||
SUM(isize)::int8 AS size | ||
FROM | ||
t_inodes | ||
JOIN t_locationinfo ON t_inodes.ipnfsid = t_locationinfo.ipnfsid | ||
GROUP BY | ||
igid, | ||
substring(ilocation FROM '^.*://([^/]*)'), | ||
t_locationinfo.itype; | ||
""" | ||
|
||
|
||
LOGICAL_USAGE_QUERY = "SELECT igid AS gid, SUM(isize) AS size, COUNT(1) AS count FROM t_inodes GROUP BY igid;" | ||
|
||
def split_csv(values): | ||
if len(values) == 0: | ||
return dict() | ||
values_no_nl = values.translate(maketrans("\n", " ")) | ||
items = csv.reader([values_no_nl], skipinitialspace=True).next() | ||
return dict([e.strip() for e in item.split("=", 1)] for item in items) | ||
|
||
|
||
STORAGE_SHARE_MAPPING = split_csv(properties.get('star.storage-share-mapping')) | ||
GID_MAPPING = split_csv(properties.get('star.gid-mapping')) | ||
|
||
|
||
ISOZ_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" | ||
|
||
def last_record_time(): | ||
path = properties.get('star.last-update.path') | ||
if os.path.exists(path): | ||
f = open(path) | ||
try: | ||
when = f.readline().strip() | ||
finally: | ||
f.close() | ||
else: | ||
secs = time.time() - int(properties.get('star.polling-period')) | ||
when = time.strftime(ISOZ_TIME_FORMAT, time.localtime(secs)) | ||
|
||
return when | ||
|
||
|
||
def createPhysicalRecordArgs(create_time, storage_share, storage_media, group, end_time, resource_capacity_used): | ||
result = { | ||
'record_id' : build_id(group, storage_media, create_time), | ||
'create_time' : create_time, | ||
'storage_share' : storage_share, | ||
'storage_media' : storage_media, | ||
'group' : group, | ||
'start_time' : last_record_time(), | ||
'end_time' : end_time, | ||
'resource_capacity_used' : resource_capacity_used, | ||
'storage_system' : properties.get('star.storage-system'), | ||
'site' : properties.get('star.site') | ||
} | ||
|
||
if storage_share: | ||
result ['storage_share'] = storage_share | ||
|
||
return result | ||
|
||
|
||
def createLogicalRecordArgs(create_time, group, end_time, logical_capacity_used, count, sums): | ||
# NB. StAR v1.2 forces us to publish RCU. Here we calculate the | ||
# appropriate value for this logical record. | ||
total_rcu = 0 | ||
for (this_group, this_share, this_media), rcu in sums.items(): | ||
if this_group == group: | ||
total_rcu += rcu | ||
|
||
return { | ||
'record_id' : build_id(group, '', create_time), | ||
'create_time' : create_time, | ||
'group' : group, | ||
'start_time' : last_record_time(), | ||
'end_time' : end_time, | ||
'resource_capacity_used' : total_rcu, | ||
'logical_capacity_used' : logical_capacity_used, | ||
'file_count' : count, | ||
'storage_system' : properties.get('star.storage-system'), | ||
'site' : properties.get('star.site') | ||
} | ||
|
||
|
||
def split_fqan(group_arg): | ||
if group_arg[0] == '/': | ||
group = group_arg.split('/')[1] | ||
roles = group_arg.split('/')[2:] | ||
else: | ||
group = group_arg | ||
roles = [] | ||
|
||
return group, roles | ||
|
||
|
||
def write_identity(f, rec_args): | ||
group, roles = split_fqan(rec_args ["group"]) | ||
|
||
f.write(" <sr:SubjectIdentity>\n") | ||
f.write(" <sr:Group>" + group + "</sr:Group>\n") | ||
|
||
for role in roles: | ||
if role.startswith("Role="): | ||
f.write(" <sr:GroupAttribute sr:attributeType='role'>" + role[5:] + "</sr:GroupAttribute>\n") | ||
else: | ||
f.write(" <sr:GroupAttribute sr:attributeType='subgroup'>" + role + "</sr:GroupAttribute>\n") | ||
f.write(" </sr:SubjectIdentity>\n") | ||
|
||
|
||
def write_record(f, rec_args): | ||
f.write(" <sr:StorageUsageRecord>\n") | ||
f.write(" <sr:RecordIdentity sr:createTime='%(create_time)s' sr:recordId='%(record_id)s'/>\n" % rec_args) | ||
f.write(" <sr:StorageSystem>%(storage_system)s</sr:StorageSystem>\n" % rec_args); | ||
f.write(" <sr:Site>%(site)s</sr:Site>\n" % rec_args) | ||
|
||
if "storage_share" in rec_args and rec_args['storage_share']: | ||
f.write(" <sr:StorageShare>%(storage_share)s</sr:StorageShare>\n" % rec_args) | ||
|
||
if "storage_media" in rec_args and rec_args['storage_media']: | ||
f.write(" <sr:StorageMedia>%(storage_media)s</sr:StorageMedia>\n" % rec_args) | ||
|
||
if "file_count" in rec_args and rec_args['file_count']: | ||
f.write(" <sr:FileCount>%(file_count)i</sr:FileCount>\n" % rec_args) | ||
|
||
write_identity(f, rec_args) | ||
|
||
f.write(" <sr:StartTime>%(start_time)s</sr:StartTime>\n" % rec_args) | ||
f.write(" <sr:EndTime>%(end_time)s</sr:EndTime>\n" % rec_args) | ||
f.write(" <sr:ResourceCapacityUsed>%(resource_capacity_used)i</sr:ResourceCapacityUsed>\n" % rec_args) | ||
|
||
if "logical_capacity_used" in rec_args and rec_args['logical_capacity_used']: | ||
f.write(" <sr:LogicalCapacityUsed>%(logical_capacity_used)i</sr:LogicalCapacityUsed>\n" % rec_args) | ||
|
||
f.write(" </sr:StorageUsageRecord>\n" % rec_args) | ||
|
||
|
||
def build_id(group, media, now): | ||
site = properties.get('star.site') | ||
if group [0] == '/': | ||
g = group [1:] | ||
else: | ||
g = group | ||
|
||
return site.replace('.', '_') + '_' + g.replace('/', '_').replace('=','_') + '_' + media + '_' + now.replace(':', '').replace('-','') | ||
|
||
|
||
def query_chimera(query): | ||
db_name = properties.get('chimera.db.name') | ||
db_host = properties.get('chimera.db.host') | ||
db_user = properties.get('chimera.db.user') | ||
db_pw = properties.get('chimera.db.password') | ||
conn = psycopg2.connect(database=db_name, user=db_user, host=db_host, password=db_pw) | ||
try: | ||
cur = conn.cursor() | ||
try: | ||
cur.execute(query) | ||
rows = cur.fetchall() | ||
finally: | ||
cur.close() | ||
finally: | ||
conn.close() | ||
return rows | ||
|
||
|
||
|
||
def fetch_physical_record_data(): | ||
sums = {} | ||
for record in query_chimera(PHYSICAL_USAGE_QUERY): | ||
gid, site_pool, media, rcu = record | ||
|
||
share = STORAGE_SHARE_MAPPING.get(site_pool) | ||
group = GID_MAPPING.get(str(gid)) | ||
|
||
if group: | ||
key = (group, share, media) | ||
sums [key] = sums.get(key, 0) + rcu | ||
|
||
return sums | ||
|
||
|
||
def fetch_logical_record_data(): | ||
sums = {} | ||
counts = {} | ||
for record in query_chimera(LOGICAL_USAGE_QUERY): | ||
gid, lcu, count = record | ||
|
||
group = GID_MAPPING.get(str(gid)) | ||
|
||
if group: | ||
sums [group] = sums.get(group, 0) + lcu | ||
counts [group] = counts.get(group, 0) + count | ||
|
||
return sums, counts | ||
|
||
|
||
def open_records_file(now): | ||
record_dir = properties.get('star.spool.dir') | ||
|
||
if not os.path.exists(record_dir): | ||
os.makedirs(record_dir) | ||
|
||
filename = now.replace(':', '').replace('-','') + ".star" | ||
file_path = os.path.join(record_dir, filename) | ||
return open(file_path, 'w') | ||
|
||
|
||
|
||
def write_records(now): | ||
f = open_records_file(now) | ||
try: | ||
f.write("<?xml version='1.0' encoding='UTF-8' ?>\n") | ||
f.write("<sr:StorageUsageRecords xmlns:sr='http://eu-emi.eu/namespaces/2011/02/storagerecord'>\n") | ||
|
||
physical_sums = fetch_physical_record_data() | ||
for (group, share, media), rcu in physical_sums.items(): | ||
rec_args = createPhysicalRecordArgs(now, share, media, group, now, rcu) | ||
write_record(f, rec_args) | ||
|
||
logical_sums, counts = fetch_logical_record_data() | ||
for group, lcu in logical_sums.items(): | ||
count = counts.get(group, 0) | ||
rec_args = createLogicalRecordArgs(now, group, now, lcu, count, physical_sums) | ||
write_record(f, rec_args) | ||
|
||
f.write("</sr:StorageUsageRecords>\n") | ||
finally: | ||
f.close() | ||
|
||
|
||
def update_last_update(now): | ||
f = open(properties.get('star.last-update.path'), 'w') | ||
try: | ||
f.write(now) | ||
finally: | ||
f.close() | ||
|
||
|
||
def main(): | ||
now = time.strftime(ISOZ_TIME_FORMAT) | ||
write_records(now) | ||
update_last_update(now) | ||
|
||
|
||
if __name__ == '__main__': | ||
main() | ||
|
Oops, something went wrong.