Skip to content

Commit

Permalink
Merge branch '1.x-branch' of https://github.com/apache/storm into 1.x…
Browse files Browse the repository at this point in the history
…-branch
  • Loading branch information
srdo committed Mar 15, 2016
2 parents e2251c4 + f0abfff commit e701a2a
Show file tree
Hide file tree
Showing 84 changed files with 2,225 additions and 344 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ metastore_db
.settings/
.project
.classpath
logs
build
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,32 @@
## 1.0.0
* STORM-971: Metric for messages lost due to kafka retention
* STORM-1608: Fix stateful topology acking behavior
* STORM-1609: Netty Client is not best effort delivery on failed Connection
* STORM-1620: Update curator to fix CURATOR-209
* STORM-1469: Decommission SimpleTransportPlugin and configuration
* STORM-1469: Adding Plain Sasl Transport Plugin
* STORM-1588: Do not add event logger details if number of event loggers is zero
* STORM-1606: print the information of testcase which is on failure
* STORM-1436: Set Travis Heap size to fit in memory limits in travis builds.
* STORM-1529: Change default worker temp directory location for workers
* STORM-1543: DRPCSpout should always try to reconnect disconnected DRPCInvocationsClient
* STORM-1561: Supervisor should relaunch worker if assignments have changed
* STORM-1601: Check if /backpressure/storm-id node exists before requesting children
* STORM-1574: Better handle backpressure exception etc.
* STORM-1587: Avoid NPE while prining Metrics
* STORM-1570: Storm SQL support for nested fields and array
* STORM-1576: fix ConcurrentModificationException in addCheckpointInputs
* STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed
* STORM-1488: UI Topology Page component last error timestamp is from 1970
* STORM-1542: Remove profile action retry in case of non-zero exit code
* STORM-1540: Fix Debug/Sampling for Trident
* STORM-1569: Allowing users to specify the nimbus thrift server queue size.
* STORM-1552: Fix topology event sampling log dir
* STORM-1511: min/max operations support on a trident stream
* STORM-1522: REST API throws invalid worker log links
* STORM-1532: Fix readCommandLineOpts to parse JSON correctly
* STORM-1541: Change scope of 'hadoop-minicluster' to test
* STORM-1539: Improve Storm ACK-ing performance
* STORM-1519: Storm syslog logging not confirming to RFC5426 3.1
* STORM-1533: IntegerValidator for metric consumer parallelism hint
* STORM-1534: Pick correct version of jackson-annotations jar
Expand Down Expand Up @@ -265,6 +293,7 @@

## 0.10.1

* STORM-1596: Do not use single Kerberos TGT instance between multiple threads
* STORM-1481: avoid Math.abs(Integer) get a negative value
* STORM-1121: Deprecate test only configuraton nimbus.reassign
* STORM-1180: FLUX logo wasn't appearing quite right
Expand Down
1 change: 1 addition & 0 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ under the License.
* Boyang Jerry Peng ([@jerrypeng](https://github.com/jerrypeng))
* Zhuo Liu ([@zhuoliu](https://github.com/zhuoliu))
* Haohui Mai ([@haohui](https://github.com/haohui))
* Longda Feng ([@longda](https://github.com/longdafeng))

## Contributors

Expand Down
4 changes: 2 additions & 2 deletions bin/flight.bash
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

JDKPATH="/home/y/share/yjava_jdk/java"
JDKPATH=$JAVA_HOME
BINPATH="/usr/bin"
USER=`whoami`

Expand Down Expand Up @@ -59,7 +59,7 @@ function dump_record {

function jstack_record {
FILENAME=jstack-$1-${NOW}.txt
$BINPATH/jstack $1 > "$2/${FILENAME}"
$BINPATH/jstack $1 > "$2/${FILENAME}" 2>&1
}

function jmap_record {
Expand Down
4 changes: 4 additions & 0 deletions bin/storm-config.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ if not defined STORM_LOG_DIR (
@rem retrieve storm.log4j2.conf.dir from conf file
@rem

if not defined CMD_TEMP_FILE (
set CMD_TEMP_FILE=tmpfile
)

"%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value storm.log4j2.conf.dir > %CMD_TEMP_FILE%

FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
Expand Down
2 changes: 1 addition & 1 deletion bin/storm.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
)

if "%c-opt%"=="second" (
set config-options=%config-options%=%1
set config-options=%config-options%=%~1
set c-opt=
goto start
)
Expand Down
2 changes: 1 addition & 1 deletion conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ topology.min.replication.count: 1
topology.max.replication.wait.time.sec: 60
nimbus.credential.renewers.freq.secs: 600
nimbus.impersonation.authorizer: "org.apache.storm.security.auth.authorizer.ImpersonationAuthorizer"

nimbus.queue.size: 100000
scheduler.display.resource: false

### ui.* configs are for the master
Expand Down
4 changes: 4 additions & 0 deletions dev-tools/travis/print-errors-from-test-reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ def print_error_reports_from_report_file(file_path):
if fail is not None:
print_detail_information(testcase, fail)

failure = testcase.find("failure")
if failure is not None:
print_detail_information(testcase, failure)


def main(report_dir_path):
for test_report in glob.iglob(report_dir_path + '/*.xml'):
Expand Down
4 changes: 3 additions & 1 deletion dev-tools/travis/travis-script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ cd ${STORM_SRC_ROOT_DIR}

# We should be concerned that Travis CI could be very slow because it uses VM
export STORM_TEST_TIMEOUT_MS=150000
# Travis only has 3GB of memory, lets use 1GB for build, and 1.5GB for forked JVMs
export MAVEN_OPTS="-Xmx1024m"

mvn --batch-mode test -fae -Pnative,all-tests -Prat -pl $2
mvn --batch-mode test -fae -Pnative,all-tests -Prat -pl "$2"
BUILD_RET_VAL=$?

for dir in `find . -type d -and -wholename \*/target/\*-reports`;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public static void printMetrics(C client, String name) throws Exception {
long acked = 0;
long failed = 0;
for (ExecutorSummary exec: info.get_executors()) {
if ("spout".equals(exec.get_component_id())) {
if ("spout".equals(exec.get_component_id()) && exec.get_stats() != null && exec.get_stats().get_specific() != null) {
SpoutStats stats = exec.get_stats().get_specific().get_spout();
Map<String, Long> failedMap = stats.get_failed().get(":all-time");
Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Random;
Expand All @@ -33,6 +35,7 @@
* every 100 ms. The ts field can be used in tuple time based windowing.
*/
public class RandomIntegerSpout extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(RandomIntegerSpout.class);
private SpoutOutputCollector collector;
private Random rand;
private long msgId = 0;
Expand All @@ -51,6 +54,16 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
@Override
public void nextTuple() {
Utils.sleep(100);
collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000), ++msgId));
collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000), ++msgId), msgId);
}

@Override
public void ack(Object msgId) {
LOG.debug("Got ACK for msgId : " + msgId);
}

@Override
public void fail(Object msgId) {
LOG.debug("Got FAIL for msgId : " + msgId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.storm.starter.spout;

import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

/**
* This spout generates random whole numbers with given {@code maxNumber} value as maximum with the given {@code fields}.
*
*/
public class RandomNumberGeneratorSpout implements IBatchSpout {
private final Fields fields;
private final int batchSize;
private final int maxNumber;
private final Map<Long, List<List<Object>>> batches = new HashMap<>();

public RandomNumberGeneratorSpout(Fields fields, int batchSize, int maxNumber) {
this.fields = fields;
this.batchSize = batchSize;
this.maxNumber = maxNumber;
}

@Override
public void open(Map conf, TopologyContext context) {
}

@Override
public void emitBatch(long batchId, TridentCollector collector) {
List<List<Object>> values = null;
if(batches.containsKey(batchId)) {
values = batches.get(batchId);
} else {
values = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
List<Object> numbers = new ArrayList<>();
for (int x=0; x<fields.size(); x++) {
numbers.add(ThreadLocalRandom.current().nextInt(0, maxNumber + 1));
}
values.add(numbers);
}
batches.put(batchId, values);
}
for (List<Object> value : values) {
collector.emit(value);
}
}

@Override
public void ack(long batchId) {
batches.remove(batchId);
}

@Override
public void close() {

}

@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}

@Override
public Fields getOutputFields() {
return fields;
}
}

0 comments on commit e701a2a

Please sign in to comment.