Skip to content

Commit

Permalink
[BEAM-4303] Enforce ErrorProne analysis in examples project
Browse files Browse the repository at this point in the history
  • Loading branch information
cademarkegard committed May 30, 2018
1 parent b414da9 commit c714f26
Show file tree
Hide file tree
Showing 20 changed files with 84 additions and 67 deletions.
2 changes: 1 addition & 1 deletion examples/java/build.gradle
Expand Up @@ -19,7 +19,7 @@
import groovy.json.JsonOutput

apply from: project(":").file("build_rules.gradle")
applyJavaNature()
applyJavaNature(failOnWarning: true)

description = "Apache Beam :: Examples :: Java"
ext.summary = """Apache Beam SDK provides a simple, Java-based
Expand Down
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.examples;

import com.google.common.base.Splitter;
import java.util.List;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
Expand Down Expand Up @@ -100,7 +102,7 @@ public void processElement(ProcessContext c) {
}

// Split the line into words.
String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN);
Iterable<String> words = Splitter.onPattern(ExampleUtils.TOKENIZER_PATTERN).split(c.element());

// Output each word encountered into the output PCollection.
for (String word : words) {
Expand Down
Expand Up @@ -20,6 +20,8 @@
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
Expand Down Expand Up @@ -57,7 +59,7 @@ public class StreamingWordExtract {
static class ExtractWords extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN);
Iterable<String> words = Splitter.onPattern(ExampleUtils.TOKENIZER_PATTERN).split(c.element());
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
Expand Down Expand Up @@ -87,12 +89,9 @@ public void processElement(ProcessContext c) {
}

static TableSchema getSchema() {
return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
// Compose the list of TableFieldSchema from tableSchema.
{
add(new TableFieldSchema().setName("string_field").setType("STRING"));
}
});
return new TableSchema().setFields(new ArrayList<>(ImmutableList.of(
new TableFieldSchema().setName("string_field").setType("STRING")
)));
}
}

Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.examples.complete;

import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import java.io.File;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -219,7 +220,7 @@ public PCollection<KV<String, KV<URI, Double>>> expand(
public void processElement(ProcessContext c) {
URI uri = c.element().getKey();
String line = c.element().getValue();
for (String word : line.split("\\W+")) {
for (String word : Splitter.onPattern("\\W+").split(line)) {
// Log INFO messages when the word “love” is found.
if ("love".equalsIgnoreCase(word)) {
LOG.info("Found {}", word.toLowerCase());
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Splitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -148,10 +149,11 @@ static class ExtractTimestamps extends DoFn<String, String> {

@ProcessElement
public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
String[] items = c.element().split(",");
if (items.length > 0) {
List<String> items = Splitter.on(',').splitToList(c.element());

if (items.size() > 0) {
try {
String timestamp = items[0];
String timestamp = items.get(0);
c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
} catch (IllegalArgumentException e) {
// Skip the invalid input.
Expand All @@ -171,21 +173,21 @@ static class ExtractFlowInfoFn extends DoFn<String, KV<String, LaneInfo>> {

@ProcessElement
public void processElement(ProcessContext c) {
String[] items = c.element().split(",");
if (items.length < 48) {
List<String> items = Splitter.on(',').splitToList(c.element());
if (items.size() < 48) {
// Skip the invalid input.
return;
}
// extract the sensor information for the lanes from the input string fields.
String timestamp = items[0];
String stationId = items[1];
String freeway = items[2];
String direction = items[3];
Integer totalFlow = tryIntParse(items[7]);
String timestamp = items.get(0);
String stationId = items.get(1);
String freeway = items.get(2);
String direction = items.get(3);
Integer totalFlow = tryIntParse(items.get(7));
for (int i = 1; i <= 8; ++i) {
Integer laneFlow = tryIntParse(items[6 + 5 * i]);
Double laneAvgOccupancy = tryDoubleParse(items[7 + 5 * i]);
Double laneAvgSpeed = tryDoubleParse(items[8 + 5 * i]);
Integer laneFlow = tryIntParse(items.get(6 + 5 * i));
Double laneAvgOccupancy = tryDoubleParse(items.get(7 + 5 * i));
Double laneAvgSpeed = tryDoubleParse(items.get(8 + 5 * i));
if (laneFlow == null || laneAvgOccupancy == null || laneAvgSpeed == null) {
return;
}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -417,7 +418,7 @@ private static String tryParseString(String[] inputItems, int index) {
* Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
*/
private static Map<String, String> buildStationInfo() {
Map<String, String> stations = new Hashtable<>();
Map<String, String> stations = new LinkedHashMap<>();
stations.put("1108413", "SDRoute1"); // from freeway 805 S
stations.put("1108699", "SDRoute2"); // from freeway 78 E
stations.put("1108702", "SDRoute2");
Expand Down
Expand Up @@ -76,7 +76,7 @@ public class HourlyTeamScore extends UserScore {

private static DateTimeFormatter minFmt =
DateTimeFormat.forPattern("yyyy-MM-dd-HH-mm")
.withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
.withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("America/Los_Angeles")));


/**
Expand Down
Expand Up @@ -17,7 +17,9 @@
*/
package org.apache.beam.examples.complete.game;

import com.google.common.base.Splitter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.examples.complete.game.utils.WriteToText;
Expand Down Expand Up @@ -129,15 +131,15 @@ static class ParseEventFn extends DoFn<String, GameActionInfo> {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("GOT " + c.element());
String[] components = c.element().split(",");
List<String> components = Splitter.on(',').splitToList(c.element());
try {
String user = components[0].trim();
String team = components[1].trim();
Integer score = Integer.parseInt(components[2].trim());
Long timestamp = Long.parseLong(components[3].trim());
String user = components.get(0).trim();
String team = components.get(1).trim();
Integer score = Integer.parseInt(components.get(2).trim());
Long timestamp = Long.parseLong(components.get(3).trim());
GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp);
c.output(gInfo);
} catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
} catch (IndexOutOfBoundsException | NumberFormatException e) {
numParseErrors.inc();
LOG.info("Parse error on " + c.element() + ", " + e.getMessage());
}
Expand Down
Expand Up @@ -351,6 +351,7 @@ public static void publishDataToFile(String fileName, int numMessages, int delay
out.println(message);
}
} catch (Exception e) {
System.err.print("Error in writing generated events to file");
e.printStackTrace();
} finally {
out.flush();
Expand Down
Expand Up @@ -31,5 +31,5 @@ public class GameConstants {

public static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
.withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("America/Los_Angeles")));
}
Expand Up @@ -52,7 +52,7 @@ public class WriteToText<InputT>

private static final DateTimeFormatter formatter =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
.withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("America/Los_Angeles")));

protected String filenamePrefix;
protected Map<String, FieldFn<InputT>> fieldFn;
Expand Down Expand Up @@ -98,7 +98,7 @@ public void processElement(ProcessContext c, BoundedWindow window) {
* A {@link DoFn} that writes elements to files with names deterministically derived from the
* lower and upper bounds of their key (an {@link IntervalWindow}).
*/
protected class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> {
protected static class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> {

private final String filenamePrefix;

Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Splitter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -370,12 +371,12 @@ public FormatTotalFlow(String triggerType) {

@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
String[] values = c.element().getValue().split(",");
List<String> values = Splitter.on(",").splitToList(c.element().getValue());
TableRow row = new TableRow()
.set("trigger_type", triggerType)
.set("freeway", c.element().getKey())
.set("total_flow", Integer.parseInt(values[0]))
.set("number_of_records", Long.parseLong(values[1]))
.set("total_flow", Integer.parseInt(values.get(0)))
.set("number_of_records", Long.parseLong(values.get(1)))
.set("window", window.toString())
.set("isFirst", c.pane().isFirst())
.set("isLast", c.pane().isLast())
Expand All @@ -393,17 +394,17 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
static class ExtractFlowInfo extends DoFn<String, KV<String, Integer>> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] laneInfo = c.element().split(",");
if ("timestamp".equals(laneInfo[0])) {
List<String> laneInfo = Splitter.on(",").splitToList(c.element());
if ("timestamp".equals(laneInfo.get(0))) {
// Header row
return;
}
if (laneInfo.length < 48) {
if (laneInfo.size() < 50) {
//Skip the invalid input.
return;
}
String freeway = laneInfo[2];
Integer totalFlow = tryIntegerParse(laneInfo[7]);
String freeway = laneInfo.get(2);
Integer totalFlow = tryIntegerParse(laneInfo.get(7));
// Ignore the records with total flow 0 to easily understand the working of triggers.
// Skip the records with total flow -1 since they are invalid input.
if (totalFlow == null || totalFlow <= 0) {
Expand Down
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.examples.subprocess.utils;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
Expand Down Expand Up @@ -150,7 +152,7 @@ public static void createDirectoriesOnWorker(SubProcessConfiguration configurati

public static String readLineOfLogFile(Path path) {

try (BufferedReader br = new BufferedReader(new FileReader(path.toString()))) {
try (BufferedReader br = Files.newBufferedReader(Paths.get(path.toString()), UTF_8)) {
return br.readLine();
} catch (FileNotFoundException e) {
LOG.error("Error reading the first line of file", e);
Expand Down
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.examples;

import avro.shaded.com.google.common.collect.Iterables;
import com.google.common.base.Splitter;
import com.google.common.io.Files;
import java.io.File;
import java.nio.charset.StandardCharsets;
Expand All @@ -37,7 +39,7 @@ public class DebuggingWordCountTest {

private String getFilePath(String filePath) {
if (filePath.contains(":")) {
return filePath.replace("\\", "/").split(":")[1];
return Iterables.get(Splitter.on(':').split(filePath.replace("\\", "/")), 1);
}
return filePath;
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import static org.hamcrest.Matchers.equalTo;

import com.google.common.base.MoreObjects;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.ArrayList;
Expand Down Expand Up @@ -169,7 +170,7 @@ private void testWindowedWordCountPipeline(WindowedWordCountITOptions options) t
SortedMap<String, Long> expectedWordCounts = new TreeMap<>();
for (String line :
inputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff())) {
String[] words = line.split(ExampleUtils.TOKENIZER_PATTERN);
Iterable<String> words = Splitter.onPattern(ExampleUtils.TOKENIZER_PATTERN).split(line);

for (String word : words) {
if (!word.isEmpty()) {
Expand Down Expand Up @@ -215,9 +216,9 @@ public boolean matchesSafely(PipelineResult pipelineResult) {
// Since the windowing is nondeterministic we only check the sums
actualCounts = new TreeMap<>();
for (String line : outputLines) {
String[] splits = line.split(": ");
String word = splits[0];
long count = Long.parseLong(splits[1]);
List<String> splits = Splitter.on(": ").splitToList(line);
String word = splits.get(0);
long count = Long.parseLong(splits.get(1));
actualCounts.merge(word, count, (a, b) -> a + b);
}

Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.examples.complete;

import com.google.common.base.Splitter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -146,8 +147,8 @@ public void testWindowedAutoComplete() {
private static List<CompletionCandidate> parseList(String... entries) {
List<CompletionCandidate> all = new ArrayList<>();
for (String s : entries) {
String[] countValue = s.split(":");
all.add(new CompletionCandidate(countValue[0], Integer.valueOf(countValue[1])));
List<String> countValue = Splitter.on(':').splitToList(s);
all.add(new CompletionCandidate(countValue.get(0), Integer.valueOf(countValue.get(1))));
}
return all;
}
Expand Down
Expand Up @@ -91,10 +91,10 @@ public void testParseEventFn() throws Exception {
DoFnTester.of(new ParseEventFn());

List<GameActionInfo> results = parseEventFn.processBundle(GAME_EVENTS_ARRAY);
Assert.assertEquals(results.size(), 8);
Assert.assertEquals(results.get(0).getUser(), "user0_MagentaKangaroo");
Assert.assertEquals(results.get(0).getTeam(), "MagentaKangaroo");
Assert.assertEquals(results.get(0).getScore(), new Integer(3));
Assert.assertEquals(8, results.size());
Assert.assertEquals("user0_MagentaKangaroo", results.get(0).getUser());
Assert.assertEquals("MagentaKangaroo", results.get(0).getTeam());
Assert.assertEquals(Integer.valueOf(3), results.get(0).getScore());
}

/** Tests ExtractAndSumScore("user"). */
Expand Down
Expand Up @@ -68,12 +68,12 @@ public void testFormatCounts() throws Exception {
KV.of(4, Long.MAX_VALUE),
KV.of(5, Long.MIN_VALUE) };
results = formatCountsFn.processBundle(input);
Assert.assertEquals(results.size(), 3);
Assert.assertEquals(results.get(0).get("month"), 3);
Assert.assertEquals(results.get(0).get("tornado_count"), 0L);
Assert.assertEquals(results.get(1).get("month"), 4);
Assert.assertEquals(results.get(1).get("tornado_count"), Long.MAX_VALUE);
Assert.assertEquals(results.get(2).get("month"), 5);
Assert.assertEquals(results.get(2).get("tornado_count"), Long.MIN_VALUE);
Assert.assertEquals(3, results.size());
Assert.assertEquals(3, results.get(0).get("month"));
Assert.assertEquals(0L, results.get(0).get("tornado_count"));
Assert.assertEquals(4, results.get(1).get("month"));
Assert.assertEquals(Long.MAX_VALUE, results.get(1).get("tornado_count"));
Assert.assertEquals(5, results.get(2).get("month"));
Assert.assertEquals(Long.MIN_VALUE, results.get(2).get("tornado_count"));
}
}

0 comments on commit c714f26

Please sign in to comment.