Skip to content

Commit

Permalink
Merge pull request #142 from ryannk/batchProcessing
Browse files Browse the repository at this point in the history
Batch processing
  • Loading branch information
Daniel Khashabi committed Apr 6, 2016
2 parents c824066 + b295d8b commit ec40b4e
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions core/app/actors/JobProcessingActor.java
Expand Up @@ -8,6 +8,8 @@
import akka.actor.Props;
import akka.util.Timeout;
import akka.actor.UntypedActor;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import edu.illinois.cs.cogcomp.core.datastructures.textannotation.TextAnnotation;
import edu.illinois.cs.cogcomp.core.experiments.ClassificationTester;
import edu.illinois.cs.cogcomp.core.experiments.evaluators.Evaluator;
Expand All @@ -33,10 +35,18 @@ public class JobProcessingActor extends UntypedActor {
private int completed;
private int total;
private int skipped;
private int learnerTimeout;
private String conf_id;
private String record_id;
private String url;

Config conf;

public JobProcessingActor() {
conf = ConfigFactory.load();
learnerTimeout = conf.getInt("learner.default.timeout");
}

/**
* When a StartJobMessage is received, the corresponding Job is extracted
* and the unprocessed instances are sent and received one at a time to the
Expand Down Expand Up @@ -94,9 +104,10 @@ public void invoke(LearnerInstancesResponse learnerInstancesResponse) throws Thr
System.out.println(String.format("Completed batch of size %s", batchSize));
}
});
response.get(5000);
response.get(learnerTimeout);
if (killCheckCounter == 5) {
if (killCommandHasBeenSent()) {
System.out.println("Exiting");
break;
}
killCheckCounter = 1;
Expand All @@ -105,7 +116,7 @@ public void invoke(LearnerInstancesResponse learnerInstancesResponse) throws Thr
}
}
} catch (Exception ex) {
System.out.println("Error sending and receiving text annotations");
System.out.println("Error sending and receiving text annotations" + ex.getMessage());
Core.storeResultsOfRunInDatabase(eval, record_id, false);
}
System.out.println("Done");
Expand All @@ -114,7 +125,7 @@ public void invoke(LearnerInstancesResponse learnerInstancesResponse) throws Thr
}

private boolean killCommandHasBeenSent() throws Exception {
Timeout timeout = new Timeout(Duration.create(5, "seconds"));
Timeout timeout = new Timeout(learnerTimeout);
Future<Object> masterResponse = ask(getSender(), new KillStatus(false, record_id), 5000);
Object result = (Object) Await.result(masterResponse, timeout.duration());
return (result instanceof KillStatus);
Expand Down

0 comments on commit ec40b4e

Please sign in to comment.