Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.8' into 3.9
Browse files Browse the repository at this point in the history
Conflicts:
	symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java
	symmetric-server/src/main/deploy/conf/sym_service.conf
  • Loading branch information
chenson42 committed Jun 14, 2017
2 parents 0c775f7 + 29dc47e commit a9da3d2
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 9 deletions.
Expand Up @@ -41,6 +41,7 @@
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.postgresql.copy.CopyIn;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
Expand All @@ -60,6 +61,8 @@ public class PostgresBulkDatabaseWriter extends DefaultDatabaseWriter {

protected boolean needsBinaryConversion;

protected boolean useDefaultDataWriter;

public PostgresBulkDatabaseWriter(IDatabasePlatform platform, DatabaseWriterSettings settings,
NativeJdbcExtractor jdbcExtractor, int maxRowsBeforeFlush) {
super(platform, settings);
Expand All @@ -68,6 +71,11 @@ public PostgresBulkDatabaseWriter(IDatabasePlatform platform, DatabaseWriterSett
}

public void write(CsvData data) {
if (useDefaultDataWriter) {
super.write(data);
return;
}

statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
Expand Down Expand Up @@ -182,6 +190,13 @@ protected void endCopy() {
}
}

@Override
public void start(Batch batch) {
super.start(batch);
IncomingBatch currentBatch = (IncomingBatch) context.get("currentBatch");
useDefaultDataWriter = currentBatch == null ? false : currentBatch.isErrorFlag();
}

@Override
public boolean start(Table table) {
return super.start(table);
Expand Down
Expand Up @@ -107,7 +107,7 @@ private Constants() {
public static final String CHANNEL_FILESYNC_RELOAD = "filesync_reload";

public static final String CHANNEL_DYNAMIC = "dynamic";

public static final String PUSH_JOB_TIMER = "job.push";

public static final String PULL_JOB_TIMER = "job.pull";
Expand Down
Expand Up @@ -102,6 +102,8 @@ public String toString() {
private Map<Status, Date> statusStartHistory;

private Date endTime;

private long totalDataCount = 0;

public ProcessInfo() {
this(new ProcessInfoKey("", "", null));
Expand Down Expand Up @@ -175,6 +177,7 @@ public void setBatchCount(long batchCount) {

public void incrementCurrentDataCount() {
this.currentDataCount++;
this.totalDataCount++;
}

public void incrementBatchCount() {
Expand Down Expand Up @@ -377,6 +380,14 @@ public static ThreadData getThreadData(long threadId) {
}
}

public long getTotalDataCount() {
return totalDataCount;
}

public void setTotalDataCount(long totalDataCount) {
this.totalDataCount = totalDataCount;
}

static public class ThreadData {

public ThreadData(String threadName, String stackTrace) {
Expand Down
Expand Up @@ -98,6 +98,9 @@ public class ConfigurationChangedDataRouter extends AbstractDataRouter implement

final String CTX_KEY_FLUSH_JOBS_NEEDED = "FlushJobs."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

final String CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED = "FlushNodeGroupLink."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

public final static String KEY = "symconfig";

Expand Down Expand Up @@ -243,6 +246,10 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
routingContext.put(CTX_KEY_FLUSH_NOTIFICATIONS_NEEDED, Boolean.TRUE);
}

if (tableMatches(dataMetaData, TableConstants.SYM_NODE_GROUP_LINK)) {
routingContext.put(CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED, Boolean.TRUE);
}

if (tableMatches(dataMetaData, TableConstants.SYM_JOB)) {
routingContext.put(CTX_KEY_FLUSH_JOBS_NEEDED, Boolean.TRUE);
}
Expand Down Expand Up @@ -654,6 +661,12 @@ public void contextCommitted(SimpleRouterContext routingContext) {
log.info("About to refresh the cache of node security because new configuration came through the data router");
engine.getNodeService().flushNodeAuthorizedCache();
}

if (routingContext.get(CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED) != null) {
log.info("About to refresh the cache of node group link because new configuration came through the data router");
engine.getConfigurationService().clearCache();
engine.getNodeService().flushNodeGroupCache();
}

}
}
Expand Down
Expand Up @@ -1738,10 +1738,10 @@ public CsvData next() {

String initialLoadSelect = data.getRowData();
if (initialLoadSelect == null && triggerRouter.getTrigger().isStreamRow()) {
if (sourceTable == null) {
//if (sourceTable == null) {
sourceTable = columnsAccordingToTriggerHistory.lookup(triggerRouter
.getRouter().getRouterId(), triggerHistory, false, true);
}
// }
Column[] columns = sourceTable.getPrimaryKeyColumns();
DmlStatement dmlStmt = platform.createDmlStatement(DmlType.WHERE, sourceTable, null);
String[] pkData = data.getParsedData(CsvData.PK_DATA);
Expand All @@ -1751,6 +1751,10 @@ public CsvData next() {
row.put(columns[i].getName(), pkData[i]);
}
initialLoadSelect = dmlStmt.buildDynamicSql(batch.getBinaryEncoding(), row, false, true, columns);
if (initialLoadSelect.endsWith(platform.getDatabaseInfo().getSqlCommandDelimiter())) {
initialLoadSelect = initialLoadSelect.substring(0,
initialLoadSelect.length() - platform.getDatabaseInfo().getSqlCommandDelimiter().length());
}
}

SelectFromTableEvent event = new SelectFromTableEvent(targetNode,
Expand Down
Expand Up @@ -964,7 +964,6 @@ public Table nextTable() {
return table;
}
};

DataProcessor processor = new DataProcessor(reader, null, listener, "data load from stage") {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
Expand Down Expand Up @@ -1043,6 +1042,8 @@ public void beforeBatchEnd(DataContext context) {
public boolean beforeBatchStarted(DataContext context) {
this.currentBatch = null;
Batch batch = context.getBatch();
context.remove("currentBatch");

if (parameterService.is(ParameterConstants.DATA_LOADER_ENABLED)
|| (batch.getChannelId() != null && batch.getChannelId().equals(
Constants.CHANNEL_CONFIG))) {
Expand All @@ -1059,6 +1060,8 @@ public boolean beforeBatchStarted(DataContext context) {
this.batchesProcessed.add(incomingBatch);
if (incomingBatchService.acquireIncomingBatch(incomingBatch)) {
this.currentBatch = incomingBatch;
context.put("currentBatch", this.currentBatch);

return true;
}
}
Expand Down
Expand Up @@ -241,7 +241,7 @@ public void insertTableReloadRequest(TableReloadRequest request) {
request.getLastUpdateTime(), request.getSourceNodeId(),
request.getTargetNodeId(), request.getTriggerId(),
request.getRouterId(), request.isCreateTable() ? 1 : 0,
request.isDeleteFirst() ? 1 : 0 });
request.isDeleteFirst() ? 1 : 0, request.getChannelId() });
}

public TableReloadRequest getTableReloadRequest(final TableReloadRequestKey key) {
Expand Down Expand Up @@ -405,13 +405,27 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa

if (isFullLoad) {
triggerHistories = triggerRouterService.getActiveTriggerHistories();
if (reloadRequests != null && reloadRequests.size() == 1) {
String channelId = reloadRequests.get(0).getChannelId();
if (channelId != null) {
List<TriggerHistory> channelTriggerHistories = new ArrayList<TriggerHistory>();

for (TriggerHistory history : triggerHistories) {
if (channelId.equals(engine.getTriggerRouterService().getTriggerById(history.getTriggerId()).getChannelId())) {
channelTriggerHistories.add(history);
}
}
triggerHistories = channelTriggerHistories;
}
}
}
else {
for (TableReloadRequest reloadRequest : reloadRequests) {
triggerHistories.addAll(engine.getTriggerRouterService()
.getActiveTriggerHistories(new Trigger(reloadRequest.getTriggerId(), null)));
}
}

processInfo.setDataCount(triggerHistories.size());

Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId = triggerRouterService
Expand Down
Expand Up @@ -31,7 +31,7 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace

putSql("selectTableReloadRequest", "select reload_select, before_custom_sql, reload_time, create_time, last_update_by, last_update_time from $(table_reload_request) where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?");

putSql("insertTableReloadRequest", "insert into $(table_reload_request) (reload_select, before_custom_sql, create_time, last_update_by, last_update_time, source_node_id, target_node_id, trigger_id, router_id, create_table, delete_first) values (?,?,?,?,?,?,?,?,?,?,?)");
putSql("insertTableReloadRequest", "insert into $(table_reload_request) (reload_select, before_custom_sql, create_time, last_update_by, last_update_time, source_node_id, target_node_id, trigger_id, router_id, create_table, delete_first, channel_id) values (?,?,?,?,?,?,?,?,?,?,?,?)");

putSql("updateTableReloadRequest", "update $(table_reload_request) set reload_select=?, before_custom_sql=?, reload_time=?, create_time=?, last_update_by=?, last_update_time=? where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?");

Expand Down
Expand Up @@ -216,6 +216,7 @@ public boolean acquireIncomingBatch(IncomingBatch batch) {
|| !parameterService
.is(ParameterConstants.INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED)) {
okayToProcess = true;
batch.setErrorFlag(existingBatch.isErrorFlag());
existingBatch.setStatus(Status.LD);
log.info("Retrying batch {}", batch.getNodeBatchId());
} else if (existingBatch.getStatus() == Status.IG) {
Expand Down
Expand Up @@ -576,7 +576,7 @@ protected synchronized void reOpenRegistration(String nodeId, String remoteHost,
// node table, but not in node security.
// lets go ahead and try to insert into node security.
sqlTemplate.update(getSql("openRegistrationNodeSecuritySql"), new Object[] {
nodeId, password, nodeService.findNode(nodeId).getNodeId() });
nodeId, password, nodeService.findIdentityNodeId() });
log.info("Registration was opened for {}", nodeId);
} else if (updateCount == 0) {
log.warn("Registration was already enabled for {}. No need to reenable it", nodeId);
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.SqlException;
import org.jumpmind.exception.ParseException;
import org.jumpmind.symmetric.io.IoConstants;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
Expand Down Expand Up @@ -150,9 +151,27 @@ public void write(CsvData data) {
if (targetTable != null || !data.requiresTable()
|| (targetTable == null && data.getDataEventType() == DataEventType.SQL)) {
try {

statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
if (filterBefore(data)) {

switch (data.getDataEventType()) {
case UPDATE:
case INSERT:
if (sourceTable.getColumnCount() != data.getParsedData(CsvData.ROW_DATA).length) {
throw new ParseException(String.format("The (%s) table's column count (%d) does not match the data's column count (%d)", sourceTable.getName(), sourceTable.getColumnCount(), data.getParsedData(CsvData.ROW_DATA).length));
}
break;
case DELETE:
if (sourceTable.getPrimaryKeyColumnCount() != data.getParsedData(CsvData.PK_DATA).length) {
throw new ParseException(String.format("The (%s) table's pk column count (%d) does not match the data's pk column count (%d)", sourceTable.getName(), sourceTable.getPrimaryKeyColumnCount(), data.getParsedData(CsvData.PK_DATA).length));
}
break;
default:
break;
}

LoadStatus loadStatus = LoadStatus.SUCCESS;
switch (data.getDataEventType()) {
case UPDATE:
Expand Down
5 changes: 3 additions & 2 deletions symmetric-server/src/main/deploy/conf/sym_service.conf
Expand Up @@ -29,8 +29,9 @@ wrapper.java.additional.14=-Djava.net.preferIPv4Stack=true
wrapper.java.additional.15=-Dcom.sun.management.jmxremote
wrapper.java.additional.16=-Dcom.sun.management.jmxremote.authenticate=false
wrapper.java.additional.17=-Dcom.sun.management.jmxremote.port=31418
wrapper.java.additional.18=-Dcom.sun.management.jmxremote.ssl=false
wrapper.java.additional.19=-Djava.rmi.server.hostname=localhost
wrapper.java.additional.18=-Dcom.sun.management.jmxremote.rmi.port=31418
wrapper.java.additional.19=-Dcom.sun.management.jmxremote.ssl=false
wrapper.java.additional.20=-Djava.rmi.server.hostname=localhost

# Initial Java Heap Size (in MB)
wrapper.java.initmemory=256
Expand Down

0 comments on commit a9da3d2

Please sign in to comment.