Skip to content

Commit

Permalink
suspend ignore changed to Set
Browse files Browse the repository at this point in the history
  • Loading branch information
mhanes committed Sep 26, 2009
1 parent df70aea commit 2252347
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 95 deletions.
Expand Up @@ -22,6 +22,8 @@
package org.jumpmind.symmetric.service;

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.DataEventAction;
Expand Down Expand Up @@ -64,4 +66,6 @@ public interface IConfigurationService {

public void autoConfigDatabase(boolean force);

public Map<String, Set<String>> getSuspendIgnoreChannelLists(String nodeId);

}
Expand Up @@ -26,6 +26,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;

import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.Channel;
Expand All @@ -35,6 +37,7 @@
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.web.WebConstants;
import org.springframework.jdbc.core.RowMapper;

public class ConfigurationService extends AbstractService implements IConfigurationService {
Expand Down Expand Up @@ -228,6 +231,30 @@ public Object mapRow(ResultSet rs, int num) throws SQLException {
}
}

public Map<String, Set<String>> getSuspendIgnoreChannelLists(final String nodeId) {

Map<String, Set<String>> channels = new HashMap<String, Set<String>>();

Set<String> suspendChannels = new TreeSet<String>();
channels.put(WebConstants.SUSPENDED_CHANNELS, suspendChannels);

Set<String> ignoreChannels = new TreeSet<String>();
channels.put(WebConstants.IGNORED_CHANNELS, ignoreChannels);

List<NodeChannel> ncs = getNodeChannels(nodeId);

for (NodeChannel nc : ncs) {
if (nc.isSuspended()) {
suspendChannels.add(nc.getId());
}
if (nc.isIgnored()) {
ignoreChannels.add(nc.getId());
}
}

return channels;
}

public void setDefaultChannels(List<Channel> defaultChannels) {
this.defaultChannels = defaultChannels;
}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.commons.lang.StringUtils;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.transport.http.HttpTransportManager;
Expand All @@ -39,6 +40,8 @@ public class TransportManagerFactoryBean implements FactoryBean {

private IParameterService parameterService;

private IConfigurationService configurationService;

public Object getObject() throws Exception {
String transport = parameterService.getString(ParameterConstants.TRANSPORT_TYPE);
if (Constants.PROTOCOL_HTTP.equalsIgnoreCase(transport)) {
Expand All @@ -57,7 +60,7 @@ public boolean verify(String s, SSLSession sslsession) {
return false;
}
});
return new HttpTransportManager(nodeService, parameterService);
return new HttpTransportManager(nodeService, parameterService, configurationService);
} else if (Constants.PROTOCOL_INTERNAL.equalsIgnoreCase(transport)) {
return new InternalTransportManager(nodeService, parameterService);
} else {
Expand All @@ -81,4 +84,12 @@ public void setParameterService(IParameterService parameterService) {
this.parameterService = parameterService;
}

public IConfigurationService getConfigurationService() {
return configurationService;
}

public void setConfigurationService(IConfigurationService configurationService) {
this.configurationService = configurationService;
}

}
Expand Up @@ -29,6 +29,8 @@
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.zip.GZIPInputStream;

import org.apache.commons.lang.StringUtils;
Expand All @@ -39,6 +41,7 @@
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.transport.AbstractTransportManager;
Expand All @@ -57,10 +60,13 @@ public class HttpTransportManager extends AbstractTransportManager implements IT
protected static final Log logger = LogFactory.getLog(HttpTransportManager.class);

private INodeService nodeService;
private IConfigurationService configurationService;

public HttpTransportManager(INodeService nodeService, IParameterService paramService) {
public HttpTransportManager(INodeService nodeService, IParameterService paramService,
IConfigurationService configurationService) {
super(paramService);
this.nodeService = nodeService;
this.configurationService = configurationService;
}

public boolean sendAcknowledgement(Node remote, List<IncomingBatch> list, Node local) throws IOException {
Expand All @@ -71,7 +77,7 @@ public boolean sendAcknowledgement(Node remote, List<IncomingBatch> list, Node l
return true;
}

public void writeAcknowledgement(OutputStream out, List<IncomingBatch> list) throws IOException {
public void writeAcknowledgement(OutputStream out, List<IncomingBatch> list) throws IOException {
writeMessage(out, getAcknowledgementData(nodeService.findIdentity().getNodeId(), list));
}

Expand Down Expand Up @@ -100,7 +106,18 @@ public void writeMessage(OutputStream out, String data) throws IOException {
}

public IIncomingTransport getPullTransport(Node remote, Node local) throws IOException {
return new HttpIncomingTransport(createGetConnectionFor(new URL(buildURL("pull", remote, local))));
HttpURLConnection conn = createGetConnectionFor(new URL(buildURL("pull", remote, local)));
Map<String, Set<String>> suspendIgnoreChannels = configurationService.getSuspendIgnoreChannelLists(remote
.getNodeId());
if (suspendIgnoreChannels.get(WebConstants.SUSPENDED_CHANNELS).size() > 0) {
conn.addRequestProperty(WebConstants.SUSPENDED_CHANNELS, StringUtils.join(suspendIgnoreChannels
.get(WebConstants.SUSPENDED_CHANNELS), ','));
}
if (suspendIgnoreChannels.get(WebConstants.IGNORED_CHANNELS).size() > 0) {
conn.addRequestProperty(WebConstants.IGNORED_CHANNELS, StringUtils.join(suspendIgnoreChannels
.get(WebConstants.IGNORED_CHANNELS), ','));
}
return new HttpIncomingTransport(conn);
}

public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local) throws IOException {
Expand All @@ -111,9 +128,10 @@ public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local)
}

public IIncomingTransport getRegisterTransport(Node node) throws IOException {
return new HttpIncomingTransport(createGetConnectionFor(new URL(buildRegistrationUrl(parameterService.getRegistrationUrl(), node))));
return new HttpIncomingTransport(createGetConnectionFor(new URL(buildRegistrationUrl(parameterService
.getRegistrationUrl(), node))));
}

public static String buildRegistrationUrl(String baseUrl, Node node) throws IOException {
StringBuilder builder = new StringBuilder(baseUrl);
builder.append("/registration?");
Expand Down Expand Up @@ -179,5 +197,4 @@ private String addNodeId(String base, String nodeId, String connector) {
sb.append(nodeId);
return sb.toString();
}

}
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.servlet.FilterChain;
import javax.servlet.ServletException;
Expand Down Expand Up @@ -93,49 +94,17 @@ public void doFilter(final ServletRequest req, final ServletResponse resp, final
protected void buildSuspendIgnoreResponseHeaders(final String nodeId, final ServletResponse resp) {
HttpServletResponse httpResponse = (HttpServletResponse) resp;

Map<String, String> channels = getSuspendIgnoreChannels(nodeId);

if (channels.get(WebConstants.SUSPENDED_CHANNELS) != null) {
httpResponse.setHeader(WebConstants.SUSPENDED_CHANNELS, channels.get(WebConstants.SUSPENDED_CHANNELS));
Map<String, Set<String>> suspendIgnoreChannels = configurationService.getSuspendIgnoreChannelLists(nodeId);
if (suspendIgnoreChannels.get(WebConstants.SUSPENDED_CHANNELS).size() > 0) {
httpResponse.setHeader(WebConstants.SUSPENDED_CHANNELS, StringUtils.join(suspendIgnoreChannels
.get(WebConstants.SUSPENDED_CHANNELS), ','));
}

if (channels.get(WebConstants.IGNORED_CHANNELS) != null) {
httpResponse.setHeader(WebConstants.IGNORED_CHANNELS, channels.get(WebConstants.IGNORED_CHANNELS));
if (suspendIgnoreChannels.get(WebConstants.IGNORED_CHANNELS).size() > 0) {
httpResponse.setHeader(WebConstants.IGNORED_CHANNELS, StringUtils.join(suspendIgnoreChannels
.get(WebConstants.IGNORED_CHANNELS), ','));
}
}

protected Map<String, String> getSuspendIgnoreChannels(final String nodeId) {

Map<String, String> channels = new HashMap<String, String>();

StringBuffer suspendChannelsBuffer = new StringBuffer();
StringBuffer ignoreChannelsBuffer = new StringBuffer();

List<NodeChannel> ncs = configurationService.getNodeChannels(nodeId);

for (NodeChannel nc : ncs) {
if (nc.isSuspended()) {
suspendChannelsBuffer.append(nc.getId()).append(',');
}
if (nc.isIgnored()) {
ignoreChannelsBuffer.append(nc.getId()).append(',');
}
}

String suspendChannels = StringUtils.trimToNull(suspendChannelsBuffer.toString());
String ignoreChannels = StringUtils.trimToNull(ignoreChannelsBuffer.toString());

if (suspendChannels != null) {
channels.put(WebConstants.SUSPENDED_CHANNELS, StringUtils.strip(suspendChannels, ","));
}

if (ignoreChannels != null) {
channels.put(WebConstants.IGNORED_CHANNELS, StringUtils.strip(ignoreChannels, ","));
}

return channels;
}

@Override
protected ILog getLog() {
return log;
Expand Down
1 change: 1 addition & 0 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -33,6 +33,7 @@
<bean id="transportManager" class="org.jumpmind.symmetric.transport.TransportManagerFactoryBean" scope="singleton">
<property name="nodeService" ref="nodeService" />
<property name="parameterService" ref="parameterService" />
<property name="configurationService" ref="configurationService" />
</bean>

<bean id="parameterService" class="org.jumpmind.symmetric.service.impl.ParameterService" scope="singleton">
Expand Down
@@ -0,0 +1,106 @@
/*
* SymmetricDS is an open source database synchronization solution.
*
* Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*/
package org.jumpmind.symmetric.web;

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.impl.ConfigurationService;
import org.jumpmind.symmetric.test.AbstractDatabaseTest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ConfigurationServiceTest extends AbstractDatabaseTest {

public IConfigurationService configurationService;

public ConfigurationServiceTest() throws Exception {
super();
}

public ConfigurationServiceTest(String dbName) {
super(dbName);
}

@Before
public void setUp() {
configurationService = (IConfigurationService) find(Constants.CONFIG_SERVICE);
}

@Test()
public void testGetSuspendIgnoreChannels() throws Exception {
// IParameterService parameterService = getParameterService();
// parameterService.saveParameter(ParameterConstants.CONCURRENT_WORKERS,
// 3);

String nodeId = "00000";

Map<String, Set<String>> result = configurationService.getSuspendIgnoreChannelLists(nodeId);
Assert.assertEquals(0, result.size());

ConfigurationService configurationService = (ConfigurationService) find(Constants.CONFIG_SERVICE);

List<NodeChannel> ncs = configurationService.getNodeChannels(nodeId);

NodeChannel nc = ncs.get(1);
String channelId = ncs.get(1).getId();

nc.setSuspended(true);
configurationService.saveNodeChannelControl(nc, false);

result = configurationService.getSuspendIgnoreChannelLists(nodeId);
;

Assert.assertEquals(1, result.size());
// Assert.assertTrue(channelId.equals(result.get(WebConstants.SUSPENDED_CHANNELS)));
Assert.assertTrue(result.get(WebConstants.SUSPENDED_CHANNELS).contains(nc.getId()));

nc = ncs.get(0);
nc.setSuspended(true);

configurationService.saveNodeChannelControl(nc, false);

// String channelIds = ncs.get(0).getId() + "," + ncs.get(1).getId();
result = configurationService.getSuspendIgnoreChannelLists(nodeId);

Assert.assertEquals(1, result.size());
Assert.assertTrue(result.get(WebConstants.SUSPENDED_CHANNELS).contains(ncs.get(0).getId()));
Assert.assertTrue(result.get(WebConstants.SUSPENDED_CHANNELS).contains(ncs.get(1).getId()));
// Assert.assertTrue(channelIds.equals(result.get(WebConstants.SUSPENDED_CHANNELS)));

nc.setIgnored(true);
configurationService.saveNodeChannelControl(nc, false);
result = configurationService.getSuspendIgnoreChannelLists(nodeId);

Assert.assertEquals(2, result.size());
Assert.assertTrue(result.get(WebConstants.SUSPENDED_CHANNELS).contains(ncs.get(0).getId()));
Assert.assertTrue(result.get(WebConstants.SUSPENDED_CHANNELS).contains(ncs.get(1).getId()));
// Assert.assertTrue(channelIds.equals(result.get(WebConstants.SUSPENDED_CHANNELS)));
Assert.assertTrue(result.get(WebConstants.IGNORED_CHANNELS).contains(nc.getId()));
// Assert.assertTrue(nc.getId().equals(result.get(WebConstants.IGNORED_CHANNELS)));

}

}

0 comments on commit 2252347

Please sign in to comment.