Skip to content

Commit

Permalink
merging
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Dec 6, 2018
2 parents 29f29b0 + 9343a2d commit 5feba15
Show file tree
Hide file tree
Showing 20 changed files with 849 additions and 73 deletions.
Expand Up @@ -42,6 +42,15 @@ public enum ExtractStatus {
private Date lastUpdateTime;
private Date createTime;
private String queue;
private long loadId;
private String tableName;
private long rows;
private long transferredRows;
private long loadedRows;
private long lastTransferredBatchId;
private long lastLoadedBatchId;
private long transferredMillis;
private long loadedMillis;

public long getRequestId() {
return requestId;
Expand Down Expand Up @@ -131,4 +140,77 @@ public void setRouterId(String routerId) {
this.routerId = routerId;
}

public long getLoadId() {
return loadId;
}

public void setLoadId(long loadId) {
this.loadId = loadId;
}

public String getTableName() {
return tableName;
}

public void setTableName(String tableName) {
this.tableName = tableName;
}

public long getRows() {
return rows;
}

public void setRows(long rows) {
this.rows = rows;
}

public long getTransferredRows() {
return transferredRows;
}

public void setTransferredRows(long transferredRows) {
this.transferredRows = transferredRows;
}

public long getLoadedRows() {
return loadedRows;
}

public void setLoadedRows(long loadedRows) {
this.loadedRows = loadedRows;
}

public long getLastTransferredBatchId() {
return lastTransferredBatchId;
}

public void setLastTransferredBatchId(long lastTransferredBatchId) {
this.lastTransferredBatchId = lastTransferredBatchId;
}

public long getLastLoadedBatchId() {
return lastLoadedBatchId;
}

public void setLastLoadedBatchId(long lastLoadedBatchId) {
this.lastLoadedBatchId = lastLoadedBatchId;
}

public long getTransferredMillis() {
return transferredMillis;
}

public void setTransferredMillis(long transferredMillis) {
this.transferredMillis = transferredMillis;
}

public long getLoadedMillis() {
return loadedMillis;
}

public void setLoadedMillis(long loadedMillis) {
this.loadedMillis = loadedMillis;
}


}
Expand Up @@ -39,12 +39,26 @@ public class TableReloadRequest {
protected Date createTime = new Date();
protected Date lastUpdateTime = new Date();
protected String lastUpdateBy;
protected int batchCount;
protected int loadedBatchCount;
protected Long rowCount;
protected Long loadedRowCount;
protected int tableCount;
protected boolean errorFlag;
protected String sqlState;
protected int sqlCode;
protected String sqlMessage;
protected int loadId;
protected boolean processed;
protected boolean completed;
protected boolean cancelled;

public TableReloadRequest(TableReloadRequestKey key) {
this.targetNodeId = key.getTargetNodeId();
this.sourceNodeId = key.getSourceNodeId();
this.triggerId = key.getTriggerId();
this.routerId = key.getRouterId();
this.createTime = key.getCreateTime();
}

public TableReloadRequest() {
Expand Down Expand Up @@ -161,4 +175,113 @@ public boolean isFullLoadRequest() {
public String getIdentifier() {
return getTriggerId() + getRouterId();
}

public int getBatchCount() {
return batchCount;
}

public void setBatchCount(int batchCount) {
this.batchCount = batchCount;
}

public Long getRowCount() {
return rowCount == null ? 0 : rowCount;
}

public void setRowCount(Long rowCount) {
this.rowCount = rowCount;
}

public int getTableCount() {
return tableCount;
}

public void setTableCount(int tableCount) {
this.tableCount = tableCount;
}

public boolean isErrorFlag() {
return errorFlag;
}

public void setErrorFlag(boolean errorFlag) {
this.errorFlag = errorFlag;
}

public String getSqlState() {
return sqlState;
}

public void setSqlState(String sqlState) {
this.sqlState = sqlState;
}

public int getSqlCode() {
return sqlCode;
}

public void setSqlCode(int sqlCode) {
this.sqlCode = sqlCode;
}

public String getSqlMessage() {
return sqlMessage;
}

public void setSqlMessage(String sqlMessage) {
this.sqlMessage = sqlMessage;
}

public int getLoadId() {
return loadId;
}

public void setLoadId(int loadId) {
this.loadId = loadId;
}

public boolean isProcessed() {
return processed;
}

public void setProcessed(boolean processed) {
this.processed = processed;
}

public boolean isCompleted() {
return completed;
}

public void setCompleted(boolean completed) {
this.completed = completed;
}

public int getLoadedBatchCount() {
return loadedBatchCount;
}

public void setLoadedBatchCount(int loadedBatchCount) {
this.loadedBatchCount = loadedBatchCount;
}

public Long getLoadedRowCount() {
return loadedRowCount == null ? 0 : loadedRowCount ;
}

public void setLoadedRowCount(Long loadedRowCount) {
this.loadedRowCount = loadedRowCount;
}

public boolean isCancelled() {
return cancelled;
}

public void setCancelled(boolean cancelled) {
this.cancelled = cancelled;
}

public TableReloadRequestKey getTableReloadRequestKey() {
return new TableReloadRequestKey(this.targetNodeId, this.sourceNodeId, this.triggerId, this.routerId, this.createTime);
}

}
Expand Up @@ -20,14 +20,17 @@
*/
package org.jumpmind.symmetric.model;

import java.util.Date;

public class TableReloadRequestKey {

protected String targetNodeId;
protected String sourceNodeId;
protected String triggerId;
protected String routerId;
protected String receivedFromNodeId;

protected Date createTime;

public TableReloadRequestKey(String targetNodeId, String sourceNodeId, String triggerId,
String routerId, String receivedFromNodeId) {
this.targetNodeId = targetNodeId;
Expand All @@ -37,6 +40,15 @@ public TableReloadRequestKey(String targetNodeId, String sourceNodeId, String tr
this.receivedFromNodeId = receivedFromNodeId;
}

public TableReloadRequestKey(String targetNodeId, String sourceNodeId, String triggerId,
String routerId, Date createTime) {
this.targetNodeId = targetNodeId;
this.sourceNodeId = sourceNodeId;
this.triggerId = triggerId;
this.routerId = routerId;
this.createTime = createTime;
}

public String getRouterId() {
return routerId;
}
Expand All @@ -60,5 +72,9 @@ public void setReceivedFromNodeId(String receivedFromNodeId) {
public String getReceivedFromNodeId() {
return receivedFromNodeId;
}

public Date getCreateTime() {
return createTime;
}

}
Expand Up @@ -27,6 +27,7 @@

import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.symmetric.io.data.writer.StructureDataWriter.PayloadType;
import org.jumpmind.symmetric.model.ExtractRequest;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchWithPayload;
Expand Down Expand Up @@ -64,10 +65,18 @@ public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTi

public RemoteNodeStatuses queueWork(boolean force);

public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String channelId, TriggerRouter triggerRouter, long startBatchId, long endBatchId);
public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String channelId, TriggerRouter triggerRouter, long startBatchId, long endBatchId, long loadId, String tableName, long rows);

public void resetExtractRequest(OutgoingBatch batch);

public void removeBatchFromStaging(OutgoingBatch batch);

public List<ExtractRequest> getPendingTablesForExtractByLoadId(long loadId);

public List<ExtractRequest> getCompletedTablesForExtractByLoadId(long loadId);

public void updateExtractRequestLoadTime(Date loadTime, OutgoingBatch batch);

public void updateExtractRequestTransferred(OutgoingBatch batch, long transferMillis);

}
Expand Up @@ -51,8 +51,18 @@ public interface IDataService {

public TableReloadRequest getTableReloadRequest(TableReloadRequestKey key);

public TableReloadRequest getTableReloadRequest(int loadId);

public List<TableReloadRequest> getTableReloadRequestToProcess(final String sourceNodeId);

public List<TableReloadRequest> getTableReloadRequestsByLoadId();

public void updateTableReloadRequestsCounts(long loadId, int batchCount, long rowsCount);

public void updateTableReloadRequestsLoadedCounts(ISqlTransaction transcation, long loadId, int batchCount, long rowsCount);

public void updateTableReloadRequestsCancelled(long loadId);

public String reloadNode(String nodeId, boolean reverseLoad, String createBy);

public String reloadTable(String nodeId, String catalogName, String schemaName, String tableName);
Expand Down
Expand Up @@ -25,16 +25,16 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.symmetric.model.LoadSummary;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchSummary;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.model.OutgoingLoadSummary;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.symmetric.service.impl.OutgoingBatchService.LoadCounts;
import org.jumpmind.symmetric.service.impl.OutgoingBatchService.LoadStatusSummary;

/**
Expand Down Expand Up @@ -121,7 +121,7 @@ public List<OutgoingBatch> listOutgoingBatches(List<String> nodeIds, List<String

public List<OutgoingLoadSummary> getLoadSummaries(boolean activeOnly);

public Set<Long> getActiveLoads(String sourceNodeId);
public Map<String, LoadCounts> getActiveLoadCounts();

public List<LoadSummary> getQueuedLoads(String sourceNodeId);

Expand Down

0 comments on commit 5feba15

Please sign in to comment.