Skip to content

Commit

Permalink
Added binary_encoding to incoming_error. Added a queryForRow method t…
Browse files Browse the repository at this point in the history
…o sqltemplate
  • Loading branch information
chenson42 committed May 29, 2012
1 parent 5b6b75c commit 328db2f
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 101 deletions.
Expand Up @@ -23,6 +23,7 @@
import java.io.Serializable;
import java.util.Date;

import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;

Expand All @@ -43,6 +44,8 @@ public class IncomingError implements Serializable {
private String targetSchemaName;

private String targetTableName;

private BinaryEncoding binaryEncoding;

private DataEventType eventType;

Expand All @@ -51,9 +54,9 @@ public class IncomingError implements Serializable {
private String primaryKeyColumnNames;

private CsvData csvData = new CsvData();

private boolean resolveIgnore = false;

private Date createTime = new Date();

private Date lastUpdateTime = new Date();
Expand Down Expand Up @@ -101,125 +104,149 @@ public void setResolveData(String resolveData) {
}

/* getters and setters */

public long getBatchId() {
return batchId;
}

public void setBatchId(long batchId) {
this.batchId = batchId;
}
public long getBatchId() {
return batchId;
}

public void setBatchId(long batchId) {
this.batchId = batchId;
}

public String getNodeId() {
return nodeId;
}

public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}

public String getNodeId() {
return nodeId;
}
public long getFailedRowNumber() {
return failedRowNumber;
}

public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public void setFailedRowNumber(long failedRowNumber) {
this.failedRowNumber = failedRowNumber;
}

public long getFailedRowNumber() {
return failedRowNumber;
}
public String getTargetCatalogName() {
return targetCatalogName;
}

public void setFailedRowNumber(long failedRowNumber) {
this.failedRowNumber = failedRowNumber;
}
public void setTargetCatalogName(String targetCatalogName) {
this.targetCatalogName = targetCatalogName;
}

public String getTargetCatalogName() {
return targetCatalogName;
}
public String getTargetSchemaName() {
return targetSchemaName;
}

public void setTargetCatalogName(String targetCatalogName) {
this.targetCatalogName = targetCatalogName;
}
public void setTargetSchemaName(String targetSchemaName) {
this.targetSchemaName = targetSchemaName;
}

public String getTargetSchemaName() {
return targetSchemaName;
}
public String getTargetTableName() {
return targetTableName;
}

public void setTargetSchemaName(String targetSchemaName) {
this.targetSchemaName = targetSchemaName;
}
public void setTargetTableName(String targetTableName) {
this.targetTableName = targetTableName;
}

public String getTargetTableName() {
return targetTableName;
}
public DataEventType getEventType() {
return eventType;
}

public void setTargetTableName(String targetTableName) {
this.targetTableName = targetTableName;
}
public void setEventType(DataEventType eventType) {
this.eventType = eventType;
}

public DataEventType getEventType() {
return eventType;
}
public CsvData getCsvData() {
return csvData;
}

public void setEventType(DataEventType eventType) {
this.eventType = eventType;
}
public void setCsvData(CsvData csvData) {
this.csvData = csvData;
}

public CsvData getCsvData() {
return csvData;
}
public boolean isResolveIgnore() {
return resolveIgnore;
}

public void setCsvData(CsvData csvData) {
this.csvData = csvData;
}
public void setResolveIgnore(boolean resolveIgnore) {
this.resolveIgnore = resolveIgnore;
}

public boolean isResolveIgnore() {
return resolveIgnore;
}
public Date getCreateTime() {
return createTime;
}

public void setResolveIgnore(boolean resolveIgnore) {
this.resolveIgnore = resolveIgnore;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}

public Date getCreateTime() {
return createTime;
}
public Date getLastUpdateTime() {
return lastUpdateTime;
}

public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public void setLastUpdateTime(Date lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}

public Date getLastUpdateTime() {
return lastUpdateTime;
}
public String getLastUpdateBy() {
return lastUpdateBy;
}

public void setLastUpdateTime(Date lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}
public void setLastUpdateBy(String lastUpdateBy) {
this.lastUpdateBy = lastUpdateBy;
}

public String getLastUpdateBy() {
return lastUpdateBy;
}
public long getFailedLineNumber() {
return failedLineNumber;
}

public void setLastUpdateBy(String lastUpdateBy) {
this.lastUpdateBy = lastUpdateBy;
}
public void setFailedLineNumber(long failedLineNumber) {
this.failedLineNumber = failedLineNumber;
}

public long getFailedLineNumber() {
return failedLineNumber;
}
public String getColumnNames() {
return columnNames;
}

public void setFailedLineNumber(long failedLineNumber) {
this.failedLineNumber = failedLineNumber;
}
public void setColumnNames(String columnNames) {
this.columnNames = columnNames;
}

public String getColumnNames() {
return columnNames;
}
public String getPrimaryKeyColumnNames() {
return primaryKeyColumnNames;
}

public void setColumnNames(String columnNames) {
this.columnNames = columnNames;
}
public void setPrimaryKeyColumnNames(String primaryKeyColumnNames) {
this.primaryKeyColumnNames = primaryKeyColumnNames;
}

public String getPrimaryKeyColumnNames() {
return primaryKeyColumnNames;
}
public String[] getParsedColumnNames() {
if (columnNames != null) {
return columnNames.split(",");
} else {
return null;
}
}

public void setPrimaryKeyColumnNames(String primaryKeyColumnNames) {
this.primaryKeyColumnNames = primaryKeyColumnNames;
}
public String[] getParsedPrimaryKeyColumnNames() {
if (primaryKeyColumnNames != null) {
return primaryKeyColumnNames.split(",");
} else {
return null;
}
}

public void setBinaryEncoding(BinaryEncoding binaryEncoding) {
this.binaryEncoding = binaryEncoding;
}

public BinaryEncoding getBinaryEncoding() {
return binaryEncoding;
}

}
Expand Up @@ -42,6 +42,7 @@
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.SqlException;
import org.jumpmind.db.sql.UniqueKeyException;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.Constants;
Expand Down Expand Up @@ -521,8 +522,10 @@ public void insertIncomingError(IncomingError incomingError) {
sqlTemplate.update(getSql("insertIncomingErrorSql"), incomingError.getBatchId(),
incomingError.getNodeId(), incomingError.getFailedRowNumber(),
incomingError.getFailedLineNumber(), incomingError.getTargetCatalogName(),
incomingError.getTargetSchemaName(), incomingError.getTargetTableName(),
incomingError.getEventType().getCode(), incomingError.getColumnNames(),
incomingError.getTargetSchemaName(), incomingError.getTargetTableName(),
incomingError.getEventType().getCode(),
incomingError.getBinaryEncoding().name(),
incomingError.getColumnNames(),
incomingError.getPrimaryKeyColumnNames(), incomingError.getRowData(),
incomingError.getOldData(), incomingError.getResolveData(),
incomingError.getResolveData(), incomingError.getCreateTime(),
Expand Down Expand Up @@ -577,6 +580,7 @@ public IncomingError mapRow(Row rs) {
incomingError.setTargetSchemaName(rs.getString("target_schema_name"));
incomingError.setTargetTableName(rs.getString("target_table_name"));
incomingError.setEventType(DataEventType.getEventType(rs.getString("event_type")));
incomingError.setBinaryEncoding(BinaryEncoding.valueOf(rs.getString("binary_encoding")));
incomingError.setColumnNames(rs.getString("column_names"));
incomingError.setPrimaryKeyColumnNames(rs.getString("pk_column_names"));
incomingError.setRowData(rs.getString("row_data"));
Expand Down Expand Up @@ -753,6 +757,7 @@ public void batchInError(DataContext context, Exception ex) {
error.setPrimaryKeyColumnNames(Table.getCommaDeliminatedColumns(context
.getTable().getPrimaryKeyColumns()));
error.setCsvData(context.getData());
error.setBinaryEncoding(context.getBatch().getBinaryEncoding());
error.setEventType(context.getData().getDataEventType());
error.setFailedLineNumber(this.currentBatch.getFailedLineNumber());
error.setFailedRowNumber(this.currentBatch.getFailedRowNumber());
Expand All @@ -767,9 +772,9 @@ public void batchInError(DataContext context, Exception ex) {
}
incomingBatchService.updateIncomingBatch(this.currentBatch);
} catch (Exception e) {
log.error("Failed to record status of batch {} because {}",
log.error("Failed to record status of batch {}",
this.currentBatch != null ? this.currentBatch.getNodeBatchId() : context
.getBatch().getNodeBatchId(), e.getMessage());
.getBatch().getNodeBatchId(), e);
}
}

Expand Down
Expand Up @@ -40,12 +40,12 @@ public class DataLoaderServiceSqlMap extends AbstractSqlMap {

putSql("selectIncomingErrorSql",
"select batch_id, node_id, failed_row_number, failed_line_number, target_catalog_name, target_schema_name, " +
"target_table_name, event_type, column_names, pk_column_names, row_data, old_data, resolve_data, resolve_ignore, " +
"target_table_name, event_type, binary_encoding, column_names, pk_column_names, row_data, old_data, resolve_data, resolve_ignore, " +
"create_time, last_update_by, last_update_time from $(incoming_error) where batch_id = ? and node_id = ?");

putSql("selectCurrentIncomingErrorSql",
"select e.batch_id, e.node_id, e.failed_row_number, e.failed_line_number, e.target_catalog_name, e.target_schema_name, " +
"e.target_table_name, e.event_type, e.column_names, e.pk_column_names, e.row_data, e.old_data, e.resolve_data, e.resolve_ignore, " +
"e.target_table_name, e.event_type, e.binary_encoding, e.column_names, e.pk_column_names, e.row_data, e.old_data, e.resolve_data, e.resolve_ignore, " +
"e.create_time, e.last_update_by, e.last_update_time " +
"from $(incoming_error) e inner join $(incoming_batch) b on b.batch_id = e.batch_id " +
"and b.node_id = e.node_id and b.failed_row_number = e.failed_row_number " +
Expand All @@ -54,8 +54,8 @@ public class DataLoaderServiceSqlMap extends AbstractSqlMap {
putSql("insertIncomingErrorSql",
"insert into $(incoming_error) " +
"(batch_id, node_id, failed_row_number, failed_line_number, target_catalog_name, target_schema_name, " +
"target_table_name, event_type, column_names, pk_column_names, row_data, old_data, resolve_data, resolve_ignore, " +
"create_time, last_update_by, last_update_time) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
"target_table_name, event_type, binary_encoding, column_names, pk_column_names, row_data, old_data, resolve_data, resolve_ignore, " +
"create_time, last_update_by, last_update_time) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");

putSql("updateIncomingErrorSql",
"update $(incoming_error) set resolve_data = ?, resolve_ignore = ? " +
Expand Down
Expand Up @@ -515,6 +515,7 @@
<column name="target_schema_name" type="VARCHAR" size="50" description="The schema name for the table being loaded." />
<column name="target_table_name" type="VARCHAR" size="50" required="true" description="The table name for the table being loaded." />
<column name="event_type" type="CHAR" size="1" required="true" description="The type of event captured by this entry. For triggers, this is the change that occurred, which is 'I' for insert, 'U' for update, or 'D' for delete. Other events include: 'R' for reloading the entire table (or subset of the table) to the node; 'S' for running dynamic SQL at the node, which is used for adhoc administration." />
<column name="binary_encoding" type="VARCHAR" size="10" required="true" default="HEX" description="The type of encoding the source system used for encoding binary data." />
<column name="column_names" type="LONGVARCHAR" required="true" description="The column names defined on the table. The column names are stored in comma-separated values (CSV) format." />
<column name="pk_column_names" type="LONGVARCHAR" required="true" description="The primary key column names defined on the table. The column names are stored in comma-separated values (CSV) format." />
<column name="row_data" type="LONGVARCHAR" description="The row data from the batch as captured from the source. The column values are stored in comma-separated values (CSV) format." />
Expand Down
Expand Up @@ -104,6 +104,14 @@ public List<Row> query(String sql) {
public <T> List<T> query(String sql, ISqlRowMapper<T> mapper, Object... args) {
return query(sql, mapper, args, null);
}

public Row queryForRow(String sql, Object... args) {
return queryForObject(sql, new ISqlRowMapper<Row>() {
public Row mapRow(Row row) {
return row;
}
}, args);
}

@SuppressWarnings("unchecked")
public <T, W> Map<T, W> query(String sql, String keyCol, String valueCol, Object[] args,
Expand Down
Expand Up @@ -26,6 +26,8 @@ public interface ISqlTemplate {
public String queryForString(String sql, Object... args);

public long queryForLong(String sql, Object... args);

public Row queryForRow(String sql, Object... args);

public Map<String, Object> queryForMap(String sql, Object... params);

Expand Down

0 comments on commit 328db2f

Please sign in to comment.