Skip to content

Commit

Permalink
Fix CsvDataReader bug
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Dec 26, 2011
1 parent 622b1e1 commit 8009ff6
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 24 deletions.
Expand Up @@ -351,7 +351,7 @@ public Object[] getObjectValues(BinaryEncoding encoding, String[] values,
objectValue = Hex.decodeHex(value.toCharArray());
}
} else if (type == Types.TIME) {
objectValue = Time.valueOf(value);
objectValue = new Time(Timestamp.valueOf(value).getTime());
} else if (type == Types.ARRAY) {
objectValue = createArray(column, value);
}
Expand Down
Expand Up @@ -191,6 +191,10 @@ public int getColumnCount() {
return columns.size();
}

public int getPrimaryKeyColumnCount() {
return getPrimaryKeyColumns().length;
}

/**
* Returns the column at the specified position.
*
Expand Down
Expand Up @@ -88,7 +88,7 @@ public void putAttribute(String attributeName, Object attributeValue) {

@SuppressWarnings("unchecked")
public <T> T getAttribute(String attributeName) {
return attributes == null ? null : (T)attributes.get(attributeName);
return attributes == null ? null : (T) attributes.get(attributeName);
}

public void removeData(String key) {
Expand Down Expand Up @@ -129,7 +129,13 @@ public boolean[] getChangedDataIndicators() {
String[] oldData = getParsedData(OLD_DATA);
for (int i = 0; i < newData.length; i++) {
if (oldData != null && oldData.length > i) {
changes[i] = !newData[i].equals(oldData[i]);
if (newData[i] == null) {
changes[i] = oldData[i] != null;
} else if (oldData[i] == null) {
changes[i] = newData[i] != null;
} else {
changes[i] = !newData[i].equals(oldData[i]);
}
} else {
changes[i] = true;
}
Expand Down
Expand Up @@ -99,7 +99,7 @@ public static int write(Writer writer, String... data) throws IOException {

writer.write(buffer.toString());
if (log.isDebugEnabled()) {
log.debug("BufferWriting", buffer);
log.debug(buffer.toString());
}
return buffer.length();
}
Expand Down
Expand Up @@ -83,6 +83,9 @@ protected Object readNext() {
String[] parsedOldData = null;
long bytesRead = 0;
while (csvReader.readRecord()) {
if (log.isDebugEnabled()) {
log.debug(csvReader.getRawRecord());
}
String[] tokens = csvReader.getValues();
if (batch == null) {
bytesRead += csvReader.getRawRecord().length();
Expand All @@ -106,8 +109,7 @@ protected Object readNext() {
catalogName = StringUtils.isBlank(tokens[1]) ? null : tokens[1];
} else if (tokens[0].equals(CsvConstants.TABLE)) {
String tableName = tokens[1];
table = tables.get(Table.getFullyQualifiedTableName(tableName, schemaName,
catalogName, ""));
table = tables.get(Table.getFullyQualifiedTableName(catalogName, schemaName, tableName));
if (table != null) {
return table;
} else {
Expand All @@ -126,7 +128,7 @@ protected Object readNext() {
&& keys.contains(tokens[i]));
table.addColumn(column);
}
tables.put(table.getFullyQualifiedTableName(""), table);
tables.put(table.getFullyQualifiedTableName(), table);
return table;
} else if (tokens[0].equals(CsvConstants.COMMIT)) {
return null;
Expand Down Expand Up @@ -156,6 +158,16 @@ protected Object readNext() {
data.setDataEventType(DataEventType.SQL);
data.putCsvData(CsvData.ROW_DATA, tokens[1]);
return data;
} else if (tokens[0].equals(CsvConstants.BSH)) {
CsvData data = new CsvData();
data.setDataEventType(DataEventType.BSH);
data.putCsvData(CsvData.ROW_DATA, tokens[1]);
return data;
} else if (tokens[0].equals(CsvConstants.CREATE)) {
CsvData data = new CsvData();
data.setDataEventType(DataEventType.CREATE);
data.putCsvData(CsvData.ROW_DATA, tokens[1]);
return data;
} else {
log.info("Unable to handle unknown csv values: " + Arrays.toString(tokens));
}
Expand Down
Expand Up @@ -127,15 +127,10 @@ public void start(Batch batch) {
}

public boolean start(Table table) {
this.lastData = null;
this.currentDmlStatement = null;
this.sourceTable = table;
this.targetTable = lookupTableAtTarget(table);
if (this.targetTable != null) {
this.targetTable = targetTable.copy();
this.targetTable.reOrderColumns(sourceTable.getColumns(),
this.batchSettings.isUsePrimaryKeysFromSource());
this.transaction.allowInsertIntoAutoIncrementColumns(true, this.targetTable);
}

if (this.targetTable != null || hasFilterThatHandlesMissingTable(table)) {
return true;
} else {
Expand Down Expand Up @@ -573,9 +568,9 @@ protected boolean hasFilterThatHandlesMissingTable(Table table) {
return false;
}

protected Table lookupTableAtTarget(Table table) {
table = platform.getTableFromCache(table.getCatalog(), table.getSchema(), table.getName(),
false);
protected Table lookupTableAtTarget(Table sourceTable) {
Table table = platform.getTableFromCache(sourceTable.getCatalog(), sourceTable.getSchema(),
sourceTable.getName(), false);
if (table != null) {
Column[] columns = table.getColumns();
for (Column column : columns) {
Expand All @@ -586,6 +581,10 @@ protected Table lookupTableAtTarget(Table table) {
}
column.setTypeCode(typeCode);
}
table = table.copy();
table.reOrderColumns(sourceTable.getColumns(),
this.batchSettings.isUsePrimaryKeysFromSource());
this.transaction.allowInsertIntoAutoIncrementColumns(true, this.targetTable);
}
return table;
}
Expand Down
Expand Up @@ -76,6 +76,85 @@ public void testSimpleRead() {

reader.close();
}

@Test
public void testTableContextSwitch() {
String nodeId= "1";
long batchId = 1;
String channelId = "test";
StringBuilder builder = beginCsv(nodeId);
beginBatch(builder, batchId, channelId);
putTableN(builder, 1, true);
putInsert(builder, 4);
putTableN(builder, 2, true);
putInsert(builder, 4);
putTableN(builder, 1, false);
putInsert(builder, 2);
putTableN(builder, 2, false);
putInsert(builder, 2);
endCsv(builder);

CsvDataReader reader = new CsvDataReader(builder);
DataContext<CsvDataReader, IDataWriter> ctx = new DataContext<CsvDataReader, IDataWriter>(reader, null);
reader.open(ctx);

Batch batch = reader.nextBatch();
Assert.assertNotNull(batch);

Table table = reader.nextTable();
Assert.assertNotNull(table);
Assert.assertEquals(2, table.getColumnCount());
Assert.assertEquals(1, table.getPrimaryKeyColumnCount());
Assert.assertEquals("test1", table.getName());

int dataCount = 0;
while (reader.nextData() != null) {
dataCount++;
}

Assert.assertEquals(4, dataCount);

table = reader.nextTable();
Assert.assertNotNull(table);
Assert.assertEquals(2, table.getColumnCount());
Assert.assertEquals(1, table.getPrimaryKeyColumnCount());
Assert.assertEquals("test2", table.getName());

dataCount = 0;
while (reader.nextData() != null) {
dataCount++;
}

Assert.assertEquals(4, dataCount);

table = reader.nextTable();
Assert.assertNotNull(table);
Assert.assertEquals(2, table.getColumnCount());
Assert.assertEquals(1, table.getPrimaryKeyColumnCount());
Assert.assertEquals("test1", table.getName());

dataCount = 0;
while (reader.nextData() != null) {
dataCount++;
}

Assert.assertEquals(2, dataCount);

table = reader.nextTable();
Assert.assertNotNull(table);
Assert.assertEquals(2, table.getColumnCount());
Assert.assertEquals(1, table.getPrimaryKeyColumnCount());
Assert.assertEquals("test2", table.getName());

dataCount = 0;
while (reader.nextData() != null) {
dataCount++;
}

Assert.assertEquals(2, dataCount);


}

protected StringBuilder beginCsv(String nodeId) {
StringBuilder builder = new StringBuilder();
Expand Down
Expand Up @@ -25,7 +25,6 @@

import javax.sql.DataSource;

import org.jumpmind.log.Log4jLog;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.logging.ILog;
Expand Down Expand Up @@ -53,11 +52,7 @@ public class AbstractDatabaseTest extends AbstractTest {

private String database = TestSetupUtil.getRootDbTypes(DatabaseTestSuite.DEFAULT_TEST_PREFIX)[0];

static boolean standalone = true;

static {
org.jumpmind.log.LogFactory.setLogClass(Log4jLog.class);
}
static boolean standalone = true;

public void init(String database) {
this.database = database;
Expand Down
Expand Up @@ -21,9 +21,14 @@

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.jumpmind.log.Log4jLog;

abstract public class AbstractTest {

static {
org.jumpmind.log.LogFactory.setLogClass(Log4jLog.class);
}

protected Level setLoggingLevelForTest(Level level) {
Level old = Logger.getLogger(getClass()).getLevel();
Logger.getLogger("org.jumpmind").setLevel(level);
Expand Down
Expand Up @@ -1066,7 +1066,7 @@ public void testCaseSensitiveTableNames() {
"Table name in mixed case was not synced");
}

@Test(timeout = 120000)
@Test //(timeout = 120000)
public void testSyncShellCommand() throws Exception {
logTestRunning();
IDataService rootDataService = AppUtils.find(Constants.DATA_SERVICE, getRootEngine());
Expand Down
4 changes: 4 additions & 0 deletions symmetric/symmetric-server/src/test/resources/log4j.xml
Expand Up @@ -12,6 +12,10 @@
<category name="org.jumpmind">
<priority value="INFO" />
</category>

<category name="org.jumpmind.symmetric.io">
<priority value="DEBUG" />
</category>

<category name="org.jumpmind.db.sql.jdbc">
<priority value="DEBUG" />
Expand Down

0 comments on commit 8009ff6

Please sign in to comment.