Skip to content

Commit

Permalink
dcache-qos: add support for policy handling in qos-engine (qos rule e…
Browse files Browse the repository at this point in the history
…ngine 4)

Motivation:

Implement the rule engine extension to QoS services.

Modification:

This patch represents the core of the rule engine
with the most new code.   The changes and additions
are as follows:

- The engine service now accesses the database. A
  new table, `qos_file_state`, holds information
  about individual files whose QoS state must be
  managed.  This includes the pnfsid, state index
  and expiration.

- A cache which is populated from the `chimera`
  table holds the JSON description for the known
  qos policies.  All external queries for
  policy information or for adding or removing
  policies pass through this cache.

- A new requirements provider based on file policy
  is added; this subclasses the standard AL/RP
  provider and falls back to it when a file's
  policy attributes are undefined.

- A manager thread is scheduled which scans the
  database table ordered by timestamp up to
  the current time of expiration, and handles
  those files with expired state by checking
  for the next state and requesting a modification
  from the verifier. Updates are made to the table
  when the  modification completes. Files with no
  further state changes are deleted from the table.

- Separate thread queues have been added to
  handle different kinds of traffic:
  (a) the normal cache location messages
  processed for "resilience", and now also
  for policy state management; (b) user
  requests to modify a file's qos;
  (c) requests for policy information and
  the internally generated periodic
  scans.

- Message support for Frontend REST access
  to policy information, as well as admin
  shell commands.

Result:

The core of the rule engine has been
implemented.

Target: master
Patch: https://rb.dcache.org/r/14072
Depends-on: #14066,#14067
Acked-by: Tigran
  • Loading branch information
alrossi committed Sep 6, 2023
1 parent e2a1f37 commit 77bd7a1
Show file tree
Hide file tree
Showing 19 changed files with 1,738 additions and 71 deletions.
Expand Up @@ -64,13 +64,13 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import diskCacheV111.vehicles.PnfsAddCacheLocationMessage;
import diskCacheV111.vehicles.PnfsClearCacheLocationMessage;
import dmg.cells.nucleus.CellMessageReceiver;
import dmg.cells.nucleus.Reply;
import java.util.function.Consumer;
import org.dcache.cells.MessageReply;
import org.dcache.qos.services.engine.handler.FileQoSStatusHandler;
import org.dcache.qos.util.MessageGuard;
import org.dcache.qos.util.MessageGuard.Status;
import org.dcache.vehicles.CorruptFileMessage;
import org.dcache.vehicles.qos.FileQoSPolicyInfoMessage;
import org.dcache.vehicles.qos.QoSActionCompleteMessage;
import org.dcache.vehicles.qos.QoSCancelRequirementsModifiedMessage;
import org.dcache.vehicles.qos.QoSRequirementsModifiedMessage;
Expand Down Expand Up @@ -185,6 +185,13 @@ public void messageArrived(QoSActionCompleteMessage message) {
action.getError()));
}

public MessageReply messageArrived(FileQoSPolicyInfoMessage message) {
MessageReply<Message> reply = new MessageReply<>();
fileStatusHandler.handleQoSPolicyInfoRequest(message, reply);

return reply;
}

public void setMessageGuard(MessageGuard messageGuard) {
this.messageGuard = messageGuard;
}
Expand Down
Expand Up @@ -59,10 +59,17 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
package org.dcache.qos.services.engine.admin;

import static org.dcache.qos.services.engine.handler.FileQoSStatusHandler.DEFAULT_PERIOD;
import static org.dcache.qos.services.engine.handler.FileQoSStatusHandler.DEFAULT_QUERY_LIMIT;
import static org.dcache.qos.services.engine.handler.FileQoSStatusHandler.DEFAULT_UNIT;

import diskCacheV111.util.PnfsId;
import dmg.cells.nucleus.CellCommandListener;
import dmg.util.command.Argument;
import dmg.util.command.Command;
import dmg.util.command.Option;
import java.util.concurrent.TimeUnit;
import org.dcache.qos.services.engine.handler.FileQoSStatusHandler;
import org.dcache.qos.services.engine.util.QoSEngineCounters;
import org.dcache.qos.util.InitializerAwareCommand;
import org.dcache.qos.util.MapInitializer;
Expand Down Expand Up @@ -152,6 +159,52 @@ protected String doCall() throws Exception {
}
}

@Command(name = "reset", hint = "reset the period",
description = "Resets the period and unit for the expiration checks of policy state.")
class ResetCommand extends InitializerAwareCommand {

@Option(name = "period",
usage = "Value of the interval for periodic checks of policy state expiration.")
int period = DEFAULT_PERIOD;

@Option(name = "unit",
valueSpec = "MILLIS|SECONDS|MINUTES|HOURS|DAYS",
usage = "Time unit of the interval for periodic checks of policy state expiration.")
TimeUnit unit = DEFAULT_UNIT;

@Option(name = "limit",
usage = "Max number of entries to fetch at a time.")
int limit = DEFAULT_QUERY_LIMIT;

ResetCommand() {
super(initializer);
}

@Override
protected String doCall() throws Exception {
handler.setPeriod(period);
handler.setPeriodUnit(unit);
handler.reset();
return "Currently scheduled sweep cancelled and reset.";
}
}

@Command(name = "qos", hint = "print qos info for a file if it is being tracked",
description = "Resets the period and unit for the expiration checks of policy state.")
class QoSRecord extends InitializerAwareCommand {

@Argument(index = 0,
usage = "The unique identifier of the file within dCache.")
PnfsId pnfsId;

QoSRecord() { super(initializer); }

@Override
protected String doCall() throws Exception {
return handler.getQoSRecordIfExists(pnfsId);
}
}

private final MapInitializer initializer = new MapInitializer() {
@Override
protected long getRefreshTimeout() {
Expand All @@ -175,6 +228,11 @@ public boolean isInitialized() {

private MessageGuard messageGuard;
private QoSEngineCounters counters;
private FileQoSStatusHandler handler;

public void setHandler(FileQoSStatusHandler handler) {
this.handler = handler;
}

public void setCounters(QoSEngineCounters counters) {
this.counters = counters;
Expand Down
@@ -0,0 +1,116 @@
/*
COPYRIGHT STATUS:
Dec 1st 2001, Fermi National Accelerator Laboratory (FNAL) documents and
software are sponsored by the U.S. Department of Energy under Contract No.
DE-AC02-76CH03000. Therefore, the U.S. Government retains a world-wide
non-exclusive, royalty-free license to publish or reproduce these documents
and software for U.S. Government purposes. All documents and software
available from this server are protected under the U.S. and Foreign
Copyright Laws, and FNAL reserves all rights.
Distribution of the software available from this server is free of
charge subject to the user following the terms of the Fermitools
Software Legal Information.
Redistribution and/or modification of the software shall be accompanied
by the Fermitools Software Legal Information (including the copyright
notice).
The user is asked to feed back problems, benefits, and/or suggestions
about the software to the Fermilab Software Providers.
Neither the name of Fermilab, the URA, nor the names of the contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
DISCLAIMER OF LIABILITY (BSD):
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FERMILAB,
OR THE URA, OR THE U.S. DEPARTMENT of ENERGY, OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Liabilities of the Government:
This software is provided by URA, independent from its Prime Contract
with the U.S. Department of Energy. URA is acting independently from
the Government and in its own private capacity and is not acting on
behalf of the U.S. Government, nor as its contractor nor its agent.
Correspondingly, it is understood and agreed that the U.S. Government
has no connection to this software and in no manner whatsoever shall
be liable for nor assume any responsibility or obligation for any claim,
cost, or damages arising out of or resulting from the use of the software
available from this server.
Export Control:
All documents and software available from this server are subject to U.S.
export control laws. Anyone downloading information from this server is
obligated to secure any necessary Government licenses before exporting
documents or software obtained from this server.
*/
package org.dcache.qos.services.engine.data;

import diskCacheV111.util.PnfsId;

/**
* Holds information about a file qos state. There should not be more than one
* of these per file at a time.
*/
public class QoSRecord {

private final long id;
private final PnfsId pnfsId;
private final long expiration;
private final int currentState;

public QoSRecord(long id, PnfsId pnfsId, long expiration, int currentState) {
this.id = id;
this.pnfsId = pnfsId;
this.expiration = expiration;
this.currentState = currentState;
}

public long getId() {
return id;
}

public PnfsId getPnfsId() {
return pnfsId;
}

public long getExpiration() {
return expiration;
}

public int getCurrentState() {
return currentState;
}

public String toString() {
return String.format("(pnfsid %s)(current state index %s)(expires %s)", pnfsId,
currentState, expiration);
}

public boolean equals(Object other) {
if (!(other instanceof QoSRecord)) {
return false;
}

QoSRecord record = (QoSRecord) other;

return pnfsId.equals(record.pnfsId);
}

public int hashCode() {
return pnfsId.hashCode();
}
}
@@ -0,0 +1,159 @@
/*
COPYRIGHT STATUS:
Dec 1st 2001, Fermi National Accelerator Laboratory (FNAL) documents and
software are sponsored by the U.S. Department of Energy under Contract No.
DE-AC02-76CH03000. Therefore, the U.S. Government retains a world-wide
non-exclusive, royalty-free license to publish or reproduce these documents
and software for U.S. Government purposes. All documents and software
available from this server are protected under the U.S. and Foreign
Copyright Laws, and FNAL reserves all rights.
Distribution of the software available from this server is free of
charge subject to the user following the terms of the Fermitools
Software Legal Information.
Redistribution and/or modification of the software shall be accompanied
by the Fermitools Software Legal Information (including the copyright
notice).
The user is asked to feed back problems, benefits, and/or suggestions
about the software to the Fermilab Software Providers.
Neither the name of Fermilab, the URA, nor the names of the contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
DISCLAIMER OF LIABILITY (BSD):
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FERMILAB,
OR THE URA, OR THE U.S. DEPARTMENT of ENERGY, OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Liabilities of the Government:
This software is provided by URA, independent from its Prime Contract
with the U.S. Department of Energy. URA is acting independently from
the Government and in its own private capacity and is not acting on
behalf of the U.S. Government, nor as its contractor nor its agent.
Correspondingly, it is understood and agreed that the U.S. Government
has no connection to this software and in no manner whatsoever shall
be liable for nor assume any responsibility or obligation for any claim,
cost, or damages arising out of or resulting from the use of the software
available from this server.
Export Control:
All documents and software available from this server are subject to U.S.
export control laws. Anyone downloading information from this server is
obligated to secure any necessary Government licenses before exporting
documents or software obtained from this server.
*/
package org.dcache.qos.services.engine.data.db;

import diskCacheV111.util.PnfsId;
import java.util.List;
import java.util.Optional;
import org.dcache.qos.services.engine.data.QoSRecord;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.support.JdbcDaoSupport;

/**
* Simple generic SQL.
*/
public class JdbcQoSEngineDao extends JdbcDaoSupport {

private static final String INSERT = "INSERT INTO qos_file_status (pnfsid, expires, state) "
+ "VALUES (?,?,?)";

private static final String UPDATE = "UPDATE qos_file_status SET expires=expires+?, state=? "
+ "WHERE pnfsid=? AND state!=?";

private static final String DELETE = "DELETE FROM qos_file_status WHERE pnfsid=?";

private static final String SELECT_EXPIRED =
"SELECT id, pnfsid, expires, state FROM qos_file_status "
+ "WHERE expires <=? and id >=? ORDER BY id limit ?";

private static final String SELECT = "SELECT id, pnfsid, expires, state FROM qos_file_status WHERE pnfsid=?";

private static RowMapper<QoSRecord> QOS_RECORD_MAPPER = (rs, rowNum) -> {
return new QoSRecord(rs.getLong(1), new PnfsId(rs.getString(2)),
rs.getLong(3), rs.getInt(4));
};

private int fetchSize;

public void setFetchSize(int fetchSize) {
this.fetchSize = fetchSize;
}

public long findExpired(List<QoSRecord> expired, long offset, int limit) {
long[] max = {0L};
long now = System.currentTimeMillis();
JdbcTemplate template = getJdbcTemplate();
template.setFetchSize(fetchSize);
template.query(SELECT_EXPIRED, ps -> {
ps.setLong(1, now);
ps.setLong(2, offset);
ps.setInt(3, limit);
}, rs -> {
expired.add(QOS_RECORD_MAPPER.mapRow(rs, rs.getRow()));
max[0] = Math.max(max[0], rs.getLong(1));
});

return max[0];
}

public Optional<QoSRecord> getRecord(PnfsId pnfs) {
try {
return Optional.ofNullable(
getJdbcTemplate().queryForObject(SELECT, QOS_RECORD_MAPPER,
pnfs.toString()));
} catch (EmptyResultDataAccessException e) {
return Optional.empty();
}
}

/**
* Generic (does not take advantage of the Postgres-specific ON CONFLICT);
*/
public boolean upsert(PnfsId pnfsId, int index, long duration) {
try {
return 0 < getJdbcTemplate().update(INSERT, ps -> {
ps.setString(1, pnfsId.toString());
ps.setLong(2, System.currentTimeMillis() + duration);
ps.setInt(3, index);
});
} catch (DuplicateKeyException e) {
return 0 < getJdbcTemplate().update(UPDATE, ps -> {
ps.setLong(1, duration);
ps.setInt(2, index);
ps.setString(3, pnfsId.toString());
ps.setInt(4, index);
});
}
}

public boolean delete(PnfsId pnfsId) {
try {
return 0 < getJdbcTemplate().update(DELETE, ps -> {
ps.setString(1, pnfsId.toString());
});
} catch (DataAccessException e) {
return false;
}
}
}

0 comments on commit 77bd7a1

Please sign in to comment.