Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds into 3.9
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jul 26, 2017
2 parents 8d2dc8a + 84a6f89 commit 058ae25
Show file tree
Hide file tree
Showing 12 changed files with 194 additions and 62 deletions.
1 change: 1 addition & 0 deletions symmetric-assemble/src/.gitignore
@@ -0,0 +1 @@
/.DS_Store
Expand Up @@ -11,4 +11,6 @@ private ContextConstants() {

public static final String GUID = "guid";

public static final String FILE_SYNC_FAST_SCAN_TRACK_TIME = "file.sync.fast.scan.track.time";

}
Expand Up @@ -72,7 +72,7 @@ public FileTriggerFileModifiedListener(FileTriggerRouter fileTriggerRouter, Date

public void onStart(final FileAlterationObserver observer) {
long lastModified = observer.getDirectory().lastModified();
if ((fromDate != null && lastModified > fromDate.getTime()) && lastModified <= toDate.getTime()) {
if (fromDate == null || ((fromDate != null && lastModified > fromDate.getTime()) && lastModified <= toDate.getTime())) {
modifiedDirs.put(".", new DirectorySnapshot(fileTriggerRouter));
}
}
Expand Down
Expand Up @@ -90,6 +90,9 @@ public class ConfigurationChangedDatabaseWriterFilter extends DatabaseWriterFilt

final String CTX_KEY_REINITIALIZED = "Reinitialized."
+ ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode();

final String CTX_KEY_FILE_SYNC_TRIGGERS_NEEDED = "FileSyncTriggers."
+ ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode();

private ISymmetricEngine engine;

Expand Down Expand Up @@ -127,6 +130,7 @@ public void afterWrite(DataContext context, Table table, CsvData data) {
recordConflictFlushNeeded(context, table);
recordNodeSecurityFlushNeeded(context, table);
recordNodeFlushNeeded(context, table, data);
recordFileSyncTriggersNeeded(context, table, data);
}

private void recordGroupletFlushNeeded(DataContext context, Table table) {
Expand Down Expand Up @@ -218,6 +222,12 @@ private void recordNodeFlushNeeded(DataContext context, Table table, CsvData dat
}
}
}

private void recordFileSyncTriggersNeeded(DataContext context, Table table, CsvData data) {
if (isFileSyncTriggersNeeded(table, data)) {
context.put(CTX_KEY_FILE_SYNC_TRIGGERS_NEEDED, true);
}
}

private boolean isSyncTriggersNeeded(DataContext context, Table table) {
boolean autoSync = engine.getParameterService().is(ParameterConstants.AUTO_SYNC_TRIGGERS_AFTER_CONFIG_LOADED) ||
Expand Down Expand Up @@ -257,6 +267,12 @@ private boolean isJobManagerRestartNeeded(Table table, CsvData data) {
&& data.getCsvData(CsvData.ROW_DATA) != null
&& data.getCsvData(CsvData.ROW_DATA).contains("job.");
}

private boolean isFileSyncTriggersNeeded(Table table, CsvData data) {
return matchesTable(table, TableConstants.SYM_PARAMETER)
&& data.getCsvData(CsvData.ROW_DATA) != null
&& data.getCsvData(CsvData.ROW_DATA).contains("file.sync.enable");
}

private boolean isTransformFlushNeeded(Table table) {
return matchesTable(table, TableConstants.SYM_TRANSFORM_COLUMN)
Expand Down Expand Up @@ -319,6 +335,14 @@ public void syncEnded(DataContext context, List<IncomingBatch> batchesProcessed,
context.remove(CTX_KEY_RESYNC_TABLE_NEEDED);
}

if (context.get(CTX_KEY_FILE_SYNC_TRIGGERS_NEEDED) != null
&& parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
log.info("About to syncTriggers for file snapshot because the file sync parameter has changed");
Table fileSnapshotTable = engine.getDatabasePlatform()
.getTableFromCache(TableConstants.getTableName(engine.getTablePrefix(), TableConstants.SYM_FILE_SNAPSHOT), false);
engine.getTriggerRouterService().syncTriggers(fileSnapshotTable, false);
context.remove(CTX_KEY_FILE_SYNC_TRIGGERS_NEEDED);
}
}

@Override
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Table;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.Constants;
Expand Down Expand Up @@ -101,6 +102,9 @@ public class ConfigurationChangedDataRouter extends AbstractDataRouter implement

final String CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED = "FlushNodeGroupLink."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

final String CTX_KEY_FILE_SYNC_TRIGGERS_NEEDED = "FileSyncTriggers."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

public final static String KEY = "symconfig";

Expand Down Expand Up @@ -471,6 +475,11 @@ protected void queueSyncTriggers(SimpleRouterContext routingContext, DataMetaDat
} else if (tableMatches(dataMetaData, TableConstants.SYM_ROUTER)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_GROUP_LINK)) {
routingContext.put(CTX_KEY_RESYNC_NEEDED, Boolean.TRUE);
} else if (tableMatches(dataMetaData, TableConstants.SYM_PARAMETER)) {
if (dataMetaData.getData().getCsvData(CsvData.ROW_DATA) != null
&& dataMetaData.getData().getCsvData(CsvData.ROW_DATA).contains("file.sync.enable")) {
routingContext.put(CTX_KEY_FILE_SYNC_TRIGGERS_NEEDED, Boolean.TRUE);
}
}

}
Expand Down Expand Up @@ -668,6 +677,14 @@ public void contextCommitted(SimpleRouterContext routingContext) {
engine.getConfigurationService().clearCache();
engine.getNodeService().flushNodeGroupCache();
}

if (routingContext.get(CTX_KEY_FILE_SYNC_TRIGGERS_NEEDED) != null
&& engine.getParameterService().is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
log.info("About to syncTriggers for file snapshot because the file sync parameter has changed");
Table fileSnapshotTable = engine.getDatabasePlatform()
.getTableFromCache(TableConstants.getTableName(engine.getTablePrefix(), TableConstants.SYM_FILE_SNAPSHOT), false);
engine.getTriggerRouterService().syncTriggers(fileSnapshotTable, false);
}

}
}
Expand Down
Expand Up @@ -1476,10 +1476,10 @@ public RemoteNodeStatuses queueWork(boolean force) {
if (identity != null) {
if (force || clusterService.lock(ClusterConstants.INITIAL_LOAD_EXTRACT)) {
try {
Map<String, String> nodes = getExtractRequestNodes();
for (Map.Entry<String, String> entry : nodes.entrySet()) {
queue(entry.getKey(), entry.getValue(), statuses);
}
List<NodeQueuePair> nodes = getExtractRequestNodes();
for (NodeQueuePair pair : nodes) {
queue(pair.getNodeId(), pair.getQueue(), statuses);
}
} finally {
if (!force) {
clusterService.unlock(ClusterConstants.INITIAL_LOAD_EXTRACT);
Expand All @@ -1501,17 +1501,44 @@ protected void queue(String nodeId, String queue, RemoteNodeStatuses statuses) {
}
}

public Map<String, String> getExtractRequestNodes() {
return sqlTemplate.queryForMap(getSql("selectNodeIdsForExtractSql"), "node_id", "queue",
public List<NodeQueuePair> getExtractRequestNodes() {
return sqlTemplate.query(getSql("selectNodeIdsForExtractSql"), new NodeQueuePairMapper(),
ExtractStatus.NE.name());
}

private class NodeQueuePair {
private String nodeId;
private String queue;
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
}

class NodeQueuePairMapper implements ISqlRowMapper<NodeQueuePair> {
@Override
public NodeQueuePair mapRow(Row row) {
NodeQueuePair pair = new NodeQueuePair();
pair.setNodeId(row.getString("node_id"));
pair.setQueue(row.getString("queue"));
return pair;
}
}

public List<ExtractRequest> getExtractRequestsForNode(NodeCommunication nodeCommunication) {
return sqlTemplate.query(getSql("selectExtractRequestForNodeSql"),
new ExtractRequestMapper(), nodeCommunication.getNodeId(), nodeCommunication.getQueue()
, ExtractRequest.ExtractStatus.NE.name());
}

@Override
public void resetExtractRequest(OutgoingBatch batch) {
ISqlTransaction transaction = null;
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ContextConstants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.file.DirectorySnapshot;
Expand All @@ -69,7 +70,6 @@
import org.jumpmind.symmetric.model.FileTrigger;
import org.jumpmind.symmetric.model.FileTriggerRouter;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.Lock;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeCommunication;
import org.jumpmind.symmetric.model.NodeCommunication.CommunicationType;
Expand Down Expand Up @@ -178,9 +178,15 @@ protected void trackChanges(ProcessInfo processInfo, boolean useCrc) {
}

protected void trackChangesFastScan(ProcessInfo processInfo, boolean useCrc) {
long ctxTime = engine.getContextService().getLong(ContextConstants.FILE_SYNC_FAST_SCAN_TRACK_TIME);
Date ctxDate = new Date(ctxTime);
if (ctxTime == 0) {
ctxDate = null;
}
Date currentDate = new Date();

boolean isLocked = engine.getClusterService().lock(ClusterConstants.FILE_SYNC_SCAN);
Lock lock = engine.getClusterService().findLocks().get(ClusterConstants.FILE_SYNC_SCAN);
log.debug("File tracker range of " + lock.getLastLockTime() + " to " + lock.getLockTime() + ", isLocked=" + isLocked);
log.debug("File tracker range of " + ctxDate + " to " + currentDate + ", isLocked=" + isLocked);
int maxRowsBeforeCommit = engine.getParameterService().getInt(ParameterConstants.DATA_LOADER_MAX_ROWS_BEFORE_COMMIT);

try {
Expand All @@ -189,8 +195,8 @@ protected void trackChangesFastScan(ProcessInfo processInfo, boolean useCrc) {
if (fileTriggerRouter.isEnabled()) {
FileAlterationObserver observer = new FileAlterationObserver(fileTriggerRouter.getFileTrigger().getBaseDir(),
fileTriggerRouter.getFileTrigger().createIOFileFilter());
FileTriggerFileModifiedListener listener = new FileTriggerFileModifiedListener(fileTriggerRouter, lock.getLastLockTime(),
lock.getLockTime(), processInfo, useCrc, new FileModifiedCallback(maxRowsBeforeCommit) {
FileTriggerFileModifiedListener listener = new FileTriggerFileModifiedListener(fileTriggerRouter, ctxDate,
currentDate, processInfo, useCrc, new FileModifiedCallback(maxRowsBeforeCommit) {
public void commit(DirectorySnapshot dirSnapshot) {
saveDirectorySnapshot(fileTriggerRouter, dirSnapshot);
}
Expand All @@ -201,6 +207,7 @@ public DirectorySnapshot getLastDirectorySnapshot(String relativeDir) {
}, engine);
observer.addListener(listener);
observer.checkAndNotify();
engine.getContextService().save(ContextConstants.FILE_SYNC_FAST_SCAN_TRACK_TIME, String.valueOf(currentDate.getTime()));
}
}
engine.getClusterService().unlock(ClusterConstants.FILE_SYNC_SCAN);
Expand Down
Expand Up @@ -575,6 +575,36 @@ public void test12RegisteredDataWriterFilter() {
.getExtensionService().getExtensionPointMap(IDatabaseWriterFilter.class).get("registeredNodeGroupTestDataFilter");
assertTrue(registeredNodeGroupFilter.getNumberOfTimesCalled() > 0);
}

@Test
public void test13NanValue() throws Exception {
String[] insertValues = new String[TEST_COLUMNS.length];
insertValues[2] = insertValues[4] = "incoming test";
insertValues[9] = "NaN";

ByteArrayOutputStream out = new ByteArrayOutputStream();
CsvWriter writer = getWriter(out);
writer.writeRecord(new String[] { CsvConstants.NODEID,
TestConstants.TEST_CLIENT_EXTERNAL_ID });
writer.writeRecord(new String[] { CsvConstants.CHANNEL,
TestConstants.TEST_CHANNEL_ID });
String nextBatchId = getNextBatchId();
writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId });
writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);

insertValues[0] = getNextId();
writer.write(CsvConstants.INSERT);
writer.writeRecord(insertValues, true);

writer.writeRecord(new String[] { CsvConstants.COMMIT, nextBatchId });
writer.close();
load(out);

IncomingBatch batch = getIncomingBatchService().findIncomingBatch(batchId,
TestConstants.TEST_CLIENT_EXTERNAL_ID);
assertEquals(batch.getStatus(), IncomingBatch.Status.OK, "Wrong status. " + printDatabase());
assertEquals(batch.getChannelId(), TestConstants.TEST_CHANNEL_ID, "Wrong channel. " + printDatabase());
}

protected CsvWriter getWriter(OutputStream out) {
CsvWriter writer = new CsvWriter(new OutputStreamWriter(out), ',');
Expand Down
Expand Up @@ -456,11 +456,17 @@ protected Object parseFloat(String value) {
}

protected Object parseBigDecimal(String value) {
value = cleanNumber(value);
/*
* In the case of a 'NaN' value, return a String
*/
if (value != null && value.equals("NaN")) {
return value;
}
/*
* The number will have either one period or one comma for the decimal
* point, but we need a period
*/
value = cleanNumber(value);
return new BigDecimal(value.replace(',', '.'));
}

Expand Down Expand Up @@ -652,11 +658,11 @@ public Map<String, String> parseQualifiedTableName(String tableName) {
}

public Table makeAllColumnsPrimaryKeys(Table table) {
Table result = table.copy();
Table result = table.copy();
for (Column column : result.getColumns()) {
if (!isLob(column.getMappedTypeCode())) {
column.setPrimaryKey(true);
}
if (!isLob(column.getMappedTypeCode())) {
column.setPrimaryKey(true);
}
}
return result;
}
Expand Down
Expand Up @@ -20,12 +20,17 @@
*/
package org.jumpmind.db.platform.postgresql;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;

import javax.sql.DataSource;

import org.jumpmind.db.platform.DatabaseInfo;
import org.jumpmind.db.sql.JdbcSqlTemplate;
import org.jumpmind.db.sql.SqlTemplateSettings;
import org.jumpmind.db.sql.SymmetricLobHandler;
import org.springframework.jdbc.core.StatementCreatorUtils;

public class PostgreSqlJdbcSqlTemplate extends JdbcSqlTemplate {

Expand Down Expand Up @@ -54,4 +59,9 @@ public String getSelectLastInsertIdSql(String sequenceName) {
protected boolean allowsNullForIdentityColumn() {
return false;
}

@Override
protected void setNanOrNull(PreparedStatement ps, int i, Object arg, int argType) throws SQLException {
StatementCreatorUtils.setParameterValue(ps, i, Types.FLOAT, Float.NaN);
}
}

0 comments on commit 058ae25

Please sign in to comment.