Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Merge remote-tracking branch 'apache/gsoc18' into gsoc-2018
  • Loading branch information
ahmedifhaam committed Aug 6, 2018
2 parents 393954a + dc59ba0 commit 74c5a1b1269bd3d110caa0c08179c8fce084c5c1
Showing 5 changed files with 90 additions and 53 deletions.
@@ -2,12 +2,15 @@ language: java
jdk:
- oraclejdk8
- oraclejdk9
- openjdk10
- openjdk11

#before_script:
# - pip install --user codecov
env:
- FILEMGR_URL=http://localhost:9000 WORKFLOW_URL=http://localhost:9001 RESMGR_URL=http://localhost:9002 SOLR_DRAT_URL=http://localhost:8080/solr/drat

#after_success:
# - codecov
before_install:
- export M2_HOME=/usr/local/maven
- export MAVEN_OPTS="-Dmaven.repo.local=$HOME/.m2/repository -Xms1024m -Xmx3072m -XX:PermSize=512m"

#addons:
# srcclr: true
script:
- mvn clean install
@@ -23,3 +23,6 @@ You can clone the wiki by running

Visit our new website [drat.apache.org](https://drat.apache.org/) at the [Apache Software Foundation](https://www.apache.org/).

---

Current build status: [![Build Status](https://travis-ci.org/apache/drat.svg?branch=master)](https://travis-ci.org/apache/drat)
@@ -12,7 +12,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<oodt.version>1.2.2</oodt.version>
<oodt.version>1.2.3-SNAPSHOT</oodt.version>
<junit.version>4.12</junit.version>
</properties>

@@ -70,9 +70,9 @@ public class ProcessDratWrapper extends GenericProcess
private static final String MAP_CMD = "map";
private static final String REDUCE_CMD = "reduce";
private static final String STATUS_IDLE = "idle";
private static final String PARTITION_AND_MAP_TASK_ID = "urn:drat:MimePartitioner";
private static final String MAPPER_TASK_ID = "urn:drat:RatCodeAudit";
private static final String REDUCE_TASK_ID = "urn:drat:RatAggregator";
protected static final String PARTITION_AND_MAP_TASK_ID = "urn:drat:MimePartitioner";
protected static final String MAPPER_TASK_ID = "urn:drat:RatCodeAudit";
protected static final String REDUCE_TASK_ID = "urn:drat:RatAggregator";
private static final String[] WIPE_TYPES = { "RatLog", "GenericFile",
"RatAggregateLog" };

@@ -180,18 +180,18 @@ public void map() {
@Override
public void reduce() throws IOException {
setStatus(REDUCE_CMD);
DratLog mapLog = new DratLog("REDUCING");
DratLog reduceLog = new DratLog("REDUCING");
WorkflowRestResource restResource = new WorkflowRestResource();
DynamicWorkflowRequestWrapper requestBody = new DynamicWorkflowRequestWrapper();
requestBody.taskIds = new ArrayList<>();
requestBody.taskIds.add(REDUCE_TASK_ID);
LOG.info("STARTING REDUCING");
mapLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID);
reduceLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID);
String resp = (String)restResource.performDynamicWorkFlow(requestBody);
if(resp.equals("OK")) {
mapLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow");
reduceLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow");
}else {
mapLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp);
reduceLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp);
throw new IOException(resp);
}
}
@@ -260,15 +260,19 @@ public void go() throws Exception {
this.map();

// don't run reduce until all maps are done
while (mapsStillRunning()) {
while (stillRunning(PARTITION_AND_MAP_TASK_ID) || stillRunning(MAPPER_TASK_ID)) {
Thread.sleep(DRAT_PROCESS_WAIT_DURATION);
LOG.info("MAP STILL RUNNING");
}
// you're not done until the final log is generated.
while (!hasAggregateRatLog()) {
try {
reduce();
LOG.info("REDUCE STILL RUNNING");
if (!stillRunning(REDUCE_TASK_ID)) {
reduce();
}
else {
LOG.info("REDUCE STILL RUNNING.");
}
} catch (IOException e) {
LOG.warning("Fired reduce off before mappers were done. Sleeping: ["
+ String.valueOf(DRAT_PROCESS_WAIT_DURATION / 1000)
@@ -290,15 +294,16 @@ private synchronized boolean hasAggregateRatLog() {
+ "]: " + breakStatus);
return numLogs > 0;
}

private boolean stillRunning(String taskId) throws Exception {
WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances();
for(WorkflowInstance instance : workflowInstances){
LOG.info("Running Instances : id: "+instance.getId()
+" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName());
}
return taskStillRunning(workflowInstances, taskId);

private boolean mapsStillRunning() throws Exception {
WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances();
for(WorkflowInstance instance : workflowInstances){
LOG.info("Running Instances : id: "+instance.getId()
+" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName());
}
return stillRunning(workflowInstances);
}

@VisibleForTesting
@@ -343,36 +348,40 @@ protected List<WorkflowItem> parseWorkflows(String cmdOutput) {
}
return items;
}

@VisibleForTesting
protected boolean stillRunning(List<WorkflowInstance> instances) {
List<WorkflowInstance> mapperInstances = filterMappers(instances);
LOG.info("Checking mappers: inspecting ["
+ String.valueOf(mapperInstances.size()) + "] mappers.");
for (WorkflowInstance mapperInstance : mapperInstances) {
if (isRunning(mapperInstance.getState().getName())) {
LOG.info("Mapper: [" + mapperInstance.getId() + "] still running.");
return true;

protected boolean taskStillRunning(List<WorkflowInstance> instances, String ...taskIds) {
if (taskIds != null && taskIds.length > 0) {
for(String taskId: taskIds) {
List<WorkflowInstance> insts = filterInstances(instances, taskId);
LOG.info("Checking task: "+taskId+" : inspecting ["+String.valueOf(instances.size())+"] tasks.");
for(WorkflowInstance i: insts) {
if(isRunning(i.getState().getName())) {
LOG.info("Task: [" + i.getId() + "] still running.");
return true;
}
}
}
}

return false;
}

@VisibleForTesting
protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
List<WorkflowInstance> mappers = new ArrayList<>();
if(instances!=null && instances.size()>0){
for(WorkflowInstance instance:instances){
if(instance.getCurrentTask().getTaskId().equals(MAPPER_TASK_ID)){
LOG.info("Adding mapper: [" + instance.getCurrentTask().getTaskId() + "]");
mappers.add(instance);
}else{
LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
}
}
}
return mappers;
}

@VisibleForTesting
protected List<WorkflowInstance> filterInstances(List<WorkflowInstance> instances, String taskId){
List<WorkflowInstance> insts = new ArrayList<>();
if(instances!=null && instances.size()>0){
for(WorkflowInstance instance:instances){
if(instance.getCurrentTask().getTaskId().equals(taskId)){
LOG.info("Adding "+taskId+" instance: [" + instance.getCurrentTask().getTaskId() + "]");
insts.add(instance);
}else{
LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
}
}
}
return insts;
}


@VisibleForTesting
protected boolean isRunning(String status) {
@@ -21,6 +21,8 @@
import java.util.List;
import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
import backend.ProcessDratWrapper;
import static backend.ProcessDratWrapper.MAPPER_TASK_ID;
import static backend.ProcessDratWrapper.PARTITION_AND_MAP_TASK_ID;
import junit.framework.TestCase;

public class TestProcessDratWrapper extends TestCase {
@@ -52,7 +54,27 @@ public void testStillRunning(){
for(WorkflowItem wi: items) {
insts.add(wi.toInstance());
}
assertTrue(wrapper.stillRunning(insts));
assertTrue(wrapper.taskStillRunning(insts, PARTITION_AND_MAP_TASK_ID, MAPPER_TASK_ID));
}

public void testFilterPartitioners(){
ProcessDratWrapper wrapper = ProcessDratWrapper.getInstance();
assertNotNull(wrapper);
String cmdLines = "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=FINISHED, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" +
"Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" +
"Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:RatCodeAudit, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]";

List<WorkflowItem> items = null;
items = wrapper.parseWorkflows(cmdLines);
assertNotNull(items);
List<WorkflowInstance> insts = new ArrayList<WorkflowInstance>(items.size());
for(WorkflowItem wi: items) {
insts.add(wi.toInstance());
}
List<WorkflowInstance> partitioners = null;
partitioners = wrapper.filterInstances(insts, PARTITION_AND_MAP_TASK_ID);
assertNotNull(partitioners);
assertEquals(2, partitioners.size());
}

public void testFilterMappers(){
@@ -70,7 +92,7 @@ public void testFilterMappers(){
insts.add(wi.toInstance());
}
List<WorkflowInstance> mappers = null;
mappers = wrapper.filterMappers(insts);
mappers = wrapper.filterInstances(insts, MAPPER_TASK_ID);
assertNotNull(mappers);
assertEquals(1, mappers.size());
}

0 comments on commit 74c5a1b

Please sign in to comment.