Skip to content

Commit

Permalink
[MINOR][DOC] Fix nits in JavaStreamingTestExample
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Fix some nits discussed in #11776 (comment)
use !rdd.isEmpty instead of rdd.count > 0
use static instead of AtomicInteger
remove unneeded "throws Exception"

## How was this patch tested?

manual tests

Author: Zheng RuiFeng <ruifengz@foxmail.com>

Closes #11821 from zhengruifeng/je_fix.
  • Loading branch information
zhengruifeng authored and srowen committed Mar 18, 2016
1 parent 0f1015f commit 53f32a2
Showing 1 changed file with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
* batches processed exceeds `numBatchesTimeout`.
*/
public class JavaStreamingTestExample {

private static int timeoutCounter = 0;

public static void main(String[] args) {
if (args.length != 3) {
System.err.println("Usage: JavaStreamingTestExample " +
Expand All @@ -76,7 +79,7 @@ public static void main(String[] args) {
JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(
new Function<String, BinarySample>() {
@Override
public BinarySample call(String line) throws Exception {
public BinarySample call(String line) {
String[] ts = line.split(",");
boolean label = Boolean.valueOf(ts[0]);
double value = Double.valueOf(ts[1]);
Expand All @@ -94,22 +97,21 @@ public BinarySample call(String line) throws Exception {
// $example off$

// Stop processing if test becomes significant or we time out
final Accumulator<Integer> timeoutCounter =
ssc.sparkContext().accumulator(numBatchesTimeout);
timeoutCounter = numBatchesTimeout;

out.foreachRDD(new VoidFunction<JavaRDD<StreamingTestResult>>() {
@Override
public void call(JavaRDD<StreamingTestResult> rdd) throws Exception {
timeoutCounter.add(-1);
public void call(JavaRDD<StreamingTestResult> rdd) {
timeoutCounter -= 1;

long cntSignificant = rdd.filter(new Function<StreamingTestResult, Boolean>() {
boolean anySignificant = !rdd.filter(new Function<StreamingTestResult, Boolean>() {
@Override
public Boolean call(StreamingTestResult v) throws Exception {
public Boolean call(StreamingTestResult v) {
return v.pValue() < 0.05;
}
}).count();
}).isEmpty();

if (timeoutCounter.value() <= 0 || cntSignificant > 0) {
if (timeoutCounter <= 0 || anySignificant) {
rdd.context().stop();
}
}
Expand Down

0 comments on commit 53f32a2

Please sign in to comment.