Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.9
  • Loading branch information
mmichalek committed Jun 15, 2017
2 parents a74f1af + a2416d9 commit 2cf88b4
Show file tree
Hide file tree
Showing 15 changed files with 103 additions and 10 deletions.
Expand Up @@ -74,7 +74,7 @@ It will be used in the generated database trigger to populate the EXTERNAL_DATA
Excluded Column Names:: Specify a comma-delimited list of columns that should not be synchronized from this table.
Included Column Names:: Specify a comma-delimited list of columns only should be synchronized from this table.
Sync Key Names:: Specify a comma-delimited list of columns that should be used as the key for synchronization operations. By default, if not specified, then the primary key of the table will be used.
Channel Expression:: An expression that will be used to capture the channel id in the trigger. This expression will only be used if the channel_id is set to 'dynamic.'
Channel Expression:: An expression that will be used to capture the channel id in the trigger. This expression will only be used if the channel_id is set to 'dynamic'. The variable "$(schemaName)" can be used, which is replaced with the source schema of the table.

.Sample Triggers
====
Expand Down
Expand Up @@ -20,6 +20,8 @@
*/
package org.jumpmind.symmetric.db.oracle;

import static org.apache.commons.lang.StringUtils.isBlank;

import java.text.ParseException;
import java.util.Date;

Expand Down Expand Up @@ -80,6 +82,9 @@ protected void buildSqlReplacementTokens() {
@Override
protected boolean doesTriggerExistOnPlatform(String catalog, String schema, String tableName,
String triggerName) {
if (isBlank(schema)) {
schema = platform.getDefaultSchema();
}
return platform.getSqlTemplate().queryForInt("select count(*) " + SQL_SELECT_TRIGGERS,
new Object[] { triggerName, tableName, schema }) > 0;
}
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.postgresql.copy.CopyIn;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
Expand All @@ -60,6 +61,8 @@ public class PostgresBulkDatabaseWriter extends DefaultDatabaseWriter {

protected boolean needsBinaryConversion;

protected boolean useDefaultDataWriter;

public PostgresBulkDatabaseWriter(IDatabasePlatform platform, DatabaseWriterSettings settings,
NativeJdbcExtractor jdbcExtractor, int maxRowsBeforeFlush) {
super(platform, settings);
Expand All @@ -68,6 +71,11 @@ public PostgresBulkDatabaseWriter(IDatabasePlatform platform, DatabaseWriterSett
}

public void write(CsvData data) {
if (useDefaultDataWriter) {
super.write(data);
return;
}

statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
Expand Down Expand Up @@ -182,6 +190,13 @@ protected void endCopy() {
}
}

@Override
public void start(Batch batch) {
super.start(batch);
IncomingBatch currentBatch = (IncomingBatch) context.get("currentBatch");
useDefaultDataWriter = currentBatch == null ? false : currentBatch.isErrorFlag();
}

@Override
public boolean start(Table table) {
return super.start(table);
Expand Down
Expand Up @@ -107,7 +107,7 @@ private Constants() {
public static final String CHANNEL_FILESYNC_RELOAD = "filesync_reload";

public static final String CHANNEL_DYNAMIC = "dynamic";

public static final String PUSH_JOB_TIMER = "job.push";

public static final String PULL_JOB_TIMER = "job.pull";
Expand Down
Expand Up @@ -95,6 +95,8 @@ public String toString() {
private Map<ProcessStatus, Date> statusStartHistory;

private Date endTime;

private long totalDataCount = 0;

public ProcessInfo() {
this(new ProcessInfoKey("", "", null));
Expand Down Expand Up @@ -168,6 +170,7 @@ public void setBatchCount(long batchCount) {

public void incrementCurrentDataCount() {
this.currentDataCount++;
this.totalDataCount++;
}

public void incrementBatchCount() {
Expand Down Expand Up @@ -370,6 +373,14 @@ public static ThreadData getThreadData(long threadId) {
}
}

public long getTotalDataCount() {
return totalDataCount;
}

public void setTotalDataCount(long totalDataCount) {
this.totalDataCount = totalDataCount;
}

static public class ThreadData {

public ThreadData(String threadName, String stackTrace) {
Expand Down
Expand Up @@ -98,6 +98,9 @@ public class ConfigurationChangedDataRouter extends AbstractDataRouter implement

final String CTX_KEY_FLUSH_JOBS_NEEDED = "FlushJobs."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

final String CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED = "FlushNodeGroupLink."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

public final static String KEY = "symconfig";

Expand Down Expand Up @@ -243,6 +246,10 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
routingContext.put(CTX_KEY_FLUSH_NOTIFICATIONS_NEEDED, Boolean.TRUE);
}

if (tableMatches(dataMetaData, TableConstants.SYM_NODE_GROUP_LINK)) {
routingContext.put(CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED, Boolean.TRUE);
}

if (tableMatches(dataMetaData, TableConstants.SYM_JOB)) {
routingContext.put(CTX_KEY_FLUSH_JOBS_NEEDED, Boolean.TRUE);
}
Expand Down Expand Up @@ -654,6 +661,12 @@ public void contextCommitted(SimpleRouterContext routingContext) {
log.info("About to refresh the cache of node security because new configuration came through the data router");
engine.getNodeService().flushNodeAuthorizedCache();
}

if (routingContext.get(CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED) != null) {
log.info("About to refresh the cache of node group link because new configuration came through the data router");
engine.getConfigurationService().clearCache();
engine.getNodeService().flushNodeGroupCache();
}

}
}
Expand Down
Expand Up @@ -1738,10 +1738,10 @@ public CsvData next() {

String initialLoadSelect = data.getRowData();
if (initialLoadSelect == null && triggerRouter.getTrigger().isStreamRow()) {
if (sourceTable == null) {
//if (sourceTable == null) {
sourceTable = columnsAccordingToTriggerHistory.lookup(triggerRouter
.getRouter().getRouterId(), triggerHistory, false, true);
}
// }
Column[] columns = sourceTable.getPrimaryKeyColumns();
DmlStatement dmlStmt = platform.createDmlStatement(DmlType.WHERE, sourceTable, null);
String[] pkData = data.getParsedData(CsvData.PK_DATA);
Expand All @@ -1751,6 +1751,10 @@ public CsvData next() {
row.put(columns[i].getName(), pkData[i]);
}
initialLoadSelect = dmlStmt.buildDynamicSql(batch.getBinaryEncoding(), row, false, true, columns);
if (initialLoadSelect.endsWith(platform.getDatabaseInfo().getSqlCommandDelimiter())) {
initialLoadSelect = initialLoadSelect.substring(0,
initialLoadSelect.length() - platform.getDatabaseInfo().getSqlCommandDelimiter().length());
}
}

SelectFromTableEvent event = new SelectFromTableEvent(targetNode,
Expand Down
Expand Up @@ -964,7 +964,6 @@ public Table nextTable() {
return table;
}
};

DataProcessor processor = new DataProcessor(reader, null, listener, "data load from stage") {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
Expand Down Expand Up @@ -1043,6 +1042,8 @@ public void beforeBatchEnd(DataContext context) {
public boolean beforeBatchStarted(DataContext context) {
this.currentBatch = null;
Batch batch = context.getBatch();
context.remove("currentBatch");

if (parameterService.is(ParameterConstants.DATA_LOADER_ENABLED)
|| (batch.getChannelId() != null && batch.getChannelId().equals(
Constants.CHANNEL_CONFIG))) {
Expand All @@ -1059,6 +1060,8 @@ public boolean beforeBatchStarted(DataContext context) {
this.batchesProcessed.add(incomingBatch);
if (incomingBatchService.acquireIncomingBatch(incomingBatch)) {
this.currentBatch = incomingBatch;
context.put("currentBatch", this.currentBatch);

return true;
}
}
Expand Down
Expand Up @@ -241,7 +241,7 @@ public void insertTableReloadRequest(TableReloadRequest request) {
request.getLastUpdateTime(), request.getSourceNodeId(),
request.getTargetNodeId(), request.getTriggerId(),
request.getRouterId(), request.isCreateTable() ? 1 : 0,
request.isDeleteFirst() ? 1 : 0 });
request.isDeleteFirst() ? 1 : 0, request.getChannelId() });
}

public TableReloadRequest getTableReloadRequest(final TableReloadRequestKey key) {
Expand Down Expand Up @@ -405,13 +405,27 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa

if (isFullLoad) {
triggerHistories = triggerRouterService.getActiveTriggerHistories();
if (reloadRequests != null && reloadRequests.size() == 1) {
String channelId = reloadRequests.get(0).getChannelId();
if (channelId != null) {
List<TriggerHistory> channelTriggerHistories = new ArrayList<TriggerHistory>();

for (TriggerHistory history : triggerHistories) {
if (channelId.equals(engine.getTriggerRouterService().getTriggerById(history.getTriggerId()).getChannelId())) {
channelTriggerHistories.add(history);
}
}
triggerHistories = channelTriggerHistories;
}
}
}
else {
for (TableReloadRequest reloadRequest : reloadRequests) {
triggerHistories.addAll(engine.getTriggerRouterService()
.getActiveTriggerHistories(new Trigger(reloadRequest.getTriggerId(), null)));
}
}

processInfo.setDataCount(triggerHistories.size());

Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId = triggerRouterService
Expand Down
Expand Up @@ -31,7 +31,7 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace

putSql("selectTableReloadRequest", "select reload_select, before_custom_sql, reload_time, create_time, last_update_by, last_update_time from $(table_reload_request) where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?");

putSql("insertTableReloadRequest", "insert into $(table_reload_request) (reload_select, before_custom_sql, create_time, last_update_by, last_update_time, source_node_id, target_node_id, trigger_id, router_id, create_table, delete_first) values (?,?,?,?,?,?,?,?,?,?,?)");
putSql("insertTableReloadRequest", "insert into $(table_reload_request) (reload_select, before_custom_sql, create_time, last_update_by, last_update_time, source_node_id, target_node_id, trigger_id, router_id, create_table, delete_first, channel_id) values (?,?,?,?,?,?,?,?,?,?,?,?)");

putSql("updateTableReloadRequest", "update $(table_reload_request) set reload_select=?, before_custom_sql=?, reload_time=?, create_time=?, last_update_by=?, last_update_time=? where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?");

Expand Down
Expand Up @@ -216,6 +216,7 @@ public boolean acquireIncomingBatch(IncomingBatch batch) {
|| !parameterService
.is(ParameterConstants.INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED)) {
okayToProcess = true;
batch.setErrorFlag(existingBatch.isErrorFlag());
existingBatch.setStatus(Status.LD);
log.info("Retrying batch {}", batch.getNodeBatchId());
} else if (existingBatch.getStatus() == Status.IG) {
Expand Down
Expand Up @@ -576,7 +576,7 @@ protected synchronized void reOpenRegistration(String nodeId, String remoteHost,
// node table, but not in node security.
// lets go ahead and try to insert into node security.
sqlTemplate.update(getSql("openRegistrationNodeSecuritySql"), new Object[] {
nodeId, password, nodeService.findNode(nodeId).getNodeId() });
nodeId, password, nodeService.findIdentityNodeId() });
log.info("Registration was opened for {}", nodeId);
} else if (updateCount == 0) {
log.warn("Registration was already enabled for {}. No need to reenable it", nodeId);
Expand Down
7 changes: 7 additions & 0 deletions symmetric-core/src/main/resources/symmetric-schema.xml
Expand Up @@ -193,6 +193,7 @@
<column name="create_time" type="TIMESTAMP" required="true" description="Timestamp of when this entry was created." />
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
<column name="last_update_time" type="TIMESTAMP" required="true" description="Timestamp of when a user last updated this entry." />
<column name="description" type="LONGVARCHAR" description="Optional notes and comments for file_trigger" />
</table>

<table name="file_trigger_router" description="Maps a file trigger to a router.">
Expand All @@ -205,6 +206,7 @@
<column name="create_time" type="TIMESTAMP" required="true" description="Timestamp when this entry was created." />
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
<column name="last_update_time" type="TIMESTAMP" required="true" description="Timestamp when a user last updated this entry." />
<column name="description" type="LONGVARCHAR" description="Optional notes and comments for file_trigger_router" />
<foreign-key foreignTable="file_trigger" name="fk_ftr_2_ftrg">
<reference local="trigger_id" foreign="trigger_id" />
</foreign-key>
Expand Down Expand Up @@ -729,6 +731,7 @@
<column name="create_time" type="TIMESTAMP" required="true" description="Timestamp when this entry was created." />
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
<column name="last_update_time" type="TIMESTAMP" required="true" description="Timestamp when a user last updated this entry." />
<column name="description" type="LONGVARCHAR" description="Optional notes and comments for router" />
<foreign-key foreignTable="node_group_link" name="fk_rt_2_grp_lnk">
<reference local="source_node_group_id" foreign="source_node_group_id" />
<reference local="target_node_group_id" foreign="target_node_group_id" />
Expand Down Expand Up @@ -785,6 +788,7 @@
<column name="create_time" type="TIMESTAMP" description="Timestamp when this entry was created." />
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
<column name="last_update_time" type="TIMESTAMP" description="Timestamp when a user last updated this entry." />
<column name="description" type="LONGVARCHAR" description="Optional notes and comments for transform_table" />
<foreign-key foreignTable="node_group_link" name="fk_tt_2_grp_lnk">
<reference local="source_node_group_id" foreign="source_node_group_id" />
<reference local="target_node_group_id" foreign="target_node_group_id" />
Expand All @@ -803,6 +807,7 @@
<column name="create_time" type="TIMESTAMP" description="Timestamp when this entry was created." />
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
<column name="last_update_time" type="TIMESTAMP" description="Timestamp when a user last updated this entry." />
<column name="description" type="LONGVARCHAR" description="Optional notes and comments for transform_column" />
</table>

<table name="trigger" description="Configures database triggers that capture changes in the database. Configuration of which triggers are generated for which tables is stored here. Triggers are created in a node's database if the source_node_group_id of a router is mapped to a row in this table.">
Expand Down Expand Up @@ -842,6 +847,7 @@
<column name="create_time" type="TIMESTAMP" required="true" description="Timestamp when this entry was created." />
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
<column name="last_update_time" type="TIMESTAMP" required="true" description="Timestamp when a user last updated this entry." />
<column name="description" type="LONGVARCHAR" description="Optional notes and comments for trigger" />
<foreign-key foreignTable="channel" name="fk_trg_2_chnl">
<reference local="channel_id" foreign="channel_id" />
</foreign-key>
Expand Down Expand Up @@ -885,6 +891,7 @@
<column name="create_time" type="TIMESTAMP" required="true" description="Timestamp when this entry was created." />
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
<column name="last_update_time" type="TIMESTAMP" required="true" description="Timestamp when a user last updated this entry." />
<column name="description" type="LONGVARCHAR" description="Optional notes and comments for trigger_router" />
<foreign-key foreignTable="trigger" name="fk_tr_2_trg">
<reference local="trigger_id" foreign="trigger_id" />
</foreign-key>
Expand Down

0 comments on commit 2cf88b4

Please sign in to comment.