Skip to content
This repository has been archived by the owner on May 28, 2021. It is now read-only.

Commit

Permalink
Fix DBS2 bug; add support for GridFS; add new generator parse2gridfs …
Browse files Browse the repository at this point in the history
…who pass input docs or write them to GridFS and replace their meta-data with gridfs pointer. fixes #611

Signed-off-by: Valentin Kuznetsov <vkuznet@gmail.com>


git-svn-id: svn+ssh://svn.cern.ch/reps/CMSDMWM/DAS/trunk@10775 4525493e-7705-40b1-a816-d608a930855b
  • Loading branch information
valya committed Oct 26, 2010
1 parent 1e3da4b commit 01601f8
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 6 deletions.
3 changes: 3 additions & 0 deletions doc/sphinx/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ performed stress tests and code audit DAS servers.
- add to DAS PLY special keys, date and system, to allow queries like
run date last 24h, jobsummary date last 24h. Prevent queires like
run last 24h since it leads to ambuguous conditions.
- add support for GridFS; parse2gridfs generator pass docs whose size less then
MongoDB limit (4MB) or store doc into GridFS. In later case the doc in DAS
workflow is replaced with gridfs pointer (issue #611)

- 0.5.3 - 0.5.4 series

Expand Down
9 changes: 8 additions & 1 deletion src/python/DAS/services/abstract_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
from DAS.utils.utils import row2das, extract_http_error, make_headers
from DAS.utils.utils import xml_parser, json_parser, plist_parser
from DAS.utils.utils import yield_rows, expire_timestamp
#from DAS.core.das_aggregators import das_func
from DAS.core.das_mongocache import compare_specs, encode_mongo_query
from DAS.utils.das_timer import das_timer
from DAS.utils.das_db import db_gridfs, parse2gridfs

def dasheader(system, query, api, url, args, ctime, expire):
"""
Expand Down Expand Up @@ -55,6 +55,9 @@ def __init__(self, name, config):
self.dasmapping = config['dasmapping']
self.analytics = config['dasanalytics']
self.write2cache = config.get('write_cache', True)
host = config['mongodb']['dbhost']
port = config['mongodb']['dbport']
self.gfs = db_gridfs(host, port)
except:
traceback.print_exc()
print config
Expand Down Expand Up @@ -440,6 +443,10 @@ def set_misses(self, query, api, genrows):
Check and adjust DAS records wrt input query. If some of the DAS
keys are missing, add it with its value to the DAS record.
"""
# Scan all docs and store those whose size above MongoDB limit into
# GridFS
genrows = parse2gridfs(self.gfs, genrows, self.logger)
# look-up primary key
prim_key = self.dasmapping.primary_key(self.name, api)
spec = query['spec']
skeys = spec.keys()
Expand Down
3 changes: 3 additions & 0 deletions src/python/DAS/services/dbs/dbs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def parser(self, query, dformat, source, api):
"""
if api == 'listBlocks':
prim_key = 'block'
if api == 'listBlocks4path':
api = 'listBlocks'
prim_key = 'block'
elif api == 'listBlockProvenance':
prim_key = 'block'
elif api == 'listFiles':
Expand Down
42 changes: 38 additions & 4 deletions src/python/DAS/utils/das_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
__version__ = "$Revision: 1.9 $"
__author__ = "Valentin Kuznetsov"

import sys
import time
import types
import traceback

# monogo db modules
from pymongo.connection import Connection
import gridfs

def connection_monitor(dbhost, dbport, func, sleep=5):
"""
Expand Down Expand Up @@ -64,7 +66,10 @@ def connection(self, dbhost, dbport):
uri = make_uri([pair])
if not self.conndict.has_key(uri):
try:
self.conndict[uri] = Connection(uri)
dbinst = Connection(uri)
gfs = dbinst.gridfs
fsinst = gridfs.GridFS(gfs)
self.conndict[uri] = (dbinst, fsinst)
except:
traceback.print_exc()
return None
Expand All @@ -75,7 +80,10 @@ def connections(self, pairs):
uri = make_uri(pairs)
if not self.conndict.has_key(uri):
try:
self.conndict[uri] = Connection(uri)
dbinst = Connection(uri)
gfs = dbinst.gridfs
fsinst = gridfs.GridFS(gfs)
self.conndict[uri] = (dbinst, fsinst)
except:
traceback.print_exc()
return None
Expand All @@ -85,12 +93,38 @@ def connections(self, pairs):

def db_connection(dbhost, dbport):
"""Return DB connection instance"""
return DB_CONN_SINGLETON.connection(dbhost, dbport)
dbinst, _ = DB_CONN_SINGLETON.connection(dbhost, dbport)
return dbinst

def db_connections(pairs):
"""
Return DB connection instance for provided set of (dbhost, dbport)
pairs
"""
return DB_CONN_SINGLETON.connections(pairs)
dbinst, _ = DB_CONN_SINGLETON.connections(pairs)
return dbinst

def db_gridfs(dbhost, dbport):
"""
Return pointer to MongoDB GridFS
"""
_, fsinst = DB_CONN_SINGLETON.connection(dbhost, dbport)
return fsinst

def parse2gridfs(gfs, genrows, logger=None):
"""
Yield docs from provided generator with size < 4MB or store them into
GridFS.
"""
for row in genrows:
row_size = sys.getsizeof(str(row))
if row_size < 4*1024*1024:
yield row
else:
fid = gfs.put(row)
gfs_rec = dict(gridfs_id=fid)
if logger:
msg = 'parse2gridfs record %s/size %s replaced with fid=%s'\
% (row['_id'], row_size, fid)
logger.info(msg)
yield gfs_rec
11 changes: 10 additions & 1 deletion test/das_db_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import ply.yacc
import unittest
import traceback
from DAS.utils.das_db import db_connection, make_uri
from DAS.utils.das_db import db_connection, make_uri, db_gridfs
from DAS.utils.das_config import das_readconfig

class testDAS_DB(unittest.TestCase):
Expand Down Expand Up @@ -51,6 +51,15 @@ def test_db_connection(self):
result = db_connection(self.dbhost, self.dbport)
self.assertEqual(expect, result.instance)

def test_db_gridfs(self):
"""Test db_gridfs"""
fsinst = db_gridfs(self.dbhost, self.dbport)
doc = 'hello world!'
fid = fsinst.put(doc)
content = fsinst.get(fid).read()
self.assertEqual(doc, content)
fsinst.delete(fid)

#
# main
#
Expand Down

0 comments on commit 01601f8

Please sign in to comment.