Skip to content

Commit

Permalink
0002088: The addition of a new symds configuration table makes regist…
Browse files Browse the repository at this point in the history
…ration incompatible with previous versions
  • Loading branch information
chenson42 committed Jan 11, 2015
1 parent feb4cdd commit 3335821
Showing 1 changed file with 87 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,107 +114,111 @@ public boolean start(Table table) {
if (this.targetTable==null && hasFilterThatHandlesMissingTable(table)) {
this.targetTable = table;
}
if (this.targetTable != null) {
return true;
} else if (writerSettings.isIgnoreMissingTables() ||
batch.getBatchId() == IoConstants.IGNORE_TABLES_BATCH) {
String qualifiedName = sourceTable.getFullyQualifiedTableName();
if (!missingTables.contains(qualifiedName)) {
log.warn("Did not find the {} table in the target database", qualifiedName);
missingTables.add(qualifiedName);
}
return false;
} else {
// The first data should fail because the table will not be found
return true;
}

/* The first data that requires a target table should fail because the table will not be found */
return true;
}

public void write(CsvData data) {
write(data, false);
}

protected void write(CsvData data, boolean fallback) {
context.put(CONFLICT_ERROR, null);
if (data.requiresTable() &&
(targetTable == null && data.getDataEventType() != DataEventType.SQL)) {
// if we cross batches and the table isn't specified, then
// use the last table we used
start(context.getLastParsedTable());
}
if (targetTable != null || !data.requiresTable() ||
(targetTable == null && data.getDataEventType() == DataEventType.SQL)) {
try {
statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
if (filterBefore(data)) {
LoadStatus loadStatus = LoadStatus.SUCCESS;
switch (data.getDataEventType()) {
case UPDATE:
loadStatus = update(data, writerSettings.isApplyChangesOnly(), true);
break;
case INSERT:
loadStatus = insert(data);
break;
case DELETE:
loadStatus = delete(data, true);
break;
case BSH:
script(data);
break;
case SQL:
sql(data);
break;
case CREATE:
create(data);
break;
default:
break;
}
/* If the startTable has been called and the targetTable is required then check
* to see if the writer has been configured to ignore this data event
*/
if (sourceTable != null && targetTable == null &&
data.requiresTable() && (writerSettings.isIgnoreMissingTables()
|| batch.getBatchId() == IoConstants.IGNORE_TABLES_BATCH)) {
String qualifiedName = sourceTable.getFullyQualifiedTableName();
if (!missingTables.contains(qualifiedName)) {
log.warn("Did not find the {} table in the target database", qualifiedName);
missingTables.add(qualifiedName);
}
} else {
context.put(CONFLICT_ERROR, null);
if (data.requiresTable()
&& (targetTable == null && data.getDataEventType() != DataEventType.SQL)) {
// if we cross batches and the table isn't specified, then
// use the last table we used
start(context.getLastParsedTable());
}
if (targetTable != null || !data.requiresTable()
|| (targetTable == null && data.getDataEventType() == DataEventType.SQL)) {
try {
statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
if (filterBefore(data)) {
LoadStatus loadStatus = LoadStatus.SUCCESS;
switch (data.getDataEventType()) {
case UPDATE:
loadStatus = update(data, writerSettings.isApplyChangesOnly(), true);
break;
case INSERT:
loadStatus = insert(data);
break;
case DELETE:
loadStatus = delete(data, true);
break;
case BSH:
script(data);
break;
case SQL:
sql(data);
break;
case CREATE:
create(data);
break;
default:
break;
}

if (loadStatus == LoadStatus.CONFLICT) {
if (conflictResolver != null && !fallback) {
conflictResolver.needsResolved(this, data, loadStatus);
} else {
throw new ConflictException(data, targetTable, false, writerSettings.pickConflict(targetTable, batch));
if (loadStatus == LoadStatus.CONFLICT) {
if (conflictResolver != null && !fallback) {
conflictResolver.needsResolved(this, data, loadStatus);
} else {
throw new ConflictException(data, targetTable, false,
writerSettings.pickConflict(targetTable, batch));
}
}
}

uncommittedCount++;

lastData = data;
uncommittedCount++;

filterAfter(data);
lastData = data;

checkForEarlyCommit();
filterAfter(data);

}
checkForEarlyCommit();

} catch (IgnoreBatchException ex) {
rollback();
throw ex;
} catch (RuntimeException ex) {
if (filterError(data, ex)) {
if (!(ex instanceof SqlException)) {
/*
* SQL exceptions should have already been logged
*/
logFailureDetails(ex, data, false);
}

} catch (IgnoreBatchException ex) {
rollback();
throw ex;
} else {
uncommittedCount++;
statistics.get(batch).increment(DataWriterStatisticConstants.IGNORECOUNT);
checkForEarlyCommit();
} catch (RuntimeException ex) {
if (filterError(data, ex)) {
if (!(ex instanceof SqlException)) {
/*
* SQL exceptions should have already been logged
*/
logFailureDetails(ex, data, false);
}
throw ex;
} else {
uncommittedCount++;
statistics.get(batch).increment(DataWriterStatisticConstants.IGNORECOUNT);
checkForEarlyCommit();
}
}
}
} else {
if (sourceTable != null) {
// If the source table was found but the target table is still unknown throw an exception
throw new SqlException(String.format("Could not find the target table %s",
sourceTable.getFullyQualifiedTableName()));
} else {
throw new SqlException("The target table was not specified");
if (sourceTable != null) {
// If the source table was found but the target table is
// still unknown throw an exception
throw new SqlException(String.format("Could not find the target table %s",
sourceTable.getFullyQualifiedTableName()));
} else {
throw new SqlException("The target table was not specified");
}
}
}
}
Expand Down

0 comments on commit 3335821

Please sign in to comment.