diff --git a/Dockerfile b/Dockerfile index 2e17f152..ce5c1c96 100644 --- a/Dockerfile +++ b/Dockerfile @@ -105,6 +105,14 @@ RUN chmod -R 777 /var/lib/logrotate RUN chmod -R 777 /var/cric RUN chmod -R 777 /var/cache/pandaserver +# to have trf files under /var/trf/user +RUN mkdir -p /var/trf/user + +RUN mkdir /tmp/panda-wnscript && cd /tmp/panda-wnscript && \ + git clone https://github.com/PanDAWMS/panda-wnscript.git && \ + cp -R panda-wnscript/dist/* /var/trf/user/ && \ + cd / && rm -rf /tmp/panda-wnscript + ENV PANDA_LOCK_DIR /var/run/panda RUN mkdir -p ${PANDA_LOCK_DIR} && chmod 777 ${PANDA_LOCK_DIR} diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index c5fcfafa..629d0319 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.3.8" +release_version = "0.3.10" diff --git a/pandaserver/daemons/scripts/pilotStreaming.py b/pandaserver/daemons/scripts/pilotStreaming.py index 6155dc6b..431c8ee9 100644 --- a/pandaserver/daemons/scripts/pilotStreaming.py +++ b/pandaserver/daemons/scripts/pilotStreaming.py @@ -4,10 +4,10 @@ from pandacommon.pandalogger.PandaLogger import PandaLogger from pandacommon.pandautils.thread_utils import GenericThread + from pandaserver.config import panda_config -# logger -_logger = PandaLogger().getLogger("PilotStreaming") +_logger = PandaLogger().getLogger("pilot_streaming") class PilotStreaming(object): @@ -31,20 +31,21 @@ def run(self): ups_queues = self.tbuf.ups_get_queues() self._logger.debug(f"UPS queues: {ups_queues}") - # get worker stats + # load the worker stats from the database worker_stats = self.tbuf.ups_load_worker_stats() + # iterate over the UPS queues for ups_queue in ups_queues: - # get the worker and job stats for the queue + # separate the worker and job stats for the queue try: tmp_worker_stats = worker_stats[ups_queue] self._logger.debug(f"worker_stats for queue {ups_queue}: {tmp_worker_stats}") - # tmp_job_stats = job_stats[ups_queue] except KeyError: # skip queue if no data available self._logger.debug(f"No worker stats for queue {ups_queue}") continue + # calculate the new worker distribution and save the harvester commands in the database try: new_workers_per_harvester = self.tbuf.ups_new_worker_distribution(ups_queue, tmp_worker_stats) self._logger.info(f"queue: {ups_queue}, results: {new_workers_per_harvester}") @@ -70,7 +71,7 @@ def run(self): except Exception: self._logger.error(traceback.format_exc()) - # timing + # log the timing time_stop = time.time() self._logger.debug(f"Done. Pilot streaming took: {time_stop - time_start} s") @@ -94,9 +95,11 @@ def main(tbuf=None, **kwargs): ) else: taskBuffer = tbuf - # run + + # run the pilot streaming logic PilotStreaming(tbuf=taskBuffer).run() - # stop taskBuffer if created inside this script + + # stop the taskBuffer if it was created inside this script if tbuf is None: taskBuffer.cleanup(requester=requester_id) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index d9f68a2d..c3c5ebd4 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -23430,12 +23430,89 @@ def ups_load_worker_stats(self): tmpLog.debug("done") return worker_stats_dict + def get_average_memory_workers(self, queue, harvester_id): + """ + Calculates the average memory for running and queued workers at a particular panda queue + + :param queue: name of the PanDA queue + :param worker_stats_harvester: worker statistics for the particular harvester instance + :param harvester_id: string with the harvester ID serving the queue + + :return: average memory + """ + + comment = " /* DBProxy.get_average_memory_workers */" + method_name = comment.split(" ")[-2].split(".")[-1] + tmp_logger = LogWrapper(_logger, method_name) + tmp_logger.debug("start") + try: + # sql to calculate the average memory for the queue - harvester_id combination + sql_running_and_submitted = ( + "SELECT sum(total_memory) / NULLIF(sum(n_workers * corecount), 0) " + "FROM ( " + " SELECT hws.computingsite, " + " hws.harvester_id, " + " hws.n_workers, " + " hws.n_workers * NVL(rt.maxcore, NVL(sj.data.corecount, 1)) * NVL(rt.maxrampercore, sj.data.maxrss / NVL(sj.data.corecount, 1)) as total_memory, " + " NVL(rt.maxcore, NVL(sj.data.corecount, 1)) as corecount " + " FROM ATLAS_PANDA.harvester_worker_stats hws " + " JOIN ATLAS_PANDA.resource_types rt ON hws.resourcetype = rt.resource_name " + " JOIN ATLAS_PANDA.schedconfig_json sj ON hws.computingsite = sj.panda_queue " + " WHERE lastupdate > CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '1' HOUR AS DATE) " + " AND status IN ('running', 'submitted', 'to_submit') " + " AND computingsite=:queue AND harvester_id=:harvester_id" + ")GROUP BY computingsite, harvester_id " + ) + + sql_running = ( + "SELECT sum(total_memory) / NULLIF(sum(n_workers * corecount), 0) " + "FROM ( " + " SELECT hws.computingsite, " + " hws.harvester_id, " + " hws.n_workers, " + " hws.n_workers * NVL(rt.maxcore, NVL(sj.data.corecount, 1)) * NVL(rt.maxrampercore, sj.data.maxrss / NVL(sj.data.corecount, 1)) as total_memory, " + " NVL(rt.maxcore, NVL(sj.data.corecount, 1)) as corecount " + " FROM ATLAS_PANDA.harvester_worker_stats hws " + " JOIN ATLAS_PANDA.resource_types rt ON hws.resourcetype = rt.resource_name " + " JOIN ATLAS_PANDA.schedconfig_json sj ON hws.computingsite = sj.panda_queue " + " WHERE lastupdate > CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '1' HOUR AS DATE) " + " AND status = 'running' " + " AND computingsite=:queue AND harvester_id=:harvester_id" + ")GROUP BY computingsite, harvester_id " + ) + + var_map = {":queue": queue, ":harvester_id": harvester_id} + + self.cur.execute(sql_running_and_submitted + comment, var_map) + results = self.cur.fetchone() + try: + average_memory_running_submitted = results[0] + except TypeError: + average_memory_running_submitted = 0 + + self.cur.execute(sql_running + comment, var_map) + results = self.cur.fetchone() + try: + average_memory_running = results[0] + except TypeError: + average_memory_running = 0 + + tmp_logger.debug( + f"Queue {queue} and harvester_id {harvester_id} currently has ({average_memory_running_submitted}, {average_memory_running}) " + f"MB of average memory workers" + ) + return average_memory_running_submitted, average_memory_running + + except Exception: + self.dumpErrorMessage(tmp_logger, method_name) + return 0, 0 + def ups_new_worker_distribution(self, queue, worker_stats): """ Assuming we want to have n_cores_queued >= n_cores_running * .5, calculate how many pilots need to be submitted and choose the number - :param queue: name of the queue + :param queue: name of the PanDA queue :param worker_stats: queue worker stats :return: """ @@ -23456,6 +23533,8 @@ def ups_new_worker_distribution(self, queue, worker_stats): pq_data_des = self.get_config_for_pq(queue) resource_type_limits = {} queue_type = "production" + average_memory_target = None + if not pq_data_des: tmp_log.debug("Error retrieving queue configuration from DB, limits can not be applied") else: @@ -23464,6 +23543,11 @@ def ups_new_worker_distribution(self, queue, worker_stats): except KeyError: tmp_log.debug("No resource type limits") pass + try: + average_memory_target = pq_data_des["params"]["average_memory"] + except KeyError: + tmp_log.debug("No average memory defined") + pass try: queue_type = pq_data_des["type"] except KeyError: @@ -23484,59 +23568,63 @@ def ups_new_worker_distribution(self, queue, worker_stats): except KeyErrorException: assigned_harvester_id = None - harvester_ids = [] - # If the assigned instance is working, use it for the statistics - if assigned_harvester_id in harvester_ids_temp: - harvester_ids = [assigned_harvester_id] - - # Filter central harvester instances that support UPS model + # If there is no harvester instance assigned to the queue or there are no statistics, we exit without any action + if assigned_harvester_id and assigned_harvester_id in harvester_ids_temp: + harvester_id = assigned_harvester_id else: - for harvester_id in harvester_ids_temp: - if "ACT" not in harvester_id and "test_fbarreir" not in harvester_id and "cern_cloud" not in harvester_id: - harvester_ids.append(harvester_id) + tmp_log.error("No harvester instance assigned or not in statistics") + return {} - for harvester_id in harvester_ids: - for job_type in worker_stats[harvester_id]: - workers_queued.setdefault(job_type, {}) - for resource_type in worker_stats[harvester_id][job_type]: - core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue) - try: - n_cores_running = n_cores_running + worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor + # If the site defined a memory target, calculate the memory requested by running and queued workers + resource_types_under_target = [] + if average_memory_target: + average_memory_workers_running_submitted, average_memory_workers_running = self.get_average_memory_workers(queue, harvester_id) + # if the queue is over memory, we will only submit lower workers in the next cycle + if average_memory_target < min(average_memory_workers_running_submitted, average_memory_workers_running): + resource_types_under_target = self.__resource_spec_mapper.filter_out_high_memory_resourcetypes() + tmp_log.debug(f"Accepting {resource_types_under_target} resource types to respect mean memory target") + else: + tmp_log.debug(f"Accepting all resource types as under memory target") - # This limit is in #JOBS or #WORKERS - if resource_type in resource_type_limits: - resource_type_limits[resource_type] = ( - resource_type_limits[resource_type] - worker_stats[harvester_id][job_type][resource_type]["running"] - ) - tmp_log.debug(f"Limit for rt {resource_type} down to {resource_type_limits[resource_type]}") + for job_type in worker_stats[harvester_id]: + workers_queued.setdefault(job_type, {}) + for resource_type in worker_stats[harvester_id][job_type]: + core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue) + try: + n_cores_running = n_cores_running + worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor - # This limit is in #CORES, since it mixes single and multi core jobs - if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits: - resource_type_limits[HIMEM] = ( - resource_type_limits[HIMEM] - worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor - ) - tmp_log.debug(f"Limit for rt group {HIMEM} down to {resource_type_limits[HIMEM]}") + # This limit is in #JOBS or #WORKERS, not in #CORES + if resource_type in resource_type_limits: + resource_type_limits[resource_type] = ( + resource_type_limits[resource_type] - worker_stats[harvester_id][job_type][resource_type]["running"] + ) + tmp_log.debug(f"Limit for rt {resource_type} down to {resource_type_limits[resource_type]}") - except KeyError: - pass + # This limit is in #CORES, since it mixes single and multi core jobs + if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits: + resource_type_limits[HIMEM] = resource_type_limits[HIMEM] - worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor + tmp_log.debug(f"Limit for rt group {HIMEM} down to {resource_type_limits[HIMEM]}") - try: # submitted - workers_queued[job_type].setdefault(resource_type, 0) - workers_queued[job_type][resource_type] = ( - workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["submitted"] - ) - n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["submitted"] * core_factor - except KeyError: - pass + except KeyError: + pass - try: # ready - workers_queued[job_type].setdefault(resource_type, 0) - workers_queued[job_type][resource_type] = ( - workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["ready"] - ) - n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["ready"] * core_factor - except KeyError: - pass + try: # submitted + workers_queued[job_type].setdefault(resource_type, 0) + workers_queued[job_type][resource_type] = ( + workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["submitted"] + ) + n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["submitted"] * core_factor + except KeyError: + pass + + try: # ready + workers_queued[job_type].setdefault(resource_type, 0) + workers_queued[job_type][resource_type] = ( + workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["ready"] + ) + n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["ready"] * core_factor + except KeyError: + pass tmp_log.debug(f"Queue {queue} queued worker overview: {workers_queued}") @@ -23561,19 +23649,27 @@ def ups_new_worker_distribution(self, queue, worker_stats): # Get the sorted global shares sorted_shares = self.get_sorted_leaves() - # Run over the activated jobs by gshare & priority, and substract them from the queued + # Run over the activated jobs by gshare & priority, and subtract them from the queued # A negative value for queued will mean more pilots of that resource type are missing for share in sorted_shares: var_map = {":queue": queue, ":gshare": share.name} - sql = f""" - SELECT gshare, prodsourcelabel, resource_type FROM {panda_config.schemaPANDA}.jobsactive4 - WHERE jobstatus = 'activated' - AND computingsite=:queue - AND gshare=:gshare - ORDER BY currentpriority DESC - """ + sql = ( + f"SELECT gshare, prodsourcelabel, resource_type FROM {panda_config.schemaPANDA}.jobsactive4 " + "WHERE jobstatus = 'activated' " + "AND computingsite=:queue " + "AND gshare=:gshare " + ) + + # if we need to filter on resource types + if resource_types_under_target: + resource_type_string = ", ".join([f":{item}" for item in resource_types_under_target]) + sql += f" AND resource_type IN ({resource_type_string}) " + var_map.update({f":{item}": item for item in resource_types_under_target}) + + sql += "ORDER BY currentpriority DESC" self.cur.execute(sql + comment, var_map) activated_jobs = self.cur.fetchall() + tmp_log.debug(f"Processing share: {share.name}. Got {len(activated_jobs)} activated jobs") for gshare, prodsourcelabel, resource_type in activated_jobs: core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue) @@ -23620,6 +23716,8 @@ def ups_new_worker_distribution(self, queue, worker_stats): # we don't have enough workers for this resource type new_workers[job_type][resource_type] = -workers_queued[job_type][resource_type] + 1 + tmp_log.debug(f"preliminary new workers: {new_workers}") + # We should still submit a basic worker, even if there are no activated jobs to avoid queue deactivation workers = False for job_type in new_workers: @@ -23630,16 +23728,9 @@ def ups_new_worker_distribution(self, queue, worker_stats): if not workers: new_workers["managed"] = {BASIC_RESOURCE_TYPE: 1} - # In case multiple harvester instances are serving a panda queue, split workers evenly between them - new_workers_per_harvester = {} - for harvester_id in harvester_ids: - new_workers_per_harvester.setdefault(harvester_id, {}) - for job_type in new_workers: - new_workers_per_harvester[harvester_id].setdefault(job_type, {}) - for resource_type in new_workers[job_type]: - new_workers_per_harvester[harvester_id][job_type][resource_type] = int( - math.ceil(new_workers[job_type][resource_type] * 1.0 / len(harvester_ids)) - ) + tmp_log.debug(f"new workers: {new_workers}") + + new_workers_per_harvester = {harvester_id: new_workers} tmp_log.debug(f"Workers to submit: {new_workers_per_harvester}") tmp_log.debug("done") diff --git a/pandaserver/taskbuffer/ResourceSpec.py b/pandaserver/taskbuffer/ResourceSpec.py index 9c19d569..60a98d9d 100644 --- a/pandaserver/taskbuffer/ResourceSpec.py +++ b/pandaserver/taskbuffer/ResourceSpec.py @@ -42,6 +42,15 @@ def translate_resourcetype_to_cores(self, resource_name, cores_queue): return 1 + def filter_out_high_memory_resourcetypes(self): + resource_names = list( + map( + lambda resource_type: resource_type.resource_name, + filter(lambda resource_type: self.is_high_memory(resource_type.resource_name), self.resource_types), + ) + ) + return resource_names + class ResourceSpec(object): # attributes diff --git a/pandaserver/taskbuffer/TaskBuffer.py b/pandaserver/taskbuffer/TaskBuffer.py index dbef2088..33ee5ff5 100755 --- a/pandaserver/taskbuffer/TaskBuffer.py +++ b/pandaserver/taskbuffer/TaskBuffer.py @@ -3808,6 +3808,17 @@ def ups_load_worker_stats(self): # return return ret + # get the distribution of new workers to submit + def get_average_memory_workers(self, queue, harvester_id): + # get DBproxy + proxy = self.proxyPool.getProxy() + # exec + ret = proxy.get_average_memory_workers(queue, harvester_id) + # release proxy + self.proxyPool.putProxy(proxy) + # return + return ret + # get the distribution of new workers to submit def ups_new_worker_distribution(self, queue, worker_stats): # get DBproxy