Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-2594: Apply new code style to storm-rocketmq #2194

Merged
merged 1 commit into from Jul 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 14 additions & 14 deletions docs/storm-rocketmq.md
Expand Up @@ -6,18 +6,18 @@ Storm/Trident integration for [RocketMQ](https://rocketmq.incubator.apache.org/)
## Read from Topic
The spout included in this package for reading data from a topic.

### RocketMQSpout
To use the `RocketMQSpout`, you construct an instance of it by specifying a Properties instance which including rocketmq configs.
RocketMQSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
RocketMQSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.
### RocketMqSpout
To use the `RocketMqSpout`, you construct an instance of it by specifying a Properties instance which including rocketmq configs.
RocketMqSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
RocketMqSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.

```java
Properties properties = new Properties();
properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, nameserverAddr);
properties.setProperty(SpoutConfig.CONSUMER_GROUP, group);
properties.setProperty(SpoutConfig.CONSUMER_TOPIC, topic);

RocketMQSpout spout = new RocketMQSpout(properties);
RocketMqSpout spout = new RocketMqSpout(properties);
```


Expand Down Expand Up @@ -51,18 +51,18 @@ public interface TopicSelector extends Serializable {
`storm-rocketmq` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `FieldNameBasedTopicSelector`.


### RocketMQBolt
To use the `RocketMQBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
RocketMQBolt send messages async by default. You can change this by invoking `withAsync(false)`.
### RocketMqBolt
To use the `RocketMqBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
RocketMqBolt send messages async by default. You can change this by invoking `withAsync(false)`.

```java
TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
TopicSelector selector = new DefaultTopicSelector(topic);

properties = new Properties();
properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);

RocketMQBolt insertBolt = new RocketMQBolt()
RocketMqBolt insertBolt = new RocketMqBolt()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);
Expand All @@ -76,19 +76,19 @@ We support trident persistent state that can be used with trident topologies. To
TopicSelector selector = new DefaultTopicSelector(topic);

Properties properties = new Properties();
properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);

RocketMQState.Options options = new RocketMQState.Options()
RocketMqState.Options options = new RocketMqState.Options()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);

StateFactory factory = new RocketMQStateFactory(options);
StateFactory factory = new RocketMqStateFactory(options);

TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);

stream.partitionPersist(factory, fields,
new RocketMQStateUpdater(), new Fields());
new RocketMqStateUpdater(), new Fields());
```

Expand Up @@ -22,14 +22,14 @@
import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.rocketmq.RocketMQConfig;
import org.apache.storm.rocketmq.RocketMqConfig;
import org.apache.storm.rocketmq.SpoutConfig;
import org.apache.storm.rocketmq.bolt.RocketMQBolt;
import org.apache.storm.rocketmq.bolt.RocketMqBolt;
import org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper;
import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector;
import org.apache.storm.rocketmq.common.selector.TopicSelector;
import org.apache.storm.rocketmq.spout.RocketMQSpout;
import org.apache.storm.rocketmq.spout.RocketMqSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

Expand All @@ -49,17 +49,17 @@ public static StormTopology buildTopology(String nameserverAddr, String topic){
properties.setProperty(SpoutConfig.CONSUMER_GROUP, CONSUMER_GROUP);
properties.setProperty(SpoutConfig.CONSUMER_TOPIC, CONSUMER_TOPIC);

RocketMQSpout spout = new RocketMQSpout(properties);
RocketMqSpout spout = new RocketMqSpout(properties);

WordCounter bolt = new WordCounter();

TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
TopicSelector selector = new DefaultTopicSelector(topic);

properties = new Properties();
properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);

RocketMQBolt insertBolt = new RocketMQBolt()
RocketMqBolt insertBolt = new RocketMqBolt()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);
Expand Down
Expand Up @@ -22,14 +22,14 @@
import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.rocketmq.RocketMQConfig;
import org.apache.storm.rocketmq.RocketMqConfig;
import org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper;
import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector;
import org.apache.storm.rocketmq.common.selector.TopicSelector;
import org.apache.storm.rocketmq.trident.state.RocketMQState;
import org.apache.storm.rocketmq.trident.state.RocketMQStateFactory;
import org.apache.storm.rocketmq.trident.state.RocketMQStateUpdater;
import org.apache.storm.rocketmq.trident.state.RocketMqState;
import org.apache.storm.rocketmq.trident.state.RocketMqStateFactory;
import org.apache.storm.rocketmq.trident.state.RocketMqStateUpdater;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.state.StateFactory;
Expand All @@ -55,20 +55,20 @@ public static StormTopology buildTopology(String nameserverAddr, String topic){
TopicSelector selector = new DefaultTopicSelector(topic);

Properties properties = new Properties();
properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);

RocketMQState.Options options = new RocketMQState.Options()
RocketMqState.Options options = new RocketMqState.Options()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);

StateFactory factory = new RocketMQStateFactory(options);
StateFactory factory = new RocketMqStateFactory(options);

TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);

stream.partitionPersist(factory, fields,
new RocketMQStateUpdater(), new Fields());
new RocketMqStateUpdater(), new Fields());

return topology.build();
}
Expand Down
28 changes: 14 additions & 14 deletions external/storm-rocketmq/README.md
Expand Up @@ -6,18 +6,18 @@ Storm/Trident integration for [RocketMQ](https://rocketmq.incubator.apache.org/)
## Read from Topic
The spout included in this package for reading data from a topic.

### RocketMQSpout
To use the `RocketMQSpout`, you construct an instance of it by specifying a Properties instance which including rocketmq configs.
RocketMQSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
RocketMQSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.
### RocketMqSpout
To use the `RocketMqSpout`, you construct an instance of it by specifying a Properties instance which including rocketmq configs.
RocketMqSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
RocketMqSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.

```java
Properties properties = new Properties();
properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, nameserverAddr);
properties.setProperty(SpoutConfig.CONSUMER_GROUP, group);
properties.setProperty(SpoutConfig.CONSUMER_TOPIC, topic);

RocketMQSpout spout = new RocketMQSpout(properties);
RocketMqSpout spout = new RocketMqSpout(properties);
```


Expand Down Expand Up @@ -51,18 +51,18 @@ public interface TopicSelector extends Serializable {
`storm-rocketmq` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `FieldNameBasedTopicSelector`.


### RocketMQBolt
To use the `RocketMQBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
RocketMQBolt send messages async by default. You can change this by invoking `withAsync(false)`.
### RocketMqBolt
To use the `RocketMqBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
RocketMqBolt send messages async by default. You can change this by invoking `withAsync(false)`.

```java
TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
TopicSelector selector = new DefaultTopicSelector(topic);

properties = new Properties();
properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);

RocketMQBolt insertBolt = new RocketMQBolt()
RocketMqBolt insertBolt = new RocketMqBolt()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);
Expand All @@ -76,20 +76,20 @@ We support trident persistent state that can be used with trident topologies. To
TopicSelector selector = new DefaultTopicSelector(topic);

Properties properties = new Properties();
properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);

RocketMQState.Options options = new RocketMQState.Options()
RocketMqState.Options options = new RocketMqState.Options()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);

StateFactory factory = new RocketMQStateFactory(options);
StateFactory factory = new RocketMqStateFactory(options);

TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);

stream.partitionPersist(factory, fields,
new RocketMQStateUpdater(), new Fields());
new RocketMqStateUpdater(), new Fields());
```


Expand Down
3 changes: 0 additions & 3 deletions external/storm-rocketmq/pom.xml
Expand Up @@ -74,9 +74,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
<maxAllowedViolations>100</maxAllowedViolations>
</configuration>
</plugin>
</plugins>
</build>
Expand Down
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.rocketmq;

import org.apache.rocketmq.common.message.MessageExt;
Expand Down
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.rocketmq;

import java.nio.charset.StandardCharsets;
Expand All @@ -25,7 +26,7 @@ public class DefaultMessageBodySerializer implements MessageBodySerializer {
* Currently, we just convert string to bytes using UTF-8 charset.
* Note: in this way, object.toString() method is invoked.
* @param body RocketMQ Message body
* @return
* @return serialized byte[]
*/
@Override
public byte[] serialize(Object body) {
Expand Down
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.rocketmq;

import java.util.Map;
Expand All @@ -24,14 +25,20 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* An implementation of MessageRetryManager
* An implementation of MessageRetryManager.
*/
public class DefaultMessageRetryManager implements MessageRetryManager{
public class DefaultMessageRetryManager implements MessageRetryManager {
private Map<String,ConsumerMessage> cache = new ConcurrentHashMap<>(500);
private BlockingQueue<ConsumerMessage> queue;
private int maxRetry;
private int ttl;

/**
* DefaultMessageRetryManager Constructor.
* @param queue messages BlockingQueue
* @param maxRetry max retry times
* @param ttl TTL
*/
public DefaultMessageRetryManager(BlockingQueue<ConsumerMessage> queue, int maxRetry, int ttl) {
this.queue = queue;
this.maxRetry = maxRetry;
Expand All @@ -53,10 +60,12 @@ public void run() {
}, period, period);
}

@Override
public void ack(String id) {
cache.remove(id);
}

@Override
public void fail(String id) {
ConsumerMessage message = cache.remove(id);
if (message == null) {
Expand All @@ -70,16 +79,13 @@ public void fail(String id) {
}
}

@Override
public void mark(ConsumerMessage message) {
message.setTimestamp(System.currentTimeMillis());
cache.put(message.getId(), message);
}

/**
* Whether the message need retry.
* @param message
* @return
*/
@Override
public boolean needRetry(ConsumerMessage message) {
return message.getRetries() < maxRetry;
}
Expand Down
Expand Up @@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.rocketmq;

import java.io.Serializable;

/**
* RocketMQ message body serializer
* RocketMQ message body serializer.
*/
public interface MessageBodySerializer extends Serializable{
public interface MessageBodySerializer extends Serializable {
byte[] serialize(Object body);
}
Expand Up @@ -15,35 +15,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.rocketmq;

/**
* Interface for messages retry manager
* Interface for messages retry manager.
*/
public interface MessageRetryManager {
/**
* Remove from the cache. Message with the id is successful.
* @param id
* @param id message id
*/
void ack(String id);

/**
* Remove from the cache. Message with the id is failed.
* Invoke retry logics if necessary.
* @param id
* @param id message id
*/
void fail(String id);

/**
* Mark message in the cache.
* @param message
* @param message message
*/
void mark(ConsumerMessage message);

/**
* Whether the message need retry.
* @param message
* @return
* @param message ConsumerMessage
* @return true if need retry, otherwise false
*/
boolean needRetry(ConsumerMessage message);

Expand Down