Skip to content

Commit

Permalink
fixed merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
dshine2 committed Apr 6, 2016
2 parents 2d5a024 + ec40b4e commit eb140d8
Show file tree
Hide file tree
Showing 24 changed files with 621 additions and 326 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Expand Up @@ -10,5 +10,4 @@ RUNNING_PID
project/project/target/*
target/.history
target/
*.lc
*.lex
/models
2 changes: 1 addition & 1 deletion build.sbt
@@ -1,4 +1,4 @@
val cogcompNLPVersion = "3.0.21"
val cogcompNLPVersion = "3.0.23"
lazy val root = (project in file(".")).
aggregate(core, learner)

Expand Down
90 changes: 62 additions & 28 deletions core/app/actors/JobProcessingActor.java
@@ -1,18 +1,23 @@
package actors;

import java.util.ArrayList;
import java.util.List;

import actors.Messages.*;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.util.Timeout;
import akka.actor.UntypedActor;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import edu.illinois.cs.cogcomp.core.datastructures.textannotation.TextAnnotation;
import edu.illinois.cs.cogcomp.core.experiments.ClassificationTester;
import edu.illinois.cs.cogcomp.core.experiments.evaluators.Evaluator;
import edu.illinois.cs.cogcomp.core.utilities.SerializationHelper;
import models.Job;
import controllers.Core;
import models.LearnerInstancesResponse;
import models.LearnerSettings;
import play.libs.F;
import play.libs.F.Promise;
import play.libs.ws.WSResponse;
Expand All @@ -22,17 +27,26 @@

import static akka.pattern.Patterns.ask;


public class JobProcessingActor extends UntypedActor {

public static Props props = Props.create(JobProcessingActor.class);

private int completed;
private int total;
private int skipped;
private int learnerTimeout;
private String conf_id;
private String record_id;
private String url;

Config conf;

public JobProcessingActor() {
conf = ConfigFactory.load();
learnerTimeout = conf.getInt("learner.default.timeout");
}

/**
* When a StartJobMessage is received, the corresponding Job is extracted
* and the unprocessed instances are sent and received one at a time to the
Expand All @@ -47,6 +61,7 @@ public void onReceive(Object message) throws Exception {
this.conf_id = jobInfo.getConf_id();
this.record_id = jobInfo.getRecord_id();
this.url = jobInfo.getUrl();
LearnerSettings learnerSettings = jobInfo.getLearnerSettings();
Job job = Core.setUpJob(conf_id, url, record_id);
Evaluator evaluator = Core.getEvaluator(conf_id);
ClassificationTester eval = new ClassificationTester();
Expand All @@ -59,50 +74,69 @@ public void onReceive(Object message) throws Exception {
System.out.println("Created Job Processor Worker");
System.out.println("Sending and recieving annotations:");
try {
for (int i = 0; i < unprocessedInstances.size(); i++) {
Promise<WSResponse> response = job.sendAndReceiveRequestFromSolver(unprocessedInstances.get(i));
TextAnnotation goldInstance = goldInstances.get(i);
response.onRedeem(new F.Callback<WSResponse>() {
int maxBatchSize = learnerSettings.maxNumInstancesAccepted;
int killCheckCounter = 1;
for (int startIndex = 0; startIndex < unprocessedInstances.size(); startIndex+=maxBatchSize) {
int batchSize = Math.min(maxBatchSize, unprocessedInstances.size() - startIndex);
List<TextAnnotation> batch = makeBatch(unprocessedInstances, startIndex, batchSize);
Promise<LearnerInstancesResponse> response = job.sendAndReceiveRequestsFromSolver(batch);

int batchStartIndex = startIndex;
response.onRedeem(new F.Callback<LearnerInstancesResponse>() {
@Override
public void invoke(WSResponse wsResponse) throws Throwable {
TextAnnotation predictedInstance;
try {
String resultJson = wsResponse.getBody();
predictedInstance = SerializationHelper.deserializeFromJson(resultJson);
} catch (Exception e) {
System.out.println(e);
skipped++;
getSender().tell(new StatusUpdate(completed, skipped, total, record_id), getSelf());
return;
public void invoke(LearnerInstancesResponse learnerInstancesResponse) throws Throwable {
for(int batchIndex = 0;batchIndex<batchSize;batchIndex++){
if (learnerInstancesResponse.textAnnotations[batchIndex] != null){
TextAnnotation goldInstance = goldInstances.get(batchStartIndex + batchIndex);
Core.evaluate(evaluator, eval, goldInstance, learnerInstancesResponse.textAnnotations[batchIndex]);
completed++;
} else {
skipped++;
}
}
Core.evaluate(evaluator, eval, goldInstance, predictedInstance);
completed++;
master.tell(new StatusUpdate(completed, skipped, total, record_id), getSelf());

//System.out.println("Completed(worker):" + completed);
if(completed+skipped < total)
Core.storeResultsOfRunInDatabase(eval, record_id, true);
else
Core.storeResultsOfRunInDatabase(eval, record_id, false);

master.tell(new StatusUpdate(completed, skipped, total, record_id), getSelf());
System.out.println(String.format("Completed batch of size %s", batchSize));
}
});
response.get(5000);
if (i % 5 == 0) {
Timeout timeout = new Timeout(Duration.create(5, "seconds"));
Future<Object> masterResponse = ask(getSender(), new KillStatus(false, record_id), 5000);
Object result = (Object) Await.result(masterResponse, timeout.duration());
if (result instanceof KillStatus) {
break;
}
response.get(learnerTimeout);
if (killCheckCounter == 5) {
if (killCommandHasBeenSent()) {
System.out.println("Exiting");
break;
}
killCheckCounter = 1;
} else {
killCheckCounter++;
}
}
} catch (Exception ex) {
System.out.println("Error sending and receiving text annotations");
System.out.println("Error sending and receiving text annotations" + ex.getMessage());
Core.storeResultsOfRunInDatabase(eval, record_id, false);
}
System.out.println("Done");
} else
unhandled(message);
}

private boolean killCommandHasBeenSent() throws Exception {
Timeout timeout = new Timeout(learnerTimeout);
Future<Object> masterResponse = ask(getSender(), new KillStatus(false, record_id), 5000);
Object result = (Object) Await.result(masterResponse, timeout.duration());
return (result instanceof KillStatus);
}

private List<TextAnnotation> makeBatch(List<TextAnnotation> unprocessedInstances, int startIndex, int batchSize){
List<TextAnnotation> batch = new ArrayList<>();
for (int i=0;i<batchSize;i++){
batch.add(unprocessedInstances.get(startIndex + i));
}
return batch;
}

}
161 changes: 83 additions & 78 deletions core/app/actors/Messages.java
@@ -1,89 +1,94 @@
package actors;

import models.Job;
import models.LearnerSettings;

public class Messages {

public static class SetUpJobMessage {
private String conf_id;
private String url;
private String record_id;

public SetUpJobMessage(String conf_id, String url, String record_id) {
super();
this.conf_id = conf_id;
this.url = url;
this.record_id = record_id;
}

public String getConf_id() {
return conf_id;
}

public String getUrl() {
return url;
}

public String getRecord_id() {
return record_id;
}
}

public static class StartJobMessage {
private Job job;

public StartJobMessage(Job job) {
super();
this.job = job;
}

public Job getJob() {
return job;
}
}

public static class StatusUpdate {
private int completed;
private int skipped;
private int total;
private String record_id;
public static class SetUpJobMessage {
private String conf_id;
private String url;
private String record_id;
private LearnerSettings learnerSettings;

public SetUpJobMessage(String conf_id, String url, String record_id, LearnerSettings learnerSettings) {
super();
this.conf_id = conf_id;
this.url = url;
this.record_id = record_id;
this.learnerSettings = learnerSettings;
}

public String getConf_id() {
return conf_id;
}

public String getUrl() {
return url;
}

public String getRecord_id() {
return record_id;
}

public LearnerSettings getLearnerSettings() { return learnerSettings; }
}

public static class StartJobMessage {
private Job job;

public StartJobMessage(Job job) {
super();
this.job = job;
}

public Job getJob() {
return job;
}
}

public static class StatusUpdate {
private int completed;
private int skipped;
private int total;
private String record_id;

public StatusUpdate(int completed, int skipped, int total, String record_id) {
this.completed = completed;
this.skipped = skipped;
this.total = total;
this.record_id = record_id;
}

public int getTotal() {
return total;
}

public int getCompleted() {
return completed;
}

public int getSkipped() {
return skipped;
}

public String getRecord_id() {
return record_id;
}
}

public static class StatusRequest {
private String record_id;

public StatusRequest(String record_id) {
this.record_id = record_id;
}

public StatusUpdate(int completed, int skipped, int total, String record_id) {
this.completed = completed;
this.skipped = skipped;
this.total = total;
this.record_id = record_id;
}

public int getTotal() {
return total;
}

public int getCompleted() {
return completed;
}

public int getSkipped() {
return skipped;
}

public String getRecord_id() {
return record_id;
}
}

public static class StatusRequest {
private String record_id;

public StatusRequest(String record_id) {
this.record_id = record_id;
}

public String getRecord_id() {
return record_id;
}
}

public static class StopRunMessage {
private String record_id;

Expand All @@ -93,18 +98,18 @@ public StopRunMessage(String record_id) {

public String getRecord_id() {
return record_id;
}
}
}

public static class KillStatus {
private Boolean kill;
private String record_id;

public KillStatus(Boolean kill, String record_id) {
this.kill = kill;
this.record_id = record_id;
}

public String getRecord_id() {
return record_id;
}
Expand Down

0 comments on commit eb140d8

Please sign in to comment.