Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Kurka committed Apr 26, 2022
1 parent f73418d commit 5092667
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 46 deletions.
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public final void update( final long newworked, final String msg) {

// --------------
/** A system key for global list of Job keys. */
public static final Key<Job> LIST = Key.make(" JobList", Key.BUILT_IN_KEY, false);
public static final Key<Job> LIST = Key.make(" JobList", Key.BUILT_IN_KEY);

public String[] warns() {
update_from_remote();
Expand Down
88 changes: 43 additions & 45 deletions h2o-core/src/main/java/water/Key.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,15 @@ public final T get() {
return val == null ? null : (T)val.get();
}

// *Desired* distribution function on keys & replication factor. Replica #0
// is the master, replica #1, 2, 3, etc represent additional desired
// replication nodes. Note that this function is just the distribution
// function - it does not DO any replication, nor does it dictate any policy
// on how fast replication occurs. Returns -1 if the desired replica
// is nonsense, e.g. asking for replica #3 in a 2-Node system.
// *Desired* distribution function on keys
int D() {
int hsz = H2O.CLOUD.size();

if (0 == hsz) return -1; // Clients starting up find no cloud, be unable to home keys

// See if this is a specifically homed Key
if( !user_allowed() && 0 < _kb[1] ) { // Asking for a replica# from the homed list?
assert _kb[0] != Key.CHK;
if (!user_allowed() && _kb[1] == 1) {
assert _kb[0] != Key.CHK; // Chunks cannot be custom-homed
H2ONode h2o = H2ONode.intern(_kb,2);
// Reverse the home to the index
int idx = h2o.index();
Expand Down Expand Up @@ -160,9 +155,9 @@ int D() {
// changes atomically with the _cache word, so we can tell which Cloud this
// data is a cache of.
private static int cloud( long cache ) { return (int)(cache>>> 0)&0x00FF; }
// Shortcut node index for Home replica#0. This replica is responsible for
// breaking ties on writes. 'char' because I want an unsigned 16bit thing,
// limit of 65534 Cloud members. -1 is reserved for a bare-key
// Shortcut node index for Home.
// 'char' because I want an unsigned 16bit thing, limit of 65534 Cloud members.
// -1 is reserved for a bare-key
private static int home ( long cache ) { return (int)(cache>>> 8)&0xFFFF; }

private static long build_cache(int cidx, int home) {
Expand Down Expand Up @@ -238,11 +233,10 @@ public static <P extends Keyed> Key<P> make(byte[] kb) {
Key key2 = H2O.getk(key); // Get the interned version, if any
if( key2 != null ) // There is one! Return it instead
return key2;

// Set the cache with desired replication factor, and a fake cloud index

H2O cloud = H2O.CLOUD; // Read once
key._cache = build_cache(cloud._idx-1,0);
key.cloud_info(cloud); // Now compute & cache the real data
key._cache = build_cache(cloud._idx-1,0); // Build a dummy cache with a fake cloud index
key.cloud_info(cloud); // Now force compute & cache the real data
return key;
}

Expand All @@ -262,10 +256,10 @@ public static <P extends Keyed> Key<P> make(String s) {
}

public static <P extends Keyed> Key<P> makeSystem(String s) {
return make(s,BUILT_IN_KEY, false);
return make(s,BUILT_IN_KEY);
}
public static <P extends Keyed> Key<P> makeUserHidden(String s) {
return make(s,HIDDEN_USER_KEY, false);
return make(s,HIDDEN_USER_KEY);
}

/**
Expand All @@ -279,49 +273,53 @@ public static <P extends Keyed> Key<P> make(H2ONode node) {
public static <P extends Keyed> Key<P> make() { return make(rand()); }

/** Factory making a homed system Key. Requires the initial system byte but
* then allows a String for the remaining bytes. Requires a list of exactly
* one H2ONode to home at. The hint specifies if it is an error to name an
* H2ONode that is NOT in the Cloud, or if some other H2ONode can be
* substituted. The rf parameter and passing more than 1 H2ONode are both
* depreciated.
* then allows a String for the remaining bytes.
*
* Requires specifying the home node of the key. The required specifies
* if it is an error to name an H2ONode that is NOT in the Cloud, or if
* some other H2ONode can be substituted.
* @return the desired Key */
public static <P extends Keyed> Key<P> make(String s, byte systemType, boolean hint, H2ONode... replicas) {
return make(decodeKeyName(s),systemType,hint,replicas);
public static <P extends Keyed> Key<P> make(String s, byte systemType, boolean required, H2ONode home) {
return make(decodeKeyName(s),systemType,required,home);
}
/** Factory making a system Key. Requires the initial system byte but
* then allows a String for the remaining bytes.
* @return the desired Key */
public static <P extends Keyed> Key<P> make(String s, byte systemType) {
return make(decodeKeyName(s),systemType,false,null);
}
/** Factory making a homed system Key. Requires the initial system byte and
* uses {@link #rand} for the remaining bytes. Requires a list of exactly
* one H2ONode to home at. The hint specifies if it is an error to name an
* H2ONode that is NOT in the Cloud, or if some other H2ONode can be
* substituted. The rf parameter and passing more than 1 H2ONode are both
* depreciated.
* uses {@link #rand} for the remaining bytes.
*
* Requires specifying the home node of the key. The required specifies
* if it is an error to name an H2ONode that is NOT in the Cloud, or if
* some other H2ONode can be substituted.
* @return the desired Key */
public static <P extends Keyed> Key<P> make(byte systemType, boolean hint, H2ONode... replicas) {
return make(rand(),systemType,hint,replicas);
public static <P extends Keyed> Key<P> make(byte systemType, boolean required, H2ONode home) {
return make(rand(),systemType,required,home);
}


// Make a Key which is homed to specific nodes.
public static <P extends Keyed> Key<P> make(byte[] kb, byte systemType, boolean required, H2ONode... replicas) {
// no more than 3 replicas allowed to be stored in the key
assert 0 <=replicas.length && replicas.length<=3;
assert systemType<32; // only system keys allowed
boolean inCloud=true;
for( H2ONode h2o : replicas ) if( !H2O.CLOUD.contains(h2o) ) inCloud = false;
if( required ) assert inCloud; // If required placement, error to find a client as the home
else if( !inCloud ) replicas = new H2ONode[0]; // If placement is a hint & cannot be placed, then ignore
private static <P extends Keyed> Key<P> make(byte[] kb, byte systemType, boolean required, H2ONode home) {
assert systemType < 32; // only system keys allowed
home = home != null && H2O.CLOUD.contains(home) ? home : null;
assert !required || home != null; // If homing is not required and home is not in cloud (or null), then ignore

// Key byte layout is:
// 0 - systemType, from 0-31
// 1 - replica-count, plus up to 3 bits for ip4 vs ip6
// 2-n - zero, one, two or 3 IP4 (4+2 bytes) or IP6 (16+2 bytes) addresses
// 1 - is homed to a specific node (0 or 1)
// 2-n - if homed then IP4 (4+2 bytes) or IP6 (16+2 bytes) address
// 2-5- 4 bytes of chunk#, or -1 for masters
// n+ - repeat of the original kb
AutoBuffer ab = new AutoBuffer();
ab.put1(systemType).put1(replicas.length);
for( H2ONode h2o : replicas )
h2o.write(ab);
ab.put1(systemType);
ab.putZ(home != null);
if (home != null) {
home.write(ab);
}
ab.put4(-1);
ab.putA1(kb,kb.length);
ab.putA1(kb, kb.length);
return make(Arrays.copyOf(ab.buf(),ab.position()));
}

Expand Down

0 comments on commit 5092667

Please sign in to comment.