Skip to content

Commit

Permalink
0001262: Locally suspended channels are ignored when a node is pushin…
Browse files Browse the repository at this point in the history
…g during extraction.

Clean up debugging comments.
  • Loading branch information
abrougher committed Jun 7, 2013
1 parent 531117d commit eae4814
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 60 deletions.
Expand Up @@ -119,7 +119,7 @@ public class DataExtractorService extends AbstractService implements IDataExtrac
private IStatisticManager statisticManager;

private IStagingManager stagingManager;

private Map<Long, Semaphore> locks = new HashMap<Long, Semaphore>();

public DataExtractorService(IParameterService parameterService,
Expand Down Expand Up @@ -264,7 +264,7 @@ private void addPurgeCriteriaToConfigurationTables(String sourceTableName, Strin

private List<OutgoingBatch> filterBatchesForExtraction(OutgoingBatches batches,
ChannelMap suspendIgnoreChannelsList) {

if (parameterService.is(ParameterConstants.FILE_SYNC_ENABLE)) {
batches.filterBatchesForChannel(Constants.CHANNEL_FILESYNC);
}
Expand Down Expand Up @@ -316,7 +316,7 @@ public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode, IOu
if (batches.containsBatches()) {

List<OutgoingBatch> activeBatches = filterBatchesForExtraction(batches,
targetTransport.getSuspendIgnoreChannelLists(configurationService));
targetTransport.getSuspendIgnoreChannelLists(configurationService, targetNode));

if (activeBatches.size() > 0) {
extract(processInfo, targetNode, targetTransport, activeBatches);
Expand Down Expand Up @@ -354,17 +354,17 @@ protected void extract(ProcessInfo processInfo, Node targetNode, IOutgoingTransp

for (int i = 0; i < activeBatches.size(); i++) {
currentBatch = activeBatches.get(i);

processInfo.incrementBatchCount();
processInfo.setCurrentBatchId(currentBatch.getBatchId());

currentBatch = requeryIfEnoughTimeHasPassed(batchesSelectedAtMs, currentBatch);

if (dataWriter == null) {
dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(),
targetTransport.openWriter(), targetNode.requires13Compatiblity());
}

processInfo.setStatus(ProcessInfo.Status.EXTRACTING);
currentBatch = extractOutgoingBatch(processInfo, targetNode, dataWriter, currentBatch,
streamToFileEnabled);
Expand Down Expand Up @@ -490,7 +490,7 @@ public OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targetNo
transformExtractWriter.close();
}
} else {
if (currentBatch.getStatus() == Status.NE ||
if (currentBatch.getStatus() == Status.NE ||
!isPreviouslyExtracted(currentBatch)) {
int maxPermits = parameterService.getInt(ParameterConstants.CONCURRENT_WORKERS);
Semaphore lock = null;
Expand Down Expand Up @@ -547,7 +547,7 @@ public OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targetNo
long dataEventCount = currentBatch.getDataEventCount();
long insertEventCount = currentBatch.getInsertEventCount();
currentBatch = requeryIfEnoughTimeHasPassed(ts, currentBatch);

// preserve in the case of a reload event
if (dataEventCount > currentBatch.getDataEventCount()) {
currentBatch.setDataEventCount(dataEventCount);
Expand Down Expand Up @@ -757,7 +757,7 @@ class SelectFromSymDataSource implements IExtractDataReaderSource {
private OutgoingBatch outgoingBatch;

private Table targetTable;

private Table sourceTable;

private TriggerHistory lastTriggerHistory;
Expand All @@ -781,7 +781,7 @@ public SelectFromSymDataSource(OutgoingBatch outgoingBatch, Node sourceNode, Nod
public Batch getBatch() {
return batch;
}

public Table getSourceTable() {
return sourceTable;
}
Expand Down Expand Up @@ -838,13 +838,13 @@ public CsvData next() {
if ((lastTriggerHistory == null || lastTriggerHistory
.getTriggerHistoryId() != triggerHistory.getTriggerHistoryId())) {
this.sourceTable = lookupAndOrderColumnsAccordingToTriggerHistory(
routerId, triggerHistory, false);
routerId, triggerHistory, false);
this.targetTable = lookupAndOrderColumnsAccordingToTriggerHistory(
routerId, triggerHistory, true);
this.requiresLobSelectedFromSource = trigger.isUseStreamLobs();
}
data.setNoBinaryOldData(requiresLobSelectedFromSource ||

data.setNoBinaryOldData(requiresLobSelectedFromSource ||
symmetricDialect.getName().equals(DatabaseNamesConstants.MSSQL));
} else {
log.error(
Expand Down Expand Up @@ -890,7 +890,7 @@ class SelectFromTableSource implements IExtractDataReaderSource {
private Batch batch;

private Table targetTable;

private Table sourceTable;

private List<SelectFromTableEvent> selectFromTableEventsToSend;
Expand Down Expand Up @@ -928,7 +928,7 @@ protected void init(Batch batch, List<SelectFromTableEvent> initialLoadEvents) {
this.batch.getTargetNodeId());
}
}

public Table getSourceTable() {
return sourceTable;
}
Expand Down Expand Up @@ -972,7 +972,7 @@ protected CsvData selectNext() {
this.targetTable = lookupAndOrderColumnsAccordingToTriggerHistory(
(String) data.getAttribute(CsvData.ATTRIBUTE_ROUTER_ID), history, true);
} else {
this.triggerRouter = this.currentInitialLoadEvent.getTriggerRouter();
this.triggerRouter = this.currentInitialLoadEvent.getTriggerRouter();
if (this.routingContext == null) {
NodeChannel channel = batch != null ? configurationService.getNodeChannel(
batch.getChannelId(), false) : new NodeChannel(this.triggerRouter
Expand Down Expand Up @@ -1096,11 +1096,11 @@ public Node getNode() {
public boolean containsData() {
return data != null;
}

public String getInitialLoadSelect() {
return initialLoadSelect;
}

}
}

}
Expand Up @@ -25,12 +25,13 @@
import java.io.OutputStream;

import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IConfigurationService;

public interface IOutgoingTransport {

public BufferedWriter openWriter();

public OutputStream openStream();

public void close();
Expand All @@ -41,6 +42,7 @@ public interface IOutgoingTransport {
* This returns a (combined) list of suspended or ignored channels. In
* addition, it will optionally do a reservation in the case of a Push
* request
* @param targetNode
*/
public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurationService);
public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurationService, Node targetNode);
}
Expand Up @@ -34,6 +34,7 @@
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.io.IoConstants;
import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.RegistrationRequiredException;
import org.jumpmind.symmetric.transport.AuthenticationException;
Expand All @@ -45,9 +46,9 @@
public class HttpOutgoingTransport implements IOutgoingWithResponseTransport {

static final String CRLF = "\r\n";

private String boundary;

private URL url;

private OutputStream os;
Expand Down Expand Up @@ -108,12 +109,12 @@ private void closeReader() {
reader = null;
}
}

private void closeOutputStream(boolean closeQuietly) {
if (os != null) {
try {
if (fileUpload) {
IOUtils.write(CRLF + "--" + boundary + "--" + CRLF, os);
IOUtils.write(CRLF + "--" + boundary + "--" + CRLF, os);
}
os.flush();
} catch (IOException ex) {
Expand All @@ -132,7 +133,7 @@ private void closeOutputStream(boolean closeQuietly) {
}
}
}

private void closeWriter(boolean closeQuietly) {
if (writer != null) {
try {
Expand Down Expand Up @@ -162,7 +163,7 @@ private void closeWriter(boolean closeQuietly) {
* Before streaming data to the remote node, make sure it is ok to. We have
* found that we can be more efficient on a push by relying on HTTP
* keep-alive.
*
*
* @throws IOException
* @throws {@link ConnectionRejectedException}
* @throws {@link AuthenticationException}
Expand Down Expand Up @@ -263,7 +264,7 @@ private void analyzeResponseCode(int code) throws IOException {
}

public BufferedReader readResponse() throws IOException {
closeWriter(false);
closeWriter(false);
closeOutputStream(false);
analyzeResponseCode(connection.getResponseCode());
this.reader = HttpTransportManager.getReaderFrom(connection);
Expand All @@ -274,7 +275,7 @@ public boolean isOpen() {
return connection != null;
}

public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurationService) {
public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurationService, Node targetNode) {

HttpURLConnection connection = requestReservation();

Expand All @@ -289,6 +290,13 @@ public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurati
suspendIgnoreChannelsList.addSuspendChannels(suspends);
suspendIgnoreChannelsList.addIgnoreChannels(ignores);

ChannelMap localSuspendIgnoreChannelsList = configurationService
.getSuspendIgnoreChannelLists(targetNode.getNodeId());
suspendIgnoreChannelsList.addSuspendChannels(
localSuspendIgnoreChannelsList.getSuspendChannels());
suspendIgnoreChannelsList.addIgnoreChannels(
localSuspendIgnoreChannelsList.getIgnoreChannels());

return suspendIgnoreChannelsList;
}

Expand Down
Expand Up @@ -20,21 +20,22 @@
*/
package org.jumpmind.symmetric.transport.internal;

import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;

import org.apache.commons.io.IOUtils;
import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.transport.IOutgoingTransport;
import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;

import org.apache.commons.io.IOUtils;
import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.transport.IOutgoingTransport;

public class InternalOutgoingTransport implements IOutgoingTransport {

BufferedWriter writer = null;

OutputStream os = null;

ChannelMap map = null;
Expand Down Expand Up @@ -64,7 +65,7 @@ public void close() {
public boolean isOpen() {
return open;
}

public OutputStream openStream() {
return os;
}
Expand All @@ -73,7 +74,7 @@ public BufferedWriter openWriter() {
return writer;
}

public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurationService) {
public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurationService, Node targetNode) {
return map;
}

Expand Down
Expand Up @@ -21,24 +21,25 @@

package org.jumpmind.symmetric.transport.internal;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.commons.io.IOUtils;
import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
import org.jumpmind.symmetric.transport.TransportUtils;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.commons.io.IOUtils;
import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
import org.jumpmind.symmetric.transport.TransportUtils;

public class InternalOutgoingWithResponseTransport implements IOutgoingWithResponseTransport {

BufferedWriter writer = null;

BufferedReader reader = null;

OutputStream os = null;

boolean open = true;
Expand All @@ -48,7 +49,7 @@ public class InternalOutgoingWithResponseTransport implements IOutgoingWithRespo
this.writer = TransportUtils.toWriter(os);
this.reader = TransportUtils.toReader(respIs);
}

public OutputStream openStream() {
return os;
}
Expand All @@ -73,7 +74,7 @@ public BufferedWriter openWriter() {
return writer;
}

public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurationService) {
public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurationService, Node targetNode) {
return configurationService.getSuspendIgnoreChannelLists();
}
}
Expand Up @@ -168,7 +168,7 @@ protected String getNativeDefaultValue(Column column) {
@Override
protected void dropTable(Table table, StringBuilder ddl, boolean temporary, boolean recreate) {
writeQuotationOnStatement(ddl);
ddl.append("IF EXISTS (SELECT 1 FROM " + table.getCatalog() + ".dbo.sysobjects WHERE type = 'U' AND name = "); // ADB catalog
ddl.append("IF EXISTS (SELECT 1 FROM " + table.getCatalog() + ".dbo.sysobjects WHERE type = 'U' AND name = ");
printAlwaysSingleQuotedIdentifier(getTableName(table.getName()), ddl);
println(")", ddl);
println("BEGIN", ddl);
Expand Down
Expand Up @@ -109,11 +109,11 @@ protected Integer mapUnknownJdbcTypeForColumn(Map<String, Object> values) {
protected Column readColumn(DatabaseMetaDataWrapper metaData, Map<String,Object> values) throws SQLException {
Column column = super.readColumn(metaData, values);

if ((column.getMappedTypeCode() == Types.NUMERIC) && (column.getSizeAsInt() == 19) // ADB reads numeric back to bigint
if ((column.getMappedTypeCode() == Types.NUMERIC) && (column.getSizeAsInt() == 19)
&& (column.getScale() == 0)) {
// Back-mapping to BIGINT
column.setMappedTypeCode(Types.BIGINT);
} else if ((column.getMappedTypeCode() == Types.NUMERIC) && (column.getSizeAsInt() == 12) // ADB reads numeric back to bigint
} else if ((column.getMappedTypeCode() == Types.NUMERIC) && (column.getSizeAsInt() == 12)
&& (column.getScale() == 0)) {
// Back-mapping to INTEGER
column.setMappedTypeCode(Types.INTEGER);
Expand Down

0 comments on commit eae4814

Please sign in to comment.