Skip to content

Commit

Permalink
0001547: Add Send Schema Feature to NodePanel. Right click on list of…
Browse files Browse the repository at this point in the history
… nodes to synchronize table schema.
  • Loading branch information
chenson42 committed Jan 27, 2014
1 parent 551e6aa commit 9b7416f
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 28 deletions.
Expand Up @@ -32,7 +32,9 @@ private Constants() {

public static final long VIRTUAL_BATCH_FOR_REGISTRATION = -9999;

public static final String NONE_TOKEN = "$(none)";
public static final String NONE_TOKEN = "$(none)";

public static final String DATA_CONTEXT_ENGINE = "engine";

public static final String DATA_CONTEXT_TARGET_NODE = "targetNode";

Expand Down
Expand Up @@ -100,7 +100,7 @@ public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction,

public void insertSqlEvent(Node targetNode, String sql, boolean isLoad, long loadId, String createBy);

public void insertCreateEvent(Node targetNode, TriggerRouter triggerRouter, TriggerHistory triggerHistory, String xml, boolean isLoad, long loadId, String createBy);
public void insertCreateEvent(Node targetNode, TriggerHistory triggerHistory, String xml, boolean isLoad, long loadId, String createBy);

/**
* Count the number of data ids in a range
Expand Down
Expand Up @@ -180,6 +180,8 @@ public TriggerHistory getNewestTriggerHistoryForTrigger(String triggerId, String
public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistIdAndSortHist(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories);

public TriggerHistory findTriggerHistoryForGenericSync();

public void clearCache();

}
Expand Up @@ -374,6 +374,7 @@ protected List<IncomingBatch> loadDataFromTransport(final ProcessInfo processInf
Throwable error = null;
try {
Node targetNode = nodeService.findIdentity();
ctx.put(Constants.DATA_CONTEXT_ENGINE, engine);
if (targetNode != null) {
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE_ID, targetNode.getNodeId());
Expand Down
Expand Up @@ -427,8 +427,8 @@ private void insertCreateBatchesForReload(Node targetNode, long loadId, String c
targetNode)) {
String xml = symmetricDialect.getCreateTableXML(triggerHistory,
triggerRouter);
insertCreateEvent(transaction, targetNode, triggerRouter, triggerHistory,
xml, true, loadId, createBy);
insertCreateEvent(transaction, targetNode, triggerHistory, xml,
true, loadId, createBy);
if (!transactional) {
transaction.commit();
}
Expand Down Expand Up @@ -585,19 +585,8 @@ protected void insertPurgeEvent(ISqlTransaction transaction, Node targetNode,
}
}

private TriggerHistory findTriggerHistoryForGenericSync() {
String triggerTableName = TableConstants.getTableName(tablePrefix,
TableConstants.SYM_TRIGGER);
TriggerHistory history = engine.getTriggerRouterService().findTriggerHistory(null, null, triggerTableName
.toUpperCase());
if (history == null) {
history = engine.getTriggerRouterService().findTriggerHistory(null, null, triggerTableName);
}
return history;
}

public void insertSqlEvent(Node targetNode, String sql, boolean isLoad, long loadId, String createBy) {
TriggerHistory history = findTriggerHistoryForGenericSync();
TriggerHistory history = engine.getTriggerRouterService().findTriggerHistoryForGenericSync();
boolean useReloadChannel = parameterService.is(ParameterConstants.INITIAL_LOAD_USE_RELOAD_CHANNEL);
Data data = new Data(history.getSourceTableName(), DataEventType.SQL,
CsvUtils.escapeCsvData(sql), null, history, useReloadChannel && isLoad ? Constants.CHANNEL_RELOAD : Constants.CHANNEL_CONFIG, null, null);
Expand All @@ -612,7 +601,7 @@ public void insertSqlEvent(Node targetNode, String sql, boolean isLoad, long loa

public void insertSqlEvent(ISqlTransaction transaction, Node targetNode, String sql,
boolean isLoad, long loadId, String createBy) {
TriggerHistory history = findTriggerHistoryForGenericSync();
TriggerHistory history = engine.getTriggerRouterService().findTriggerHistoryForGenericSync();
insertSqlEvent(transaction, history, Constants.CHANNEL_CONFIG, targetNode, sql, isLoad, loadId, createBy);
}

Expand Down Expand Up @@ -647,12 +636,12 @@ public void checkForAndUpdateMissingChannelIds(long firstDataId, long lastDataId
}
}

public void insertCreateEvent(final Node targetNode, final TriggerRouter triggerRouter,
TriggerHistory triggerHistory, String xml, boolean isLoad, long loadId, String createBy) {
public void insertCreateEvent(final Node targetNode, TriggerHistory triggerHistory,
String xml, boolean isLoad, long loadId, String createBy) {
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
insertCreateEvent(transaction, targetNode, triggerRouter, triggerHistory, xml, isLoad, loadId, createBy);
insertCreateEvent(transaction, targetNode, triggerHistory, xml, isLoad, loadId, createBy);
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
Expand All @@ -670,20 +659,19 @@ public void insertCreateEvent(final Node targetNode, final TriggerRouter trigger
}

public void insertCreateEvent(ISqlTransaction transaction, Node targetNode,
TriggerRouter triggerRouter, TriggerHistory triggerHistory, String xml, boolean isLoad, long loadId, String createBy) {
Trigger trigger = triggerRouter.getTrigger();
TriggerHistory triggerHistory, String xml, boolean isLoad, long loadId, String createBy) {
Data data = new Data(
triggerHistory.getSourceTableName(),
DataEventType.CREATE,
CsvUtils.escapeCsvData(xml),
null,
triggerHistory,
parameterService.is(ParameterConstants.INITIAL_LOAD_USE_RELOAD_CHANNEL) && isLoad ? Constants.CHANNEL_RELOAD
: trigger.getChannelId(), null, null);
: Constants.CHANNEL_CONFIG, null, null);
try {
if (isLoad) {
insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(),
triggerRouter.getRouter().getRouterId(), isLoad, loadId, createBy, Status.NE);
Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE);
} else {
data.setNodeList(targetNode.getNodeId());
insertData(transaction, data);
Expand Down Expand Up @@ -893,7 +881,7 @@ private void insertNodeSecurityUpdate(ISqlTransaction transaction, String nodeId

public void sendScript(String nodeId, String script, boolean isLoad) {
Node targetNode = engine.getNodeService().findNode(nodeId);
TriggerHistory history = findTriggerHistoryForGenericSync();
TriggerHistory history = engine.getTriggerRouterService().findTriggerHistoryForGenericSync();
Data data = new Data(history.getSourceTableName(), DataEventType.BSH,
CsvUtils.escapeCsvData(script), null, history, Constants.CHANNEL_CONFIG, null, null);
data.setNodeList(nodeId);
Expand Down Expand Up @@ -927,7 +915,7 @@ public boolean sendSchema(String nodeId, String catalogName, String schemaName,
for (TriggerRouter triggerRouter : triggerRouters) {
eventCount++;
String xml = symmetricDialect.getCreateTableXML(triggerHistory, triggerRouter);
insertCreateEvent(targetNode, triggerRouter, triggerHistory, xml, false, -1, null);
insertCreateEvent(targetNode, triggerHistory, xml, false, -1, null);
}
}

Expand Down
Expand Up @@ -1796,6 +1796,18 @@ public void addExtraConfigTable(String table) {
public Map<Trigger, Exception> getFailedTriggers() {
return this.failureListener.getFailures();
}

public TriggerHistory findTriggerHistoryForGenericSync() {
String triggerTableName = TableConstants.getTableName(tablePrefix,
TableConstants.SYM_TRIGGER);
TriggerHistory history = findTriggerHistory(null, null, triggerTableName
.toUpperCase());
if (history == null) {
history = findTriggerHistory(null, null, triggerTableName);
}
return history;
}


public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistIdAndSortHist(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories) {
Expand Down
Expand Up @@ -62,8 +62,7 @@ public enum DataEventType {
* An event the indicates that the data payload is going to be a Java bean shell script that is to be run at the
* client.
*/
BSH("B"),

BSH("B"),

UNKNOWN("U");

Expand Down
Expand Up @@ -871,6 +871,7 @@ protected boolean script(CsvData data) {
Map<String, Object> variables = new HashMap<String, Object>();
variables.put("SOURCE_NODE_ID", batch.getSourceNodeId());
variables.put("TARGET_NODE_ID", batch.getTargetNodeId());
variables.putAll(context.getContext());
ISqlTemplate template = platform.getSqlTemplate();
Class<?> templateClass = template.getClass();
if (templateClass.getSimpleName().equals("JdbcSqlTemplate")) {
Expand Down
4 changes: 4 additions & 0 deletions symmetric-util/src/main/java/org/jumpmind/util/Context.java
Expand Up @@ -44,4 +44,8 @@ public Set<String> keySet() {
return context.keySet();
}

public Map<String, Object> getContext() {
return context;
}

}

0 comments on commit 9b7416f

Please sign in to comment.