Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.10' into 3.11
Browse files Browse the repository at this point in the history
Conflicts:
	symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java
	symmetric-db/src/main/java/org/jumpmind/db/sql/AbstractJavaDriverSqlTemplate.java
  • Loading branch information
erilong committed Nov 12, 2019
2 parents d6ec612 + b93b722 commit 8652308
Show file tree
Hide file tree
Showing 18 changed files with 170 additions and 58 deletions.
Expand Up @@ -410,8 +410,11 @@ public String getInitialLoadTwoPassLobLengthSql(Column column, boolean isFirstPa
} else if (isFirstPass) {
return "(t." + quote + column.getName() + quote + " is null or " +
"dbms_lob.getlength(t." + quote + column.getName() + quote + ") <= 4000)";
} else if (parameterService.is(ParameterConstants.EXTRACT_CHECK_ROW_SIZE, false)) {
return "dbms_lob.getlength(t." + quote + column.getName() + quote + ") between 4001 and " + parameterService.getLong(ParameterConstants.EXTRACT_ROW_MAX_LENGTH, 1000000000);
}
return "dbms_lob.getlength(t." + quote + column.getName() + quote + ") > 4000";

}

@Override
Expand Down
Expand Up @@ -176,6 +176,9 @@ private ParameterConstants() {
public final static String INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS = "initial.load.defer.create.constraints";
public final static String INITIAL_LOAD_RECURSION_SELF_FK = "initial.load.recursion.self.fk";

public final static String EXTRACT_CHECK_ROW_SIZE = "extract.check.row.size";
public final static String EXTRACT_ROW_MAX_LENGTH = "extract.row.max.length";

public final static String CREATE_TABLE_WITHOUT_DEFAULTS = "create.table.without.defaults";
public final static String CREATE_TABLE_WITHOUT_FOREIGN_KEYS = "create.table.without.foreign.keys";
public final static String CREATE_TABLE_WITHOUT_INDEXES = "create.table.without.indexes";
Expand Down
Expand Up @@ -1054,7 +1054,8 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targe
try {
new DataProcessor(dataReader, writer, listener, "extract").process(ctx);
} catch (Exception e) {
if ((e instanceof ProtocolException || (e instanceof SQLException && ((SQLException) e).getErrorCode() == 6502)) &&
if ((e instanceof ProtocolException || (e.getCause() != null && e.getCause() instanceof SQLException
&& ((SQLException) e.getCause()).getErrorCode() == 6502)) &&
!configurationService.getNodeChannel(currentBatch.getChannelId(), false).getChannel().isContainsBigLob()) {
log.warn(e.getMessage());
log.info("Re-attempting extraction for batch {} with contains_big_lobs temporarily enabled for channel {}",
Expand Down Expand Up @@ -2263,16 +2264,19 @@ protected void checkSendDeferredConstraints(ExtractRequest request, List<Extract
List<TriggerHistory> histories = triggerRouterService.getActiveTriggerHistories(triggerRouterService.getTriggerById(request.getTriggerId()));
if (histories != null && histories.size() > 0) {
for (TriggerHistory history : histories) {
Data data = new Data(history.getSourceTableName(), DataEventType.CREATE, null, String.valueOf(request.getLoadId()),
history, trigger.getChannelId(), null, null);
data.setNodeList(targetNode.getNodeId());
dataService.insertData(data);
if (childRequests != null) {
for (ExtractRequest childRequest : childRequests) {
data = new Data(history.getSourceTableName(), DataEventType.CREATE, null, String.valueOf(childRequest.getLoadId()),
history, trigger.getChannelId(), null, null);
data.setNodeList(childRequest.getNodeId());
dataService.insertData(data);
Channel channel = configurationService.getChannel(trigger.getReloadChannelId());
if (!channel.isFileSyncFlag()) {
Data data = new Data(history.getSourceTableName(), DataEventType.CREATE, null, String.valueOf(request.getLoadId()),
history, trigger.getChannelId(), null, null);
data.setNodeList(targetNode.getNodeId());
dataService.insertData(data);
if (childRequests != null) {
for (ExtractRequest childRequest : childRequests) {
data = new Data(history.getSourceTableName(), DataEventType.CREATE, null, String.valueOf(childRequest.getLoadId()),
history, trigger.getChannelId(), null, null);
data.setNodeList(childRequest.getNodeId());
dataService.insertData(data);
}
}
}
}
Expand Down Expand Up @@ -2971,10 +2975,31 @@ protected void startNewCursor(final TriggerHistory triggerHistory,
final boolean objectValuesWillNeedEscaped = !getSymmetricDialect().getTriggerTemplate()
.useTriggerTemplateForColumnTemplatesDuringInitialLoad();
final boolean[] isColumnPositionUsingTemplate = getSymmetricDialect().getColumnPositionUsingTemplate(sourceTable, triggerHistory);
final boolean checkRowLength = parameterService.is(ParameterConstants.EXTRACT_CHECK_ROW_SIZE, false);
final long rowMaxLength = parameterService.getLong(ParameterConstants.EXTRACT_ROW_MAX_LENGTH, 1000000000);
log.debug(sql);

this.cursor = getSymmetricDialect().getPlatform().getSqlTemplate().queryForCursor(initialLoadSql, new ISqlRowMapper<Data>() {
public Data mapRow(Row row) {
if (checkRowLength) {
// Account for double byte characters and encoding
long rowSize = row.getLength() * 2;

if (rowSize > rowMaxLength) {
StringBuffer pkValues = new StringBuffer();
int i = 0;
Object[] rowValues = row.values().toArray();
for (String name : sourceTable.getPrimaryKeyColumnNames()) {
pkValues.append(name).append("=").append(rowValues[i]);
i++;
}
log.warn("Extract row max size exceeded, keys [" + pkValues.toString() + "], size=" + rowSize);
Data data = new Data(0, null, "", DataEventType.SQL, triggerHistory
.getSourceTableName(), null, triggerHistory, batch.getChannelId(),
null, null);
return data;
}
}
String csvRow = null;
if (selectedAsCsv) {
csvRow = row.stringValue();
Expand All @@ -2997,7 +3022,7 @@ public Data mapRow(Row row) {
null, null);
return data;
}
});
}, checkRowLength && sourceTable.containsLobColumns(symmetricDialect.getPlatform()) && !sourceTable.getNameLowerCase().startsWith(symmetricDialect.getTablePrefix()));
}

public boolean requiresLobsSelectedFromSource(CsvData data) {
Expand Down
Expand Up @@ -1466,9 +1466,14 @@ private Map<Integer, ExtractRequest> insertLoadBatchesForReload(Node targetNode,

firstBatchId = firstBatchId > 0 ? firstBatchId : startBatchId;

updateTableReloadStatusDataCounts(platform.supportsMultiThreadedTransactions() ? null : transaction,
loadId, firstBatchId, endBatchId, numberOfBatches, rowCount);
if (table.getNameLowerCase().startsWith(symmetricDialect.getTablePrefix() + "_" + TableConstants.SYM_FILE_SNAPSHOT)) {
TableReloadStatus reloadStatus = getTableReloadStatusByLoadId(loadId);
startBatchId = reloadStatus.getStartDataBatchId();
}

updateTableReloadStatusDataCounts(platform.supportsMultiThreadedTransactions() ? null : transaction,
loadId, firstBatchId, endBatchId, numberOfBatches, rowCount);

ExtractRequest request = engine.getDataExtractorService().requestExtractRequest(transaction, targetNode.getNodeId(), channel.getQueue(),
triggerRouter, startBatchId, endBatchId, loadId, table.getName(), rowCount, parentRequestId);
if (parentRequestId == 0) {
Expand Down
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.CancellationException;

import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.data.Batch;
Expand Down Expand Up @@ -128,9 +127,11 @@ protected IDataWriter buildWriter() {

@Override
public void close() {
while (!inError && batches.size() > 0 && table != null) {
while (!inError && batches.size() > 0) {
startNewBatch();
end(this.table);
if(table != null) {
end(this.table);
}
end(this.batch, false);
log.debug("Batch {} is empty", new Object[] { batch.getNodeBatchId() });
Statistics stats = closeCurrentDataWriter();
Expand Down Expand Up @@ -336,11 +337,9 @@ protected void startNewBatch() {
this.currentDataWriter.start(batch);
processInfo.incrementBatchCount();

if (table == null) {
throw new SymmetricException(
"'table' cannot null while starting new batch. Batch: " + outgoingBatch + ". Check trigger/router configs.");
if(table != null) {
this.currentDataWriter.start(table);
}
this.currentDataWriter.start(table);
}

}
Expand Up @@ -1682,12 +1682,7 @@ protected void updateOrCreateDatabaseTriggers(Trigger trigger, Table table,
if (verifyInDatabase) {
Channel channel = configurationService.getChannel(trigger.getChannelId());
if (channel == null) {
errorMessage = String
.format("Trigger %s had an unrecognized channel_id of '%s'. Please check to make sure the channel exists. Creating trigger on the '%s' channel",
trigger.getTriggerId(), trigger.getChannelId(),
Constants.CHANNEL_DEFAULT);
log.error(errorMessage);
trigger.setChannelId(Constants.CHANNEL_DEFAULT);
log.warn("Trigger '{}' has a channel of '{}' not found in sym_channel table", trigger.getTriggerId(), trigger.getChannelId());
}
}

Expand Down
16 changes: 16 additions & 0 deletions symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -2713,3 +2713,19 @@ job.log.miner.period.time.ms=10000
# Tags: postgres
# Type: boolean
postgres.security.definer=false

# Determines if the size of a LOB value should be checked before extracting to prevent
# a JVM crash that can occur if the size of a LOB is bigger than the max size of a java array
# 2^31 - 1
#
# DatabaseOverridable: true
# Tags: extract
# Type: boolean
extract.check.row.size=false

# Used when the extract.check.row.size is true as an upper limit to check against. If the size
# exceeds the limit the row will be skipped and logged as a warning.
# DatabaseOverridable: true
# Tags: extract
extract.row.max.length=1000000000

Expand Up @@ -6,22 +6,12 @@
public abstract class AbstractJavaDriverSqlTemplate extends AbstractSqlTemplate {

public abstract String getDatabaseProductName();

// @Override
// public byte[] queryForBlob(String sql, Object... args) {
// return null;
// }

@Override
public byte[] queryForBlob(String sql, int jdbcTypeCode, String jdbcTypeName, Object... args) {
return null;
}

// @Override
// public String queryForClob(String sql, Object... args) {
// return null;
// }

@Override
public String queryForClob(String sql, int jdbcTypeCode, String jdbcTypeName, Object... args) {
return null;
Expand All @@ -42,6 +32,11 @@ public <T> ISqlReadCursor<T> queryForCursor(String sql, ISqlRowMapper<T> mapper,
return null;
}

@Override
public <T> ISqlReadCursor<T> queryForCursor(String sql, ISqlRowMapper<T> mapper, boolean returnLobObjects) {
return null;
}

@Override
public int update(boolean autoCommit, boolean failOnError, int commitRate, ISqlResultsListener listener,
String... sql) {
Expand Down
Expand Up @@ -112,7 +112,10 @@ public <T> ISqlReadCursor<T> queryForCursor(String sql, ISqlRowMapper<T> mapper,
int[] types) {
return queryForCursor(sql, mapper, args, types);
}


public <T> ISqlReadCursor<T> queryForCursor(String sql, ISqlRowMapper<T> mapper, boolean returnLobObjects) {
return queryForCursor(sql, mapper, returnLobObjects);
}
public List<Row> query(String sql) {
return query(sql, (Object[])null, (int[]) null);
}
Expand Down
Expand Up @@ -69,6 +69,8 @@ public <T> ISqlReadCursor<T> queryForCursor(String sql, ISqlRowMapper<T> mapper,

public <T> ISqlReadCursor<T> queryForCursor(String sql, ISqlRowMapper<T> mapper);

public <T> ISqlReadCursor<T> queryForCursor(String sql, ISqlRowMapper<T> mapper, boolean returnLobObjects);

public List<Row> query(String sql);

public List<Row> query(String sql, Object[] params, int[] types);
Expand Down
25 changes: 25 additions & 0 deletions symmetric-db/src/main/java/org/jumpmind/db/sql/Row.java
Expand Up @@ -23,22 +23,28 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Collection;
import java.util.Date;
import java.util.Map;

import org.apache.commons.io.IOUtils;
import org.jumpmind.exception.IoException;
import org.jumpmind.exception.ParseException;
import org.jumpmind.util.FormatUtils;
import org.jumpmind.util.LinkedCaseInsensitiveMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Row extends LinkedCaseInsensitiveMap<Object> {

private static final long serialVersionUID = 1L;

protected Logger log = LoggerFactory.getLogger(getClass());

public Row(int numberOfColumns) {
super(numberOfColumns);
}
Expand Down Expand Up @@ -290,4 +296,23 @@ public String[] toStringArray(String[] keys) {
return values;
}

public long getLength() {
long length = 0;

for (Map.Entry<String, Object> entry : this.entrySet()) {
try {
Object obj = entry.getValue();
if (obj instanceof Blob) {
length += ((Blob) obj).length();
} else if (obj instanceof Clob) {
length += ((Clob) obj).length();
} else {
length += obj.toString().length();
}
} catch (SQLException se) {
log.warn("Unable to determine length of row, failure on column " + entry.getKey(), se);
}
}
return length;
}
}
Expand Up @@ -56,19 +56,22 @@ public class JdbcSqlReadCursor<T> implements ISqlReadCursor<T> {

protected IConnectionHandler connectionHandler;

protected boolean returnLobObjects;

public JdbcSqlReadCursor() {
}

public JdbcSqlReadCursor(JdbcSqlTemplate sqlTemplate, ISqlRowMapper<T> mapper, String sql,
Object[] values, int[] types) {
this(sqlTemplate, mapper, sql, values, types, null);
this(sqlTemplate, mapper, sql, values, types, null, false);
}

public JdbcSqlReadCursor(JdbcSqlTemplate sqlTemplate, ISqlRowMapper<T> mapper, String sql,
Object[] values, int[] types, IConnectionHandler connectionHandler) {
Object[] values, int[] types, IConnectionHandler connectionHandler, boolean returnLobObjects) {
this.sqlTemplate = sqlTemplate;
this.mapper = mapper;
this.connectionHandler = connectionHandler;
this.returnLobObjects = returnLobObjects;

try {
c = sqlTemplate.getDataSource().getConnection();
Expand Down Expand Up @@ -132,7 +135,7 @@ public T next() {
rsColumnCount = rsMetaData.getColumnCount();
}

Row row = getMapForRow(rs, rsMetaData, rsColumnCount, sqlTemplate.getSettings().isReadStringsAsBytes());
Row row = getMapForRow(rs, rsMetaData, rsColumnCount, sqlTemplate.getSettings().isReadStringsAsBytes(), returnLobObjects);
T value = mapper.mapRow(row);
if (value != null) {
return value;
Expand All @@ -145,11 +148,11 @@ public T next() {
}

protected static Row getMapForRow(ResultSet rs, ResultSetMetaData argResultSetMetaData,
int columnCount, boolean readStringsAsBytes) throws SQLException {
int columnCount, boolean readStringsAsBytes, boolean returnLobObjects) throws SQLException {
Row mapOfColValues = new Row(columnCount);
for (int i = 1; i <= columnCount; i++) {
String key = JdbcSqlTemplate.lookupColumnName(argResultSetMetaData, i);
Object obj = JdbcSqlTemplate.getResultSetValue(rs, argResultSetMetaData, i, readStringsAsBytes);
Object obj = JdbcSqlTemplate.getResultSetValue(rs, argResultSetMetaData, i, readStringsAsBytes, returnLobObjects);
mapOfColValues.put(key, obj);
}
return mapOfColValues;
Expand Down

0 comments on commit 8652308

Please sign in to comment.