Skip to content

Commit

Permalink
Added filecsvdatawriter
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Dec 27, 2011
1 parent cce3ffe commit 930f5fd
Show file tree
Hide file tree
Showing 13 changed files with 536 additions and 17 deletions.
2 changes: 1 addition & 1 deletion symmetric/symmetric-assemble/TODO.txt
Expand Up @@ -9,7 +9,7 @@
+ Introduce handler for conflict resolution. Implement default handler to start with
+ Setting for whether to update all columns or only the columns that changed
+ plug in transform database writer
* LobHandler for Oracle
+ LobHandler for Oracle
* Add settings to channel
* Lookup/choose datawriter based on channel settings
* clean up error handling in loader service
Expand Down
@@ -0,0 +1,26 @@
package org.jumpmind.symmetric.io;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;

import org.jumpmind.exception.IoException;

public class FileIoResource implements IoResource {

File file;

public FileIoResource(File file) {
this.file = file;
}

public InputStream open() {
try {
return new FileInputStream(file);
} catch (FileNotFoundException e) {
throw new IoException(e);
}
}

}
@@ -0,0 +1,9 @@
package org.jumpmind.symmetric.io;

import java.io.InputStream;

public interface IoResource {

public InputStream open();

}
@@ -0,0 +1,18 @@
package org.jumpmind.symmetric.io;

import java.io.ByteArrayInputStream;
import java.io.InputStream;

public class MemoryIoResource implements IoResource {

byte[] buffer;

public MemoryIoResource(byte[] buffer) {
this.buffer = buffer;
}

public InputStream open() {
return new ByteArrayInputStream(buffer);
}

}
Expand Up @@ -10,8 +10,5 @@ public interface IDataReader extends IDataResource {
public Table nextTable();

public CsvData nextData();

// TODO think about streaming big data
//public InputStream getStream();

}
@@ -0,0 +1,5 @@
package org.jumpmind.symmetric.io.data.reader;

public class CapturedDataReader {

}
Expand Up @@ -4,6 +4,7 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringReader;
Expand Down Expand Up @@ -34,7 +35,7 @@
public class CsvDataReader implements IDataReader {

protected Log log = LogFactory.getLog(getClass());

protected Reader reader;
protected Map<Batch, Statistics> statistics = new HashMap<Batch, Statistics>();
protected CsvReader csvReader;
Expand All @@ -51,6 +52,18 @@ public CsvDataReader(StringBuilder input) {
this(new BufferedReader(new StringReader(input.toString())));
}

public CsvDataReader(InputStream is) {
this(toReader(is));
}

protected static Reader toReader(InputStream is) {
try {
return new BufferedReader(new InputStreamReader(is, "UTF-8"));
} catch (IOException ex) {
throw new IoException(ex);
}
}

public CsvDataReader(String input) {
this(new BufferedReader(new StringReader(input)));
}
Expand All @@ -68,8 +81,8 @@ public CsvDataReader(File file) {
throw new IoException(ex);
}
}
public <R extends IDataReader,W extends IDataWriter> void open(DataContext<R,W> context) {

public <R extends IDataReader, W extends IDataWriter> void open(DataContext<R, W> context) {
this.context = context;
this.csvReader = CsvUtils.getCsvReader(reader);
this.next = readNext();
Expand All @@ -90,11 +103,13 @@ protected Object readNext() {
if (batch == null) {
bytesRead += csvReader.getRawRecord().length();
} else {
statistics.get(batch).increment(CsvReaderStatistics.READ_BYTE_COUNT, csvReader.getRawRecord().length() + bytesRead);
statistics.get(batch).increment(CsvReaderStatistics.READ_BYTE_COUNT,
csvReader.getRawRecord().length() + bytesRead);
bytesRead = 0;
}
if (tokens[0].equals(CsvConstants.BATCH)) {
Batch batch = new Batch(Long.parseLong(tokens[1]), channelId, binaryEncoding, sourceNodeId);
Batch batch = new Batch(Long.parseLong(tokens[1]), channelId, binaryEncoding,
sourceNodeId);
statistics.put(batch, new CsvReaderStatistics());
return batch;
} else if (tokens[0].equals(CsvConstants.NODEID)) {
Expand All @@ -109,7 +124,8 @@ protected Object readNext() {
catalogName = StringUtils.isBlank(tokens[1]) ? null : tokens[1];
} else if (tokens[0].equals(CsvConstants.TABLE)) {
String tableName = tokens[1];
table = tables.get(Table.getFullyQualifiedTableName(catalogName, schemaName, tableName));
table = tables.get(Table.getFullyQualifiedTableName(catalogName, schemaName,
tableName));
if (table != null) {
return table;
} else {
Expand All @@ -135,22 +151,25 @@ protected Object readNext() {
} else if (tokens[0].equals(CsvConstants.INSERT)) {
CsvData data = new CsvData();
data.setDataEventType(DataEventType.INSERT);
data.putParsedData(CsvData.ROW_DATA, Arrays.copyOfRange(tokens, 1, tokens.length));
data.putParsedData(CsvData.ROW_DATA,
Arrays.copyOfRange(tokens, 1, tokens.length));
return data;
} else if (tokens[0].equals(CsvConstants.OLD)) {
parsedOldData = Arrays.copyOfRange(tokens, 1, tokens.length);
} else if (tokens[0].equals(CsvConstants.UPDATE)) {
CsvData data = new CsvData();
data.setDataEventType(DataEventType.UPDATE);
data.putParsedData(CsvData.ROW_DATA, Arrays.copyOfRange(tokens, 1, table.getColumnCount() + 1));
data.putParsedData(CsvData.PK_DATA, Arrays.copyOfRange(tokens, table.getColumnCount() + 1,
tokens.length));
data.putParsedData(CsvData.ROW_DATA,
Arrays.copyOfRange(tokens, 1, table.getColumnCount() + 1));
data.putParsedData(CsvData.PK_DATA,
Arrays.copyOfRange(tokens, table.getColumnCount() + 1, tokens.length));
data.putParsedData(CsvData.OLD_DATA, parsedOldData);
return data;
} else if (tokens[0].equals(CsvConstants.DELETE)) {
CsvData data = new CsvData();
data.setDataEventType(DataEventType.DELETE);
data.putParsedData(CsvData.PK_DATA, Arrays.copyOfRange(tokens, 1, tokens.length));
data.putParsedData(CsvData.PK_DATA,
Arrays.copyOfRange(tokens, 1, tokens.length));
data.putParsedData(CsvData.OLD_DATA, parsedOldData);
return data;
} else if (tokens[0].equals(CsvConstants.SQL)) {
Expand All @@ -167,7 +186,7 @@ protected Object readNext() {
CsvData data = new CsvData();
data.setDataEventType(DataEventType.CREATE);
data.putCsvData(CsvData.ROW_DATA, tokens[1]);
return data;
return data;
} else {
log.info("Unable to handle unknown csv values: " + Arrays.toString(tokens));
}
Expand Down Expand Up @@ -239,7 +258,7 @@ public void close() {
csvReader.close();
}
}

public Map<Batch, Statistics> getStatistics() {
return statistics;
}
Expand Down
@@ -0,0 +1,5 @@
package org.jumpmind.symmetric.io.data.reader;

public class TableDataReader {

}
@@ -0,0 +1,181 @@
package org.jumpmind.symmetric.io.data.writer;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.BinaryEncoding;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvConstants;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.IDataReader;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.util.Statistics;

abstract public class AbstractCsvDataWriter implements IDataWriter {

protected DataContext<? extends IDataReader, ? extends IDataWriter> context;

protected Batch batch;

protected Table table;

protected Set<Table> processedTables = new HashSet<Table>();

protected String delimiter = ",";

protected boolean flushNodeId = true;

protected Map<Batch, Statistics> statistics = new HashMap<Batch, Statistics>();

protected List<ICsvDataWriterListener> listeners;

public AbstractCsvDataWriter(List<ICsvDataWriterListener> listeners) {
this.listeners = listeners;
}

public <R extends IDataReader, W extends IDataWriter> void open(DataContext<R, W> context) {
this.context = context;
}

public void close() {
}

public void start(Batch batch) {
this.statistics.put(batch, new Statistics());
this.batch = batch;

if (listeners != null) {
for (ICsvDataWriterListener listener : listeners) {
listener.start(batch);
}
}

if (flushNodeId) {
String sourceNodeId = batch.getSourceNodeId();
if (StringUtils.isNotBlank(sourceNodeId)) {
println(CsvConstants.NODEID, sourceNodeId);
}
BinaryEncoding binaryEncoding = batch.getBinaryEncoding();
if (binaryEncoding != null) {
println(CsvConstants.BINARY, binaryEncoding.name());
}
flushNodeId = false;
}
if (StringUtils.isNotBlank(batch.getChannelId())) {
println(CsvConstants.CHANNEL, batch.getChannelId());
}
println(CsvConstants.BATCH, Long.toString(batch.getBatchId()));
}

public boolean start(Table table) {
this.table = table;
String catalogName = table.getCatalog();
println(CsvConstants.CATALOG, StringUtils.isNotBlank(catalogName) ? catalogName : "");
String schemaName = table.getSchema();
println(CsvConstants.SCHEMA, StringUtils.isNotBlank(schemaName) ? schemaName : "");
println(CsvConstants.TABLE, table.getName());
if (!processedTables.contains(table)) {
println(CsvConstants.KEYS, table.getPrimaryKeyColumns());
println(CsvConstants.COLUMNS, table.getColumns());
}
this.processedTables.add(table);
return true;
}

public void write(CsvData data) {
switch (data.getDataEventType()) {
case INSERT:
println(CsvConstants.INSERT, data.getCsvData(CsvData.ROW_DATA));
break;

case UPDATE:
String oldData = data.getCsvData(CsvData.OLD_DATA);
if (StringUtils.isNotBlank(oldData)) {
println(CsvConstants.OLD, oldData);
}
println(CsvConstants.UPDATE, data.getCsvData(CsvData.ROW_DATA),
data.getCsvData(CsvData.PK_DATA));
break;

case DELETE:
oldData = data.getCsvData(CsvData.OLD_DATA);
if (StringUtils.isNotBlank(oldData)) {
println(CsvConstants.OLD, oldData);
}
println(CsvConstants.DELETE, data.getCsvData(CsvData.PK_DATA));
break;

case SQL:
println(CsvConstants.SQL, data.getCsvData(CsvData.ROW_DATA));
break;
}
}

public void end(Table table) {
}

final public void end(Batch batch, boolean inError) {
println(CsvConstants.COMMIT, Long.toString(batch.getBatchId()));

endBatch(batch);

if (listeners != null) {
for (ICsvDataWriterListener listener : listeners) {
notifyEndBatch(batch, listener);
}
}
}

abstract protected void endBatch(Batch batch);

abstract protected void notifyEndBatch(Batch batch, ICsvDataWriterListener listener);

protected int println(String key, List<Column> columns) {
return println(key, columns.toArray(new Column[columns.size()]));
}

protected int println(String key, Column[] columns) {
StringBuilder buffer = new StringBuilder(key);
for (int i = 0; i < columns.length; i++) {
buffer.append(delimiter);
buffer.append(columns[i].getName());
}
println(buffer.toString());
return buffer.length();
}

abstract protected void print(Batch batch, String data);

protected int println(String... data) {
StringBuilder buffer = new StringBuilder();
for (int i = 0; i < data.length; i++) {
if (i != 0) {
buffer.append(delimiter);
}
buffer.append(data[i]);
}
buffer.append("\n");
print(batch, buffer.toString());
return buffer.length();
}

public void setDelimiter(String delimiter) {
this.delimiter = delimiter;
}

public String getDelimiter() {
return delimiter;
}

public Map<Batch, Statistics> getStatistics() {
return statistics;
}

}

0 comments on commit 930f5fd

Please sign in to comment.