Skip to content

Commit

Permalink
rename fields on processinfo. added load id.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 27, 2014
1 parent 2e66a84 commit 2fd71ca
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 9 deletions.
Expand Up @@ -66,7 +66,7 @@ public String toString() {

private Status status = Status.NEW;

private long currentDataCount;
private long currentBatchDataCount;

private long dataCount = -1;

Expand All @@ -79,6 +79,10 @@ public String toString() {
private String currentTableName;

private transient Thread thread;

private Date currentBatchStartTime;

private long currentLoadId;

private Date startTime = new Date();

Expand Down Expand Up @@ -127,12 +131,12 @@ public void setStatus(Status status) {
}
}

public long getCurrentDataCount() {
return currentDataCount;
public long getCurrentBatchDataCount() {
return currentBatchDataCount;
}

public void setCurrentDataCount(long dataCount) {
this.currentDataCount = dataCount;
public void setCurrentBatchDataCount(long dataCount) {
this.currentBatchDataCount = dataCount;
}

public long getBatchCount() {
Expand All @@ -144,7 +148,7 @@ public void setBatchCount(long batchCount) {
}

public void incrementCurrentDataCount() {
this.currentDataCount++;
this.currentBatchDataCount++;
}

public void incrementBatchCount() {
Expand All @@ -157,6 +161,17 @@ public long getCurrentBatchId() {

public void setCurrentBatchId(long currentBatchId) {
this.currentBatchId = currentBatchId;
this.currentBatchStartTime = new Date();
this.currentBatchDataCount = 0;
this.currentLoadId = 0;
}

public void setCurrentLoadId(long loadId) {
this.currentLoadId = loadId;
}

public long getCurrentLoadId() {
return currentLoadId;
}

public String getCurrentChannelId() {
Expand Down Expand Up @@ -206,6 +221,18 @@ public void setDataCount(long dataCount) {
public long getDataCount() {
return dataCount;
}

public Date getCurrentBatchStartTime() {
if (currentBatchStartTime == null) {
return startTime;
} else {
return currentBatchStartTime;
}
}

public void setCurrentBatchStartTime(Date currentBatchStartTime) {
this.currentBatchStartTime = currentBatchStartTime;
}

@Override
public String toString() {
Expand Down
Expand Up @@ -38,7 +38,7 @@ public ProcessInfoDataWriter(IDataWriter targetWriter, ProcessInfo processInfo)

public void open(DataContext context) {
super.open(context);
processInfo.setCurrentDataCount(0);
processInfo.setCurrentBatchDataCount(0);
processInfo.setBatchCount(0);
}

Expand Down
Expand Up @@ -483,6 +483,7 @@ protected List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
processInfo.setDataCount(currentBatch.getDataEventCount());
processInfo.incrementBatchCount();
processInfo.setCurrentBatchId(currentBatch.getBatchId());
processInfo.setCurrentLoadId(currentBatch.getLoadId());

currentBatch = requeryIfEnoughTimeHasPassed(batchesSelectedAtMs, currentBatch);

Expand Down
Expand Up @@ -160,7 +160,7 @@ public ProcessInfo newProcessInfo(ProcessInfoKey key) {
old.toString());
}

if (old.getCurrentDataCount() > 0) {
if (old.getCurrentBatchDataCount() > 0) {
processInfosThatHaveDoneWork.put(key, old);
}
}
Expand Down Expand Up @@ -196,7 +196,7 @@ public List<ProcessInfo> getProcessInfosThatHaveDoneWork() {
while (i.hasNext()) {
boolean addedOneThatDidWork = false;
ProcessInfo info = i.next().copy();
if (info.getStatus() == ProcessInfo.Status.DONE && info.getCurrentDataCount() == 0) {
if (info.getStatus() == ProcessInfo.Status.DONE && info.getCurrentBatchDataCount() == 0) {
ProcessInfo lastThatDidWork = processInfosThatHaveDoneWork.get(info.getKey());
if (lastThatDidWork != null) {
toReturn.add(lastThatDidWork);
Expand Down

0 comments on commit 2fd71ca

Please sign in to comment.