Navigation Menu

Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.9
  • Loading branch information
jumpmind-josh committed May 25, 2018
2 parents 47042f6 + 86d2696 commit d190861
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 27 deletions.
Expand Up @@ -645,22 +645,10 @@ public synchronized boolean start(boolean startJobs) {
setup();
if (isConfigured()) {
Node node = nodeService.findIdentity();
if (node != null && (!node.getExternalId().equals(getParameterService().getExternalId())
|| !node.getNodeGroupId().equals(
getParameterService().getNodeGroupId()))) {
if (parameterService.is(ParameterConstants.NODE_COPY_MODE_ENABLED,
false)) {
registrationService.requestNodeCopy();
} else {
throw new SymmetricException(
"The configured state does not match recorded database state. The recorded external id is '%s' while the configured external id is '%s'. The recorded node group id is '%s' while the configured node group id is '%s'",
new Object[] { node.getExternalId(),
getParameterService().getExternalId(),
node.getNodeGroupId(),
getParameterService().getNodeGroupId() });
}
isInitialized = true;
} else if (node != null) {
checkSystemIntegrity(node);
isInitialized = true;

if (node != null) {

log.info(
"Starting registered node [group={}, id={}, nodeId={}]",
Expand Down Expand Up @@ -738,6 +726,31 @@ public synchronized boolean start(boolean startJobs) {
return started;
}

protected void checkSystemIntegrity(Node node) {
if (node != null && (!node.getExternalId().equals(getParameterService().getExternalId())
|| !node.getNodeGroupId().equals(getParameterService().getNodeGroupId()))) {
if (parameterService.is(ParameterConstants.NODE_COPY_MODE_ENABLED, false)) {
registrationService.requestNodeCopy();
} else {
throw new SymmetricException(
"The configured state does not match recorded database state. The recorded external id is '%s' while the configured external id is '%s'. The recorded node group id is '%s' while the configured node group id is '%s'",
new Object[] { node.getExternalId(),
getParameterService().getExternalId(),
node.getNodeGroupId(),
getParameterService().getNodeGroupId() });
}
}

boolean useExtractJob = parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB, true);
boolean streamToFile = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED, false);
if (useExtractJob && !streamToFile) {
throw new SymmetricException(String.format("Node '%s' is configured with confilcting parameters which may result in replication stopping and/or empty load batches. "
+ "One of these two parameters needs to be changed: %s=%s and %s=%s",
node.getNodeId(), ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB, useExtractJob, ParameterConstants.STREAM_TO_FILE_ENABLED,
streamToFile));
}
}

public String getEngineDescription(String msg) {
if (lastRestartTime == null) {
return "";
Expand Down
Expand Up @@ -383,9 +383,9 @@ public void processTableRequestLoads(Node source, ProcessInfo processInfo, Map<
}
}
} else {
log.error("Can't process load for '{}' because of confilcting parameters: {}={} and {}={}", load.getTargetNodeId(),
ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB, useExtractJob, ParameterConstants.STREAM_TO_FILE_ENABLED,
streamToFile);
throw new SymmetricException(String.format("Node '%s' can't process load for '%s' because of confilcting parameters: %s=%s and %s=%s",
source.getNodeId(), load.getTargetNodeId(), ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB, useExtractJob, ParameterConstants.STREAM_TO_FILE_ENABLED,
streamToFile));
}
}

Expand Down
Expand Up @@ -821,14 +821,8 @@ protected void processTableStructureChanges(Database currentModel, Database desi

Table realTargetTable = getRealTargetTableFor(desiredModel, sourceTable, targetTable);

dropTemporaryTable(tempTable, ddl);
createTemporaryTable(tempTable, ddl);
writeCopyDataStatement(sourceTable, tempTable, ddl);
/*
* Note that we don't drop the indices here because the DROP TABLE will take
* care of that Likewise, foreign keys have already been dropped as necessary
*/
dropTable(sourceTable, ddl, false, true);
renameTable(sourceTable, tempTable, ddl);

createTable(realTargetTable, ddl, false, true);
if (canMigrateData) {
writeCopyDataStatement(tempTable, targetTable, ddl);
Expand All @@ -839,6 +833,18 @@ protected void processTableStructureChanges(Database currentModel, Database desi
ddl.append(tableDdl);
}
}

protected void renameTable(Table sourceTable, Table tempTable, StringBuilder ddl) {
dropTemporaryTable(tempTable, ddl);
createTemporaryTable(tempTable, ddl);
writeCopyDataStatement(sourceTable, tempTable, ddl);
/*
* Note that we don't drop the indices here because the DROP
* TABLE will take care of that Likewise, foreign keys have
* already been dropped as necessary
*/
dropTable(sourceTable, ddl, false, true);
}

protected Database copy(Database currentModel) {
try {
Expand Down
Expand Up @@ -22,7 +22,11 @@

import java.sql.Connection;
import java.sql.Types;
import java.util.Collection;
import java.util.Iterator;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.alter.*;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.ForeignKey;
Expand Down Expand Up @@ -167,6 +171,13 @@ protected String mapDefaultValue(Object defaultValue, int typeCode) {
return super.mapDefaultValue(defaultValue, typeCode);
}

@Override
protected void writeColumnDefaultValue(Table table, Column column, StringBuilder ddl) {
ddl.append("(");
super.writeColumnDefaultValue(table, column, ddl);
ddl.append(")");
}

@Override
protected void createTable(Table table, StringBuilder ddl, boolean temporary, boolean recreate) {
// SQL Lite does not allow auto increment columns on a composite primary key. Solution is to turn off
Expand All @@ -179,4 +190,47 @@ protected void createTable(Table table, StringBuilder ddl, boolean temporary, bo
super.createTable(table, ddl, temporary, recreate);
}

@Override
protected void renameTable(Table sourceTable, Table tempTable, StringBuilder ddl) {
dropTemporaryTable(tempTable, ddl);

for(IIndex index : sourceTable.getIndices()) {
writeExternalIndexDropStmt(tempTable, index, ddl);
}

ddl.append("ALTER TABLE ");
ddl.append(getFullyQualifiedTableNameShorten(sourceTable));
ddl.append(" RENAME TO ");
ddl.append(getFullyQualifiedTableNameShorten(tempTable));
printEndOfStatement(ddl);
}

@Override
protected void processTableStructureChanges(Database currentModel, Database desiredModel, Collection<TableChange> changes, StringBuilder ddl) {
for (Iterator<TableChange> changeIt = changes.iterator(); changeIt.hasNext();) {
TableChange change = changeIt.next();

if (change instanceof AddColumnChange) {
AddColumnChange addColumnChange = (AddColumnChange) change;
processChange(currentModel, desiredModel, addColumnChange, ddl);
changeIt.remove();
}
}

super.processTableStructureChanges(currentModel, desiredModel, changes, ddl);
}

/*
* Processes the addition of a column to a table.
*/
protected void processChange(Database currentModel, Database desiredModel,
AddColumnChange change, StringBuilder ddl) {
ddl.append("ALTER TABLE ");
ddl.append(getFullyQualifiedTableNameShorten(change.getChangedTable()));
printIndent(ddl);
ddl.append("ADD ");
writeColumn(change.getChangedTable(), change.getNewColumn(), ddl);
printEndOfStatement(ddl);
change.apply(currentModel, delimitedIdentifierModeOn);
}
}

0 comments on commit d190861

Please sign in to comment.