Skip to content

Commit

Permalink
Merge branch 'PanDAWMS:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
tmaeno committed Jun 28, 2024
2 parents 29ae43b + 5e9148d commit f729cbd
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 74 deletions.
8 changes: 8 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ RUN chmod -R 777 /var/lib/logrotate
RUN chmod -R 777 /var/cric
RUN chmod -R 777 /var/cache/pandaserver

# to have trf files under /var/trf/user
RUN mkdir -p /var/trf/user

RUN mkdir /tmp/panda-wnscript && cd /tmp/panda-wnscript && \
git clone https://github.com/PanDAWMS/panda-wnscript.git && \
cp -R panda-wnscript/dist/* /var/trf/user/ && \
cd / && rm -rf /tmp/panda-wnscript

ENV PANDA_LOCK_DIR /var/run/panda
RUN mkdir -p ${PANDA_LOCK_DIR} && chmod 777 ${PANDA_LOCK_DIR}

Expand Down
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.3.8"
release_version = "0.3.10"
19 changes: 11 additions & 8 deletions pandaserver/daemons/scripts/pilotStreaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

from pandacommon.pandalogger.PandaLogger import PandaLogger
from pandacommon.pandautils.thread_utils import GenericThread

from pandaserver.config import panda_config

# logger
_logger = PandaLogger().getLogger("PilotStreaming")
_logger = PandaLogger().getLogger("pilot_streaming")


class PilotStreaming(object):
Expand All @@ -31,20 +31,21 @@ def run(self):
ups_queues = self.tbuf.ups_get_queues()
self._logger.debug(f"UPS queues: {ups_queues}")

# get worker stats
# load the worker stats from the database
worker_stats = self.tbuf.ups_load_worker_stats()

# iterate over the UPS queues
for ups_queue in ups_queues:
# get the worker and job stats for the queue
# separate the worker and job stats for the queue
try:
tmp_worker_stats = worker_stats[ups_queue]
self._logger.debug(f"worker_stats for queue {ups_queue}: {tmp_worker_stats}")
# tmp_job_stats = job_stats[ups_queue]
except KeyError:
# skip queue if no data available
self._logger.debug(f"No worker stats for queue {ups_queue}")
continue

# calculate the new worker distribution and save the harvester commands in the database
try:
new_workers_per_harvester = self.tbuf.ups_new_worker_distribution(ups_queue, tmp_worker_stats)
self._logger.info(f"queue: {ups_queue}, results: {new_workers_per_harvester}")
Expand All @@ -70,7 +71,7 @@ def run(self):
except Exception:
self._logger.error(traceback.format_exc())

# timing
# log the timing
time_stop = time.time()
self._logger.debug(f"Done. Pilot streaming took: {time_stop - time_start} s")

Expand All @@ -94,9 +95,11 @@ def main(tbuf=None, **kwargs):
)
else:
taskBuffer = tbuf
# run

# run the pilot streaming logic
PilotStreaming(tbuf=taskBuffer).run()
# stop taskBuffer if created inside this script

# stop the taskBuffer if it was created inside this script
if tbuf is None:
taskBuffer.cleanup(requester=requester_id)

Expand Down
221 changes: 156 additions & 65 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23430,12 +23430,89 @@ def ups_load_worker_stats(self):
tmpLog.debug("done")
return worker_stats_dict

def get_average_memory_workers(self, queue, harvester_id):
"""
Calculates the average memory for running and queued workers at a particular panda queue

:param queue: name of the PanDA queue
:param worker_stats_harvester: worker statistics for the particular harvester instance
:param harvester_id: string with the harvester ID serving the queue

:return: average memory
"""

comment = " /* DBProxy.get_average_memory_workers */"
method_name = comment.split(" ")[-2].split(".")[-1]
tmp_logger = LogWrapper(_logger, method_name)
tmp_logger.debug("start")
try:
# sql to calculate the average memory for the queue - harvester_id combination
sql_running_and_submitted = (
"SELECT sum(total_memory) / NULLIF(sum(n_workers * corecount), 0) "
"FROM ( "
" SELECT hws.computingsite, "
" hws.harvester_id, "
" hws.n_workers, "
" hws.n_workers * NVL(rt.maxcore, NVL(sj.data.corecount, 1)) * NVL(rt.maxrampercore, sj.data.maxrss / NVL(sj.data.corecount, 1)) as total_memory, "
" NVL(rt.maxcore, NVL(sj.data.corecount, 1)) as corecount "
" FROM ATLAS_PANDA.harvester_worker_stats hws "
" JOIN ATLAS_PANDA.resource_types rt ON hws.resourcetype = rt.resource_name "
" JOIN ATLAS_PANDA.schedconfig_json sj ON hws.computingsite = sj.panda_queue "
" WHERE lastupdate > CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '1' HOUR AS DATE) "
" AND status IN ('running', 'submitted', 'to_submit') "
" AND computingsite=:queue AND harvester_id=:harvester_id"
")GROUP BY computingsite, harvester_id "
)

sql_running = (
"SELECT sum(total_memory) / NULLIF(sum(n_workers * corecount), 0) "
"FROM ( "
" SELECT hws.computingsite, "
" hws.harvester_id, "
" hws.n_workers, "
" hws.n_workers * NVL(rt.maxcore, NVL(sj.data.corecount, 1)) * NVL(rt.maxrampercore, sj.data.maxrss / NVL(sj.data.corecount, 1)) as total_memory, "
" NVL(rt.maxcore, NVL(sj.data.corecount, 1)) as corecount "
" FROM ATLAS_PANDA.harvester_worker_stats hws "
" JOIN ATLAS_PANDA.resource_types rt ON hws.resourcetype = rt.resource_name "
" JOIN ATLAS_PANDA.schedconfig_json sj ON hws.computingsite = sj.panda_queue "
" WHERE lastupdate > CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '1' HOUR AS DATE) "
" AND status = 'running' "
" AND computingsite=:queue AND harvester_id=:harvester_id"
")GROUP BY computingsite, harvester_id "
)

var_map = {":queue": queue, ":harvester_id": harvester_id}

self.cur.execute(sql_running_and_submitted + comment, var_map)
results = self.cur.fetchone()
try:
average_memory_running_submitted = results[0]
except TypeError:
average_memory_running_submitted = 0

self.cur.execute(sql_running + comment, var_map)
results = self.cur.fetchone()
try:
average_memory_running = results[0]
except TypeError:
average_memory_running = 0

tmp_logger.debug(
f"Queue {queue} and harvester_id {harvester_id} currently has ({average_memory_running_submitted}, {average_memory_running}) "
f"MB of average memory workers"
)
return average_memory_running_submitted, average_memory_running

except Exception:
self.dumpErrorMessage(tmp_logger, method_name)
return 0, 0

def ups_new_worker_distribution(self, queue, worker_stats):
"""
Assuming we want to have n_cores_queued >= n_cores_running * .5, calculate how many pilots need to be submitted
and choose the number

:param queue: name of the queue
:param queue: name of the PanDA queue
:param worker_stats: queue worker stats
:return:
"""
Expand All @@ -23456,6 +23533,8 @@ def ups_new_worker_distribution(self, queue, worker_stats):
pq_data_des = self.get_config_for_pq(queue)
resource_type_limits = {}
queue_type = "production"
average_memory_target = None

if not pq_data_des:
tmp_log.debug("Error retrieving queue configuration from DB, limits can not be applied")
else:
Expand All @@ -23464,6 +23543,11 @@ def ups_new_worker_distribution(self, queue, worker_stats):
except KeyError:
tmp_log.debug("No resource type limits")
pass
try:
average_memory_target = pq_data_des["params"]["average_memory"]
except KeyError:
tmp_log.debug("No average memory defined")
pass
try:
queue_type = pq_data_des["type"]
except KeyError:
Expand All @@ -23484,59 +23568,63 @@ def ups_new_worker_distribution(self, queue, worker_stats):
except KeyErrorException:
assigned_harvester_id = None

harvester_ids = []
# If the assigned instance is working, use it for the statistics
if assigned_harvester_id in harvester_ids_temp:
harvester_ids = [assigned_harvester_id]

# Filter central harvester instances that support UPS model
# If there is no harvester instance assigned to the queue or there are no statistics, we exit without any action
if assigned_harvester_id and assigned_harvester_id in harvester_ids_temp:
harvester_id = assigned_harvester_id
else:
for harvester_id in harvester_ids_temp:
if "ACT" not in harvester_id and "test_fbarreir" not in harvester_id and "cern_cloud" not in harvester_id:
harvester_ids.append(harvester_id)
tmp_log.error("No harvester instance assigned or not in statistics")
return {}

for harvester_id in harvester_ids:
for job_type in worker_stats[harvester_id]:
workers_queued.setdefault(job_type, {})
for resource_type in worker_stats[harvester_id][job_type]:
core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue)
try:
n_cores_running = n_cores_running + worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor
# If the site defined a memory target, calculate the memory requested by running and queued workers
resource_types_under_target = []
if average_memory_target:
average_memory_workers_running_submitted, average_memory_workers_running = self.get_average_memory_workers(queue, harvester_id)
# if the queue is over memory, we will only submit lower workers in the next cycle
if average_memory_target < min(average_memory_workers_running_submitted, average_memory_workers_running):
resource_types_under_target = self.__resource_spec_mapper.filter_out_high_memory_resourcetypes()
tmp_log.debug(f"Accepting {resource_types_under_target} resource types to respect mean memory target")
else:
tmp_log.debug(f"Accepting all resource types as under memory target")

# This limit is in #JOBS or #WORKERS
if resource_type in resource_type_limits:
resource_type_limits[resource_type] = (
resource_type_limits[resource_type] - worker_stats[harvester_id][job_type][resource_type]["running"]
)
tmp_log.debug(f"Limit for rt {resource_type} down to {resource_type_limits[resource_type]}")
for job_type in worker_stats[harvester_id]:
workers_queued.setdefault(job_type, {})
for resource_type in worker_stats[harvester_id][job_type]:
core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue)
try:
n_cores_running = n_cores_running + worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor

# This limit is in #CORES, since it mixes single and multi core jobs
if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits:
resource_type_limits[HIMEM] = (
resource_type_limits[HIMEM] - worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor
)
tmp_log.debug(f"Limit for rt group {HIMEM} down to {resource_type_limits[HIMEM]}")
# This limit is in #JOBS or #WORKERS, not in #CORES
if resource_type in resource_type_limits:
resource_type_limits[resource_type] = (
resource_type_limits[resource_type] - worker_stats[harvester_id][job_type][resource_type]["running"]
)
tmp_log.debug(f"Limit for rt {resource_type} down to {resource_type_limits[resource_type]}")

except KeyError:
pass
# This limit is in #CORES, since it mixes single and multi core jobs
if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits:
resource_type_limits[HIMEM] = resource_type_limits[HIMEM] - worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor
tmp_log.debug(f"Limit for rt group {HIMEM} down to {resource_type_limits[HIMEM]}")

try: # submitted
workers_queued[job_type].setdefault(resource_type, 0)
workers_queued[job_type][resource_type] = (
workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["submitted"]
)
n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["submitted"] * core_factor
except KeyError:
pass
except KeyError:
pass

try: # ready
workers_queued[job_type].setdefault(resource_type, 0)
workers_queued[job_type][resource_type] = (
workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["ready"]
)
n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["ready"] * core_factor
except KeyError:
pass
try: # submitted
workers_queued[job_type].setdefault(resource_type, 0)
workers_queued[job_type][resource_type] = (
workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["submitted"]
)
n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["submitted"] * core_factor
except KeyError:
pass

try: # ready
workers_queued[job_type].setdefault(resource_type, 0)
workers_queued[job_type][resource_type] = (
workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["ready"]
)
n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["ready"] * core_factor
except KeyError:
pass

tmp_log.debug(f"Queue {queue} queued worker overview: {workers_queued}")

Expand All @@ -23561,19 +23649,27 @@ def ups_new_worker_distribution(self, queue, worker_stats):
# Get the sorted global shares
sorted_shares = self.get_sorted_leaves()

# Run over the activated jobs by gshare & priority, and substract them from the queued
# Run over the activated jobs by gshare & priority, and subtract them from the queued
# A negative value for queued will mean more pilots of that resource type are missing
for share in sorted_shares:
var_map = {":queue": queue, ":gshare": share.name}
sql = f"""
SELECT gshare, prodsourcelabel, resource_type FROM {panda_config.schemaPANDA}.jobsactive4
WHERE jobstatus = 'activated'
AND computingsite=:queue
AND gshare=:gshare
ORDER BY currentpriority DESC
"""
sql = (
f"SELECT gshare, prodsourcelabel, resource_type FROM {panda_config.schemaPANDA}.jobsactive4 "
"WHERE jobstatus = 'activated' "
"AND computingsite=:queue "
"AND gshare=:gshare "
)

# if we need to filter on resource types
if resource_types_under_target:
resource_type_string = ", ".join([f":{item}" for item in resource_types_under_target])
sql += f" AND resource_type IN ({resource_type_string}) "
var_map.update({f":{item}": item for item in resource_types_under_target})

sql += "ORDER BY currentpriority DESC"
self.cur.execute(sql + comment, var_map)
activated_jobs = self.cur.fetchall()

tmp_log.debug(f"Processing share: {share.name}. Got {len(activated_jobs)} activated jobs")
for gshare, prodsourcelabel, resource_type in activated_jobs:
core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue)
Expand Down Expand Up @@ -23620,6 +23716,8 @@ def ups_new_worker_distribution(self, queue, worker_stats):
# we don't have enough workers for this resource type
new_workers[job_type][resource_type] = -workers_queued[job_type][resource_type] + 1

tmp_log.debug(f"preliminary new workers: {new_workers}")

# We should still submit a basic worker, even if there are no activated jobs to avoid queue deactivation
workers = False
for job_type in new_workers:
Expand All @@ -23630,16 +23728,9 @@ def ups_new_worker_distribution(self, queue, worker_stats):
if not workers:
new_workers["managed"] = {BASIC_RESOURCE_TYPE: 1}

# In case multiple harvester instances are serving a panda queue, split workers evenly between them
new_workers_per_harvester = {}
for harvester_id in harvester_ids:
new_workers_per_harvester.setdefault(harvester_id, {})
for job_type in new_workers:
new_workers_per_harvester[harvester_id].setdefault(job_type, {})
for resource_type in new_workers[job_type]:
new_workers_per_harvester[harvester_id][job_type][resource_type] = int(
math.ceil(new_workers[job_type][resource_type] * 1.0 / len(harvester_ids))
)
tmp_log.debug(f"new workers: {new_workers}")

new_workers_per_harvester = {harvester_id: new_workers}

tmp_log.debug(f"Workers to submit: {new_workers_per_harvester}")
tmp_log.debug("done")
Expand Down
9 changes: 9 additions & 0 deletions pandaserver/taskbuffer/ResourceSpec.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ def translate_resourcetype_to_cores(self, resource_name, cores_queue):

return 1

def filter_out_high_memory_resourcetypes(self):
resource_names = list(
map(
lambda resource_type: resource_type.resource_name,
filter(lambda resource_type: self.is_high_memory(resource_type.resource_name), self.resource_types),
)
)
return resource_names


class ResourceSpec(object):
# attributes
Expand Down
11 changes: 11 additions & 0 deletions pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3808,6 +3808,17 @@ def ups_load_worker_stats(self):
# return
return ret

# get the distribution of new workers to submit
def get_average_memory_workers(self, queue, harvester_id):
# get DBproxy
proxy = self.proxyPool.getProxy()
# exec
ret = proxy.get_average_memory_workers(queue, harvester_id)
# release proxy
self.proxyPool.putProxy(proxy)
# return
return ret

# get the distribution of new workers to submit
def ups_new_worker_distribution(self, queue, worker_stats):
# get DBproxy
Expand Down

0 comments on commit f729cbd

Please sign in to comment.