Skip to content

Commit

Permalink
Fix cleaning > 10000 channels
Browse files Browse the repository at this point in the history
Elastic/ChannelFinder has a setting ES_QUERY_SIZE. So by default queries only return up to ES_QUERY_SIZE channels, which by default is 10000.

When the clean runs it fetches using this api all channels where the pvStatus is Active. But if there are more than 10000 this won't work due to the ES_QUERY_SIZE.

Solution here is to keep setting channels to Inactive and querying for more channels until there are no channels set to Active.
  • Loading branch information
jacomago committed May 7, 2024
1 parent 8e5d01d commit 52f0e6b
Showing 1 changed file with 23 additions and 14 deletions.
37 changes: 23 additions & 14 deletions server/recceiver/cfstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,17 +282,14 @@ def clean_service(self):
while 1:
try:
_log.info("CF Clean Started")
channels = self.client.findByArgs(prepareFindArgs(self.conf, [('pvStatus', 'Active'), (RECCEIVERID_KEY, recceiverid)]))
channels = self.get_active_channels(recceiverid)
if channels is not None:
new_channels = []
for ch in channels or []:
new_channels.append(ch[u'name'])
_log.info("Total channels to update: {nChannels}", nChannels=len(new_channels))
while len(new_channels) > 0:
_log.debug('Update "pvStatus" property to "Inactive" for {n_channels} channels', n_channels=min(len(new_channels), 10000))
self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"},
channelNames=new_channels[:10000])
new_channels = new_channels[10000:]
while channels is not None and len(channels) > 0:
self.clean_channels(owner, channels)
channels = self.get_active_channels(recceiverid)
_log.info("CF Clean Completed")
return
else:
_log.info("CF Clean Completed")
return
except RequestException as e:
Expand All @@ -305,6 +302,18 @@ def clean_service(self):
_log.info("Abandoning clean after {retry_limit} seconds", retry_limit=retry_limit)
return

def get_active_channels(self, recceiverid):
return self.client.findByArgs(prepareFindArgs(self.conf, [('pvStatus', 'Active'), (RECCEIVERID_KEY, recceiverid)], 10000))

def clean_channels(self, owner, channels):
new_channels = []
for ch in channels or []:
new_channels.append(ch[u'name'])
_log.info("Total channels to update: {nChannels}", nChannels=len(new_channels))
_log.debug('Update "pvStatus" property to "Inactive" for {n_channels} channels', n_channels=min(len(new_channels), 10000))
self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"},
channelNames=new_channels)


def dict_to_file(dict, iocs, conf):
filename = conf.get('debug_file_loc', None)
Expand Down Expand Up @@ -544,10 +553,10 @@ def getCurrentTime():
return str(datetime.datetime.now())


def prepareFindArgs(conf, args):
size = conf.get('findSizeLimit', 0)
if size > 0:
args.append(('~size', size))
def prepareFindArgs(conf, args, size=0):
size_limit = conf.get('findSizeLimit', size)
if size_limit > 0:
args.append(('~size', size_limit))
return args


Expand Down

0 comments on commit 52f0e6b

Please sign in to comment.