Skip to content

Commit

Permalink
Rename some io classes after feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jan 13, 2012
1 parent 905abe1 commit a3ef50c
Show file tree
Hide file tree
Showing 14 changed files with 75 additions and 75 deletions.
Expand Up @@ -24,7 +24,7 @@
import java.util.Date;

import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.reader.CsvReaderStatistics;
import org.jumpmind.symmetric.io.data.reader.DataReaderStatistics;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterStatistics;
import org.jumpmind.util.Statistics;

Expand Down Expand Up @@ -103,7 +103,7 @@ public IncomingBatch(Batch batch) {

public void setValues(Statistics readerStatistics, Statistics writerStatistics,
boolean isSuccess) {
byteCount = readerStatistics.get(CsvReaderStatistics.READ_BYTE_COUNT);
byteCount = readerStatistics.get(DataReaderStatistics.READ_BYTE_COUNT);
filterMillis = writerStatistics.get(DatabaseWriterStatistics.FILTERMILLIS);
databaseMillis = writerStatistics.get(DatabaseWriterStatistics.DATABASEMILLIS);
statementCount = writerStatistics.get(DatabaseWriterStatistics.STATEMENTCOUNT);
Expand Down
Expand Up @@ -52,12 +52,12 @@
import org.jumpmind.symmetric.io.data.DataProcessor;
import org.jumpmind.symmetric.io.data.IDataReader;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.reader.IBatchCsvDataSource;
import org.jumpmind.symmetric.io.data.reader.SourcedCsvDataReader;
import org.jumpmind.symmetric.io.data.reader.TextualCsvDataReader;
import org.jumpmind.symmetric.io.data.writer.CsvDataWriter;
import org.jumpmind.symmetric.io.data.writer.FileCsvDataWriter;
import org.jumpmind.symmetric.io.data.writer.ICsvDataWriterListener;
import org.jumpmind.symmetric.io.data.reader.IExtractBatchSource;
import org.jumpmind.symmetric.io.data.reader.ExtractDataReader;
import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader;
import org.jumpmind.symmetric.io.data.writer.ProtocolDataWriter;
import org.jumpmind.symmetric.io.data.writer.StagingDataWriter;
import org.jumpmind.symmetric.io.data.writer.IProtocolDataWriterListener;
import org.jumpmind.symmetric.model.BatchInfo;
import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.model.Data;
Expand Down Expand Up @@ -193,10 +193,10 @@ public void extractConfigurationStandalone(Node node, Writer writer, String... t
}

InitialLoadSource source = new InitialLoadSource(batch, initialLoadEvents);
SourcedCsvDataReader dataReader = new SourcedCsvDataReader(
ExtractDataReader dataReader = new ExtractDataReader(
this.symmetricDialect.getPlatform(), source);
CsvDataWriter dataWriter = new CsvDataWriter(writer);
DataProcessor<SourcedCsvDataReader, CsvDataWriter> processor = new DataProcessor<SourcedCsvDataReader, CsvDataWriter>(
ProtocolDataWriter dataWriter = new ProtocolDataWriter(writer);
DataProcessor<ExtractDataReader, ProtocolDataWriter> processor = new DataProcessor<ExtractDataReader, ProtocolDataWriter>(
dataReader, dataWriter);
processor.process();

Expand Down Expand Up @@ -289,9 +289,9 @@ public List<OutgoingBatch> extract(Node node, IOutgoingTransport targetTransport
if (streamToFileEnabled) {
long memoryThresholdInBytes = parameterService
.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD);
extractWriter = new FileCsvDataWriter(new File(
extractWriter = new StagingDataWriter(new File(
System.getProperty("java.io.tmpdir")), memoryThresholdInBytes,
new ICsvDataWriterListener() {
new IProtocolDataWriterListener() {
public void start(Batch batch) {
}

Expand All @@ -300,7 +300,7 @@ public void end(Batch batch, IoResource resource) {
}
});
} else {
extractWriter = new CsvDataWriter(targetTransport.open());
extractWriter = new ProtocolDataWriter(targetTransport.open());
}

OutgoingBatch currentBatch = null;
Expand Down Expand Up @@ -329,7 +329,7 @@ public void end(Batch batch, IoResource resource) {
.getExtractCount() + 1);
outgoingBatchService.updateOutgoingBatch(outgoingBatch);

IDataReader dataReader = new SourcedCsvDataReader(
IDataReader dataReader = new ExtractDataReader(
symmetricDialect.getPlatform(),
new SelectBatchSource(outgoingBatch));

Expand All @@ -355,9 +355,9 @@ public void end(Batch batch, IoResource resource) {
IoResource extractedBatch = extractedBatchesHandle
.get(outgoingBatch.getBatchId());
if (extractedBatch != null) {
IDataReader dataReader = new TextualCsvDataReader(
IDataReader dataReader = new ProtocolDataReader(
extractedBatch.open());
IDataWriter dataWriter = new CsvDataWriter(
IDataWriter dataWriter = new ProtocolDataWriter(
targetTransport.open());
new DataProcessor<IDataReader, IDataWriter>(dataReader,
dataWriter).process();
Expand Down Expand Up @@ -505,7 +505,7 @@ public boolean containsData() {

}

class SelectBatchSource implements IBatchCsvDataSource {
class SelectBatchSource implements IExtractBatchSource {

public SelectBatchSource(OutgoingBatch outgoingBatch) {
// TODO Auto-generated constructor stub
Expand Down Expand Up @@ -560,7 +560,7 @@ public CsvData mapRow(Row row) {

}

class InitialLoadSource implements IBatchCsvDataSource {
class InitialLoadSource implements IExtractBatchSource {

private Batch batch;

Expand Down
Expand Up @@ -48,12 +48,12 @@
import org.jumpmind.symmetric.io.data.IDataProcessorListener;
import org.jumpmind.symmetric.io.data.IDataReader;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.reader.TextualCsvDataReader;
import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader;
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
import org.jumpmind.symmetric.io.data.transform.TransformTable;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.FileCsvDataWriter;
import org.jumpmind.symmetric.io.data.writer.ICsvDataWriterListener;
import org.jumpmind.symmetric.io.data.writer.StagingDataWriter;
import org.jumpmind.symmetric.io.data.writer.IProtocolDataWriterListener;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.TransformDatabaseWriter;
import org.jumpmind.symmetric.load.ConfigurationChangedFilter;
Expand Down Expand Up @@ -244,23 +244,23 @@ protected List<IncomingBatch> loadDataAndReturnBatches(String sourceNodeId,
final List<IDataReader> readersForDatabaseLoader = new ArrayList<IDataReader>();
long totalNetworkMillis = System.currentTimeMillis();
if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) {
IDataReader dataReader = new TextualCsvDataReader(transport.open());
IDataReader dataReader = new ProtocolDataReader(transport.open());
long memoryThresholdInBytes = parameterService
.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD);
IDataWriter dataWriter = new FileCsvDataWriter(new File(
IDataWriter dataWriter = new StagingDataWriter(new File(
System.getProperty("java.io.tmpdir")), memoryThresholdInBytes,
new ICsvDataWriterListener() {
new IProtocolDataWriterListener() {
public void start(Batch batch) {
}

public void end(Batch batch, IoResource resource) {
readersForDatabaseLoader.add(new TextualCsvDataReader(resource));
readersForDatabaseLoader.add(new ProtocolDataReader(resource));
}
});
new DataProcessor<IDataReader, IDataWriter>(dataReader, dataWriter).process();
totalNetworkMillis = System.currentTimeMillis() - totalNetworkMillis;
} else {
readersForDatabaseLoader.add(new TextualCsvDataReader(transport.open()));
readersForDatabaseLoader.add(new ProtocolDataReader(transport.open()));
}

DatabaseWriterSettings settings = buildDatabaseWriterSettings();
Expand Down
Expand Up @@ -2,7 +2,7 @@

import org.jumpmind.util.Statistics;

public class CsvReaderStatistics extends Statistics {
public class DataReaderStatistics extends Statistics {

public static final String READ_BYTE_COUNT = "READ_BYTE_COUNT";

Expand Down
Expand Up @@ -22,30 +22,30 @@
import org.jumpmind.util.CollectionUtils;
import org.jumpmind.util.Statistics;

public class SourcedCsvDataReader implements IDataReader {
public class ExtractDataReader implements IDataReader {

protected Map<Batch, Statistics> statistics = new HashMap<Batch, Statistics>();

protected IDatabasePlatform platform;

protected List<IBatchCsvDataSource> sourcesToUse;
protected List<IExtractBatchSource> sourcesToUse;

protected IBatchCsvDataSource currentSource;
protected IExtractBatchSource currentSource;

protected Batch batch;

protected Table table;

protected CsvData data;

public SourcedCsvDataReader(IDatabasePlatform platform, IBatchCsvDataSource source) {
this.sourcesToUse = new ArrayList<IBatchCsvDataSource>();
public ExtractDataReader(IDatabasePlatform platform, IExtractBatchSource source) {
this.sourcesToUse = new ArrayList<IExtractBatchSource>();
this.sourcesToUse.add(source);
this.platform = platform;
}

public SourcedCsvDataReader(IDatabasePlatform platform, List<IBatchCsvDataSource> sources) {
this.sourcesToUse = new ArrayList<IBatchCsvDataSource>(sources);
public ExtractDataReader(IDatabasePlatform platform, List<IExtractBatchSource> sources) {
this.sourcesToUse = new ArrayList<IExtractBatchSource>(sources);
this.platform = platform;
}

Expand Down
Expand Up @@ -4,7 +4,7 @@
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;

public interface IBatchCsvDataSource {
public interface IExtractBatchSource {

public Batch getBatch();

Expand Down
Expand Up @@ -33,7 +33,7 @@
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.util.Statistics;

public class TextualCsvDataReader implements IDataReader {
public class ProtocolDataReader implements IDataReader {

protected Log log = LogFactory.getLog(getClass());

Expand All @@ -50,15 +50,15 @@ public class TextualCsvDataReader implements IDataReader {
protected String sourceNodeId;
protected BinaryEncoding binaryEncoding;

public TextualCsvDataReader(StringBuilder input) {
public ProtocolDataReader(StringBuilder input) {
this(new BufferedReader(new StringReader(input.toString())));
}

public TextualCsvDataReader(InputStream is) {
public ProtocolDataReader(InputStream is) {
this(toReader(is));
}

public TextualCsvDataReader(IoResource ioResource) {
public ProtocolDataReader(IoResource ioResource) {
this.ioResource = ioResource;
}

Expand All @@ -70,15 +70,15 @@ protected static Reader toReader(InputStream is) {
}
}

public TextualCsvDataReader(String input) {
public ProtocolDataReader(String input) {
this(new BufferedReader(new StringReader(input)));
}

public TextualCsvDataReader(Reader reader) {
public ProtocolDataReader(Reader reader) {
this.reader = reader;
}

public TextualCsvDataReader(File file) {
public ProtocolDataReader(File file) {
try {
FileInputStream fis = new FileInputStream(file);
InputStreamReader in = new InputStreamReader(fis, "UTF-8");
Expand Down Expand Up @@ -112,14 +112,14 @@ protected Object readNext() {
if (batch == null) {
bytesRead += csvReader.getRawRecord().length();
} else {
statistics.get(batch).increment(CsvReaderStatistics.READ_BYTE_COUNT,
statistics.get(batch).increment(DataReaderStatistics.READ_BYTE_COUNT,
csvReader.getRawRecord().length() + bytesRead);
bytesRead = 0;
}
if (tokens[0].equals(CsvConstants.BATCH)) {
Batch batch = new Batch(Long.parseLong(tokens[1]), channelId, binaryEncoding,
sourceNodeId);
statistics.put(batch, new CsvReaderStatistics());
statistics.put(batch, new DataReaderStatistics());
return batch;
} else if (tokens[0].equals(CsvConstants.NODEID)) {
this.sourceNodeId = tokens[1];
Expand Down
Expand Up @@ -18,7 +18,7 @@
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.util.Statistics;

abstract public class AbstractCsvDataWriter implements IDataWriter {
abstract public class AbstractProtocolDataWriter implements IDataWriter {

protected DataContext<? extends IDataReader, ? extends IDataWriter> context;

Expand All @@ -34,9 +34,9 @@ abstract public class AbstractCsvDataWriter implements IDataWriter {

protected Map<Batch, Statistics> statistics = new HashMap<Batch, Statistics>();

protected List<ICsvDataWriterListener> listeners;
protected List<IProtocolDataWriterListener> listeners;

public AbstractCsvDataWriter(List<ICsvDataWriterListener> listeners) {
public AbstractProtocolDataWriter(List<IProtocolDataWriterListener> listeners) {
this.listeners = listeners;
}

Expand All @@ -52,7 +52,7 @@ public void start(Batch batch) {
this.batch = batch;

if (listeners != null) {
for (ICsvDataWriterListener listener : listeners) {
for (IProtocolDataWriterListener listener : listeners) {
listener.start(batch);
}
}
Expand Down Expand Up @@ -127,15 +127,15 @@ final public void end(Batch batch, boolean inError) {
endBatch(batch);

if (listeners != null) {
for (ICsvDataWriterListener listener : listeners) {
for (IProtocolDataWriterListener listener : listeners) {
notifyEndBatch(batch, listener);
}
}
}

abstract protected void endBatch(Batch batch);

abstract protected void notifyEndBatch(Batch batch, ICsvDataWriterListener listener);
abstract protected void notifyEndBatch(Batch batch, IProtocolDataWriterListener listener);

protected int println(String key, List<Column> columns) {
return println(key, columns.toArray(new Column[columns.size()]));
Expand Down
Expand Up @@ -3,7 +3,7 @@
import org.jumpmind.symmetric.io.IoResource;
import org.jumpmind.symmetric.io.data.Batch;

public interface ICsvDataWriterListener {
public interface IProtocolDataWriterListener {

public void start(Batch batch);

Expand Down
Expand Up @@ -8,15 +8,15 @@
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.io.data.Batch;

public class CsvDataWriter extends AbstractCsvDataWriter {
public class ProtocolDataWriter extends AbstractProtocolDataWriter {

private BufferedWriter writer;

public CsvDataWriter(Writer writer) {
public ProtocolDataWriter(Writer writer) {
this(null, writer);
}

public CsvDataWriter(List<ICsvDataWriterListener> listeners, Writer writer) {
public ProtocolDataWriter(List<IProtocolDataWriterListener> listeners, Writer writer) {
super(listeners);
if (writer instanceof BufferedWriter) {
this.writer = (BufferedWriter) writer;
Expand All @@ -35,7 +35,7 @@ protected void endBatch(Batch batch) {
}

@Override
protected void notifyEndBatch(Batch batch, ICsvDataWriterListener listener) {
protected void notifyEndBatch(Batch batch, IProtocolDataWriterListener listener) {
}

@Override
Expand Down

0 comments on commit a3ef50c

Please sign in to comment.