Permalink
Browse files

First upload

  • Loading branch information...
0 parents commit 901fe94679623ca4f23b1c29936f56122525c5c6 @cippino committed Nov 15, 2011
@@ -0,0 +1,6 @@
+*.py[co]
+*~
+#
+.#*
+*.egg-info
+branches
@@ -0,0 +1,27 @@
+Introduction
+============
+
+This product was born as replace of collective.solr.maintenance.sync
+function.
+
+Instead matching portal catalog with solr, it use the content of
+Data.fs walking from root to leafs.
+
+The match is realized comparing modification date and rights (you
+have to enable it).
+
+
+
+Required
+========
+
+AllowedRolesAndUsers as stored attribute in solr schema.xml.
+If you apply it on yet existing solr database, use <reindex_allowedRolesAndUsers>
+to update the db.
+
+
+
+This software in under GPL2 Licence
+
+Read it at http://www.gnu.org/licenses/gpl-2.0.html
+
@@ -0,0 +1,6 @@
+# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
+try:
+ __import__('pkg_resources').declare_namespace(__name__)
+except ImportError:
+ from pkgutil import extend_path
+ __path__ = extend_path(__path__, __name__)
@@ -0,0 +1,3 @@
+
+def initialize(context):
+ """Initializer called when used as a Zope 2 product."""
@@ -0,0 +1,324 @@
+import sys
+import transaction
+from logging import getLogger
+
+from zope.component import queryUtility, getUtility
+from zope.component.interfaces import ComponentLookupError
+
+from Acquisition import aq_base
+
+from Products.Five.browser import BrowserView
+from Products.CMFCore.utils import getToolByName
+from Products.CMFPlone.CatalogTool import allowedRolesAndUsers as aRaU
+
+from collective.solr.interfaces import ISolrConnectionManager, ISolrConnectionConfig, ISearch
+from collective.solr.indexer import SolrIndexProcessor, indexable, handlers
+from collective.solr.manager import SolrConnectionManager
+from collective.solr.browser.maintenance import SolrMaintenanceView, notimeout
+from collective.solr.local import getLocal, setLocal
+from collective.solr.solr import SolrConnection
+from collective.solr.utils import prepareData
+
+from httplib import CannotSendRequest, ResponseNotReady
+
+logger = getLogger('collective.solr_checks')
+
+class MaintenanceView(SolrMaintenanceView):
+ def _walk(root,portal_types):
+ res=[]
+ if root.portal_type in portal_types and hasattr(root,'UID'):
+ res=[root]
+ try:
+ children=root.objectValues(portal_types)
+ except:
+ children=[]
+ for obj in children:
+ res+=_walk(obj,portal_types)
+ return res
+
+ def _get_property(self,prop_name,default=None):
+ pp=getToolByName(self.context,'portal_properties')
+ properties=aq_base(pp)
+ csc_props=getattr(properties,'collective.solr_checkes',{})
+ return csc_props.get(prop_name,default)
+
+ def _getConnection(self):
+ """ returns an existing connection or opens one """
+ config = getUtility(ISolrConnectionConfig,context=self.context)
+ if not config.active:
+ return None
+ conn = getLocal('connection')
+ if conn is None and config.host is not None:
+ host = '%s:%d' % (config.host, config.port)
+ logger.debug('opening connection to %s', host)
+ conn = SolrConnection(host=host, solrBase=config.base,
+ persistent=True)
+ setLocal('connection',conn)
+ return conn
+
+ def _get_man_proc_conn(self):
+ manager = queryUtility(ISolrConnectionManager,context=self.context) or SolrConnectionManager()
+ proc = SolrIndexProcessor(manager)
+ try:
+ conn=manager.getConnection()
+ except ComponentLookupError:
+ conn=self._getConnection()
+ return (manager,proc,conn)
+
+ def _get_all_objects(self,portal_types):
+ sub_tree= self._walk(self.context,portal_types)
+ uid_to_obj={}
+ path_to_obj={}
+ for obj in sub_tree:
+ uid=obj.UID()
+ path='/'.join(obj.getPhysicalPath())
+ uid_to_obj[uid]=path_to_obj[path]=obj
+ return (uid_to_obj,path_to_obj)
+
+ def _getSchema(self):
+ """ returns the currently used schema or fetches it """
+ manager, proc, conn= self._get_man_proc_conn()
+ schema = getLocal('schema')
+ if schema is None:
+ if conn is not None:
+ logger.debug('getting schema from solr')
+ try:
+ schema = conn.getSchema()
+ setLocal('schema', schema)
+ except (error, CannotSendRequest, ResponseNotReady):
+ logger.exception('exception while getting schema')
+ return schema
+
+ def _reindex(self,obj):
+ manager, proc, conn= self._get_man_proc_conn()
+ pwo=proc.wrapObject(obj)
+ schema=self._getSchema()
+ data={}
+ for name,field in schema.items():
+ try:
+ value = getattr(pwo, name)
+ if callable(value):
+ value = value()
+ except AttributeError:
+ continue
+ handler = handlers.get(field.class_, None)
+ if handler is not None:
+ try:
+ value = handler(value)
+ except AttributeError:
+ continue
+ elif isinstance(value, (list, tuple)) and not field.multiValued:
+ separator = getattr(field, 'separator', ' ')
+ value = separator.join(value)
+ data[name] = value
+
+ prepareData(data)
+ conn.add(**data)
+
+ def _diff(self,log,needed_attributes,valid_flare,valid_object,delete_flare=None,reindex_obj=None,commit=None):
+ """
+ This is the core: for every content the procedure store it in two ways: by uid and by path
+ so every flare has to match with two objects.
+ It is a little bit complex but may be a flare store an id of a content but has got a path of
+ another. This may happen if a user move a content from a folder to another and solr is down.
+ Security too is a problem because reindexObjectSecurity is not patched for solr.
+ """
+ pt=getToolByName(self.context,'portal_types')
+
+ log("\nStarted walker...")
+ uid_to_obj,path_to_obj=self._get_all_objects(pt.listContentTypes())
+ log("..finished\n")
+ index=[]
+ reindex=[]
+ unindex=[]
+ viewed=set()
+ path = '/'.join(self.context.getPhysicalPath())
+
+ key = queryUtility(ISolrConnectionManager).getSchema().uniqueKey
+ search = queryUtility(ISearch)
+ rows = len(uid_to_obj) * 10 # sys.maxint makes solr choke :(
+ query = '+%s:[* TO *] +parentPaths:%s' % (key, path)
+ log("Quering solr\n")
+ for flare in search(query, rows=rows, fl=' '.join([key]+needed_attributes)):
+ uid = getattr(flare, key)
+
+ if valid_flare:
+ obj_by_uid=uid_to_obj.get(uid,None)
+ obj_by_path=path_to_obj.get(getattr(flare,'physicalPath'),None)
+ flare_to_delete=False
+ obj_to_reindex=[]
+
+ if obj_by_uid is None and obj_by_path is None:
+ """
+ the flare isn't related to an object so it will be deleted
+ """
+ flare_to_delete=True
+
+ elif obj_by_uid is None or obj_by_path is None:
+ """
+ the flare is related to one object instead two (one get by uid and one by path)
+ so flare will be removed and object will be reindexed
+ """
+ flare_to_delete=True
+ #take not None obj
+ temp=obj_by_uid or obj_by_path
+ viewed.add(temp)
+ obj_to_reindex=[temp]
+
+ elif obj_by_uid.UID() == obj_by_path.UID() and \
+ '/'.join(obj_by_uid.getPhysicalPath()) == '/'.join(obj_by_path.getPhysicalPath()):
+ """
+ the flare got a relation with two objects (eureka) and they are the same (double eureka!!)
+ so it will be checked modification date and security
+ """
+ flare_allowed=set(getattr(flare,'allowedRolesAndUsers',[]))
+ obj_by_uid_allowed=set(aRaU(obj_by_uid)())
+ obj_by_path_allowed=set(aRaU(obj_by_path)())
+
+ diff_by_uid=len(obj_by_uid_allowed.symmetric_difference(flare_allowed))
+ diff_by_path=len(obj_by_path_allowed.symmetric_difference(flare_allowed))
+
+ viewed.add(obj_by_uid)
+
+ #if object is modified or security is not the same then reindex content
+ if flare.modified.millis() < obj_by_uid.modified().millis() or \
+ diff_by_uid>0 or diff_by_path>0:
+ obj_to_reindex=[obj_by_uid]
+ else:
+ """
+ the flare is relate with two contents but they are difference
+ so flare will be deleted and contents will be reindexed
+ """
+ flare_to_delete=True
+ obj_to_reindex=[obj_by_uid,obj_by_path]
+ viewed.add(obj_by_uid)
+ viewed.add(obj_by_path)
+
+ #if flare has to be deleted
+ if flare_to_delete:
+ if delete_flare:
+ delete_flare(uid)
+ unindex.append(uid)
+ log("REMOVE FLARE: %s [%s] %s\n"%(uid,flare.portal_type,flare.physicalPath))
+
+ #for all objects who needs to be reindexed...
+ for obj in obj_to_reindex:
+ if valid_object(obj):
+ if reindex_obj:
+ reindex_obj(obj)
+ reindex.append(obj)
+ log("REINDEX: %s\n" % '/'.join(obj.getPhysicalPath()))
+
+ #after reindex, is time of find non-indexed object
+ for obj in viewed:
+ o_uid=obj.UID()
+ o_path='/'.join(obj.getPhysicalPath())
+ try:
+ del uid_to_obj[o_uid]
+ except KeyError:
+ pass
+ try:
+ del path_to_obj[o_path]
+ except KeyError:
+ pass
+
+ #join of all elements who not related with a flare
+ for obj in uid_to_obj.values()+path_to_obj.values():
+ obj_path='/'.join(obj.getPhysicalPath())
+ if valid_object(obj_path):
+ if reindex_obj:
+ reindex_obj(obj)
+ index.append(obj_path)
+ log("INDEX: %s\n" % '/'.join(obj.getPhysicalPath()) )
+ if commit:
+ commit()
+ return index, reindex, unindex
+
+ def check(self, batch=100, cache=10000):
+ """
+ This is a call to _diff WITHOUT modification function, so it do not modify solr archiver
+ """
+ log = self.mklog()
+ log('Check solr difference...')
+
+ needed_attributes=self._get_property('needed_attributes',['allowedRolesAndUsers', 'modified', 'physicalPath', 'portal_type'])
+ valid_flare_exp=self._get_property('valid_flare_exp','True')
+ valid_flare_fun=lambda flare: eval(valid_flare_exp)
+ valid_object_exp=self._get_property('valid_object_exp','True')
+ valid_object_fun=lambda path: eval(valid_object_exp)
+
+ index, reindex, unindex = self._diff(log, needed_attributes, valid_flare_fun, valid_object_fun)
+
+ msg='check of solr terminated: \n'
+ msg+='operations needed: %d "index", %d "reindex", %d "unindex"\n'
+ msg=msg % (len(index), len(reindex), len(unindex))
+
+ log(msg)
+ logger.info(msg)
+
+ def fix(self, batch=100, cache=10000):
+ """
+ This is a call to _diff WITH modification function so it will modify solr!!!
+ """
+ log = self.mklog()
+
+ log('Fix solr difference...')
+
+ man,proc,conn=self._get_man_proc_conn()
+ delete_flare = notimeout(lambda uid: conn.delete(id=uid))
+ reindex_obj = notimeout(lambda obj: proc.reindex(obj))
+ commit = notimeout(lambda: proc.commit(wait=True))
+
+ needed_attributes=self._get_property('needed_attributes',['allowedRolesAndUsers', 'modified', 'physicalPath', 'portal_type'])
+ valid_flare_exp=self._get_property('valid_flare_exp','True')
+ valid_flare_fun=lambda flare: eval(valid_flare_exp)
+ valid_object_exp=self._get_property('valid_object_exp','True')
+ valid_object_fun=lambda path: eval(valid_object_exp)
+
+ index, reindex, unindex = self._diff(log, needed_attributes, valid_flare_fun, valid_object_fun,delete_flare,reindex_obj,commit)
+
+ msg='fix of solr terminated: \n'
+ msg+='operations made: %d "index", %d "reindex", %d "unindex"\n'
+ msg=msg % (len(index), len(reindex), len(unindex))
+
+ log(msg)
+ logger.info(msg)
+
+ def reindex_allowedRolesAndUsers(self):
+ """
+ This funcion is only to reindex allowedRolesAndUsers, without any check.
+ This is usesul when you install this utility with a non-empty solr, so reindex all
+ fields for all object may be expensive.
+ """
+ log = self.mklog()
+ log("\nStarted reindex allowedRolesAndUsers...")
+
+ man,proc,conn=self._get_man_proc_conn()
+ path = '/'.join(self.context.getPhysicalPath())
+
+ #get all contents from catalog, remember that we want to make a reindex faster as possible
+ #so we can ignore difference between data.fs and catalog
+ pc=getToolByName(self.context,'portal_catalog')
+
+ #try to avoid None elements
+ size=pc.__len__()
+ contents=pc(path=path,b_size=size)
+ log("\nquery")
+
+ step=self._get_property('step',1000)
+ for start in range(0,size,step):
+ for i in contents[start:start+step]:
+ if i is not None:
+ o=i.getObject()
+ if o is not None:
+ self._reindex(o)
+ #proc.reindex(o,['allowedRolesAndUsers'])
+ log(o.UID()+'\n')
+ else:
+ log('is None\n')
+ logger.info('is None\n')
+ transaction.commit()
+ conn.commit(True, True)
+ msg="\ncommit [%s:%s]\n\n\n"%(start,start+step)
+ log(msg)
+ logger.info(msg)
Oops, something went wrong. Retry.

0 comments on commit 901fe94

Please sign in to comment.