Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sweep:integration] Fix race condition in FTS3DB.getNonFinishedOperations #5984

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
sweep: #5967 Fix race condition in FTS3DB.getNonFinishedOperations
  • Loading branch information
chaen authored and web-flow committed Mar 24, 2022
commit 86b389067611ea4a5e4ede30833e7394658bb099
63 changes: 41 additions & 22 deletions src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
# Lifetime in seconds of the proxy we download for submission
PROXY_LIFETIME = 43200 # 12 hours

# Instead of querying many jobs at once,
# which maximizes the possibility of race condition
# when running multiple agents, we rather do it in steps
JOB_MONITORING_BATCH_SIZE = 20


class FTS3Agent(AgentModule):
"""
Expand Down Expand Up @@ -279,35 +284,49 @@ def monitorJobsLoop(self):
log = gLogger.getSubLogger("monitorJobs")
log.debug("Size of the context cache %s" % len(self._globalContextCache))

# Find the number of loops
nbOfLoops, mod = divmod(self.jobBulkSize, JOB_MONITORING_BATCH_SIZE)
if mod:
nbOfLoops += 1

log.debug("Getting active jobs")
# get jobs from DB
res = self.fts3db.getActiveJobs(limit=self.jobBulkSize, jobAssignmentTag=self.assignmentTag)

if not res["OK"]:
log.error("Could not retrieve ftsJobs from the DB", res)
return res
for loopId in range(nbOfLoops):

activeJobs = res["Value"]
log.info("%s jobs to queue for monitoring" % len(activeJobs))
log.info("Getting next batch of jobs to monitor", "%s/%s" % (loopId, nbOfLoops))
# get jobs from DB
res = self.fts3db.getActiveJobs(limit=JOB_MONITORING_BATCH_SIZE, jobAssignmentTag=self.assignmentTag)

# We store here the AsyncResult object on which we are going to wait
applyAsyncResults = []
if not res["OK"]:
log.error("Could not retrieve ftsJobs from the DB", res)
return res

# Starting the monitoring threads
for ftsJob in activeJobs:
log.debug("Queuing executing of ftsJob %s" % ftsJob.jobID)
# queue the execution of self._monitorJob( ftsJob ) in the thread pool
# The returned value is passed to _monitorJobCallback
applyAsyncResults.append(
self.jobsThreadPool.apply_async(self._monitorJob, (ftsJob,), callback=self._monitorJobCallback)
)
activeJobs = res["Value"]
log.info("Jobs queued for monitoring", len(activeJobs))

log.debug("All execution queued")
# We store here the AsyncResult object on which we are going to wait
applyAsyncResults = []

# Waiting for all the monitoring to finish
while not all([r.ready() for r in applyAsyncResults]):
log.debug("Not all the tasks are finished")
time.sleep(0.5)
# Starting the monitoring threads
for ftsJob in activeJobs:
log.debug("Queuing executing of ftsJob %s" % ftsJob.jobID)
# queue the execution of self._monitorJob( ftsJob ) in the thread pool
# The returned value is passed to _monitorJobCallback
applyAsyncResults.append(
self.jobsThreadPool.apply_async(self._monitorJob, (ftsJob,), callback=self._monitorJobCallback)
)

log.debug("All execution queued")

# Waiting for all the monitoring to finish
while not all([r.ready() for r in applyAsyncResults]):
log.debug("Not all the tasks are finished")
time.sleep(0.5)

# If we got less to monitor than what we asked,
# stop looping
if len(activeJobs) < JOB_MONITORING_BATCH_SIZE:
break

log.debug("All the tasks have completed")
return S_OK()
Expand Down
48 changes: 25 additions & 23 deletions src/DIRAC/DataManagementSystem/DB/FTS3DB.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@

metadata = MetaData()

# Define the default utc_timestampfunction.
# We overwrite it in the case of sqlite in the tests
# because sqlite does not know UTC_TIMESTAMP
utc_timestamp = func.utc_timestamp

fts3FileTable = Table(
"Files",
metadata,
Column("fileID", Integer, primary_key=True),
Column("operationID", Integer, ForeignKey("Operations.operationID", ondelete="CASCADE"), nullable=False),
Column("attempt", Integer, server_default="0"),
Column("lastUpdate", DateTime, onupdate=func.utc_timestamp()),
Column("lastUpdate", DateTime, onupdate=utc_timestamp()),
Column("rmsFileID", Integer, server_default="0"),
Column("lfn", String(1024)),
Column("checksum", String(255)),
Expand All @@ -67,7 +71,7 @@
Column("jobID", Integer, primary_key=True),
Column("operationID", Integer, ForeignKey("Operations.operationID", ondelete="CASCADE"), nullable=False),
Column("submitTime", DateTime),
Column("lastUpdate", DateTime, onupdate=func.utc_timestamp()),
Column("lastUpdate", DateTime, onupdate=utc_timestamp()),
Column("lastMonitor", DateTime),
Column("completeness", Float),
# Could be fetched from Operation, but bad for perf
Expand Down Expand Up @@ -99,7 +103,7 @@
Column("activity", String(255)),
Column("priority", SmallInteger),
Column("creationTime", DateTime),
Column("lastUpdate", DateTime, onupdate=func.utc_timestamp()),
Column("lastUpdate", DateTime, onupdate=utc_timestamp()),
Column("status", Enum(*FTS3Operation.ALL_STATES), server_default=FTS3Operation.INIT_STATE, index=True),
Column("error", String(1024)),
Column("type", String(255)),
Expand Down Expand Up @@ -176,7 +180,7 @@ def __getDBConnectionInfo(self, fullname):
self.dbPass = dbParameters["Password"]
self.dbName = dbParameters["DBName"]

def __init__(self, pool_size=15):
def __init__(self, pool_size=15, url=None):
"""c'tor

:param self: self reference
Expand All @@ -185,12 +189,16 @@ def __init__(self, pool_size=15):
"""

self.log = gLogger.getSubLogger("FTS3DB")
# Initialize the connection info
self.__getDBConnectionInfo("DataManagement/FTS3DB")

if not url:
# Initialize the connection info
self.__getDBConnectionInfo("DataManagement/FTS3DB")

url = "mysql://%s:%s@%s:%s/%s" % (self.dbUser, self.dbPass, self.dbHost, self.dbPort, self.dbName)

runDebug = gLogger.getLevel() == "DEBUG"
self.engine = create_engine(
"mysql://%s:%s@%s:%s/%s" % (self.dbUser, self.dbPass, self.dbHost, self.dbPort, self.dbName),
url,
echo=runDebug,
pool_size=pool_size,
pool_recycle=3600,
Expand Down Expand Up @@ -221,7 +229,7 @@ def persistOperation(self, operation):
# so that another agent can work on the request
operation.assignment = None
# because of the merge we have to explicitely set lastUpdate
operation.lastUpdate = func.utc_timestamp()
operation.lastUpdate = utc_timestamp()
try:

# Merge it in case it already is in the DB
Expand Down Expand Up @@ -286,7 +294,6 @@ def getActiveJobs(self, limit=20, lastMonitor=None, jobAssignmentTag="Assigned")
:returns: list of FTS3Jobs

"""

session = self.dbSession(expire_on_commit=False)

try:
Expand Down Expand Up @@ -435,7 +442,7 @@ def updateJobStatus(self, jobStatusDict):
updateDict[FTS3Job.completeness] = valueDict["completeness"]

if valueDict.get("lastMonitor"):
updateDict[FTS3Job.lastMonitor] = func.utc_timestamp()
updateDict[FTS3Job.lastMonitor] = utc_timestamp()

updateDict[FTS3Job.assignment] = None

Expand Down Expand Up @@ -530,12 +537,13 @@ def getNonFinishedOperations(self, limit=20, operationAssignmentTag="Assigned"):
ftsOperations = []

# We need to do the select in two times because the join clause that makes the limit difficult
# We get the list of operations ID that have associated jobs assigned
opIDsWithJobAssigned = session.query(FTS3Job.operationID).filter(~FTS3Job.assignment.is_(None)).subquery()
operationIDsQuery = (
session.query(FTS3Operation.operationID)
.outerjoin(FTS3Job)
.filter(FTS3Operation.status.in_(["Active", "Processed"]))
.filter(FTS3Operation.assignment.is_(None))
.filter(FTS3Job.assignment.is_(None))
.filter(~FTS3Operation.operationID.in_(opIDsWithJobAssigned))
.order_by(FTS3Operation.lastUpdate.asc())
.limit(limit)
.distinct()
Expand Down Expand Up @@ -591,8 +599,7 @@ def kickStuckOperations(self, limit=20, kickDelay=2):
ftsOps = (
session.query(FTS3Operation.operationID)
.filter(
FTS3Operation.lastUpdate
< (func.date_sub(func.utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay)))
FTS3Operation.lastUpdate < (func.date_sub(utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay)))
)
.filter(~FTS3Operation.assignment.is_(None))
.limit(limit)
Expand All @@ -607,7 +614,7 @@ def kickStuckOperations(self, limit=20, kickDelay=2):
.where(FTS3Operation.operationID.in_(opIDs))
.where(
FTS3Operation.lastUpdate
< (func.date_sub(func.utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay)))
< (func.date_sub(utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay)))
)
.values({"assignment": None})
.execution_options(synchronize_session=False) # see comment about synchronize_session
Expand Down Expand Up @@ -641,9 +648,7 @@ def kickStuckJobs(self, limit=20, kickDelay=2):

ftsJobs = (
session.query(FTS3Job.jobID)
.filter(
FTS3Job.lastUpdate < (func.date_sub(func.utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay)))
)
.filter(FTS3Job.lastUpdate < (func.date_sub(utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay))))
.filter(~FTS3Job.assignment.is_(None))
.limit(limit)
)
Expand All @@ -655,9 +660,7 @@ def kickStuckJobs(self, limit=20, kickDelay=2):
result = session.execute(
update(FTS3Job)
.where(FTS3Job.jobID.in_(jobIDs))
.where(
FTS3Job.lastUpdate < (func.date_sub(func.utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay)))
)
.where(FTS3Job.lastUpdate < (func.date_sub(utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay))))
.values({"assignment": None})
.execution_options(synchronize_session=False) # see comment about synchronize_session
)
Expand Down Expand Up @@ -689,8 +692,7 @@ def deleteFinalOperations(self, limit=20, deleteDelay=180):
ftsOps = (
session.query(FTS3Operation.operationID)
.filter(
FTS3Operation.lastUpdate
< (func.date_sub(func.utc_timestamp(), text("INTERVAL %d DAY" % deleteDelay)))
FTS3Operation.lastUpdate < (func.date_sub(utc_timestamp(), text("INTERVAL %d DAY" % deleteDelay)))
)
.filter(FTS3Operation.status.in_(FTS3Operation.FINAL_STATES))
.limit(limit)
Expand Down
Loading