Skip to content

Commit

Permalink
development check-in
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 25, 2011
1 parent 656c41e commit 86125cf
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 21 deletions.
Expand Up @@ -1945,7 +1945,11 @@ protected void writeColumnDefaultValue(Table table, Column column) {
protected void printDefaultValue(Object defaultValue, int typeCode) {
if (defaultValue != null) {
boolean shouldUseQuotes = !TypeMap.isNumericType(typeCode);

if (shouldUseQuotes && defaultValue instanceof String) {
String value = (String)defaultValue;
shouldUseQuotes = !(value.startsWith("'") && value.endsWith("'"));
}

if (shouldUseQuotes) {
// characters are only escaped when within a string literal
print(getPlatformInfo().getValueQuoteToken());
Expand Down
Expand Up @@ -256,7 +256,11 @@ protected void printDefaultValue(Object defaultValue, int typeCode) {
String defaultValueStr = defaultValue.toString();
boolean shouldUseQuotes = !TypeMap.isNumericType(typeCode)
&& !defaultValueStr.startsWith("TO_DATE(");

if (shouldUseQuotes && defaultValue instanceof String) {
String value = (String)defaultValue;
shouldUseQuotes = !(value.startsWith("'") && value.endsWith("'"));
}

if (shouldUseQuotes) {
// characters are only escaped when within a string literal
print(getPlatformInfo().getValueQuoteToken());
Expand Down
Expand Up @@ -21,6 +21,8 @@ public class Parameters extends HashMap<String, String> {

public final static String DB_STREAMING_FETCH_SIZE = "db.jdbc.streaming.results.fetch.size";

public final static String LOADER_CREATE_TABLE_IF_DOESNT_EXIST = "dataloader.create.if.table.doesnt.exist";

public final static String LOADER_MAX_ROWS_BEFORE_COMMIT = "dataloader.max.rows.before.commit";

public final static String LOADER_MAX_ROWS_BEFORE_BATCH_FLUSH = "dataloader.max.rows.before.batch.flush";
Expand Down
Expand Up @@ -65,16 +65,17 @@ protected int forEachTableInBatch(boolean processBatch, Batch batch, T readerCon
do {
table = dataReader.nextTable(readerContext);
if (table != null) {
boolean processTable = false;
if (processBatch) {
processBatch |= dataWriter.switchTables(table);
processTable = dataWriter.switchTables(table);
}
dataRow += forEachDataInTable(processBatch, batch, readerContext, writerContext);
dataRow += forEachDataInTable(processTable, batch, readerContext, writerContext);
}
} while (table != null);
return dataRow;
}

protected int forEachDataInTable(boolean processBatch, Batch batch, T readerContext,
protected int forEachDataInTable(boolean processTable, Batch batch, T readerContext,
T writerContext) {
int dataRow = 0;
Data data = null;
Expand All @@ -83,7 +84,7 @@ protected int forEachDataInTable(boolean processBatch, Batch batch, T readerCont
if (data != null) {
try {
dataRow++;
if (processBatch) {
if (processTable) {
dataWriter.writeData(data);
}
} catch (Exception ex) {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.jumpmind.symmetric.core.process.IDataWriter;
import org.jumpmind.symmetric.core.sql.DataIntegrityViolationException;
import org.jumpmind.symmetric.core.sql.ISqlTransaction;
import org.jumpmind.symmetric.core.sql.SqlScript;
import org.jumpmind.symmetric.core.sql.StatementBuilder;
import org.jumpmind.symmetric.core.sql.StatementBuilder.DmlType;

Expand Down Expand Up @@ -55,6 +56,8 @@ public class SqlDataWriter implements IDataWriter<DataContext> {
protected Batch batch;

protected DataContext ctx;

protected boolean exitBatchMode = false;

public SqlDataWriter(IDbPlatform platform) {
this(platform, new Settings());
Expand Down Expand Up @@ -96,9 +99,16 @@ public void open(DataContext context) {

public boolean switchTables(Table sourceTable) {
if (sourceTable != null) {
this.targetTable = platform.findTable(sourceTable.getCatalogName(),
sourceTable.getSchemaName(), sourceTable.getTableName(), true).copy();
if (this.targetTable != null) {
Table tableAtTarget = platform.findTable(sourceTable.getCatalogName(),
sourceTable.getSchemaName(), sourceTable.getTableName(), true);
if (tableAtTarget == null && settings.autoCreateTable) {
new SqlScript(platform.getAlterScriptFor(sourceTable), platform).execute();
tableAtTarget = platform.findTable(sourceTable.getCatalogName(),
sourceTable.getSchemaName(), sourceTable.getTableName(), true);
}

if (tableAtTarget != null) {
this.targetTable = tableAtTarget.copy();
this.targetTable.reOrderColumns(sourceTable.getColumns(),
settings.usePrimaryKeysFromSource);
return true;
Expand All @@ -117,6 +127,7 @@ public boolean switchTables(Table sourceTable) {
public void startBatch(Batch batch) {
this.statementBuilder = null;
this.batch = batch;
this.exitBatchMode = false;
}

public void writeData(Data data) {
Expand Down Expand Up @@ -167,6 +178,8 @@ protected void writeData(Data data, boolean batchMode) {

protected void handleDataIntegrityViolationException(DataIntegrityViolationException ex) {
if (transaction.isInBatchMode()) {
log.log(LogLevel.WARN, "Exiting batch mode for the rest of batch %d", batch.getBatchId());
this.exitBatchMode = true;
resendFailedDataInNonBatchMode();
} else {
throw ex;
Expand Down Expand Up @@ -208,6 +221,8 @@ protected void populateSettings(Parameters parameters) {
settings.usePrimaryKeysFromSource = parameters.is(Parameters.DB_USE_PKS_FROM_SOURCE, true);
settings.dontIncludeKeysInUpdateStatement = parameters.is(
Parameters.LOADER_DONT_INCLUDE_PKS_IN_UPDATE, false);
settings.autoCreateTable = parameters.is(Parameters.LOADER_CREATE_TABLE_IF_DOESNT_EXIST,
false);
}

protected boolean filterData(Data data, DataContext ctx) {
Expand Down Expand Up @@ -310,7 +325,7 @@ protected String[] getPkData(Data data) {
}

protected void executeInsertSql(Data data, boolean batchMode) {
transaction.setInBatchMode(batchMode);
transaction.setInBatchMode(batchMode && !exitBatchMode);
if (requireNewStatement(data)) {
this.statementBuilder = getStatementBuilder(DmlType.INSERT, null,
targetTable.getColumns());
Expand All @@ -320,7 +335,7 @@ protected void executeInsertSql(Data data, boolean batchMode) {
}

protected int executeDeleteSql(Data data, boolean batchMode) {
transaction.setInBatchMode(batchMode);
transaction.setInBatchMode(batchMode && !exitBatchMode);
if (requireNewStatement(data)) {
this.statementBuilder = getStatementBuilder(DmlType.DELETE,
targetTable.getPrimaryKeyColumnsArray(), targetTable.getColumns());
Expand Down Expand Up @@ -497,6 +512,8 @@ public static class Settings {

protected boolean dontIncludeKeysInUpdateStatement;

protected boolean autoCreateTable;

}

}
2 changes: 1 addition & 1 deletion future/symmetric3-csv/pom.xml
Expand Up @@ -5,7 +5,7 @@
<artifactId>symmetric3-csv</artifactId>
<packaging>jar</packaging>
<version>3.0.0-SNAPSHOT</version>
<name>csv</name>
<name>symmetric3-csv</name>
<inceptionYear>2010</inceptionYear>

<description>
Expand Down
Expand Up @@ -20,7 +20,6 @@
import org.jumpmind.symmetric.core.model.Column;
import org.jumpmind.symmetric.core.model.Index;
import org.jumpmind.symmetric.core.model.Table;
import org.jumpmind.symmetric.core.model.TypeMap;
import org.jumpmind.symmetric.jdbc.db.DatabaseMetaDataWrapper;
import org.jumpmind.symmetric.jdbc.db.IJdbcDbPlatform;
import org.jumpmind.symmetric.jdbc.db.JdbcModelReader;
Expand Down Expand Up @@ -151,9 +150,7 @@ protected Column readColumn(DatabaseMetaDataWrapper metaData, Map<String, Object
column.setDefaultValue(timestamp.toString());
}
}
} else if (TypeMap.isTextType(column.getTypeCode())) {
column.setDefaultValue(unescape(column.getDefaultValue(), "'", "''"));
}
}
return column;
}

Expand Down
Expand Up @@ -188,6 +188,8 @@ public Integer execute(Connection con) throws SQLException {
if (!failOnError) {
log.log(LogLevel.WARN, "%s. Failed to execute: %s.",
ex.getMessage(), sql);
} else {
throw new SqlException(ex);
}
}
}
Expand Down
Expand Up @@ -6,6 +6,9 @@

import javax.sql.DataSource;

import org.jumpmind.symmetric.core.common.Log;
import org.jumpmind.symmetric.core.common.LogFactory;
import org.jumpmind.symmetric.core.common.LogLevel;
import org.jumpmind.symmetric.core.db.IDbPlatform;
import org.jumpmind.symmetric.core.db.TableNotFoundException;
import org.jumpmind.symmetric.core.io.IoUtils;
Expand All @@ -22,6 +25,8 @@

public class TableCopy {

static final Log logger = LogFactory.getLog(TableCopy.class);

protected DataSource source;
protected DataSource target;
protected IDbPlatform targetPlatform;
Expand All @@ -45,6 +50,8 @@ public TableCopy(TableCopyProperties properties) {
Table table = sourcePlatform.findTable(tableName);
if (table != null) {
String condition = properties.getConditionForTable(tableName);
table.setSchemaName(null);
table.setCatalogName(null);
tablesToRead.add(new TableToExtract(table, condition));
} else {
throw new TableNotFoundException(tableName);
Expand All @@ -58,10 +65,12 @@ public void copy() {

public void copy(List<TableToExtract> tables) {
for (TableToExtract tableToRead : tables) {
SqlTableDataReader reader = new SqlTableDataReader(this.sourcePlatform, new Batch(),
tableToRead);
SqlDataWriter writer = new SqlDataWriter(this.targetPlatform, parameters);
DataProcessor<DataContext> processor = new DataProcessor<DataContext>(reader, writer);
logger.log(LogLevel.INFO,
String.format("Copying %s", tableToRead.getTable().getTableName()));
Batch batch = new Batch();
DataProcessor<DataContext> processor = new DataProcessor<DataContext>(
new SqlTableDataReader(this.sourcePlatform, batch, tableToRead),
new SqlDataWriter(this.targetPlatform, parameters));
processor.process();
}
}
Expand Down Expand Up @@ -97,7 +106,9 @@ public static void main(String[] args) {
if (propFile.exists() && !propFile.isDirectory()) {
TableCopyProperties properties = new TableCopyProperties(propFile);
new TableCopy(properties).copy();

} else {
System.err.println(String.format("Could not find the properties file named %s",
args[0]));
}
} else {
System.err
Expand Down

0 comments on commit 86125cf

Please sign in to comment.