Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.9
  • Loading branch information
jumpmind-josh committed May 10, 2018
2 parents fcf6cbb + e18734a commit 927d5e4
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 110 deletions.
Expand Up @@ -276,6 +276,15 @@ protected boolean checkPrerequsites(boolean force) {
}
}

if (jobDefinition.getNodeGroupId() != null
&& !jobDefinition.getNodeGroupId().equals("ALL")
&& !jobDefinition.getNodeGroupId().equals(engine.getNodeService().findIdentity().getNodeGroupId())){
log.info("Job should be only run on node groups '{}' but this is '{}'",
jobDefinition.getNodeGroupId(),
engine.getNodeService().findIdentity().getNodeGroupId());
return false;
}

return true;
}

Expand Down
Expand Up @@ -65,7 +65,7 @@ protected AbstractJob instantiateJavaJob(JobDefinition jobDefinition, ISymmetric
return (AbstractJob) constructor.newInstance(jobDefinition.getJobName(), engine, taskScheduler);
}
}
// look for 3 arg constructor of engine, taskScheduler.
// look for 2 arg constructor of engine, taskScheduler.
for (Constructor constructor : constructors) {
if (constructor.getParameterTypes().length == 2
&& constructor.getParameterTypes()[0].isAssignableFrom(ISymmetricEngine.class)
Expand Down
Expand Up @@ -110,52 +110,6 @@ public static File createSnapshot(ISymmetricEngine engine) {
tmpDir.mkdirs();
log.info("Creating snapshot file in " + tmpDir.getAbsolutePath());

File logDir = null;

String parameterizedLogDir = parameterService.getString("server.log.dir");
if (isNotBlank(parameterizedLogDir)) {
logDir = new File(parameterizedLogDir);
}

if (logDir != null && logDir.exists()) {
log.info("Using server.log.dir setting as the location of the log files");
} else {
logDir = new File("logs");

if (!logDir.exists()) {
Map<File, Layout> matches = findSymmetricLogFile();
if (matches != null && matches.size() == 1) {
logDir = matches.keySet().iterator().next().getParentFile();
}
}

if (!logDir.exists()) {
logDir = new File("../logs");
}

if (!logDir.exists()) {
logDir = new File("target");
}

if (logDir.exists()) {
File[] files = logDir.listFiles();
if (files != null) {
for (File file : files) {
String lowerCaseFileName = file.getName().toLowerCase();
if (lowerCaseFileName.contains(".log")
&& (lowerCaseFileName.contains("symmetric") || lowerCaseFileName.contains("wrapper"))) {
try {
FileUtils.copyFileToDirectory(file, tmpDir);
} catch (IOException e) {
log.warn("Failed to copy " + file.getName() + " to the snapshot directory", e);
}
}
}
}
}

}

FileWriter fwriter = null;
try {
fwriter = new FileWriter(new File(tmpDir, "config-export.csv"));
Expand Down Expand Up @@ -270,6 +224,7 @@ public static File createSnapshot(ISymmetricEngine engine) {
extract(export, 5000, "order by relative_dir, file_name", new File(tmpDir, "sym_file_snapshot.csv"),
TableConstants.getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT));

export.setIgnoreMissingTables(true);
extract(export, new File(tmpDir, "sym_console_event.csv"),
TableConstants.getTableName(tablePrefix, TableConstants.SYM_CONSOLE_EVENT));

Expand Down Expand Up @@ -375,6 +330,52 @@ public static File createSnapshot(ISymmetricEngine engine) {
IOUtils.closeQuietly(fos);
}

File logDir = null;

String parameterizedLogDir = parameterService.getString("server.log.dir");
if (isNotBlank(parameterizedLogDir)) {
logDir = new File(parameterizedLogDir);
}

if (logDir != null && logDir.exists()) {
log.info("Using server.log.dir setting as the location of the log files");
} else {
logDir = new File("logs");
}

if (!logDir.exists()) {
Map<File, Layout> matches = findSymmetricLogFile();
if (matches != null && matches.size() == 1) {
logDir = matches.keySet().iterator().next().getParentFile();
}
}

if (!logDir.exists()) {
logDir = new File("../logs");
}

if (!logDir.exists()) {
logDir = new File("target");
}

if (logDir.exists()) {
log.info("Copying log files into snapshot file");
File[] files = logDir.listFiles();
if (files != null) {
for (File file : files) {
String lowerCaseFileName = file.getName().toLowerCase();
if (lowerCaseFileName.contains(".log")
&& (lowerCaseFileName.contains("symmetric") || lowerCaseFileName.contains("wrapper"))) {
try {
FileUtils.copyFileToDirectory(file, tmpDir);
} catch (IOException e) {
log.warn("Failed to copy " + file.getName() + " to the snapshot directory", e);
}
}
}
}
}

File jarFile = null;
try {
jarFile = new File(getSnapshotDirectory(engine), tmpDir.getName() + ".zip");
Expand Down Expand Up @@ -541,9 +542,12 @@ public synchronized Enumeration<Object> keys() {
runtimeProperties.setProperty("data.id.min", Long.toString(engine.getDataService().findMinDataId()));
runtimeProperties.setProperty("data.id.max", Long.toString(engine.getDataService().findMaxDataId()));

runtimeProperties.put("jvm.title", Runtime.class.getPackage().getImplementationTitle());
runtimeProperties.put("jvm.vendor", Runtime.class.getPackage().getImplementationVendor());
runtimeProperties.put("jvm.version", Runtime.class.getPackage().getImplementationVersion());
String jvmTitle = Runtime.class.getPackage().getImplementationTitle();
runtimeProperties.put("jvm.title", jvmTitle != null ? jvmTitle : "Unknown");
String jvmVendor = Runtime.class.getPackage().getImplementationVendor();
runtimeProperties.put("jvm.vendor", jvmVendor != null ? jvmVendor : "Unknown");
String jvmVersion = Runtime.class.getPackage().getImplementationVersion();
runtimeProperties.put("jvm.version", jvmVersion != null ? jvmVersion : "Unknown");
RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean();
List<String> arguments = runtimeMxBean.getInputArguments();
runtimeProperties.setProperty("jvm.arguments", arguments.toString());
Expand Down
Expand Up @@ -340,11 +340,19 @@ protected ISqlReadCursor<Data> prepareCursor() {

this.currentGap = dataGaps.remove(0);

return sqlTemplate.queryForCursor(sql, new ISqlRowMapper<Data>() {
ISqlRowMapper<Data> dataMapper = new ISqlRowMapper<Data>() {
public Data mapRow(Row row) {
return engine.getDataService().mapData(row);
}
}, args, types);
};

try {
return sqlTemplate.queryForCursor(sql, dataMapper, args, types);
} catch (RuntimeException e) {
log.info("Failed to execute query, but will try again,", e);
AppUtils.sleep(1000);
return sqlTemplate.queryForCursor(sql, dataMapper, args, types);
}

}

Expand Down
Expand Up @@ -55,6 +55,7 @@
import org.jumpmind.db.sql.mapper.NumberMapper;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
Expand Down Expand Up @@ -520,9 +521,15 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa

if (reloadRequests != null && reloadRequests.size() > 0) {
for (TableReloadRequest request : reloadRequests) {
transaction.prepareAndExecute(getSql("updateProcessedTableReloadRequest"), loadId, new Date(),
int rowsAffected = transaction.prepareAndExecute(getSql("updateProcessedTableReloadRequest"), loadId, new Date(),
request.getTargetNodeId(), request.getSourceNodeId(), request.getTriggerId(),
request.getRouterId(), request.getCreateTime());
request.getRouterId(), request.getCreateTime());
if (rowsAffected == 0) {
throw new SymmetricException(String.format("Failed to update a table_reload_request for loadId '%s' "
+ "targetNodeId '%s' sourceNodeId '%s' triggerId '%s' routerId '%s' createTime '%s'",
loadId, request.getTargetNodeId(), request.getSourceNodeId(), request.getTriggerId(),
request.getRouterId(), request.getCreateTime()));
}
}
log.info("Table reload request(s) for load id " + loadId + " have been processed.");
}
Expand Down
Expand Up @@ -78,7 +78,19 @@ public long purgeOutgoing(boolean force) {
Calendar retentionCutoff = Calendar.getInstance();
retentionCutoff.add(Calendar.MINUTE,
-parameterService.getInt(ParameterConstants.PURGE_RETENTION_MINUTES));
rowsPurged += purgeOutgoing(retentionCutoff, force);

try {
rowsPurged += purgeOutgoing(retentionCutoff, force);
} catch (Exception ex) {
try {
log.info("Failed to execute purge, but will try again,", ex);
rowsPurged += purgeOutgoing(retentionCutoff, force);
} catch (Exception e) {
log.error("Failed to execute purge, so aborting,", ex);
}
} finally {
log.info("The outgoing purge process has completed");
}

List<IPurgeListener> purgeListeners = extensionService.getExtensionPointList(IPurgeListener.class);
for (IPurgeListener purgeListener : purgeListeners) {
Expand All @@ -104,35 +116,29 @@ public long purgeIncoming(boolean force) {

public long purgeOutgoing(Calendar retentionCutoff, boolean force) {
long rowsPurged = 0;
try {
if (force || clusterService.lock(ClusterConstants.PURGE_OUTGOING)) {
try {
log.info("The outgoing purge process is about to run for data older than {}",
SimpleDateFormat.getDateTimeInstance()
.format(retentionCutoff.getTime()));
// VoltDB doesn't support capture, or subselects. So we'll just be purging heartbeats
// by date here.
if (getSymmetricDialect().getName().equalsIgnoreCase(DatabaseNamesConstants.VOLTDB)) {
rowsPurged += purgeOutgoingByRetentionCutoff(retentionCutoff);
} else {
rowsPurged += purgeStrandedBatches();
rowsPurged += purgeDataRows(retentionCutoff);
rowsPurged += purgeOutgoingBatch(retentionCutoff);
rowsPurged += purgeStranded(retentionCutoff);
rowsPurged += purgeExtractRequests();
}
} finally {
if (!force) {
clusterService.unlock(ClusterConstants.PURGE_OUTGOING);
}
if (force || clusterService.lock(ClusterConstants.PURGE_OUTGOING)) {
try {
log.info("The outgoing purge process is about to run for data older than {}",
SimpleDateFormat.getDateTimeInstance()
.format(retentionCutoff.getTime()));
// VoltDB doesn't support capture, or subselects. So we'll just be purging heartbeats
// by date here.
if (getSymmetricDialect().getName().equalsIgnoreCase(DatabaseNamesConstants.VOLTDB)) {
rowsPurged += purgeOutgoingByRetentionCutoff(retentionCutoff);
} else {
rowsPurged += purgeStrandedBatches();
rowsPurged += purgeDataRows(retentionCutoff);
rowsPurged += purgeOutgoingBatch(retentionCutoff);
rowsPurged += purgeStranded(retentionCutoff);
rowsPurged += purgeExtractRequests();
}
} finally {
if (!force) {
clusterService.unlock(ClusterConstants.PURGE_OUTGOING);
}
} else {
log.debug("Could not get a lock to run an outgoing purge");
}
} catch (Exception ex) {
log.error("", ex);
} finally {
log.info("The outgoing purge process has completed");
} else {
log.debug("Could not get a lock to run an outgoing purge");
}
return rowsPurged;
}
Expand Down
Expand Up @@ -31,14 +31,7 @@
import java.sql.Array;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.*;
import java.util.regex.Pattern;

import javax.sql.DataSource;
Expand Down Expand Up @@ -95,7 +88,7 @@ public abstract class AbstractDatabasePlatform implements IDatabasePlatform {

protected IDdlBuilder ddlBuilder;

protected Map<String, Table> tableCache = new HashMap<String, Table>();
protected Map<String, Table> tableCache = Collections.synchronizedMap(new HashMap<String, Table>());

private long lastTimeCachedModelClearedInMs = System.currentTimeMillis();

Expand Down Expand Up @@ -315,10 +308,8 @@ public Table readTableFromDatabase(String catalogName, String schemaName, String
}

public void resetCachedTableModel() {
synchronized (this.getClass()) {
this.tableCache = new HashMap<String, Table>();
lastTimeCachedModelClearedInMs = System.currentTimeMillis();
}
this.tableCache = Collections.synchronizedMap(new HashMap<String, Table>());
lastTimeCachedModelClearedInMs = System.currentTimeMillis();
}

public Table getTableFromCache(String tableName, boolean forceReread) {
Expand All @@ -333,16 +324,14 @@ public Table getTableFromCache(String catalogName, String schemaName, String tab
String key = Table.getFullyQualifiedTableName(catalogName, schemaName, tableName);
Table retTable = model != null ? model.get(key) : null;
if (retTable == null || forceReread) {
synchronized (this.getClass()) {
try {
Table table = readTableFromDatabase(catalogName, schemaName, tableName);
tableCache.put(key, table);
retTable = table;
} catch (RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
try {
Table table = readTableFromDatabase(catalogName, schemaName, tableName);
tableCache.put(key, table);
retTable = table;
} catch (RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
return retTable;
Expand Down
Expand Up @@ -51,7 +51,10 @@ public SqliteJdbcSqlTemplate(DataSource dataSource, SqlTemplateSettings settings
@Override
public boolean isUniqueKeyViolation(Throwable ex) {
SQLException sqlEx = findSQLException(ex);
return (sqlEx != null && sqlEx.getMessage() != null && sqlEx.getMessage().contains("[SQLITE_CONSTRAINT]"));
return (sqlEx != null && sqlEx.getMessage() != null && (
sqlEx.getMessage().contains("[SQLITE_CONSTRAINT]") ||
sqlEx.getMessage().contains("[SQLITE_CONSTRAINT_PRIMARYKEY]") ||
sqlEx.getMessage().contains("[SQLITE_CONSTRAINT_UNIQUE]")));
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions symmetric-server/src/main/deploy/samples/corp-000.properties
Expand Up @@ -40,7 +40,7 @@ db.driver=org.h2.Driver
# The JDBC URL used to connect to the database
#db.url=jdbc:mysql://localhost/corp?tinyInt1isBit=false
#db.url=jdbc:oracle:thin:@127.0.0.1:1521:corp
#db.url=jdbc:postgresql://localhost/corp?protocolVersion=2&stringtype=unspecified
#db.url=jdbc:postgresql://localhost/corp?stringtype=unspecified
#db.url=jdbc:derby:corp;create=true
#db.url=jdbc:hsqldb:file:corp;shutdown=true
#db.url=jdbc:jtds:sqlserver://localhost:1433;useCursors=true;bufferMaxMemory=10240;lobBuffer=5242880
Expand Down Expand Up @@ -76,4 +76,4 @@ job.push.period.time.ms=10000
# This is how often the pull job will be run.
job.pull.period.time.ms=10000
# Kick off initial load
initial.load.create.first=true
initial.load.create.first=true
Expand Up @@ -40,7 +40,7 @@ db.driver=org.h2.Driver
# The JDBC URL used to connect to the database
#db.url=jdbc:mysql://localhost/store001?tinyInt1isBit=false
#db.url=jdbc:oracle:thin:@127.0.0.1:1521:store001
#db.url=jdbc:postgresql://localhost/store001?protocolVersion=2&stringtype=unspecified
#db.url=jdbc:postgresql://localhost/store001?stringtype=unspecified
#db.url=jdbc:derby:store001;create=true
#db.url=jdbc:hsqldb:file:store001;shutdown=true
#db.url=jdbc:jtds:sqlserver://localhost:1433/store001;useCursors=true;bufferMaxMemory=10240;lobBuffer=5242880
Expand Down
Expand Up @@ -40,7 +40,7 @@ db.driver=org.h2.Driver
# The JDBC URL used to connect to the database
#db.url=jdbc:mysql://localhost/store002?tinyInt1isBit=false
#db.url=jdbc:oracle:thin:@127.0.0.1:1521:store002
#db.url=jdbc:postgresql://localhost/store002?protocolVersion=2&stringtype=unspecified
#db.url=jdbc:postgresql://localhost/store002?stringtype=unspecified
#db.url=jdbc:derby:store002;create=true
#db.url=jdbc:hsqldb:file:store002;shutdown=true
#db.url=jdbc:jtds:sqlserver://localhost:1433/store002;useCursors=true;bufferMaxMemory=10240;lobBuffer=5242880
Expand Down

0 comments on commit 927d5e4

Please sign in to comment.