Skip to content

Commit

Permalink
0003314: Support row-by-row stream lob for Sybase
Browse files Browse the repository at this point in the history
  • Loading branch information
klementinastojanovska committed Nov 14, 2017
1 parent cb4132a commit c9365e1
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 11 deletions.
Expand Up @@ -47,7 +47,7 @@ public AseTriggerTemplate(ISymmetricDialect symmetricDialect) {
stringColumnTemplate = "case when $(tableAlias)." + quote + "$(columnName)" + quote + " is null then null else '\"' + str_replace(str_replace($(tableAlias)." + quote + "$(columnName)" + quote + ",'\\','\\\\'),'\"','\\\"') + '\"' end" ;
numberColumnTemplate = "case when $(tableAlias)." + quote + "$(columnName)" + quote + " is null then null else ('\"' + convert(varchar,$(tableAlias)." + quote + "$(columnName)" + quote + ") + '\"') end" ;
datetimeColumnTemplate = "case when $(tableAlias)." + quote + "$(columnName)" + quote + " is null then null else ('\"' + str_replace(convert(varchar,$(tableAlias)." + quote + "$(columnName)" + quote + ",102),'.','-') + ' ' + right('00'+convert(varchar,datepart(HOUR,$(tableAlias)." + quote + "$(columnName)" + quote + ")),2)+':'+right('00'+convert(varchar,datepart(MINUTE,$(tableAlias)." + quote + "$(columnName)" + quote + ")),2)+':'+right('00'+convert(varchar,datepart(SECOND,$(tableAlias)." + quote + "$(columnName)" + quote + ")),2)+'.'+right('000'+convert(varchar,datepart(MILLISECOND,$(tableAlias)." + quote + "$(columnName)" + quote + ")),3) + '\"') end" ;
clobColumnTemplate = "case when datalength($(origTableAlias)." + quote + "$(columnName)" + quote + ") is null or datalength($(origTableAlias)." + quote + "$(columnName)" + quote + ")=0 then null else '\"' + str_replace(str_replace(cast($(origTableAlias)." + quote + "$(columnName)" + quote + " as varchar(16384)),'\\','\\\\'),'\"','\\\"') + '\"' end" ;
clobColumnTemplate = "case when datalength($(origTableAlias)." + quote + "$(columnName)" + quote + ") is null or datalength($(origTableAlias)." + quote + "$(columnName)" + quote + ")=0 then null when char_length($(origTableAlias)." + quote + "$(columnName)" + quote + ") > 16384 then '\"\\b\"' else '\"' + str_replace(str_replace(cast($(origTableAlias)." + quote + "$(columnName)" + quote + " as varchar(16384)),'\\','\\\\'),'\"','\\\"') + '\"' end" ;
blobColumnTemplate = "case when $(origTableAlias)." + quote + "$(columnName)" + quote + " is null then null else '\"' + bintostr(convert(varbinary(16384),$(origTableAlias)." + quote + "$(columnName)" + quote + ")) + '\"' end" ;
binaryColumnTemplate = "case when $(tableAlias)." + quote + "$(columnName)" + quote + " is null then null else '\"' + bintostr(convert(varbinary(16384),$(tableAlias)." + quote + "$(columnName)" + quote + ")) + '\"' end" ;
imageColumnTemplate = "case when datalength($(origTableAlias)." + quote + "$(columnName)" + quote + ") is null or datalength($(origTableAlias)." + quote + "$(columnName)" + quote + ")=0 then null else '\"' + bintostr(convert(varbinary(16384),$(origTableAlias)." + quote + "$(columnName)" + quote + ")) + '\"' end" ;
Expand Down
Expand Up @@ -1764,6 +1764,30 @@ protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest re
batches, channel.getMaxBatchSize(), processInfo);
return multiBatchStatingWriter;
}

protected boolean tableContainsLobs(Table table) {
Column[] columns = table.getColumns();
for (Column c : columns) {
if (platform.isLob(c.getJdbcTypeCode())) {
return true;
}
}
return false;
}

protected boolean lobColumnsMoreThan16k(Table table, CsvData data) {
String[] colNames = table.getColumnNames();
Map<String, String> colMap = data.toColumnNameValuePairs(colNames, CsvData.ROW_DATA);
List<Column> lobColumns = platform.getLobColumns(table);

for (Column c : lobColumns) {
String value = colMap.get(c.getName());
if (value != null && value.equals("\b")) {
return true;
}
}
return false;
}

class ExtractRequestMapper implements ISqlRowMapper<ExtractRequest> {
public ExtractRequest mapRow(Row row) {
Expand Down Expand Up @@ -1870,6 +1894,8 @@ public CsvData next() {
if (data == null) {
reloadSource.close();
reloadSource = null;
} else {
this.requiresLobSelectedFromSource = this.reloadSource.requiresLobsSelectedFromSource(data);
}
lastTriggerHistory = null;
}
Expand Down Expand Up @@ -1916,8 +1942,7 @@ public CsvData next() {
data = (Data) this.reloadSource.next();
this.sourceTable = reloadSource.getSourceTable();
this.targetTable = this.reloadSource.getTargetTable();
this.requiresLobSelectedFromSource = this.reloadSource
.requiresLobsSelectedFromSource();
this.requiresLobSelectedFromSource = this.reloadSource.requiresLobsSelectedFromSource(data);

if (data == null) {
data = (Data)next();
Expand All @@ -1940,7 +1965,14 @@ public CsvData next() {
routerId, triggerHistory, false, true);
this.targetTable = columnsAccordingToTriggerHistory.lookup(
routerId, triggerHistory, true, false);
this.requiresLobSelectedFromSource = trigger == null ? false : trigger.isUseStreamLobs();
if (tableContainsLobs(sourceTable)) {
if ((data.getRowData() != null && lobColumnsMoreThan16k(sourceTable, data))
|| trigger.isUseStreamLobs()) {
this.requiresLobSelectedFromSource = true;
}
} else {
this.requiresLobSelectedFromSource = false;
}
}

data.setNoBinaryOldData(requiresLobSelectedFromSource
Expand Down Expand Up @@ -2022,7 +2054,7 @@ public CsvData next() {
return data;
}

public boolean requiresLobsSelectedFromSource() {
public boolean requiresLobsSelectedFromSource(CsvData data) {
return requiresLobSelectedFromSource;
}

Expand Down Expand Up @@ -2222,11 +2254,15 @@ public Data mapRow(Row row) {
});
}

public boolean requiresLobsSelectedFromSource() {
public boolean requiresLobsSelectedFromSource(CsvData data) {
if (this.currentInitialLoadEvent != null
&& this.currentInitialLoadEvent.getTriggerRouter() != null) {
return this.currentInitialLoadEvent.getTriggerRouter().getTrigger()
.isUseStreamLobs();
if (tableContainsLobs(sourceTable)) {
if (data != null && lobColumnsMoreThan16k(sourceTable, data)) {
return true;
}
}
return this.currentInitialLoadEvent.getTriggerRouter().getTrigger().isUseStreamLobs();
} else {
return false;
}
Expand Down
Expand Up @@ -172,7 +172,7 @@ public Map<Batch, Statistics> getStatistics() {
}

protected CsvData enhanceWithLobsFromSourceIfNeeded(Table table, CsvData data) {
if (this.currentSource.requiresLobsSelectedFromSource()
if (this.currentSource.requiresLobsSelectedFromSource(data)
&& (data.getDataEventType() == DataEventType.UPDATE || data.getDataEventType() == DataEventType.INSERT)) {
List<Column> lobColumns = platform.getLobColumns(table);
if (lobColumns.size() > 0) {
Expand Down
Expand Up @@ -40,7 +40,7 @@ public interface IExtractDataReaderSource {

public CsvData next();

public boolean requiresLobsSelectedFromSource();
public boolean requiresLobsSelectedFromSource(CsvData data);

public void close();

Expand Down
Expand Up @@ -118,7 +118,7 @@ protected String[] toStringData(Row row, Column[] columns) {
return stringValues;
}

public boolean requiresLobsSelectedFromSource() {
public boolean requiresLobsSelectedFromSource(CsvData data) {
return streamLobs;
}

Expand Down

0 comments on commit c9365e1

Please sign in to comment.