Skip to content

Commit

Permalink
0003598: Order rows in initial load for table with self-referencing
Browse files Browse the repository at this point in the history
foreign key
  • Loading branch information
erilong committed Jun 11, 2018
1 parent 8a7bd07 commit 84cc52f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 9 deletions.
Expand Up @@ -62,7 +62,9 @@
import org.jumpmind.db.io.DatabaseXmlUtil;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.ForeignKey;
import org.jumpmind.db.model.PlatformColumn;
import org.jumpmind.db.model.Reference;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.DdlBuilderFactory;
Expand Down Expand Up @@ -2392,6 +2394,18 @@ class SelectFromTableSource implements IExtractDataReaderSource {
private TriggerRouter triggerRouter;

private ColumnsAccordingToTriggerHistory columnsAccordingToTriggerHistory;

private String overrideSelectSql;

private boolean isSelfReferencingFk;

private int selfRefLevel;

private String selfRefParentColumnName;

private String selfRefChildColumnName;

boolean isFirstRow;

public SelectFromTableSource(OutgoingBatch outgoingBatch, Batch batch,
SelectFromTableEvent event) {
Expand Down Expand Up @@ -2454,6 +2468,8 @@ protected CsvData selectNext() {
if (this.currentInitialLoadEvent == null && selectFromTableEventsToSend.size() > 0) {
this.currentInitialLoadEvent = selectFromTableEventsToSend.remove(0);
TriggerHistory history = this.currentInitialLoadEvent.getTriggerHistory();
this.isSelfReferencingFk = false;
this.isFirstRow = true;
if (this.currentInitialLoadEvent.containsData()) {
data = this.currentInitialLoadEvent.getData();
this.currentInitialLoadEvent = null;
Expand All @@ -2474,19 +2490,47 @@ protected CsvData selectNext() {
.getRouter().getRouterId(), history, false, true);
this.targetTable = columnsAccordingToTriggerHistory.lookup(triggerRouter
.getRouter().getRouterId(), history, true, false);

this.startNewCursor(history, triggerRouter,
this.currentInitialLoadEvent.getInitialLoadSelect());

this.overrideSelectSql = currentInitialLoadEvent.getInitialLoadSelect();
if (overrideSelectSql != null && overrideSelectSql.trim().toUpperCase().startsWith("WHERE")) {
overrideSelectSql = overrideSelectSql.trim().substring(5);
}

}
if (StringUtils.isBlank(overrideSelectSql) || (overrideSelectSql != null && overrideSelectSql.equals("1=1"))) {
ForeignKey fk = this.sourceTable.getSelfReferencingForeignKey();
if (fk != null) {
Reference[] refs = fk.getReferences();
if (refs.length == 1) {
this.isSelfReferencingFk = true;
this.selfRefParentColumnName = refs[0].getLocalColumnName();
this.selfRefChildColumnName = refs[0].getForeignColumnName();
this.selfRefLevel = 0;
log.info("Ordering rows for table {} using self-referencing foreign key {} -> {}",
this.sourceTable.getName(), this.selfRefParentColumnName, this.selfRefChildColumnName);
} else {
log.warn("Unable to order rows for self-referencing foreign key because it contains multiple columns");
}
}
}

this.startNewCursor(history, triggerRouter);
}
}

if (this.cursor != null) {
data = this.cursor.next();
if (data == null) {
closeCursor();
if (isSelfReferencingFk && !this.isFirstRow) {
this.selfRefLevel++;
this.startNewCursor(this.currentInitialLoadEvent.getTriggerHistory(), triggerRouter);
this.isFirstRow = true;
} else {
this.currentInitialLoadEvent = null;
}
data = next();
} else if (this.isFirstRow) {
this.isFirstRow = false;
}
}

Expand All @@ -2497,15 +2541,28 @@ protected void closeCursor() {
if (this.cursor != null) {
this.cursor.close();
this.cursor = null;
this.currentInitialLoadEvent = null;
}
}

protected void startNewCursor(final TriggerHistory triggerHistory,
final TriggerRouter triggerRouter, String overrideSelectSql) {
if (overrideSelectSql != null && overrideSelectSql.trim().toUpperCase().startsWith("WHERE")) {
overrideSelectSql = overrideSelectSql.trim().substring(5);
final TriggerRouter triggerRouter) {

if (isSelfReferencingFk) {
if (selfRefLevel == 0) {
overrideSelectSql = selfRefParentColumnName + " is null";
} else {
String refSql= "select " + selfRefChildColumnName + " from " + this.sourceTable.getFullyQualifiedTableName() +
" where " + selfRefParentColumnName;
overrideSelectSql = selfRefParentColumnName + " in (";

for (int i = 1; i < selfRefLevel; i++) {
overrideSelectSql += refSql + " in (";
}
overrideSelectSql += refSql + " is null)" + StringUtils.repeat(")", selfRefLevel - 1);
}
log.info("Querying level {} for table {}: {}", selfRefLevel, sourceTable.getName(), overrideSelectSql);
}

String sql = symmetricDialect.createInitialLoadSqlFor(
this.currentInitialLoadEvent.getNode(), triggerRouter, sourceTable,
triggerHistory,
Expand Down
Expand Up @@ -793,7 +793,7 @@ public ForeignKey getSelfReferencingForeignKey() {
for (int idx = 0; idx < getForeignKeyCount(); idx++) {
ForeignKey fk = getForeignKey(idx);

if (this.equals(fk.getForeignTable())) {
if (this.getName().equals(fk.getForeignTableName())) {
return fk;
}
}
Expand Down

0 comments on commit 84cc52f

Please sign in to comment.