Skip to content

Commit

Permalink
0002691: Partial initial loads
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Jul 22, 2016
1 parent c98a5cb commit a4786a3
Show file tree
Hide file tree
Showing 9 changed files with 485 additions and 290 deletions.
Expand Up @@ -229,7 +229,13 @@ public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerH

@Override
public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, List<TransformTableNodeGroupLink> transforms) {
String sql = null;
return createPurgeSqlFor(node, triggerRouter, triggerHistory, transforms, null);
}

@Override
public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory,
List<TransformTableNodeGroupLink> transforms, String deleteSql) {
String sql = null;
if (StringUtils.isEmpty(triggerRouter.getInitialLoadDeleteStmt())) {
List<String> tableNames = new ArrayList<String>();
if (transforms != null) {
Expand All @@ -243,12 +249,13 @@ public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerH
StringBuilder statements = new StringBuilder(128);

for (String tableName : tableNames) {
String deleteSql = null;
if (tableName.startsWith(parameterService.getTablePrefix())) {
deleteSql = "delete from %s";
} else {
deleteSql = parameterService.getString(ParameterConstants.INITIAL_LOAD_DELETE_FIRST_SQL);
}
if (deleteSql == null) {
if (tableName.startsWith(parameterService.getTablePrefix())) {
deleteSql = "delete from %s";
} else {
deleteSql = parameterService.getString(ParameterConstants.INITIAL_LOAD_DELETE_FIRST_SQL);
}
}
statements.append(String.format(deleteSql, tableName)).append(";");
}

Expand Down
Expand Up @@ -95,6 +95,8 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc

public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, List<TransformTableNodeGroupLink> transforms);

public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, List<TransformTableNodeGroupLink> transforms, String deleteSql);

public String createCsvDataSql(Trigger trigger, TriggerHistory triggerHistory, Channel channel, String whereClause);

public String createCsvPrimaryKeySql(Trigger trigger, TriggerHistory triggerHistory, Channel channel, String whereClause);
Expand Down
Expand Up @@ -22,16 +22,21 @@

import java.util.Date;

import org.jumpmind.symmetric.common.ParameterConstants;

public class TableReloadRequest {

protected String targetNodeId;
protected String sourceNodeId;
protected String triggerId;
protected String routerId;
protected boolean createTable;
protected boolean deleteFirst;
protected String reloadSelect;
protected String reloadDeleteStmt;
protected boolean reloadEnabled = true;
protected Date reloadTime;
protected String channelId;
protected Date createTime = new Date();
protected Date lastUpdateTime = new Date();
protected String lastUpdateBy;
Expand Down Expand Up @@ -134,4 +139,35 @@ public void setLastUpdateBy(String lastUpdateBy) {
this.lastUpdateBy = lastUpdateBy;
}

public String getChannelId() {
return channelId;
}

public void setChannelId(String channelId) {
this.channelId = channelId;
}

public boolean isCreateTable() {
return createTable;
}

public void setCreateTable(boolean createTable) {
this.createTable = createTable;
}

public boolean isDeleteFirst() {
return deleteFirst;
}

public void setDeleteFirst(boolean deleteFirst) {
this.deleteFirst = deleteFirst;
}

public boolean isFullLoadRequest() {
return ParameterConstants.ALL.equals(getTriggerId()) && ParameterConstants.ALL.equals(getRouterId());
}

public String getIdentifier() {
return getTriggerId() + getRouterId();
}
}
Expand Up @@ -542,8 +542,6 @@ public void contextCommitted(SimpleRouterContext routingContext) {
engine.getLoadFilterService().clearCache();
}

insertReloadEvents(routingContext);

if (routingContext.get(CTX_KEY_RESTART_JOBMANAGER_NEEDED) != null) {
IJobManager jobManager = engine.getJobManager();
if (jobManager != null) {
Expand All @@ -569,25 +567,7 @@ public void contextCommitted(SimpleRouterContext routingContext) {
}
}
}

protected void insertReloadEvents(SimpleRouterContext routingContext) {
@SuppressWarnings("unchecked")
List<TableReloadRequestKey> reloadRequestKeys = (List<TableReloadRequestKey>) routingContext
.get(CTX_KEY_TABLE_RELOAD_NEEDED);
if (reloadRequestKeys != null) {
for (TableReloadRequestKey reloadRequestKey : reloadRequestKeys) {
TableReloadRequest request = engine.getDataService().getTableReloadRequest(
reloadRequestKey);
if (engine.getDataService().insertReloadEvent(request,
reloadRequestKey.getReceivedFromNodeId() != null)) {
log.info(
"Inserted table reload request from config data router for node {} and trigger {}",
reloadRequestKey.getTargetNodeId(), reloadRequestKey.getTriggerId());
}
}
}
}


private String tableName(String tableName) {
return TableConstants.getTableName(engine != null ? engine.getTablePrefix() : "sym",
tableName);
Expand Down
Expand Up @@ -47,6 +47,8 @@ public interface IDataService {

public TableReloadRequest getTableReloadRequest(TableReloadRequestKey key);

public List<TableReloadRequest> getTableReloadRequestToProcess(final String sourceNodeId);

public String reloadNode(String nodeId, boolean reverseLoad, String createBy);

public String reloadTable(String nodeId, String catalogName, String schemaName, String tableName);
Expand All @@ -67,6 +69,8 @@ public interface IDataService {

public void insertReloadEvents(Node targetNode, boolean reverse);

public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests);

public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient);

public long insertReloadEvent(ISqlTransaction transaction, Node targetNode,
Expand Down

0 comments on commit a4786a3

Please sign in to comment.