Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0001626: Create event should not store XML is row data
  • Loading branch information
erilong committed Apr 21, 2014
1 parent 60fd0f1 commit 6aad093
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 22 deletions.
Expand Up @@ -100,7 +100,7 @@ public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction,

public void insertSqlEvent(Node targetNode, String sql, boolean isLoad, long loadId, String createBy);

public void insertCreateEvent(Node targetNode, TriggerHistory triggerHistory, String xml, boolean isLoad, long loadId, String createBy);
public void insertCreateEvent(Node targetNode, TriggerHistory triggerHistory, String routerId, boolean isLoad, long loadId, String createBy);

/**
* Count the number of data ids in a range
Expand Down
Expand Up @@ -36,6 +36,8 @@
import java.util.concurrent.Semaphore;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.io.DatabaseXmlUtil;
import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.DdlBuilderFactory;
Expand All @@ -55,6 +57,7 @@
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.CsvUtils;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.DataProcessor;
Expand Down Expand Up @@ -1385,6 +1388,16 @@ public CsvData next() {
triggerHistory.getSourceTableName(),
triggerHistory.getTriggerHistoryId() });
}
if (data.getDataEventType() == DataEventType.CREATE && StringUtils.isBlank(data.getCsvData(CsvData.ROW_DATA))) {
this.targetTable = lookupAndOrderColumnsAccordingToTriggerHistory(
routerId, triggerHistory, true, true);
Database db = new Database();
db.setName("dataextractor");
db.setCatalog(targetTable.getCatalog());
db.setSchema(targetTable.getSchema());
db.addTable(targetTable);
data.setRowData(CsvUtils.escapeCsvData(DatabaseXmlUtil.toXml(db)));
}
}

lastTriggerHistory = triggerHistory;
Expand Down
Expand Up @@ -465,7 +465,7 @@ private void insertCreateBatchesForReload(Node targetNode, long loadId, String c
if (triggerRouter.getInitialLoadOrder() >= 0
&& engine.getGroupletService().isTargetEnabled(triggerRouter,
targetNode)) {
insertCreateEvent(transaction, targetNode, triggerHistory, null, true,
insertCreateEvent(transaction, targetNode, triggerHistory, triggerRouter.getRouter().getRouterId(), true,
loadId, createBy);
if (!transactional) {
transaction.commit();
Expand Down Expand Up @@ -702,12 +702,12 @@ public void checkForAndUpdateMissingChannelIds(long firstDataId, long lastDataId
}
}

public void insertCreateEvent(final Node targetNode, TriggerHistory triggerHistory, String xml,
public void insertCreateEvent(final Node targetNode, TriggerHistory triggerHistory, String routerId,
boolean isLoad, long loadId, String createBy) {
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
insertCreateEvent(transaction, targetNode, triggerHistory, xml, isLoad, loadId,
insertCreateEvent(transaction, targetNode, triggerHistory, routerId, isLoad, loadId,
createBy);
transaction.commit();
} catch (Error ex) {
Expand All @@ -726,20 +726,20 @@ public void insertCreateEvent(final Node targetNode, TriggerHistory triggerHisto
}

public void insertCreateEvent(ISqlTransaction transaction, Node targetNode,
TriggerHistory triggerHistory, String xml, boolean isLoad, long loadId, String createBy) {
TriggerHistory triggerHistory, String routerId, boolean isLoad, long loadId, String createBy) {

Trigger trigger = engine.getTriggerRouterService().getTriggerById(
triggerHistory.getTriggerId(), false);
String reloadChannelId = getReloadChannelIdForTrigger(trigger, engine
.getConfigurationService().getChannels(false));

Data data = new Data(triggerHistory.getSourceTableName(), DataEventType.CREATE,
CsvUtils.escapeCsvData(xml), null, triggerHistory, isLoad ? reloadChannelId
null, null, triggerHistory, isLoad ? reloadChannelId
: Constants.CHANNEL_CONFIG, null, null);
try {
if (isLoad) {
insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(),
Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE);
routerId, isLoad, loadId, createBy, Status.NE);
} else {
data.setNodeList(targetNode.getNodeId());
insertData(transaction, data);
Expand Down Expand Up @@ -996,8 +996,7 @@ public boolean sendSchema(String nodeId, String catalogName, String schemaName,
.getTriggerHistoryId());
for (TriggerRouter triggerRouter : triggerRouters) {
eventCount++;
String xml = symmetricDialect.getCreateTableXML(triggerHistory, triggerRouter);
insertCreateEvent(targetNode, triggerHistory, xml, false, -1, null);
insertCreateEvent(targetNode, triggerHistory, triggerRouter.getRouter().getRouterId(), false, -1, null);
}
}

Expand Down
Expand Up @@ -25,15 +25,12 @@
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.io.DatabaseXmlUtil;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvConstants;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.CsvUtils;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.util.Statistics;
Expand Down Expand Up @@ -186,16 +183,7 @@ public void write(CsvData data) {
break;

case CREATE:
String xml = data.getCsvData(CsvData.ROW_DATA);
if (StringUtils.isBlank(xml)) {
Database db = new Database();
db.setName("datawriter");
db.setCatalog(table.getCatalog());
db.setSchema(table.getSchema());
db.addTable(table);
xml = CsvUtils.escapeCsvData(DatabaseXmlUtil.toXml(db));
}
println(CsvConstants.CREATE, xml);
println(CsvConstants.CREATE, data.getCsvData(CsvData.ROW_DATA));
break;

case BSH:
Expand Down

0 comments on commit 6aad093

Please sign in to comment.