Skip to content

Commit

Permalink
start of syncing ddl
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Dec 4, 2007
1 parent 42d1182 commit cd2f0a6
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 7 deletions.
Expand Up @@ -46,5 +46,7 @@ public class CsvConstants {
public static final String COMMIT = "commit";

public static final String SQL = "sql";

public static final String DDL = "ddl";

}
Expand Up @@ -21,7 +21,10 @@

package org.jumpmind.symmetric.db;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.URL;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
Expand Down Expand Up @@ -415,6 +418,35 @@ public String getCreateSymmetricDDL() {
return platform.getCreateTablesSql(db, true, true);
}

public String getCreateTableSQL(Trigger trig) {
Table table = getMetaDataFor(null, trig.getSourceSchemaName(), trig.getSourceTableName(), true);
String sql = null;
try {
StringWriter buffer = new StringWriter();
platform.getSqlBuilder().setWriter(buffer);
platform.getSqlBuilder().createTable(cachedModel, table);
sql = buffer.toString();
}
catch(IOException e) { }
return sql;
}

public String getCreateTableXML(Trigger trig) {
Table table = getMetaDataFor(null, trig.getSourceSchemaName(), trig.getSourceTableName(), true);
Database db = new Database();
db.addTable(table);
StringWriter buffer = new StringWriter();
DatabaseIO xmlWriter = new DatabaseIO();
xmlWriter.write(db, buffer);
return buffer.toString();
}

public void createTables(String xml) {
StringReader reader = new StringReader(xml);
Database db = new DatabaseIO().read(reader);
platform.createTables(db, true, true);
}

protected boolean prefixConfigDatabase(Database targetTables) {
try {
String tblPrefix = this.tablePrefix.toLowerCase() + "_";
Expand Down
Expand Up @@ -98,6 +98,12 @@ public void initTrigger(DataEventType dml, Trigger config,

public String getCreateSymmetricDDL();

public String getCreateTableXML(Trigger trig);

public String getCreateTableSQL(Trigger trig);

public void createTables(String xml);

public String getSelectLastInsertIdSql(String sequenceName);

public long insertWithGeneratedKey(final String sql, final String sequenceName);
Expand Down
@@ -0,0 +1,16 @@
package org.jumpmind.symmetric.extract.csv;

import java.io.BufferedWriter;
import java.io.IOException;

import org.jumpmind.symmetric.common.csv.CsvConstants;
import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.model.Data;

public class StreamDDLDataCommand extends AbstractStreamDataCommand {

public void execute(BufferedWriter writer, Data data, DataExtractorContext context) throws IOException {
Util.write(writer, CsvConstants.DDL, DELIMITER, data.getRowData());
writer.newLine();
}
}
Expand Up @@ -133,7 +133,11 @@ public void load() throws IOException {
break;
} else if (tokens[0].equals(CsvConstants.SQL)) {
if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
runSql(tokens);
runSql(csvReader.getRawRecord());
}
} else if (tokens[0].equals(CsvConstants.DDL)) {
if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
runDdl(csvReader.getRawRecord());
}
} else if (tokens[0].equals(CsvConstants.BINARY)) {
try {
Expand Down Expand Up @@ -256,10 +260,20 @@ protected int delete(String[] tokens) {
return rows;
}

protected void runSql(String[] tokens) {
protected void runSql(String sql) {
stats.incrementStatementCount();
logger.debug("Running SQL: " + tokens[1]);
jdbcTemplate.execute(tokens[1]);
if (logger.isDebugEnabled()) {
logger.debug("Running SQL: " + sql);
}
jdbcTemplate.execute(sql);
}

protected void runDdl(String xml) {
stats.incrementStatementCount();
if (logger.isDebugEnabled()) {
logger.debug("Running DDL: " + xml);
}
dbDialect.createTables(xml);
}

protected String[] parseKeys(String[] tokens, int startIndex) {
Expand Down
Expand Up @@ -53,7 +53,12 @@ public enum DataEventType implements ICoded {
* An event that indicates that the data payload has a sql statement that needs to be executed.
* This is more of a remote control feature (that would have been very handy in past lives).
*/
SQL("S");
SQL("S"),

/**
* An event that indicates that the data payload is a table creation.
*/
DDL("DDL");

private String code;

Expand All @@ -76,6 +81,8 @@ public static DataEventType getEventType(String s) {
return DataEventType.RELOAD;
} else if (s.equals("S")) {
return DataEventType.SQL;
} else if (s.equals("DDL")) {
return DataEventType.DDL;
}
return null;
}
Expand Down
Expand Up @@ -75,6 +75,8 @@ public class DataService extends AbstractService implements IDataService {
private String insertIntoDataEventSql;

private boolean deleteFirstForReload;

private boolean createFirstForReload;

public void insertReloadEvent(final Node targetNode, final Trigger trigger) {
final TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
Expand All @@ -85,14 +87,24 @@ public void insertReloadEvent(final Node targetNode, final Trigger trigger) {
}

public void createPurgeEvent(final Node targetNode, final Trigger trigger) {
final TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
final String sql = dbDialect.createPurgeSqlFor(targetNode, trigger);
String sql = dbDialect.createPurgeSqlFor(targetNode, trigger);
createSqlEvent(targetNode, trigger, sql);
}

public void createSqlEvent(final Node targetNode, final Trigger trigger, String sql) {
TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
Data data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.SQL, sql, null,
history);
insertDataEvent(data, targetNode.getNodeId());
}

public void createTableEvent(final Node targetNode, final Trigger trigger, String xml) {
TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
Data data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.DDL, xml, null,
history);
insertDataEvent(data, targetNode.getNodeId());
}

public long insertData(final Data data) {
return dbDialect.insertWithGeneratedKey(insertIntoDataSql, "sym_data_data_id_seq",
new PreparedStatementCallback() {
Expand Down Expand Up @@ -140,6 +152,12 @@ public String reloadNode(String nodeId) {
List<Trigger> triggers = configurationService.getActiveTriggersForReload(sourceNode.getNodeGroupId(),
targetNode.getNodeGroupId());

if (createFirstForReload) {
for (Trigger trigger : triggers) {
String xml = dbDialect.getCreateTableXML(trigger);
createTableEvent(targetNode, trigger, xml);
}
}
if (deleteFirstForReload) {
for (ListIterator<Trigger> iterator = triggers.listIterator(triggers.size()); iterator.hasPrevious();) {
Trigger trigger = iterator.previous();
Expand Down Expand Up @@ -288,4 +306,8 @@ public void setDeleteFirstForReload(boolean deleteFirstForReload) {
this.deleteFirstForReload = deleteFirstForReload;
}

public void setCreateFirstForReload(boolean createFirstForReload) {
this.createFirstForReload = createFirstForReload;
}

}
3 changes: 3 additions & 0 deletions symmetric/src/main/resources/symmetric-default.properties
Expand Up @@ -72,6 +72,9 @@ symmetric.runtime.trigger.prefix=
# Set this if tables should be purged prior to an initial load
symmetric.runtime.initial.load.delete.first=false

# Set this if tables should be cerated prior to an initial load
symmetric.runtime.initial.load.create.first=false

# Sets both the connection and read timeout on the internal HttpUrlConnection
symmetric.runtime.http.timeout.ms=600000

Expand Down
2 changes: 2 additions & 0 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -267,6 +267,8 @@
scope="singleton">
<property name="deleteFirstForReload"
value="${symmetric.runtime.initial.load.delete.first}" />
<property name="createFirstForReload"
value="${symmetric.runtime.initial.load.create.first}" />
<property name="jdbcTemplate" ref="jdbcTemplate" />
<property name="configurationService" ref="configurationService" />
<property name="runtimeConfiguration" ref="runtimeConfiguration" />
Expand Down

0 comments on commit cd2f0a6

Please sign in to comment.