Skip to content

Commit

Permalink
CAMEL-10849: Salesforce: subscription channel ...
Browse files Browse the repository at this point in the history
...created per component

Changes the `replayId` value from `Integer` to `Long`. The range of the
`replayId` is explicitly specified in the Salesforce documentation it's
safer to make it as big as can be.
  • Loading branch information
zregvart committed Feb 22, 2017
1 parent 9a5357e commit cdf97cb
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 38 deletions.
Expand Up @@ -483,7 +483,7 @@ The Salesforce component supports 42 endpoint options which are listed below:
| backoffIncrement | common | | long | Backoff interval increment for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
| batchId | common | | String | Bulk API Batch ID
| contentType | common | | ContentType | Bulk API content type one of XML CSV ZIP_XML ZIP_CSV
| defaultReplayId | common | | Integer | Default replayId setting if no value is found in link initialReplayIdMap
| defaultReplayId | common | | Long | Default replayId setting if no value is found in link initialReplayIdMap
| format | common | | PayloadFormat | Payload format to use for Salesforce API calls either JSON or XML defaults to JSON
| httpClient | common | | SalesforceHttpClient | Custom Jetty Http Client to use to connect to Salesforce.
| includeDetails | common | | Boolean | Include details in Salesforce1 Analytics report defaults to false.
Expand Down
Expand Up @@ -161,9 +161,9 @@ public class SalesforceEndpointConfig implements Cloneable {

// Streaming API properties
@UriParam
private Integer defaultReplayId;
private Long defaultReplayId;
@UriParam
private Map<String, Integer> initialReplayIdMap;
private Map<String, Long> initialReplayIdMap;

// Approval API properties
private ApprovalRequest approval;
Expand Down Expand Up @@ -615,7 +615,7 @@ public Map<String, Object> toValueMap() {
return Collections.unmodifiableMap(valueMap);
}

public Integer getDefaultReplayId() {
public Long getDefaultReplayId() {
return defaultReplayId;
}

Expand All @@ -624,18 +624,18 @@ public Integer getDefaultReplayId() {
*
* @param defaultReplayId
*/
public void setDefaultReplayId(Integer defaultReplayId) {
public void setDefaultReplayId(Long defaultReplayId) {
this.defaultReplayId = defaultReplayId;
}

public Map<String, Integer> getInitialReplayIdMap() {
public Map<String, Long> getInitialReplayIdMap() {
return Optional.ofNullable(initialReplayIdMap).orElse(Collections.emptyMap());
}

/**
* Replay IDs to start from per channel name.
*/
public void setInitialReplayIdMap(Map<String, Integer> initialReplayIdMap) {
public void setInitialReplayIdMap(Map<String, Long> initialReplayIdMap) {
this.initialReplayIdMap = initialReplayIdMap;
}

Expand Down
Expand Up @@ -63,16 +63,26 @@
*/
public class CometDReplayExtension extends Adapter {
private static final String EXTENSION_NAME = "replay";
private final ConcurrentMap<String, Integer> dataMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Long> dataMap = new ConcurrentHashMap<>();
private final AtomicBoolean supported = new AtomicBoolean();

public void addTopicReplayId(final String topicName, final int replayId) {
public void addTopicReplayId(final String topicName, final long replayId) {
dataMap.put(topicName, replayId);
}

@Override
public boolean rcv(ClientSession session, Message.Mutable message) {
Integer replayId = (Integer)message.get(EXTENSION_NAME);
final Object value = message.get(EXTENSION_NAME);

final Long replayId;
if (value instanceof Long) {
replayId = (Long)value;
} else if (value instanceof Number) {
replayId = ((Number)value).longValue();
} else {
replayId = null;
}

if (this.supported.get() && replayId != null) {
try {
dataMap.put(message.getChannel(), replayId);
Expand Down
Expand Up @@ -415,28 +415,28 @@ public void onMessage(ClientSessionChannel channel, Message message) {
void setupReplay(final SalesforceEndpoint endpoint) {
final String topicName = endpoint.getTopicName();

final Optional<Integer> replayId = determineReplayIdFor(endpoint, topicName);
final Optional<Long> replayId = determineReplayIdFor(endpoint, topicName);
if (replayId.isPresent()) {
final String channelName = getChannelName(topicName);

REPLAY_EXTENSION.addTopicReplayId(channelName, replayId.get());
}
}

static Optional<Integer> determineReplayIdFor(final SalesforceEndpoint endpoint, final String topicName) {
static Optional<Long> determineReplayIdFor(final SalesforceEndpoint endpoint, final String topicName) {
final String channelName = getChannelName(topicName);

final SalesforceComponent component = endpoint.getComponent();

final SalesforceEndpointConfig endpointConfiguration = endpoint.getConfiguration();
final Map<String, Integer> endpointInitialReplayIdMap = endpointConfiguration.getInitialReplayIdMap();
final Integer endpointReplayId = endpointInitialReplayIdMap.getOrDefault(topicName, endpointInitialReplayIdMap.get(channelName));
final Integer endpointDefaultReplayId = endpointConfiguration.getDefaultReplayId();
final Map<String, Long> endpointInitialReplayIdMap = endpointConfiguration.getInitialReplayIdMap();
final Long endpointReplayId = endpointInitialReplayIdMap.getOrDefault(topicName, endpointInitialReplayIdMap.get(channelName));
final Long endpointDefaultReplayId = endpointConfiguration.getDefaultReplayId();

final SalesforceEndpointConfig componentConfiguration = component.getConfig();
final Map<String, Integer> componentInitialReplayIdMap = componentConfiguration.getInitialReplayIdMap();
final Integer componentReplayId = componentInitialReplayIdMap.getOrDefault(topicName, componentInitialReplayIdMap.get(channelName));
final Integer componentDefaultReplayId = componentConfiguration.getDefaultReplayId();
final Map<String, Long> componentInitialReplayIdMap = componentConfiguration.getInitialReplayIdMap();
final Long componentReplayId = componentInitialReplayIdMap.getOrDefault(topicName, componentInitialReplayIdMap.get(channelName));
final Long componentDefaultReplayId = componentConfiguration.getDefaultReplayId();

// the endpoint values have priority over component values, and the default values posteriority
// over give topic values
Expand Down
Expand Up @@ -35,13 +35,13 @@ public class SubscriptionHelperTest {

@Test
public void shouldSupportInitialConfigMapWithTwoKeySyntaxes() throws Exception {
final Map<String, Integer> initialReplayIdMap = new HashMap<>();
initialReplayIdMap.put("my-topic-1", 10);
initialReplayIdMap.put("/topic/my-topic-1", 20);
initialReplayIdMap.put("/topic/my-topic-2", 30);
final Map<String, Long> initialReplayIdMap = new HashMap<>();
initialReplayIdMap.put("my-topic-1", 10L);
initialReplayIdMap.put("/topic/my-topic-1", 20L);
initialReplayIdMap.put("/topic/my-topic-2", 30L);

final SalesforceEndpointConfig config = new SalesforceEndpointConfig();
config.setDefaultReplayId(14);
config.setDefaultReplayId(14L);
config.setInitialReplayIdMap(initialReplayIdMap);

final SalesforceComponent component = mock(SalesforceComponent.class);
Expand All @@ -52,25 +52,25 @@ public void shouldSupportInitialConfigMapWithTwoKeySyntaxes() throws Exception {
when(component.getConfig()).thenReturn(new SalesforceEndpointConfig());

assertEquals("Expecting replayId for `my-topic-1` to be 10, as short topic names have priority",
Optional.of(10), determineReplayIdFor(endpoint, "my-topic-1"));
Optional.of(10L), determineReplayIdFor(endpoint, "my-topic-1"));

assertEquals("Expecting replayId for `my-topic-2` to be 30, the only one given", Optional.of(30),
assertEquals("Expecting replayId for `my-topic-2` to be 30, the only one given", Optional.of(30L),
determineReplayIdFor(endpoint, "my-topic-2"));

assertEquals("Expecting replayId for `my-topic-3` to be 14, the default", Optional.of(14),
assertEquals("Expecting replayId for `my-topic-3` to be 14, the default", Optional.of(14L),
determineReplayIdFor(endpoint, "my-topic-3"));
}

@Test
public void precedenceShouldBeFollowed() {
final SalesforceEndpointConfig componentConfig = new SalesforceEndpointConfig();
componentConfig.setDefaultReplayId(1);
componentConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-1", 2));
componentConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-2", 3));
componentConfig.setDefaultReplayId(1L);
componentConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-1", 2L));
componentConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-2", 3L));

final SalesforceEndpointConfig endpointConfig = new SalesforceEndpointConfig();
endpointConfig.setDefaultReplayId(4);
endpointConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-1", 5));
endpointConfig.setDefaultReplayId(4L);
endpointConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-1", 5L));

final SalesforceComponent component = mock(SalesforceComponent.class);
when(component.getConfig()).thenReturn(componentConfig);
Expand All @@ -80,17 +80,17 @@ public void precedenceShouldBeFollowed() {
when(endpoint.getConfiguration()).thenReturn(endpointConfig);

assertEquals("Expecting replayId for `my-topic-1` to be 5, as endpoint configuration has priority",
Optional.of(5), determineReplayIdFor(endpoint, "my-topic-1"));
Optional.of(5L), determineReplayIdFor(endpoint, "my-topic-1"));

assertEquals("Expecting replayId for `my-topic-2` to be 3, as endpoint does not configure it",
Optional.of(3), determineReplayIdFor(endpoint, "my-topic-2"));
Optional.of(3L), determineReplayIdFor(endpoint, "my-topic-2"));

assertEquals("Expecting replayId for `my-topic-3` to be 4, as it is endpoint's default",
Optional.of(4), determineReplayIdFor(endpoint, "my-topic-3"));
Optional.of(4L), determineReplayIdFor(endpoint, "my-topic-3"));

endpointConfig.setDefaultReplayId(null);

assertEquals("Expecting replayId for `my-topic-3` to be 1, as it is component's default when endpoint does not have a default",
Optional.of(1), determineReplayIdFor(endpoint, "my-topic-3"));
Optional.of(1L), determineReplayIdFor(endpoint, "my-topic-3"));
}
}
Expand Up @@ -574,7 +574,7 @@ public static class SalesforceEndpointConfigNestedConfiguration {
*
* @param defaultReplayId
*/
private Integer defaultReplayId;
private Long defaultReplayId;
/**
* Replay IDs to start from per channel name.
*/
Expand Down Expand Up @@ -909,11 +909,11 @@ public void setMaxBackoff(Long maxBackoff) {
this.maxBackoff = maxBackoff;
}

public Integer getDefaultReplayId() {
public Long getDefaultReplayId() {
return defaultReplayId;
}

public void setDefaultReplayId(Integer defaultReplayId) {
public void setDefaultReplayId(Long defaultReplayId) {
this.defaultReplayId = defaultReplayId;
}

Expand Down

0 comments on commit cdf97cb

Please sign in to comment.