Skip to content
This repository has been archived by the owner on Jun 16, 2023. It is now read-only.

Commit

Permalink
Add jstorm-utility module
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongyan.feng committed Oct 10, 2014
1 parent dd922b8 commit 86bf393
Show file tree
Hide file tree
Showing 13 changed files with 1,377 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "jstorm-utility/rocket-mq"]
path = jstorm-utility/rocket-mq
url = https://github.com/rocketmq/rocketmq-storm
1 change: 1 addition & 0 deletions jstorm-utility/rocket-mq
Submodule rocket-mq added at 372e9d
21 changes: 21 additions & 0 deletions jstorm-utility/transaction_meta_spout/conf/topology.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

meta.consumer.group: "S_BATCH_META_TEST"
meta.topic: "jae_log"
meta.subexpress: "*"
meta.nameserver: null
#type "yyyyMMddhhmmss"
#meta.start.timestamp: "20140701000000"
meta.start.timestamp: null
meta.batch.message.num: 1024
meta.max.fail.times: 10
meta.rebalance.frequency.sec: 60


storm.cluster.mode: "local"
#storm.cluster.mode: "distributed"

topology.name: "batch_test"
topology.spout.parallel: 1
topology.bolt.parallel: 1
topology.message.timeout.secs: 300
topology.workers: 10
68 changes: 68 additions & 0 deletions jstorm-utility/transaction_meta_spout/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.aloha</groupId>
<artifactId>aloha-utility</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>transaction_meta_spout</artifactId>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>com.taobao.metaq.final</groupId>
<artifactId>metaq-client</artifactId>
<version>3.1.3</version>
</dependency>
<!--
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
<version>3.0.1</version>
</dependency>
-->

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.alibaba.jstorm.batch.example;

import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.log4j.Logger;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import com.alibaba.jstorm.batch.BatchId;
import com.alibaba.jstorm.batch.IPostCommit;
import com.alibaba.jstorm.utils.JStormUtils;

public class BatchMetaRebalance implements IBasicBolt, IPostCommit {
/** */
private static final long serialVersionUID = 2991323223385556163L;

private static final Logger LOG = Logger
.getLogger(BatchMetaRebalance.class);

public static final String BOLT_NAME = BatchMetaRebalance.class
.getSimpleName();

public static final String REBALANCE_STREAM_ID = "rebalance";

private transient AtomicBoolean isNeedRebalance;
private transient ScheduledExecutorService scheduExec;

@Override
public void prepare(Map stormConf, TopologyContext context) {
isNeedRebalance = new AtomicBoolean(false);
CheckRebalanceTimer timer = new CheckRebalanceTimer();

int rebalanceTimeInterval = JStormUtils.parseInt(
stormConf.get("meta.rebalance.frequency.sec"), 3600);

long now = System.currentTimeMillis();
long next = (now/(rebalanceTimeInterval * 1000) + 1) * rebalanceTimeInterval * 1000;
long diff = (next - now )/1000;

// Calendar start = Calendar.getInstance();
//
// start.add(Calendar.HOUR_OF_DAY, 1);
//
// start.set(Calendar.MINUTE, 30);
// start.set(Calendar.SECOND, 0);
// start.set(Calendar.MILLISECOND, 0);
// long startMillis = start.getTimeInMillis();
//
// long now = System.currentTimeMillis();
//
// long diff = (startMillis - now) / (1000);

ScheduledExecutorService scheduExec = null;

scheduExec = Executors.newSingleThreadScheduledExecutor();
scheduExec.scheduleAtFixedRate(timer, diff, rebalanceTimeInterval,
TimeUnit.SECONDS);

LOG.info("Successfully init rebalance timer");
}

@Override
public void execute(Tuple input, BasicOutputCollector collector) {
LOG.warn("Receive unkonw message");
}

@Override
public void cleanup() {
scheduExec.shutdown();
LOG.info("Successfully do cleanup");
}

@Override
public void postCommit(BatchId id, BasicOutputCollector collector) {
if (isNeedRebalance.get() == true) {
isNeedRebalance.set(false);
collector.emit(REBALANCE_STREAM_ID, new Values(id));
LOG.info("Emit rebalance command");
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(REBALANCE_STREAM_ID, new Fields("BatchId"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}

public class CheckRebalanceTimer implements Runnable {
public void run() {
BatchMetaRebalance.this.isNeedRebalance.set(true);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.alibaba.jstorm.batch.example;

import java.util.List;
import java.util.Map;

import org.apache.log4j.Logger;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import com.alibaba.jstorm.batch.BatchId;
import com.alibaba.jstorm.batch.IBatchSpout;
import com.alibaba.jstorm.batch.meta.MetaSimpleClient;
import com.alibaba.jstorm.batch.meta.MetaSpoutConfig;
import com.alibaba.jstorm.batch.util.BatchCommon;
import com.alibaba.jstorm.cluster.ClusterState;
import com.alibaba.rocketmq.common.message.MessageExt;

public class BatchMetaSpout implements IBatchSpout{
private static final long serialVersionUID = 5720810158625748041L;

private static final Logger LOG = Logger.getLogger(BatchMetaSpout.class);

public static final String SPOUT_NAME = BatchMetaSpout.class.getSimpleName();

private Map conf;

private String taskName;
private int taskIndex;
private int taskParallel;

private transient MetaSimpleClient metaClient;
private MetaSpoutConfig metaSpoutConfig;



public BatchMetaSpout(MetaSpoutConfig metaSpoutConfig) {
this.metaSpoutConfig = metaSpoutConfig;
}

public void initMetaClient() throws Exception {
ClusterState zkClient = BatchCommon.getZkClient(conf);
metaClient = new MetaSimpleClient(metaSpoutConfig, zkClient, taskIndex,
taskParallel);

metaClient.init();

LOG.info("Successfully init meta client " + taskName);
}

@Override
public void prepare(Map stormConf, TopologyContext context) {
this.conf = stormConf;

taskName = context.getThisComponentId() + "_" + context.getThisTaskId();

taskIndex = context.getThisTaskIndex();

taskParallel = context.getComponentTasks(context.getThisComponentId())
.size();

try {
initMetaClient();
} catch (Exception e) {
LOG.info("Failed to init Meta Client,", e);
throw new RuntimeException(e);
}

LOG.info(taskName + " successfully do prepare ");
}

public void emitBatch(BatchId batchId, BasicOutputCollector collector) {
List<MessageExt> msgs = metaClient.fetchOneBatch();
for (MessageExt msgExt : msgs) {
collector.emit(new Values(batchId, msgExt));
}
}

@Override
public void execute(Tuple input, BasicOutputCollector collector) {

String streamId = input.getSourceStreamId();
if (streamId.equals(BatchMetaRebalance.REBALANCE_STREAM_ID)) {
try {
metaClient.rebalanceMqList();
} catch (Exception e) {
LOG.warn("Failed to do rebalance operation", e);
}
}else {
BatchId batchId = (BatchId) input.getValue(0);

emitBatch(batchId, collector);
}

}

@Override
public void cleanup() {
metaClient.cleanup();

}

@Override
public byte[] commit(BatchId id) throws FailedException {
return metaClient.commit(id);
}

@Override
public void revert(BatchId id, byte[] commitResult) {
metaClient.revert(id, commitResult);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("BatchId", "MessageExt"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}




}
Loading

0 comments on commit 86bf393

Please sign in to comment.