Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0001637: Refactor databasewriter into abstractdatabasewriter and defa…
…ultdatabasewriter so we can subclass off of abstract databasewriter
  • Loading branch information
chenson42 committed Mar 7, 2014
1 parent e46e07d commit 41eeb80
Show file tree
Hide file tree
Showing 17 changed files with 909 additions and 744 deletions.
Expand Up @@ -12,12 +12,12 @@
import org.jumpmind.symmetric.io.data.CsvUtils;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

public class MySqlBulkDatabaseWriter extends DatabaseWriter {
public class MySqlBulkDatabaseWriter extends DefaultDatabaseWriter {


protected NativeJdbcExtractor jdbcExtractor;
Expand Down
Expand Up @@ -31,7 +31,7 @@
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DefaultTransformWriterConflictResolver;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
Expand Down Expand Up @@ -64,13 +64,13 @@ public IDataWriter getDataWriter(final String sourceNodeId,
final ISymmetricDialect symmetricDialect, TransformWriter transformWriter,
List<IDatabaseWriterFilter> filters, List<IDatabaseWriterErrorHandler> errorHandlers,
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {
DatabaseWriter writer = new DatabaseWriter(symmetricDialect.getPlatform(),
DefaultDatabaseWriter writer = new DefaultDatabaseWriter(symmetricDialect.getPlatform(),
new DefaultTransformWriterConflictResolver(transformWriter) {
@Override
protected void beforeResolutionAttempt(Conflict conflict) {
if (conflict.getPingBack() != PingBack.OFF) {
DatabaseWriter writer = transformWriter
.getNestedWriterOfType(DatabaseWriter.class);
DefaultDatabaseWriter writer = transformWriter
.getNestedWriterOfType(DefaultDatabaseWriter.class);
ISqlTransaction transaction = writer.getTransaction();
if (transaction != null) {
symmetricDialect.enableSyncTriggers(transaction);
Expand All @@ -81,8 +81,8 @@ protected void beforeResolutionAttempt(Conflict conflict) {
@Override
protected void afterResolutionAttempt(Conflict conflict) {
if (conflict.getPingBack() == PingBack.SINGLE_ROW) {
DatabaseWriter writer = transformWriter
.getNestedWriterOfType(DatabaseWriter.class);
DefaultDatabaseWriter writer = transformWriter
.getNestedWriterOfType(DefaultDatabaseWriter.class);
ISqlTransaction transaction = writer.getTransaction();
if (transaction != null) {
symmetricDialect.disableSyncTriggers(transaction, sourceNodeId);
Expand Down
Expand Up @@ -66,7 +66,7 @@
import org.jumpmind.symmetric.io.data.writer.Conflict.PingBack;
import org.jumpmind.symmetric.io.data.writer.Conflict.ResolveConflict;
import org.jumpmind.symmetric.io.data.writer.ConflictException;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.IProtocolDataWriterListener;
Expand Down Expand Up @@ -920,7 +920,7 @@ public void batchInError(DataContext context, Throwable ex) {
error.setPrimaryKeyColumnNames(Table.getCommaDeliminatedColumns(context
.getTable().getPrimaryKeyColumns()));
error.setCsvData(context.getData());
error.setCurData((String)context.get(DatabaseWriter.CUR_DATA));
error.setCurData((String)context.get(DefaultDatabaseWriter.CUR_DATA));
error.setBinaryEncoding(context.getBatch().getBinaryEncoding());
error.setEventType(context.getData().getDataEventType());
error.setFailedLineNumber(this.currentBatch.getFailedLineNumber());
Expand Down
Expand Up @@ -25,7 +25,7 @@

import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.NestedDataWriter;
import org.jumpmind.util.Context;

Expand Down Expand Up @@ -121,12 +121,12 @@ public void setLastParsedTable(Table lastParsedTable) {
public ISqlTransaction findTransaction() {
ISqlTransaction transaction = null;
if (writer instanceof NestedDataWriter) {
DatabaseWriter dbWriter = ((NestedDataWriter)writer).getNestedWriterOfType(DatabaseWriter.class);
DefaultDatabaseWriter dbWriter = ((NestedDataWriter)writer).getNestedWriterOfType(DefaultDatabaseWriter.class);
if (dbWriter != null) {
transaction = dbWriter.getTransaction();
}
} else if (writer instanceof DatabaseWriter) {
transaction = ((DatabaseWriter) writer).getTransaction();
} else if (writer instanceof DefaultDatabaseWriter) {
transaction = ((DefaultDatabaseWriter) writer).getTransaction();
}
return transaction;
}
Expand Down
Expand Up @@ -58,7 +58,7 @@
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.Conflict.DetectConflict;
import org.jumpmind.symmetric.io.data.writer.Conflict.ResolveConflict;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterErrorIgnorer;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
Expand Down Expand Up @@ -196,28 +196,28 @@ protected void importTablesFromCsv(InputStream in, String tableName) {

CsvTableDataReader reader = new CsvTableDataReader(BinaryEncoding.HEX, table.getCatalog(),
table.getSchema(), table.getName(), in);
DatabaseWriter writer = new DatabaseWriter(platform, buildDatabaseWriterSettings());
DefaultDatabaseWriter writer = new DefaultDatabaseWriter(platform, buildDatabaseWriterSettings());
DataProcessor dataProcessor = new DataProcessor(reader, writer, "import");
dataProcessor.process();
}

protected void importTablesFromXml(InputStream in) {
XmlDataReader reader = new XmlDataReader(in);
DatabaseWriter writer = new DatabaseWriter(platform, buildDatabaseWriterSettings());
DefaultDatabaseWriter writer = new DefaultDatabaseWriter(platform, buildDatabaseWriterSettings());
DataProcessor dataProcessor = new DataProcessor(reader, writer, "import");
dataProcessor.process();
}

protected void importTablesFromSymXml(InputStream in) {
SymXmlDataReader reader = new SymXmlDataReader(in);
DatabaseWriter writer = new DatabaseWriter(platform, buildDatabaseWriterSettings());
DefaultDatabaseWriter writer = new DefaultDatabaseWriter(platform, buildDatabaseWriterSettings());
DataProcessor dataProcessor = new DataProcessor(reader, writer, "import");
dataProcessor.process();
}

protected void importTablesFromSql(InputStream in) {
SqlDataReader reader = new SqlDataReader(in);
DatabaseWriter writer = new DatabaseWriter(platform, buildDatabaseWriterSettings());
DefaultDatabaseWriter writer = new DefaultDatabaseWriter(platform, buildDatabaseWriterSettings());
DataProcessor dataProcessor = new DataProcessor(reader, writer, "import");
dataProcessor.process();
}
Expand Down

0 comments on commit 41eeb80

Please sign in to comment.