Skip to content

Commit

Permalink
Fix for PUBDEV-1218. Add shared progress key so SVD progress propagat…
Browse files Browse the repository at this point in the history
…ed to PCA frontend.
  • Loading branch information
anqi committed Jun 2, 2015
1 parent 59f6676 commit 519f232
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
22 changes: 21 additions & 1 deletion h2o-algos/src/main/java/hex/pca/PCA.java
Expand Up @@ -218,7 +218,7 @@ protected void computeStatsFillModel(PCAModel pca, SVDModel svd) {
SVDModel svd = null;
SVD job = null;
try {
job = new SVD(parms);
job = new EmbeddedSVD(parms, _progressKey);
svd = job.trainModel().get();
} finally {
if (job != null) job.remove();
Expand Down Expand Up @@ -255,3 +255,23 @@ Key self() {
}
}
}

class EmbeddedSVD extends SVD {

final private Key sharedProgressKey;

public EmbeddedSVD(SVDModel.SVDParameters parms, Key sharedProgressKey) {
super(parms);
this.sharedProgressKey = sharedProgressKey;
}

@Override
protected Key createProgressKey() {
return sharedProgressKey != null ? sharedProgressKey : super.createProgressKey();
}

@Override
protected boolean deleteProgressKey() {
return false;
}
}
12 changes: 9 additions & 3 deletions h2o-core/src/main/java/water/Job.java
Expand Up @@ -133,7 +133,8 @@ public Job(Key<T> dest, String desc) {
* @see H2OCountedCompleter
*/
public Job<T> start(final H2OCountedCompleter fjtask, long work) {
DKV.put(_progressKey = Key.make(), new Progress(work));
// FIXME: Do not override shared progress key
DKV.put(_progressKey = createProgressKey(), new Progress(work));
assert _state == JobState.CREATED : "Trying to run job which was already run?";
assert fjtask != null : "Starting a job with null working task is not permitted!";
assert fjtask.getCompleter() == null : "Cannot have a completer; this must be a top-level task";
Expand Down Expand Up @@ -175,6 +176,10 @@ public Job<T> start(final H2OCountedCompleter fjtask, long work) {
return this;
}

protected Key createProgressKey() { return Key.make(); }

protected boolean deleteProgressKey() { return true; }

/** Blocks and get result of this job.
* <p>
* This call blocks on working task which was passed via {@link #start}
Expand Down Expand Up @@ -246,7 +251,8 @@ private void changeJobState(final String msg, final JobState resultingState) {
_end_time = done;
}
// Remove on cancel/fail/done, only used whilst Job is Running
DKV.remove(_progressKey);
if (deleteProgressKey())
DKV.remove(_progressKey);
}

/**
Expand Down Expand Up @@ -335,7 +341,7 @@ public static class ProgressUpdate extends TAtomic<Progress> {
public static class JobCancelledException extends RuntimeException{}

@Override protected Futures remove_impl(Futures fs) {
if (null != _progressKey) DKV.remove(_progressKey, fs);
if (null != _progressKey && deleteProgressKey()) DKV.remove(_progressKey, fs);
return fs;
}

Expand Down

0 comments on commit 519f232

Please sign in to comment.