Skip to content

Commit

Permalink
Merge pull request #6178 from h2oai/michalk_no-replicas
Browse files Browse the repository at this point in the history
PUBDEV-8692: DKV - Remove key-replicas from API
  • Loading branch information
Michal Kurka committed Apr 28, 2022
2 parents 7c91104 + 0bc46ce commit 85135c8
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -769,11 +769,11 @@ public static DeepLearningModelInfo timeAverage(DeepLearningModelInfo nodeAverag
}

public Key localModelInfoKey(H2ONode node) {
return Key.make(_model_id + ".node" + node.index(), (byte) 1 /*replica factor*/, (byte) 31 /*hidden user-key*/, true, node);
return Key.make(_model_id + ".node" + node.index(), Key.HIDDEN_USER_KEY, true, node);
}

public Key elasticAverageModelInfoKey() {
return Key.make(_model_id + ".elasticaverage", (byte) 1 /*replica factor*/, (byte) 31 /*hidden user-key*/, true, H2O.CLOUD._memary[0]);
return Key.make(_model_id + ".elasticaverage", Key.HIDDEN_USER_KEY, true, H2O.CLOUD._memary[0]);
}

static public class GradientCheck {
Expand Down
4 changes: 2 additions & 2 deletions h2o-core/src/main/java/water/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public Job(Key<T> key, String clz_of_T, String desc, String warningStr) {

// Job Keys are pinned to this node (i.e., the node that invoked the
// computation), because it should be almost always updated locally
private static Key<Job> defaultJobKey() { return Key.make((byte) 0, Key.JOB, false, H2O.SELF); }
private static Key<Job> defaultJobKey() { return Key.make(Key.JOB, false, H2O.SELF); }


/** Job start_time and end_time using Sys.CTM */
Expand Down Expand Up @@ -226,7 +226,7 @@ public final void update( final long newworked, final String msg) {

// --------------
/** A system key for global list of Job keys. */
public static final Key<Job> LIST = Key.make(" JobList", (byte) 0, Key.BUILT_IN_KEY, false);
public static final Key<Job> LIST = Key.make(" JobList", Key.BUILT_IN_KEY);

public String[] warns() {
update_from_remote();
Expand Down
171 changes: 71 additions & 100 deletions h2o-core/src/main/java/water/Key.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ final public class Key<T extends Keyed> extends Iced<Key<T>> implements Comparab
public static final byte HIDDEN_USER_KEY = 31;
public static final byte USER_KEY = 32;

// Indices into key header structure (key bytes)
private static final int KEY_HEADER_TYPE = 0;
private static final int KEY_HEADER_CUSTOM_HOMED = 1;

// For Fluid Vectors, we have a special Key layout.
// 0 - key type byte, one of VEC, CHK or GRP
// 1 - homing byte, always -1/0xFF as these keys use the hash to figure their home out
Expand All @@ -66,11 +70,11 @@ final public class Key<T extends Keyed> extends Iced<Key<T>> implements Comparab

/** True is this is a {@link Vec} Key.
* @return True is this is a {@link Vec} Key */
public final boolean isVec() { return _kb != null && _kb.length > 0 && _kb[0] == VEC; }
public final boolean isVec() { return _kb.length > 0 && _kb[KEY_HEADER_TYPE] == VEC; }

/** True is this is a {@link Chunk} Key.
* @return True is this is a {@link Chunk} Key */
public final boolean isChunkKey() { return _kb != null && _kb.length > 0 && _kb[0] == CHK; }
public final boolean isChunkKey() { return _kb.length > 0 && _kb[KEY_HEADER_TYPE] == CHK; }

/** Returns the {@link Vec} Key from a {@link Chunk} Key.
* @return Returns the {@link Vec} Key from a {@link Chunk} Key. */
Expand All @@ -83,22 +87,16 @@ public final T get() {
return val == null ? null : (T)val.get();
}

// *Desired* distribution function on keys & replication factor. Replica #0
// is the master, replica #1, 2, 3, etc represent additional desired
// replication nodes. Note that this function is just the distribution
// function - it does not DO any replication, nor does it dictate any policy
// on how fast replication occurs. Returns -1 if the desired replica
// is nonsense, e.g. asking for replica #3 in a 2-Node system.
int D( int repl ) {
// *Desired* distribution function on keys
int D() {
int hsz = H2O.CLOUD.size();

if (0 == hsz) return -1; // Clients starting up find no cloud, be unable to home keys

// See if this is a specifically homed Key
if( !user_allowed() && repl < _kb[1] ) { // Asking for a replica# from the homed list?
assert repl == 0 : "No replication is support now";
assert _kb[0] != Key.CHK;
H2ONode h2o = H2ONode.intern(_kb,2+repl*(H2ONode.H2Okey.SIZE /* serialized bytesize of H2OKey - depends on IP protocol */));
if (!user_allowed() && custom_homed()) {
assert _kb[KEY_HEADER_TYPE] != Key.CHK; // Chunks cannot be custom-homed
H2ONode h2o = H2ONode.intern(_kb,2);
// Reverse the home to the index
int idx = h2o.index();
if( idx >= 0 ) return idx;
Expand All @@ -116,9 +114,9 @@ int D( int repl ) {
// hash. Apart from that, we keep the previous mode of operation, so that
// ByteVec would have first 64MB distributed around cloud randomly and then
// go round-robin in 64MB chunks.
if( _kb[0] == CHK ) {
if( _kb[KEY_HEADER_TYPE] == CHK ) {
// Homed Chunk?
if( _kb[1] != -1 ) throw H2O.fail();
if( _kb[KEY_HEADER_CUSTOM_HOMED] != -1 ) throw H2O.fail();
// For round-robin on Chunks in the following pattern:
// 1 Chunk-per-node, until all nodes have 1 chunk (max parallelism).
// Then 2 chunks-per-node, once around, then 4, then 8, then 16.
Expand All @@ -137,11 +135,11 @@ int D( int repl ) {
// 15+ -> remaining rounds in groups of 16: nidx= (cidx-15*hsz)>>4
int z = x==0 ? 0 : (x<=2 ? 1 : (x<=6 ? 2 : (x<=14 ? 3 : 4)));
int nidx = (cidx-((1<<z)-1)*hsz)>>z;
return ((nidx+repl)&0x7FFFFFFF) % hsz;
return (nidx&0x7FFFFFFF) % hsz;
}

// Easy Cheesy Stupid:
return ((_hash+repl)&0x7FFFFFFF) % hsz;
return (_hash&0x7FFFFFFF) % hsz;
}


Expand All @@ -160,31 +158,19 @@ int D( int repl ) {
// The Cloud index, a byte uniquely identifying the last 256 Clouds. It
// changes atomically with the _cache word, so we can tell which Cloud this
// data is a cache of.
private static int cloud( long cache ) { return (int)(cache>>> 0)&0x00FF; }
// Shortcut node index for Home replica#0. This replica is responsible for
// breaking ties on writes. 'char' because I want an unsigned 16bit thing,
// limit of 65534 Cloud members. -1 is reserved for a bare-key
private static int home ( long cache ) { return (int)(cache>>> 8)&0xFFFF; }
// Our replica #, or -1 if we're not one of the first 127 replicas. This
// value is found using the Cloud distribution function and changes for a
// changed Cloud.
private static int replica(long cache) { return (byte)(cache>>>24)&0x00FF; }
// Desired replication factor. Can be zero for temp keys. Not allowed to
// later, because it messes with e.g. meta-data on disk.
private static int desired(long cache) { return (int)(cache>>>32)&0x00FF; }

private static long build_cache( int cidx, int home, int replica, int desired ) {
static int cloud( long cache ) { return (int)(cache>>> 0)&0x00FF; }
// Shortcut node index for Home.
// 'char' because I want an unsigned 16bit thing, limit of 65534 Cloud members.
// -1 is reserved for a bare-key
static int home ( long cache ) { return (int)(cache>>> 8)&0xFFFF; }

static long build_cache(int cidx, int home) {
return // Build the new cache word
((long)(cidx &0xFF)<< 0) |
((long)(home &0xFFFF)<< 8) |
((long)(replica&0xFF)<<24) |
((long)(desired&0xFF)<<32) |
((long)(0 )<<40);
((long) (cidx & 0xFF)) |
((long) (home & 0xFFFF) << 8);
}

int home ( H2O cloud ) { return home (cloud_info(cloud)); }
int replica( H2O cloud ) { return replica(cloud_info(cloud)); }
int desired( ) { return desired(_cache); }

/** True if the {@link #home_node} is the current node.
* @return True if the {@link #home_node} is the current node */
Expand Down Expand Up @@ -220,26 +206,12 @@ long cloud_info( H2O cloud ) {
// Cache missed! Probably it just needs (atomic) updating.
// But we might be holding the stale cloud...
// Figure out home Node in this Cloud
char home = (char)D(0);
// Figure out what replica # I am, if any
int desired = desired(x);
int replica = -1;
for( int i=0; i<desired; i++ ) {
int idx = D(i);
if( idx >= 0 && cloud._memary[idx] == H2O.SELF ) {
replica = i;
break;
}
}
long cache = build_cache(cloud._idx,home,replica,desired);
char home = (char)D();
long cache = build_cache(cloud._idx,home);
set_cache(cache); // Attempt to upgrade cache, but ignore failure
return cache; // Return the magic word for this Cloud
}

// Default desired replication factor. Unless specified otherwise, all new
// k-v pairs start with this replication factor.
static final byte DEFAULT_DESIRED_REPLICA_FACTOR = 1;

// Construct a new Key.
private Key(byte[] kb) {
_kb = kb;
Expand All @@ -257,17 +229,15 @@ private Key(byte[] kb) {
}

// Make new Keys. Optimistically attempt interning, but no guarantee.
static <P extends Keyed> Key<P> make(byte[] kb, byte rf) {
if( rf == -1 ) throw new IllegalArgumentException();
public static <P extends Keyed> Key<P> make(byte[] kb) {
Key key = new Key(kb);
Key key2 = H2O.getk(key); // Get the interned version, if any
if( key2 != null ) // There is one! Return it instead
return key2;

// Set the cache with desired replication factor, and a fake cloud index

H2O cloud = H2O.CLOUD; // Read once
key._cache = build_cache(cloud._idx-1,0,0,rf);
key.cloud_info(cloud); // Now compute & cache the real data
key._cache = build_cache(cloud._idx-1,0); // Build a dummy cache with a fake cloud index
key.cloud_info(cloud); // Now force compute & cache the real data
return key;
}

Expand All @@ -280,21 +250,17 @@ public static String rand() {
return "_"+Long.toHexString(l1)+Long.toHexString(l2);
}

/** Factory making a Key from a byte[]
* @return Desired Key */
public static <P extends Keyed> Key<P> make(byte[] kb) { return make(kb, DEFAULT_DESIRED_REPLICA_FACTOR); }

/** Factory making a Key from a String
* @return Desired Key */
public static <P extends Keyed> Key<P> make(String s) {
return make(decodeKeyName(s != null? s : rand()));
}

public static <P extends Keyed> Key<P> makeSystem(String s) {
return make(s,DEFAULT_DESIRED_REPLICA_FACTOR,BUILT_IN_KEY, false);
return make(s,BUILT_IN_KEY);
}
public static <P extends Keyed> Key<P> makeUserHidden(String s) {
return make(s,DEFAULT_DESIRED_REPLICA_FACTOR,HIDDEN_USER_KEY, false);
return make(s,HIDDEN_USER_KEY);
}

/**
Expand All @@ -303,58 +269,59 @@ public static <P extends Keyed> Key<P> makeUserHidden(String s) {
* @return the new key
*/
public static <P extends Keyed> Key<P> make(H2ONode node) {
return make(decodeKeyName(rand()),DEFAULT_DESIRED_REPLICA_FACTOR,BUILT_IN_KEY,false,node);
return make(decodeKeyName(rand()),BUILT_IN_KEY,false,node);
}
static <P extends Keyed> Key<P> make(String s, byte rf) { return make(decodeKeyName(s), rf);}
/** Factory making a random Key
* @return Desired Key */
public static <P extends Keyed> Key<P> make() { return make(rand()); }

/** Factory making a homed system Key. Requires the initial system byte but
* then allows a String for the remaining bytes. Requires a list of exactly
* one H2ONode to home at. The hint specifies if it is an error to name an
* H2ONode that is NOT in the Cloud, or if some other H2ONode can be
* substituted. The rf parameter and passing more than 1 H2ONode are both
* depreciated.
* then allows a String for the remaining bytes.
*
* Requires specifying the home node of the key. The required specifies
* if it is an error to name an H2ONode that is NOT in the Cloud, or if
* some other H2ONode can be substituted.
* @return the desired Key */
public static <P extends Keyed> Key<P> make(String s, byte systemType, boolean required, H2ONode home) {
return make(decodeKeyName(s),systemType,required,home);
}
/** Factory making a system Key. Requires the initial system byte but
* then allows a String for the remaining bytes.
* @return the desired Key */
public static <P extends Keyed> Key<P> make(String s, byte rf, byte systemType, boolean hint, H2ONode... replicas) {
return make(decodeKeyName(s),rf,systemType,hint,replicas);
public static <P extends Keyed> Key<P> make(String s, byte systemType) {
return make(decodeKeyName(s),systemType,false,null);
}
/** Factory making a homed system Key. Requires the initial system byte and
* uses {@link #rand} for the remaining bytes. Requires a list of exactly
* one H2ONode to home at. The hint specifies if it is an error to name an
* H2ONode that is NOT in the Cloud, or if some other H2ONode can be
* substituted. The rf parameter and passing more than 1 H2ONode are both
* depreciated.
* uses {@link #rand} for the remaining bytes.
*
* Requires specifying the home node of the key. The required specifies
* if it is an error to name an H2ONode that is NOT in the Cloud, or if
* some other H2ONode can be substituted.
* @return the desired Key */
public static <P extends Keyed> Key<P> make(byte rf, byte systemType, boolean hint, H2ONode... replicas) {
return make(rand(),rf,systemType,hint,replicas);
public static <P extends Keyed> Key<P> make(byte systemType, boolean required, H2ONode home) {
return make(rand(),systemType,required,home);
}


// Make a Key which is homed to specific nodes.
public static <P extends Keyed> Key<P> make(byte[] kb, byte rf, byte systemType, boolean required, H2ONode... replicas) {
// no more than 3 replicas allowed to be stored in the key
assert 0 <=replicas.length && replicas.length<=3;
assert systemType<32; // only system keys allowed
boolean inCloud=true;
for( H2ONode h2o : replicas ) if( !H2O.CLOUD.contains(h2o) ) inCloud = false;
if( required ) assert inCloud; // If required placement, error to find a client as the home
else if( !inCloud ) replicas = new H2ONode[0]; // If placement is a hint & cannot be placed, then ignore
private static <P extends Keyed> Key<P> make(byte[] kb, byte systemType, boolean required, H2ONode home) {
assert systemType < 32; // only system keys allowed
home = home != null && H2O.CLOUD.contains(home) ? home : null;
assert !required || home != null; // If homing is not required and home is not in cloud (or null), then ignore

// Key byte layout is:
// 0 - systemType, from 0-31
// 1 - replica-count, plus up to 3 bits for ip4 vs ip6
// 2-n - zero, one, two or 3 IP4 (4+2 bytes) or IP6 (16+2 bytes) addresses
// 1 - is the key homed to a specific node? (0 or 1)
// 2-n - if homed then IP4 (4+2 bytes) or IP6 (16+2 bytes) address
// 2-5- 4 bytes of chunk#, or -1 for masters
// n+ - repeat of the original kb
AutoBuffer ab = new AutoBuffer();
ab.put1(systemType).put1(replicas.length);
for( H2ONode h2o : replicas )
h2o.write(ab);
ab.put1(systemType);
ab.putZ(home != null);
if (home != null) {
home.write(ab);
}
ab.put4(-1);
ab.putA1(kb,kb.length);
return make(Arrays.copyOf(ab.buf(),ab.position()),rf);
ab.putA1(kb, kb.length);
return make(Arrays.copyOf(ab.buf(),ab.position()));
}

/**
Expand All @@ -374,10 +341,14 @@ public Futures remove(Futures fs) {
* @return True if a {@link #USER_KEY} and not a system key */
public boolean user_allowed() { return type()==USER_KEY; }

boolean custom_homed() {
return _kb[KEY_HEADER_CUSTOM_HOMED] == 1;
}

/** System type/byte of a Key, or the constant {@link #USER_KEY}
* @return Key type */
// Returns the type of the key.
public int type() { return ((_kb[0]&0xff)>=32) ? USER_KEY : (_kb[0]&0xff); }
public int type() { return ((_kb[KEY_HEADER_TYPE]&0xff)>=32) ? USER_KEY : (_kb[KEY_HEADER_TYPE]&0xff); }

/** Return the classname for the Value that this Key points to, if any (e.g., "water.fvec.Frame"). */
public String valueClass() {
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/rapids/BinaryMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ static Key getKeyForMSBComboPerCol(/*Frame leftFrame, Frame rightFrame,*/ int le
return Key.make("__binary_merge__Chunk_for_col" + col + "_batch" + batch
// + rightFrame._key.toString() + "_joined_with" + leftFrame._key.toString()
+ "_leftSB._msb" + leftMSB + "_riteSB._msb" + rightMSB,
(byte) 1, Key.HIDDEN_USER_KEY, false, SplitByMSBLocal.ownerOfMSB(rightMSB==-1 ? leftMSB : rightMSB)
Key.HIDDEN_USER_KEY, false, SplitByMSBLocal.ownerOfMSB(rightMSB==-1 ? leftMSB : rightMSB)
); //TODO home locally
}

Expand Down
6 changes: 3 additions & 3 deletions h2o-core/src/main/java/water/rapids/SplitByMSBLocal.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,12 @@ static H2ONode ownerOfMSB(int MSBvalue) {

static Key getNodeOXbatchKey(boolean isLeft, int MSBvalue, int node, int batch) {
return Key.make("__radix_order__NodeOXbatch_MSB" + MSBvalue + "_node" + node + "_batch" + batch + (isLeft ? "_LEFT" : "_RIGHT"),
(byte) 1, Key.HIDDEN_USER_KEY, false, SplitByMSBLocal.ownerOfMSB(MSBvalue));
Key.HIDDEN_USER_KEY, false, SplitByMSBLocal.ownerOfMSB(MSBvalue));
}

static Key getSortedOXbatchKey(boolean isLeft, int MSBvalue, int batch) {
return Key.make("__radix_order__SortedOXbatch_MSB" + MSBvalue + "_batch" + batch + (isLeft ? "_LEFT" : "_RIGHT"),
(byte) 1, Key.HIDDEN_USER_KEY, false, SplitByMSBLocal.ownerOfMSB(MSBvalue));
Key.HIDDEN_USER_KEY, false, SplitByMSBLocal.ownerOfMSB(MSBvalue));
}


Expand All @@ -216,7 +216,7 @@ static class OXbatch extends Iced {

static Key getMSBNodeHeaderKey(boolean isLeft, int MSBvalue, int node) {
return Key.make("__radix_order__OXNodeHeader_MSB" + MSBvalue + "_node" + node + (isLeft ? "_LEFT" : "_RIGHT"),
(byte) 1, Key.HIDDEN_USER_KEY, false, SplitByMSBLocal.ownerOfMSB(MSBvalue));
Key.HIDDEN_USER_KEY, false, SplitByMSBLocal.ownerOfMSB(MSBvalue));
}

static class MSBNodeHeader extends Iced {
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/test/java/water/AtomicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public Key makeKey(String n, boolean remote) {
H2O cloud = H2O.CLOUD;
H2ONode target = cloud._memary[0];
if( target == H2O.SELF ) target = cloud._memary[1];
return Key.make(n,(byte)1,Key.BUILT_IN_KEY,true,target);
return Key.make(n,Key.BUILT_IN_KEY,true,target);
}

// Simple wrapper class defining an array-of-keys that is serializable.
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/test/java/water/DKVTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testTatomic(){
System.out.println("iteration " + r);
try {
for (int i = 0; i < keys.length; ++i) // byte rf, byte systemType, boolean hint, H2ONode... replicas
DKV.put(keys[i] = Key.make((byte) 1, Key.HIDDEN_USER_KEY, true, H2O.CLOUD._memary[i]), new IcedInt(0));
DKV.put(keys[i] = Key.make(Key.HIDDEN_USER_KEY, true, H2O.CLOUD._memary[i]), new IcedInt(0));
new TestMM(keys).doAllNodes();
} finally {
for (Key k : keys)
Expand Down
6 changes: 6 additions & 0 deletions h2o-core/src/test/java/water/JobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public void tryGetDoneJob() {
assertEquals(t4 - t3, sleepMs - (t2 - t1), 20);
}

@Test
public void testDefaultJobKey() {
Key<Job> jobKey = new Job<>(Key.make(), Frame.class.getName(), "test")._key;
assertEquals(H2O.SELF, jobKey.home_node());
}

@Test
public void testBlockingWaitForDone() {
final Job<Frame> j = new Job<>(Key.make(), Frame.class.getName(), "Test Job");
Expand Down
Loading

0 comments on commit 85135c8

Please sign in to comment.