Skip to content

Commit

Permalink
resilience v2 (1/45): Add support for resilience attributes to pool s…
Browse files Browse the repository at this point in the history
…election unit and related classes

Motivation:

In order to enable resilience handling on the basis of storage units, additional properties
must be defined.

Modification:

To the pool group, a simple resilience marker is added.  This is necessary for the resilience
system to distinguish between pools which are under its control and pools which are not.

For storage units, two properties are defined:

    required       = number of non-removable copies needed
    onlyOneCopyPer = a comma-delimited list of pool tag names used to partition copies among pools in the pool group.
                     These are understood to constitute an "AND" clause for matching.

Changes in support of these additional properties are propagated to the selection unit and its commands.
In addition, several minor adjustments are necessary to accommodate the extra array slots in the info service handlers.
The storage unit has been also turned into a separate class for convenience.

Result:

The selection unit (pool manager configuration) now supports the definition of resilience on a storage unit basis.

NOTES:

1.  The storage unit default for 'required' is 1.
2.  A storage unit with required=1 can be linked to a pool group with resilience=true without conflicts.
3.  A storage unit with required>1 *cannot* be linked to a pool group with resilience=false.
    This logic currently resides in the pool selection unit decorator logic provided by the resilience system [an upstream patch];
    I was undecided whether to incorporate it into the actual V2 implementation layer or not.

Target: master
Acked-by: Dmitry
Acked-by: Gerd
  • Loading branch information
alrossi committed Feb 10, 2016
1 parent 72ddf8f commit d1aa11d
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 408 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,22 @@ public void process(Object msgPayload, long metricLifetime)
LOGGER.trace("processing new poolgroup information");

if (!msgPayload.getClass().isArray()) {
LOGGER.error("received a message that isn't an array");
LOGGER.error("Pool group info, received a message that isn't an array");
return;
}

Object array[] = (Object []) msgPayload;

if (array.length != 3) {
LOGGER.error("Unexpected array size: {}", array.length);
return;
}
if (array.length != 4) {
LOGGER.error("Pool group info, unexpected array size: {}", array.length);
return;
}

// Map the array into slightly more meaningful components.
String poolgroupName = (String) array[0];
Object poolNames[] = (Object []) array[1];
Object linkNames[] = (Object []) array[2];
Object resilient[] = (Object []) array[3];

StateUpdate update = new StateUpdate();

Expand All @@ -55,6 +56,7 @@ public void process(Object msgPayload, long metricLifetime)
} else {
addItems(update, thisPoolGroupPath.newChild("pools"), poolNames, metricLifetime);
addItems(update, thisPoolGroupPath.newChild("links"), linkNames, metricLifetime);
addItems(update, thisPoolGroupPath.newChild("resilient"), resilient, metricLifetime);
}

applyUpdates(update);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,26 @@ public UnitInfoMsgHandler(StateUpdateManager sum,
public void process(Object msgPayload, long metricLifetime)
{
if (!msgPayload.getClass().isArray()) {
LOGGER.error("unexpected received non-array payload");
LOGGER.error("Unit info, unexpected received non-array payload");
return;
}

Object array[] = (Object []) msgPayload;

if (array.length != 3) {
LOGGER.error("Unexpected array size: {}", array.length);
if (array.length > 5 || array.length < 3) {
LOGGER.error("Unit info, unexpected array size: {}", array.length);
return;
}

/*
* array[0] = name
* array[1] = type
* array[2] = list of unitgroups.
*
* for storage,
* array[3] = required (number of copies)
* array[4] = list of tags for partitioning copies
*/

String unitName = array[0].toString();
String unitType = array[1].toString();

Expand All @@ -61,6 +64,19 @@ public void process(Object msgPayload, long metricLifetime)

addItems(update, thisUnitPath.newChild("unitgroups"), (Object []) array [2], metricLifetime);

if ("store".equals(unitType)) {
if (array.length == 5) {
if (array[3] != null) {
addItems(update, thisUnitPath.newChild("required"),
(Object[]) array[3], metricLifetime);
}
if (array[4] != null) {
addItems(update, thisUnitPath.newChild("oneCopyPer"),
(Object[]) array[4], metricLifetime);
}
}
}

applyUpdates(update);
}
}
10 changes: 7 additions & 3 deletions modules/dcache/src/main/java/diskCacheV111/poolManager/Link.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ public String getTag() {
@Override
public Collection<SelectionPoolGroup> getPoolGroupsPointingTo() {
Collection<SelectionPoolGroup> pGroups = new ArrayList<>();
for (PoolCore pGroup : _poolList.values()) {
PGroup newPGroup = new PGroup(pGroup.getName());
pGroups.add(newPGroup);
for (PoolCore pcore : _poolList.values()) {
if (pcore instanceof PGroup) {
PGroup original = (PGroup)pcore;
PGroup newPGroup = new PGroup(original.getName(),
original.isResilient());
pGroups.add(newPGroup);
}
}
return pGroups;
}
Expand Down
17 changes: 12 additions & 5 deletions modules/dcache/src/main/java/diskCacheV111/poolManager/PGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,25 @@ class PGroup extends PoolCore implements SelectionPoolGroup {
private static final long serialVersionUID = 3883973457610397314L;
final Map<String, Pool> _poolList = new ConcurrentHashMap<>();

PGroup(String name) {
private final boolean resilient;

PGroup(String name, boolean resilient) {
super(name);
this.resilient = resilient;
}

private String[] getPools()
{
return _poolList.keySet().toArray(new String[_poolList.size()]);
@Override
public boolean isResilient() {
return resilient;
}

@Override
public String toString() {
return getName() + " (links=" + _linkList.size() + ";pools=" + _poolList.size() + ")";
return super.toString() + " " + getName() + "(links=" + _linkList.size()
+ "; pools=" + _poolList.size() + "; resilient=" + resilient + ")";
}

private String[] getPools() {
return _poolList.keySet().toArray(new String[_poolList.size()]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ abstract class PoolCore implements Serializable {
private static final long serialVersionUID = -8571296485927073985L;
private final String _name;
protected final Map<String, Link> _linkList = new ConcurrentHashMap<>();

protected PoolCore(String name) {
_name = name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ interface SelectionPool extends SelectionEntity
/**
* Returns the pool mode.
*
* @see setPoolMode
* @see #setPoolMode
*/
PoolV2Mode getPoolMode();

Expand Down Expand Up @@ -148,9 +148,12 @@ interface SelectionPool extends SelectionEntity

void setAddress(CellAddressCore address);
}

interface SelectionPoolGroup extends SelectionEntity {
boolean isResilient();
}
interface SelectionLinkGroup extends SelectionEntity{

interface SelectionLinkGroup extends SelectionEntity{
void add(SelectionLink link);
boolean remove(SelectionLink link);
Collection<SelectionLink> getLinks();
Expand All @@ -167,7 +170,9 @@ interface SelectionLinkGroup extends SelectionEntity{
boolean isOnlineAllowed();
boolean isNearlineAllowed();
}

interface SelectionUnit extends SelectionEntity{
int getType();
String getUnitType();
Collection<SelectionUnitGroup> getMemberOfUnitGroups();
}
Expand All @@ -176,17 +181,19 @@ interface SelectionUnitGroup extends SelectionEntity {
Collection<SelectionUnit> getMemeberUnits();
Collection<SelectionLink> getLinksPointingTo();
}
SelectionPool getPool(String poolName) ;
SelectionPool getPool(String poolName, boolean create) ;
SelectionLink getLinkByName(String linkName) throws NoSuchElementException ;

SelectionPool getPool(String poolName);
SelectionPool getPool(String poolName, boolean create);
SelectionLink getLinkByName(String linkName) throws NoSuchElementException;
PoolPreferenceLevel []
match(DirectionType type, String net, String protocol,
FileAttributes fileAttributes, String linkGroup) ;
String [] getActivePools() ;
String [] getDefinedPools(boolean enabledOnly) ;
String getVersion() ;
FileAttributes fileAttributes, String linkGroup);
String[] getActivePools() ;
String[] getDefinedPools(boolean enabledOnly) ;
String getVersion() ;
String getNetIdentifier(String address) throws UnknownHostException;
String getProtocolUnit(String protocolUnitName) ;
StorageUnit getStorageUnit(String storageClass) ;
SelectionLinkGroup getLinkGroupByName(String linkGroupName) throws NoSuchElementException ;
Collection<SelectionPool> getPoolsByPoolGroup(String poolGroup) throws NoSuchElementException;
Collection<SelectionPool> getAllDefinedPools(boolean enabledOnly) ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface PoolSelectionUnitAccess extends CellSetupProvider {

void createPool(String name, boolean isNoPing, boolean isDisabled);

void createPoolGroup(String name);
void createPoolGroup(String name, boolean isResilient);

void createUnit(String name, boolean isNet, boolean isStore,
boolean isDcache, boolean isProtocol);
Expand Down Expand Up @@ -116,5 +116,9 @@ void setLinkGroup(String linkGroupName, String custodial,

String setRegex(String onOff);

void setStorageUnit(String storageUnitKey,
Integer required,
String[] onlyOneCopyPer);

void unlink(String linkName, String poolName);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package diskCacheV111.poolManager;

import com.google.common.base.Preconditions;

import java.net.UnknownHostException;
import java.util.concurrent.Callable;

import dmg.cells.nucleus.CellCommandListener;
import dmg.util.CommandSyntaxException;
import dmg.util.command.Argument;
import dmg.util.command.Command;
import dmg.util.command.Option;
import org.dcache.util.Args;

/**
Expand Down Expand Up @@ -83,10 +89,10 @@ public String ac_psu_clear_im_really_sure(Args args) {
return "";
}

public static final String hh_psu_create_pgroup = "<pool group>";
public static final String hh_psu_create_pgroup = "<pool group> [-resilient]";

public String ac_psu_create_pgroup_$_1(Args args) {
psuAccess.createPoolGroup(args.argv(0));
psuAccess.createPoolGroup(args.argv(0), args.hasOption("resilient"));
return "";
}

Expand Down Expand Up @@ -396,6 +402,43 @@ public String ac_psu_ls_netunits(Args args) {

public static final String hh_psu_unlink = "<link> <pool>|<pool group>";

@Command(name = "psu set storage unit",
hint = "define resilience requirements for a storage unit",
description = "Sets the required number of copies and/or "
+ "the partitioning of replicas by pool tags.")
class StorageUnitCommand implements Callable<String> {
@Option(name="required",
usage="Set the number of copies required. "
+ "Must be an integer >= 1. A storage "
+ "unit has required set to 1 by default. "
+ "Not specifying this attribute means "
+ "the current value is retained.")
Integer required;

@Option(name="onlyOneCopyPer",
separator = ",",
usage="A comma-delimited list of pool tag names used to "
+ "partition copies across pools "
+ "(interpreted as an 'and'-clause). "
+ "A storage unit has an empty list by default. "
+ "Not specifying this attribute means "
+ "the current value is retained; specifying "
+ "an empty string restores default behavior.")
String[] onlyOneCopyPer;

@Argument(usage="Name of the storage unit.")
String name;

@Override
public String call() throws Exception {
Preconditions.checkArgument(required == null || required >= 1,
"required must be >= 1, was set to %s.",
required);
psuAccess.setStorageUnit(name, required, onlyOneCopyPer);
return "";
}
}

public String ac_psu_unlink_$_2(Args args) {
psuAccess.unlink(args.argv(0), args.argv(1));
return "";
Expand Down
Loading

0 comments on commit d1aa11d

Please sign in to comment.