Skip to content

Commit

Permalink
0000770: Improve DbImport so that it incorporates the SymmetricDS dat…
Browse files Browse the repository at this point in the history
…a loader
  • Loading branch information
chenson42 committed Aug 12, 2012
1 parent d596d27 commit f79bb2a
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 133 deletions.
37 changes: 21 additions & 16 deletions symmetric-client/src/main/java/org/jumpmind/symmetric/DbFill.java
Expand Up @@ -87,17 +87,22 @@ private void fillTables(Table... tables) {
for (Table table : tables) {
DmlStatement statement = platform.createDmlStatement(DmlType.INSERT, table);

Column[] columns = table.getColumns();
Object[] values = generateRandomValues(insertedColumns, table);
for (int j = 0; j < columns.length; j++) {
String qualifiedColumnName = table.getName() + "." + columns[j].getName();
insertedColumns.put(qualifiedColumnName, values[j]);
Column[] tableColumns = table.getColumns();
Object[] columnValues = generateRandomValues(insertedColumns, table);
for (int j = 0; j < tableColumns.length; j++) {
insertedColumns.put(table.getQualifiedColumnName(tableColumns[j]),
columnValues[j]);
}
int rows = sqlTemplate.update(statement.getSql(), values);
if (log.isDebugEnabled()) {
log.debug("{} inserted by {} in {}", new Object[] { rows,
getClass().getSimpleName(), table.getFullyQualifiedTableName() });

Column[] statementColumns = statement.getMetaData();
Object[] statementValues = new Object[statementColumns.length];
for (int j = 0; j < statementColumns.length; j++) {
statementValues[j] = insertedColumns.get(table
.getQualifiedColumnName(statementColumns[j]));
}

sqlTemplate.update(statement.getSql(), statementValues);

}

insertedColumns.clear();
Expand Down Expand Up @@ -136,9 +141,9 @@ private Object[] generateRandomValues(Map<String, Object> insertedColumns, Table
} else if (type == Types.INTEGER) {
objectValue = randomInt();
} else if (type == Types.BIT) {
objectValue = randomBit();
objectValue = randomBit();
} else if (type == Types.SMALLINT) {
objectValue = randomSmallInt();
objectValue = randomSmallInt();
} else if (type == Types.TINYINT) {
objectValue = randomTinyInt();
} else if (type == Types.NUMERIC || type == Types.DECIMAL || type == Types.FLOAT
Expand All @@ -163,10 +168,10 @@ private Object[] generateRandomValues(Map<String, Object> insertedColumns, Table
}
return list.toArray();
}

private Object randomSmallInt() {
// TINYINT (-32768 32767)
return new Integer(new java.util.Random().nextInt(65535) - 32768);
// TINYINT (-32768 32767)
return new Integer(new java.util.Random().nextInt(65535) - 32768);
}

private Object randomTinyInt() {
Expand Down Expand Up @@ -216,10 +221,10 @@ private Date randomDate() {
private Integer randomInt() {
return new Integer(new java.util.Random().nextInt(1000000));
}

private Integer randomBit() {
return new Integer(new java.util.Random().nextInt(1));
}
}

protected String getSchemaToUse() {
if (StringUtils.isBlank(schema)) {
Expand Down
75 changes: 40 additions & 35 deletions symmetric-client/src/main/java/org/jumpmind/symmetric/DbImport.java
Expand Up @@ -26,12 +26,10 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import javax.sql.DataSource;

import org.apache.commons.io.IOUtils;
import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.DdlException;
Expand All @@ -40,10 +38,11 @@
import org.jumpmind.db.sql.DmlStatement;
import org.jumpmind.db.sql.DmlStatement.DmlType;
import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.db.sql.SqlScript;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.io.data.DataProcessor;
import org.jumpmind.symmetric.io.data.reader.CsvTableDataReader;
import org.jumpmind.symmetric.io.data.reader.SqlDataReader;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.Conflict.DetectConflict;
import org.jumpmind.symmetric.io.data.writer.Conflict.ResolveConflict;
Expand Down Expand Up @@ -88,9 +87,9 @@ public enum Format {
private boolean ignoreCollisions = false;

private boolean alterCaseToMatchDatabaseDefaultCase = false;

private boolean alterTables = false;

private boolean dropIfExists = false;

protected IDatabasePlatform platform;
Expand All @@ -105,24 +104,33 @@ public DbImport(IDatabasePlatform platform) {
public DbImport(DataSource dataSource) {
platform = JdbcDatabasePlatformFactory.createNewPlatformInstance(dataSource, null, true);
}

public void importTables(String importData, String tableName) throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(importData.getBytes());
importTables(in, tableName);
in.close();
}

public void importTables(String importData) throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(importData.getBytes());
importTables(in);
in.close();
public void importTables(String importData, String tableName) {
try {
ByteArrayInputStream in = new ByteArrayInputStream(importData.getBytes());
importTables(in, tableName);
in.close();
} catch (IOException e) {
throw new IoException(e);
}
}

public void importTables(String importData) {
try {
ByteArrayInputStream in = new ByteArrayInputStream(importData.getBytes());
importTables(in);
in.close();
} catch (IOException e) {
throw new IoException(e);
}

}

public void importTables(InputStream in) throws IOException {
public void importTables(InputStream in) {
importTables(in, null);
}

public void importTables(InputStream in, String tableName) throws IOException {
public void importTables(InputStream in, String tableName) {
if (format == Format.SQL) {
importTablesFromSql(in);
} else if (format == Format.CSV) {
Expand Down Expand Up @@ -158,7 +166,7 @@ protected DatabaseWriterSettings buildDatabaseWriterSettings() {
return settings;
}

protected void importTablesFromCsv(InputStream in, String tableName) throws IOException {
protected void importTablesFromCsv(InputStream in, String tableName) {
Table table = platform.readTableFromDatabase(catalog, schema, tableName);
if (table == null) {
throw new RuntimeException("Unable to find table");
Expand All @@ -172,13 +180,13 @@ protected void importTablesFromCsv(InputStream in, String tableName) throws IOEx
}

protected void importTablesFromXml(InputStream in) {
Database database = platform.readDatabaseFromXml(in, alterCaseToMatchDatabaseDefaultCase);
Database database = platform.readDatabaseFromXml(in, alterCaseToMatchDatabaseDefaultCase);
if (alterTables) {
platform.alterDatabase(database, forceImport);
platform.alterDatabase(database, forceImport);
} else {
platform.createDatabase(database, dropIfExists, forceImport);
}

// TODO: read in data from XML also
}

Expand Down Expand Up @@ -270,14 +278,11 @@ protected void importTablesFromSymXml(InputStream in) {

}

protected void importTablesFromSql(InputStream in) throws IOException {
// TODO: SqlScript should be able to stream from standard input to run
// large SQL script
List<String> lines = IOUtils.readLines(in);

SqlScript script = new SqlScript(lines, platform.getSqlTemplate(), true,
SqlScript.QUERY_ENDS, platform.getSqlScriptReplacementTokens());
script.execute();
protected void importTablesFromSql(InputStream in) {
SqlDataReader reader = new SqlDataReader(in);
DatabaseWriter writer = new DatabaseWriter(platform, buildDatabaseWriterSettings());
DataProcessor dataProcessor = new DataProcessor(reader, writer);
dataProcessor.process();
}

public Format getFormat() {
Expand Down Expand Up @@ -355,27 +360,27 @@ public void setIgnoreCollisions(boolean ignoreConflicts) {
public boolean isIgnoreCollisions() {
return ignoreCollisions;
}

public void setReplaceRows(boolean replaceRows) {
this.replaceRows = replaceRows;
}

public boolean isReplaceRows() {
return replaceRows;
}

public void setAlterTables(boolean alterTables) {
this.alterTables = alterTables;
}

public boolean isAlterTables() {
return alterTables;
}

public void setDropIfExists(boolean dropIfExists) {
this.dropIfExists = dropIfExists;
}

public boolean isDropIfExists() {
return dropIfExists;
}
Expand Down
Expand Up @@ -102,15 +102,11 @@ public void exportThenImportCsv() throws Exception {
IDatabasePlatform platform = engine.getSymmetricDialect().getPlatform();
Database testTables = platform.readDatabaseFromXml("/test-dbimport.xml", true);
Table table = testTables.findTable("test_db_import_1", false);

DbImport reCreateTablesImport = new DbImport(ds);
reCreateTablesImport.setFormat(DbImport.Format.XML);
reCreateTablesImport.setDropIfExists(true);
reCreateTablesImport.setAlterCaseToMatchDatabaseDefaultCase(true);
reCreateTablesImport.importTables(getClass().getResourceAsStream("/test-dbimport.xml"));


recreateImportTable();

final int RECORD_COUNT = 100;

DbFill fill = new DbFill(platform);
fill.setRecordCount(RECORD_COUNT);
fill.fillTables(table.getName());
Expand All @@ -120,23 +116,69 @@ public void exportThenImportCsv() throws Exception {
export.setNoCreateInfo(true);
export.setNoData(false);
String csvOutput = export.exportTables(new String[] { table.getName() });
reCreateTablesImport.importTables(getClass().getResourceAsStream("/test-dbimport.xml"));

recreateImportTable();

DbImport importCsv = new DbImport(ds);
importCsv.setFormat(DbImport.Format.CSV);
importCsv.importTables(csvOutput, table.getName());

DmlStatement dml = new DmlStatement(DmlType.COUNT, table.getCatalog(), table.getSchema(), table.getName(), null, table.getColumns(), false, null, null);

DmlStatement dml = new DmlStatement(DmlType.COUNT, table.getCatalog(), table.getSchema(),
table.getName(), null, table.getColumns(), false, null, null);
Assert.assertEquals(RECORD_COUNT, platform.getSqlTemplate().queryForInt(dml.getSql()));

// TODO test error

// TODO test replace

// TODO test ignore

// TODO test force
}

protected void recreateImportTable() throws Exception {
ISymmetricEngine engine = getSymmetricEngine();
DataSource ds = engine.getDataSource();
DbImport reCreateTablesImport = new DbImport(ds);
reCreateTablesImport.setFormat(DbImport.Format.XML);
reCreateTablesImport.setDropIfExists(true);
reCreateTablesImport.setAlterCaseToMatchDatabaseDefaultCase(true);
reCreateTablesImport.importTables(getClass().getResourceAsStream("/test-dbimport.xml"));
}

protected void assertCountDbImportTableRecords(int expected) {
ISymmetricEngine engine = getSymmetricEngine();
IDatabasePlatform platform = engine.getSymmetricDialect().getPlatform();
Database testTables = platform.readDatabaseFromXml("/test-dbimport.xml", true);
Table table = testTables.findTable("test_db_import_1", false);
DmlStatement dml = new DmlStatement(DmlType.COUNT, table.getCatalog(), table.getSchema(),
table.getName(), null, table.getColumns(), false, null, null);
Assert.assertEquals(expected, platform.getSqlTemplate().queryForInt(dml.getSql()));
}

@Test
public void importSqlData() throws Exception {
ISymmetricEngine engine = getSymmetricEngine();
DataSource ds = engine.getDataSource();

recreateImportTable();

assertCountDbImportTableRecords(0);

DbImport importCsv = new DbImport(ds);
importCsv.setFormat(DbImport.Format.SQL);
importCsv.importTables(getClass().getResourceAsStream("/test-dbimport-1-good.sql"));

assertCountDbImportTableRecords(5);

// TODO test error

// TODO test replace

// TODO test ignore

// TODO test force

}

}
@@ -0,0 +1,6 @@
-- Test Script
INSERT INTO TEST_DB_IMPORT_1 (ID,STRING_VALUE,STRING_REQUIRED_VALUE,CHAR_VALUE,CHAR_REQUIRED_VALUE,DATE_VALUE,TIME_VALUE,BOOLEAN_VALUE,INTEGER_VALUE,DECIMAL_VALUE,DOUBLE_VALUE) VALUES (963261,'HvSSVaJVyVCYDZrUirFqQJjWyQglXyEFvTsETaIGxGxRRSaYrp','RBvLcOuBlIXrhOmFXzmUNTVpUPGPVRUbeISJmUUlJLmaFlaXFY','j','j',{d '1989-09-21'},{ts '1997-11-06 08:02:33.324000000'},0,48440,0.83,0.5306078975099197);
INSERT INTO NO_TABLE (ID,STRING_VALUE,STRING_REQUIRED_VALUE,CHAR_VALUE,CHAR_REQUIRED_VALUE,DATE_VALUE,TIME_VALUE,BOOLEAN_VALUE,INTEGER_VALUE,DECIMAL_VALUE,DOUBLE_VALUE) VALUES (963416,'CwBsJsyxThbvZQnrfqZkMEJBrnnSYOoBHAtxcScQmaojHYBDeo','LeSWSyKoPGACBlXgmXGDzzjBrInbZQJzjULTbPgWISOMKvGBIA','E','X',{d '2002-06-28'},{ts '1983-11-13 18:32:48.510000000'},0,991474,0.99,0.25859735923859273);
INSERT INTO TEST_DB_IMPORT_1 (ID,STRING_VALUE,STRING_REQUIRED_VALUE,CHAR_VALUE,CHAR_REQUIRED_VALUE,DATE_VALUE,TIME_VALUE,BOOLEAN_VALUE,INTEGER_VALUE,DECIMAL_VALUE,DOUBLE_VALUE) VALUES (970293,'lmLGiYcgBqesYhHekppknCflOHwMJidwlcBsELKUefXVCgNjdX','jfUejWwMSgZAGIRsAKdtFXKEnBYZmHSDltfsEEgchQOlEqoPfh','G','H',{d '1980-12-02'},{ts '2014-01-18 14:29:47.687000000'},0,860563,0.20,0.7443149905394701);
INSERT INTO TEST_DB_IMPORT_1 (ID,STRING_VALUE,STRING_REQUIRED_VALUE,CHAR_VALUE,CHAR_REQUIRED_VALUE,DATE_VALUE,TIME_VALUE,BOOLEAN_VALUE,INTEGER_VALUE,DECIMAL_VALUE,DOUBLE_VALUE) VALUES (982811,'QrjhBYcEEnfOXmNpnRuMJmZPfZtJjYCBrhgwODXyfhaPEKkuct','ZdiUYvqNzxaNKsglNbhSWWGxNlBbyJbQWnHoZHbDJgGIoGQKRK','G','v',{d '1979-11-12'},{ts '1987-11-24 10:37:00.462000000'},0,928805,0.81,0.8746972467103193);
INSERT INTO TEST_DB_IMPORT_1 (ID,STRING_VALUE,STRING_REQUIRED_VALUE,CHAR_VALUE,CHAR_REQUIRED_VALUE,DATE_VALUE,TIME_VALUE,BOOLEAN_VALUE,INTEGER_VALUE,DECIMAL_VALUE,DOUBLE_VALUE) VALUES (997658,'TiUGEHICEplStWjGquHWDkYNhNRwLhYdsTABLQrkWwryRqXyiy','pSxDPMAtpXjFaairTeqMyOEuqSMQxuAUTUDoEcRSXWQicKdATS','n','o',{d '1978-01-19'},{ts '1985-03-05 15:43:22.248000000'},0,575117,0.52,0.36237140348235275);
6 changes: 6 additions & 0 deletions symmetric-client/src/test/resources/test-dbimport-1-good.sql
@@ -0,0 +1,6 @@
-- Test Script
INSERT INTO TEST_DB_IMPORT_1 (ID,STRING_VALUE,STRING_REQUIRED_VALUE,CHAR_VALUE,CHAR_REQUIRED_VALUE,DATE_VALUE,TIME_VALUE,BOOLEAN_VALUE,INTEGER_VALUE,DECIMAL_VALUE,DOUBLE_VALUE) VALUES (963261,'HvSSVaJVyVCYDZrUirFqQJjWyQglXyEFvTsETaIGxGxRRSaYrp','RBvLcOuBlIXrhOmFXzmUNTVpUPGPVRUbeISJmUUlJLmaFlaXFY','j','j',{d '1989-09-21'},{ts '1997-11-06 08:02:33.324000000'},0,48440,0.83,0.5306078975099197);
INSERT INTO TEST_DB_IMPORT_1 (ID,STRING_VALUE,STRING_REQUIRED_VALUE,CHAR_VALUE,CHAR_REQUIRED_VALUE,DATE_VALUE,TIME_VALUE,BOOLEAN_VALUE,INTEGER_VALUE,DECIMAL_VALUE,DOUBLE_VALUE) VALUES (963416,'CwBsJsyxThbvZQnrfqZkMEJBrnnSYOoBHAtxcScQmaojHYBDeo','LeSWSyKoPGACBlXgmXGDzzjBrInbZQJzjULTbPgWISOMKvGBIA','E','X',{d '2002-06-28'},{ts '1983-11-13 18:32:48.510000000'},0,991474,0.99,0.25859735923859273);
INSERT INTO TEST_DB_IMPORT_1 (ID,STRING_VALUE,STRING_REQUIRED_VALUE,CHAR_VALUE,CHAR_REQUIRED_VALUE,DATE_VALUE,TIME_VALUE,BOOLEAN_VALUE,INTEGER_VALUE,DECIMAL_VALUE,DOUBLE_VALUE) VALUES (970293,'lmLGiYcgBqesYhHekppknCflOHwMJidwlcBsELKUefXVCgNjdX','jfUejWwMSgZAGIRsAKdtFXKEnBYZmHSDltfsEEgchQOlEqoPfh','G','H',{d '1980-12-02'},{ts '2014-01-18 14:29:47.687000000'},0,860563,0.20,0.7443149905394701);
INSERT INTO TEST_DB_IMPORT_1 (ID,STRING_VALUE,STRING_REQUIRED_VALUE,CHAR_VALUE,CHAR_REQUIRED_VALUE,DATE_VALUE,TIME_VALUE,BOOLEAN_VALUE,INTEGER_VALUE,DECIMAL_VALUE,DOUBLE_VALUE) VALUES (982811,'QrjhBYcEEnfOXmNpnRuMJmZPfZtJjYCBrhgwODXyfhaPEKkuct','ZdiUYvqNzxaNKsglNbhSWWGxNlBbyJbQWnHoZHbDJgGIoGQKRK','G','v',{d '1979-11-12'},{ts '1987-11-24 10:37:00.462000000'},0,928805,0.81,0.8746972467103193);
INSERT INTO TEST_DB_IMPORT_1 (ID,STRING_VALUE,STRING_REQUIRED_VALUE,CHAR_VALUE,CHAR_REQUIRED_VALUE,DATE_VALUE,TIME_VALUE,BOOLEAN_VALUE,INTEGER_VALUE,DECIMAL_VALUE,DOUBLE_VALUE) VALUES (997658,'TiUGEHICEplStWjGquHWDkYNhNRwLhYdsTABLQrkWwryRqXyiy','pSxDPMAtpXjFaairTeqMyOEuqSMQxuAUTUDoEcRSXWQicKdATS','n','o',{d '1978-01-19'},{ts '1985-03-05 15:43:22.248000000'},0,575117,0.52,0.36237140348235275);
2 changes: 1 addition & 1 deletion symmetric-client/src/test/resources/test-dbimport.xml
Expand Up @@ -14,7 +14,7 @@
required="true" />
<column name="date_value" type="DATE" />
<column name="time_value" type="TIMESTAMP" />
<column name="boolean_value" type="BIT" size="1" />
<column name="boolean_value" type="BOOLEANINT" size="1" />
<column name="integer_value" type="INTEGER" />
<column name="decimal_value" type="DECIMAL" size="10,2" />
<column name="double_value" type="DOUBLE" />
Expand Down
4 changes: 4 additions & 0 deletions symmetric-db/src/main/java/org/jumpmind/db/model/Table.java
Expand Up @@ -897,6 +897,10 @@ public static String getFullyQualifiedTableName(String catalogName, String schem
String tableName) {
return getFullyQualifiedTableName(catalogName, schemaName, tableName, null);
}

public String getQualifiedColumnName(Column column) {
return getFullyQualifiedTableName() + "." + column.getName();
}

public static String getQualifiedTablePrefix(String catalogName, String schemaName) {
return getQualifiedTablePrefix(catalogName, schemaName, null);
Expand Down
Expand Up @@ -68,7 +68,9 @@ public AbstractTableDataReader(Batch batch, String catalogName, String schemaNam
String tableName, Reader reader) {
this.reader = reader;
this.batch = batch;
this.table = new Table(catalogName, schemaName, tableName);
if (StringUtils.isNotBlank(tableName)) {
this.table = new Table(catalogName, schemaName, tableName);
}
}

public AbstractTableDataReader(BinaryEncoding binaryEncoding, String catalogName,
Expand Down Expand Up @@ -123,9 +125,9 @@ public void open(DataContext context) {
abstract protected void init();

abstract protected CsvData readNext();

abstract protected void finish();

protected CsvData buildCsvData(String[] tokens, DataEventType dml) {
statistics.increment(DataReaderStatistics.READ_BYTE_COUNT, logDebugAndCountBytes(tokens));
return new CsvData(dml, tokens);
Expand All @@ -136,7 +138,7 @@ public CsvData nextData() {
CsvData data = readNext();
if (data != null) {
lineNumber++;
context.put(CTX_LINE_NUMBER, lineNumber);
context.put(CTX_LINE_NUMBER, lineNumber);
return data;
} else {
batch.setComplete(true);
Expand Down

0 comments on commit f79bb2a

Please sign in to comment.