diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/DbImport.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/DbImport.java index 19c8f9cb3b..1746236d20 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/DbImport.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/DbImport.java @@ -29,7 +29,6 @@ import javax.sql.DataSource; -import org.jumpmind.db.model.Database; import org.jumpmind.db.model.Table; import org.jumpmind.db.platform.IDatabasePlatform; import org.jumpmind.db.platform.JdbcDatabasePlatformFactory; @@ -158,10 +157,14 @@ protected Conflict buildConflictSettings() { protected DatabaseWriterSettings buildDatabaseWriterSettings() { DatabaseWriterSettings settings = new DatabaseWriterSettings(); - settings.setMaxRowsBeforeCommit(commitRate); + settings.setMaxRowsBeforeCommit(commitRate); settings.setDefaultConflictSetting(buildConflictSettings()); settings.setUsePrimaryKeysFromSource(false); + settings.setAlterTable(alterTables); + settings.setCreateTableDropFirst(dropIfExists); + settings.setCreateTableFailOnError(!forceImport); settings.setDatabaseWriterFilters(databaseWriterFilters); + settings.setCreateTableAlterCaseToMatchDatabaseDefault(alterCaseToMatchDatabaseDefaultCase); if (forceImport) { settings.addErrorHandler(new DatabaseWriterErrorIgnorer()); } @@ -181,24 +184,7 @@ protected void importTablesFromCsv(InputStream in, String tableName) { dataProcessor.process(); } - protected void importTablesFromXml(InputStream in) { - - // TODO should probably handle database creation in xml/data reader writer. - in.mark(Integer.MAX_VALUE); - - Database database = platform.readDatabaseFromXml(in, alterCaseToMatchDatabaseDefaultCase); - if (alterTables) { - platform.alterDatabase(database, forceImport); - } else { - platform.createDatabase(database, dropIfExists, forceImport); - } - - try { - in.reset(); - } catch (IOException e) { - throw new IoException(e); - } - + protected void importTablesFromXml(InputStream in) { XmlDataReader reader = new XmlDataReader(in); DatabaseWriter writer = new DatabaseWriter(platform, buildDatabaseWriterSettings()); DataProcessor dataProcessor = new DataProcessor(reader, writer); diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index e1ed4faecd..45b8c079ab 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -296,7 +296,7 @@ auto.sync.triggers=true # Type: boolean auto.upgrade=true -# Send symmetricds changes to client nodes when configuration changes. +# Capture and send SymmetricDS configuration changes to client nodes. # # Tags: general # Type: boolean @@ -305,6 +305,7 @@ auto.sync.configuration=true # Whether triggers should fire when changes sync into the registration server # from child nodes # +# DatabaseOverridable: true # Tags: general # Type: boolean auto.sync.configuration.on.incoming.at.registration.server=true diff --git a/symmetric-db/src/main/java/org/jumpmind/db/io/DatabaseXmlUtil.java b/symmetric-db/src/main/java/org/jumpmind/db/io/DatabaseXmlUtil.java index 64e3d1e37b..af354ecfdc 100644 --- a/symmetric-db/src/main/java/org/jumpmind/db/io/DatabaseXmlUtil.java +++ b/symmetric-db/src/main/java/org/jumpmind/db/io/DatabaseXmlUtil.java @@ -29,6 +29,7 @@ import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Reader; +import java.io.StringWriter; import java.io.Writer; import org.apache.commons.io.IOUtils; @@ -54,7 +55,7 @@ public class DatabaseXmlUtil { public static final String DTD_PREFIX = "http://db.apache.org/torque/dtd/database"; - + private DatabaseXmlUtil() { } @@ -118,9 +119,6 @@ public static Database read(Reader reader, boolean validate) { try { boolean done = false; Database database = null; - Table table = null; - ForeignKey fk = null; - IIndex index = null; XmlPullParser parser = XmlPullParserFactory.newInstance().newPullParser(); parser.setInput(reader); @@ -142,6 +140,45 @@ public static Database read(Reader reader, boolean validate) { } } } else if (name.equalsIgnoreCase("table")) { + Table table = nextTable(parser); + if (table != null) { + database.addTable(table); + } + } + break; + case XmlPullParser.END_TAG: + name = parser.getName(); + if (name.equalsIgnoreCase("database")) { + done = true; + } + break; + } + eventType = parser.next(); + } + + if (validate) { + database.initialize(); + } + return database; + } catch (XmlPullParserException e) { + throw new IoException(e); + } catch (IOException e) { + throw new IoException(e); + } + } + + public static Table nextTable(XmlPullParser parser) { + try { + Table table = null; + ForeignKey fk = null; + IIndex index = null; + boolean done = false; + int eventType = parser.getEventType(); + while (eventType != XmlPullParser.END_DOCUMENT && !done) { + switch (eventType) { + case XmlPullParser.START_TAG: + String name = parser.getName(); + if (name.equalsIgnoreCase("table")) { table = new Table(); for (int i = 0; i < parser.getAttributeCount(); i++) { String attributeName = parser.getAttributeName(i); @@ -150,7 +187,6 @@ public static Database read(Reader reader, boolean validate) { table.setName(attributeValue); } } - database.addTable(table); } else if (name.equalsIgnoreCase("column")) { Column column = new Column(); for (int i = 0; i < parser.getAttributeCount(); i++) { @@ -239,7 +275,7 @@ public static Database read(Reader reader, boolean validate) { break; case XmlPullParser.END_TAG: name = parser.getName(); - if (name.equalsIgnoreCase("database")) { + if (name.equalsIgnoreCase("table")) { done = true; } else if (name.equalsIgnoreCase("index") || name.equalsIgnoreCase("unique")) { @@ -251,13 +287,13 @@ public static Database read(Reader reader, boolean validate) { } break; } - eventType = parser.next(); - } - if (validate) { - database.initialize(); + if (!done) { + eventType = parser.next(); + } } - return database; + + return table; } catch (XmlPullParserException e) { throw new IoException(e); } catch (IOException e) { @@ -334,6 +370,12 @@ public static void write(Database model, Writer output) { throw new IoException(e); } } + + public static String toXml(Table table) { + StringWriter writer = new StringWriter(); + write(table, writer); + return writer.toString(); + } public static void write(Table table, Writer output) { diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/AbstractDatabasePlatform.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/AbstractDatabasePlatform.java index c545f1b583..b3324228ec 100644 --- a/symmetric-db/src/main/java/org/jumpmind/db/platform/AbstractDatabasePlatform.java +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/AbstractDatabasePlatform.java @@ -633,45 +633,53 @@ public Database readDatabaseFromXml(String filePath, boolean alterCaseToMatchDat } finally { IOUtils.closeQuietly(is); } + } + + public void alterCaseToMatchDatabaseDefaultCase(Database database) { + Table[] tables = database.getTables(); + for (Table table : tables) { + alterCaseToMatchDatabaseDefaultCase(table); + } } + + public void alterCaseToMatchDatabaseDefaultCase(Table table) { + boolean storesUpperCase = isStoresUpperCaseIdentifiers(); + if (!FormatUtils.isMixedCase(table.getName())) { + table.setName(storesUpperCase ? table.getName().toUpperCase() : table.getName() + .toLowerCase()); + } - public Database readDatabaseFromXml(InputStream is, boolean alterCaseToMatchDatabaseDefaultCase) { - InputStreamReader reader = new InputStreamReader(is); - Database database = DatabaseXmlUtil.read(reader); - if (alterCaseToMatchDatabaseDefaultCase) { - boolean storesUpperCase = isStoresUpperCaseIdentifiers(); - Table[] tables = database.getTables(); - for (Table table : tables) { - if (!FormatUtils.isMixedCase(table.getName())) { - table.setName(storesUpperCase ? table.getName().toUpperCase() : table.getName() - .toLowerCase()); - } - - Column[] columns = table.getColumns(); - for (Column column : columns) { - if (!FormatUtils.isMixedCase(column.getName())) { - column.setName(storesUpperCase ? column.getName().toUpperCase() : column - .getName().toLowerCase()); - } - } + Column[] columns = table.getColumns(); + for (Column column : columns) { + if (!FormatUtils.isMixedCase(column.getName())) { + column.setName(storesUpperCase ? column.getName().toUpperCase() : column.getName() + .toLowerCase()); + } + } - IIndex[] indexes = table.getIndices(); - for (IIndex index : indexes) { - if (!FormatUtils.isMixedCase(index.getName())) { - index.setName(storesUpperCase ? index.getName().toUpperCase() : index - .getName().toLowerCase()); - } + IIndex[] indexes = table.getIndices(); + for (IIndex index : indexes) { + if (!FormatUtils.isMixedCase(index.getName())) { + index.setName(storesUpperCase ? index.getName().toUpperCase() : index.getName() + .toLowerCase()); + } - IndexColumn[] indexColumns = index.getColumns(); - for (IndexColumn indexColumn : indexColumns) { - if (!FormatUtils.isMixedCase(indexColumn.getName())) { - indexColumn.setName(storesUpperCase ? indexColumn.getName() - .toUpperCase() : indexColumn.getName().toLowerCase()); - } - } + IndexColumn[] indexColumns = index.getColumns(); + for (IndexColumn indexColumn : indexColumns) { + if (!FormatUtils.isMixedCase(indexColumn.getName())) { + indexColumn.setName(storesUpperCase ? indexColumn.getName().toUpperCase() + : indexColumn.getName().toLowerCase()); } } } + } + + public Database readDatabaseFromXml(InputStream is, boolean alterCaseToMatchDatabaseDefaultCase) { + InputStreamReader reader = new InputStreamReader(is); + Database database = DatabaseXmlUtil.read(reader); + if (alterCaseToMatchDatabaseDefaultCase) { + alterCaseToMatchDatabaseDefaultCase(database); + } return database; } diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/IDatabasePlatform.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/IDatabasePlatform.java index 1a6b2c225c..9ddfedcb18 100644 --- a/symmetric-db/src/main/java/org/jumpmind/db/platform/IDatabasePlatform.java +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/IDatabasePlatform.java @@ -127,6 +127,10 @@ public Object[] getObjectValues(BinaryEncoding encoding, String[] values, public Database readDatabaseFromXml(String filePath, boolean alterCaseToMatchDatabaseDefaultCase); public Database readDatabaseFromXml(InputStream in, boolean alterCaseToMatchDatabaseDefaultCase); + + public void alterCaseToMatchDatabaseDefaultCase(Table table); + + public void alterCaseToMatchDatabaseDefaultCase(Database database); public boolean isLob(int type); diff --git a/symmetric-db/src/test/java/org/jumpmind/db/io/DatabaseIOTest.java b/symmetric-db/src/test/java/org/jumpmind/db/io/DatabaseXmlUtilTest.java similarity index 95% rename from symmetric-db/src/test/java/org/jumpmind/db/io/DatabaseIOTest.java rename to symmetric-db/src/test/java/org/jumpmind/db/io/DatabaseXmlUtilTest.java index 3692aded3a..514191bfba 100644 --- a/symmetric-db/src/test/java/org/jumpmind/db/io/DatabaseIOTest.java +++ b/symmetric-db/src/test/java/org/jumpmind/db/io/DatabaseXmlUtilTest.java @@ -6,7 +6,7 @@ import org.jumpmind.db.model.Table; import org.junit.Test; -public class DatabaseIOTest { +public class DatabaseXmlUtilTest { @Test public void testReadXml() { diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/XmlDataReader.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/XmlDataReader.java index d7f3966cf6..55d4fc7641 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/XmlDataReader.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/XmlDataReader.java @@ -23,17 +23,21 @@ import java.io.IOException; import java.io.InputStream; import java.io.Reader; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import org.apache.commons.io.IOUtils; +import org.jumpmind.db.io.DatabaseXmlUtil; import org.jumpmind.db.model.Column; import org.jumpmind.db.model.Table; import org.jumpmind.db.util.BinaryEncoding; import org.jumpmind.exception.IoException; import org.jumpmind.symmetric.io.data.Batch; import org.jumpmind.symmetric.io.data.CsvData; +import org.jumpmind.symmetric.io.data.CsvUtils; import org.jumpmind.symmetric.io.data.DataContext; import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.io.data.IDataReader; @@ -47,14 +51,12 @@ public class XmlDataReader extends AbstractDataReader implements IDataReader { protected Reader reader; protected DataContext context; protected Batch batch; - protected Table lastTable; protected Table table; - protected CsvData data; protected String sourceNodeId; protected int lineNumber = 0; protected XmlPullParser parser; protected Statistics statistics = new Statistics(); - protected Object next = null; + protected List next = new ArrayList(); public XmlDataReader(InputStream is) { this(toReader(is)); @@ -70,17 +72,19 @@ public void open(DataContext context) { this.context = context; this.parser = XmlPullParserFactory.newInstance().newPullParser(); this.parser.setInput(reader); - this.next = readNext(); + readNext(); } catch (XmlPullParserException e) { throw new RuntimeException(e); } } - protected Object readNext() { + protected void readNext() { try { boolean nullValue = false; Map rowData = new LinkedHashMap(); String columnName = null; + CsvData data = null; + Table table = null; int eventType = parser.next(); while (eventType != XmlPullParser.END_DOCUMENT) { switch (eventType) { @@ -100,8 +104,10 @@ protected Object readNext() { String name = parser.getName(); if ("row".equalsIgnoreCase(name)) { - data = new CsvData(); - table.removeAllColumns(); + data = new CsvData(); + if (table != null) { + table.removeAllColumns(); + } data.setDataEventType(DataEventType.INSERT); } else if ("field".equalsIgnoreCase(name)) { for (int i = 0; i < parser.getAttributeCount(); i++) { @@ -114,17 +120,28 @@ protected Object readNext() { } } } else if ("table_data".equalsIgnoreCase(name)) { - batch = new Batch(); + Batch batch = new Batch(); batch.setBinaryEncoding(BinaryEncoding.BASE64); + next.add(batch); table = new Table(); for (int i = 0; i < parser.getAttributeCount(); i++) { String attributeName = parser.getAttributeName(i); String attributeValue = parser.getAttributeValue(i); if ("name".equalsIgnoreCase(attributeName)) { table.setName(attributeValue); - } + } } - return batch; + next.add(table); + } else if ("table".equalsIgnoreCase(name)) { + Batch batch = new Batch(); + batch.setBinaryEncoding(BinaryEncoding.BASE64); + next.add(batch); + table = DatabaseXmlUtil.nextTable(parser); + next.add(table); + String xml = DatabaseXmlUtil.toXml(table); + data = new CsvData(DataEventType.CREATE); + data.putCsvData(CsvData.ROW_DATA, CsvUtils.escapeCsvData(xml)); + next.add(data); } break; @@ -139,14 +156,14 @@ protected Object readNext() { String[] columnValues = rowData.values().toArray( new String[rowData.values().size()]); data.putParsedData(CsvData.ROW_DATA, columnValues); - if (lastTable == null || !lastTable.equals(table)) { - lastTable = table; - return table; - } else { - return data; + if (this.table == null || !this.table.equals(table)) { + next.add(table); } + next.add(data); } else if ("table_data".equalsIgnoreCase(name)) { - batch.setComplete(true); + if (batch != null) { + batch.setComplete(true); + } } else if ("field".equalsIgnoreCase(name)) { columnName = null; nullValue = false; @@ -157,7 +174,6 @@ protected Object readNext() { eventType = parser.next(); } - return null; } catch (IOException ex) { throw new IoException(ex); } catch (XmlPullParserException ex) { @@ -166,56 +182,45 @@ protected Object readNext() { } public Batch nextBatch() { - if (next instanceof Batch) { - this.batch = (Batch) next; - next = null; - return batch; - } else { - next = readNext(); - if (next instanceof Batch) { - this.batch = (Batch) next; - next = null; - return batch; + do { + readNext(); + if (next.size() > 0) { + Object o = next.remove(0); + if (o instanceof Batch) { + batch = (Batch) o; + return batch; + } } - } + } while (next.size() > 0); return null; } public Table nextTable() { - if (next instanceof Table) { - this.table = (Table) next; - next = data; - } else if (next instanceof Batch) { - return null; - } else { - next = readNext(); - if (next instanceof Table) { - this.table = (Table) next; - next = data; - } else { - this.table = null; + this.table = null; + do { + readNext(); + if (next.size() > 0) { + Object o = next.remove(0); + if (o instanceof Table) { + this.table = (Table) o; + break; + } } - } + } while (next.size() > 0); - if (this.table == null) { + if (this.table == null && batch != null) { batch.setComplete(true); } + return this.table; } public CsvData nextData() { - if (next instanceof CsvData) { - CsvData data = (CsvData) next; - next = null; - return data; - } else { - next = readNext(); - if (next instanceof CsvData) { - CsvData data = (CsvData) next; - next = null; - return data; - } + readNext(); + if (next.size() > 0 && next.get(0) instanceof CsvData) { + return (CsvData) next.remove(0); } + return null; } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriter.java index 3daa6146e3..128518ec53 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriter.java @@ -795,10 +795,19 @@ protected boolean create(CsvData data) { try { statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS); xml = data.getParsedData(CsvData.ROW_DATA)[0]; - log.info("About to create table using the following definition: ", xml); + log.info("About to create table using the following definition: {}", xml); StringReader reader = new StringReader(xml); - Database db = (Database) DatabaseXmlUtil.read(reader, false); - platform.alterTables(false, db.getTables()); + Database db = DatabaseXmlUtil.read(reader, false); + if (writerSettings.isCreateTableAlterCaseToMatchDatabaseDefault()) { + platform.alterCaseToMatchDatabaseDefaultCase(db); + } + + if (writerSettings.isAlterTable()) { + platform.alterDatabase(db, !writerSettings.isCreateTableFailOnError()); + } else { + platform.createDatabase(db, writerSettings.isCreateTableDropFirst(), !writerSettings.isCreateTableFailOnError()); + } + platform.resetCachedTableModel(); statistics.get(batch).increment(DataWriterStatisticConstants.CREATECOUNT); return true; diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriterSettings.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriterSettings.java index 02a25d88a6..139c00118f 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriterSettings.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriterSettings.java @@ -37,6 +37,14 @@ public class DatabaseWriterSettings { protected boolean usePrimaryKeysFromSource = true; protected Conflict defaultConflictSetting; + + protected boolean createTableFailOnError = true; + + protected boolean alterTable = true; + + protected boolean createTableDropFirst = false; + + protected boolean createTableAlterCaseToMatchDatabaseDefault = false; protected Map conflictSettingsByChannel; @@ -47,6 +55,30 @@ public class DatabaseWriterSettings { protected List databaseWriterErrorHandlers; protected List resolvedData; + + public boolean isAlterTable() { + return alterTable; + } + + public void setAlterTable(boolean alterTable) { + this.alterTable = alterTable; + } + + public boolean isCreateTableDropFirst() { + return createTableDropFirst; + } + + public void setCreateTableDropFirst(boolean createTableDropFirst) { + this.createTableDropFirst = createTableDropFirst; + } + + public boolean isCreateTableFailOnError() { + return createTableFailOnError; + } + + public void setCreateTableFailOnError(boolean createTableFailOnError) { + this.createTableFailOnError = createTableFailOnError; + } public long getMaxRowsBeforeCommit() { return maxRowsBeforeCommit; @@ -79,6 +111,15 @@ public Conflict getDefaultConflictSetting() { public void setDefaultConflictSetting(Conflict defaultConflictSetting) { this.defaultConflictSetting = defaultConflictSetting; } + + public boolean isCreateTableAlterCaseToMatchDatabaseDefault() { + return createTableAlterCaseToMatchDatabaseDefault; + } + + public void setCreateTableAlterCaseToMatchDatabaseDefault( + boolean createTableAlterCaseToMatchDatabaseDefault) { + this.createTableAlterCaseToMatchDatabaseDefault = createTableAlterCaseToMatchDatabaseDefault; + } public Map getConflictSettingsByChannel() { return conflictSettingsByChannel;