Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.examples.complete.game.utils.WriteToText;
import org.apache.beam.sdk.Pipeline;
Expand Down Expand Up @@ -100,6 +101,10 @@ public Integer getScore() {
return this.score;
}

public Long getTimestamp() {
return this.timestamp;
}

public String getKey(String keyname) {
if ("team".equals(keyname)) {
return this.team;
Expand All @@ -108,8 +113,35 @@ public String getKey(String keyname) {
}
}

public Long getTimestamp() {
return this.timestamp;
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || o.getClass() != this.getClass()) {
return false;
}

GameActionInfo gameActionInfo = (GameActionInfo) o;

if (!this.getUser().equals(gameActionInfo.getUser())) {
return false;
}

if (!this.getTeam().equals(gameActionInfo.getTeam())) {
return false;
}

if (!this.getScore().equals(gameActionInfo.getScore())) {
return false;
}

return this.getTimestamp().equals(gameActionInfo.getTimestamp());
}

@Override
public int hashCode() {
return Objects.hash(user, team, score, timestamp);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.examples.complete.game;

import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
Expand All @@ -28,13 +29,11 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -69,6 +68,19 @@ public class UserScoreTest implements Serializable {
static final List<String> GAME_EVENTS = Arrays.asList(GAME_EVENTS_ARRAY);
static final List<String> GAME_EVENTS2 = Arrays.asList(GAME_EVENTS_ARRAY2);

static final List<GameActionInfo> GAME_ACTION_INFO_LIST =
Lists.newArrayList(
new GameActionInfo("user0_MagentaKangaroo", "MagentaKangaroo", 3, 1447955630000L),
new GameActionInfo("user13_ApricotQuokka", "ApricotQuokka", 15, 1447955630000L),
new GameActionInfo("user6_AmberNumbat", "AmberNumbat", 11, 1447955630000L),
new GameActionInfo("user7_AlmondWallaby", "AlmondWallaby", 15, 1447955630000L),
new GameActionInfo(
"user7_AndroidGreenKookaburra", "AndroidGreenKookaburra", 12, 1447955630000L),
new GameActionInfo(
"user7_AndroidGreenKookaburra", "AndroidGreenKookaburra", 11, 1447955630000L),
new GameActionInfo("user19_BisqueBilby", "BisqueBilby", 6, 1447955630000L),
new GameActionInfo("user19_BisqueBilby", "BisqueBilby", 8, 1447955630000L));

static final List<KV<String, Integer>> USER_SUMS =
Arrays.asList(
KV.of("user0_MagentaKangaroo", 3),
Expand All @@ -92,21 +104,20 @@ public class UserScoreTest implements Serializable {
/** Test the {@link ParseEventFn} {@link org.apache.beam.sdk.transforms.DoFn}. */
@Test
public void testParseEventFn() throws Exception {
DoFnTester<String, GameActionInfo> parseEventFn = DoFnTester.of(new ParseEventFn());
PCollection<String> input = p.apply(Create.of(GAME_EVENTS));
PCollection<GameActionInfo> output = input.apply(ParDo.of(new ParseEventFn()));

PAssert.that(output).containsInAnyOrder(GAME_ACTION_INFO_LIST);

List<GameActionInfo> results = parseEventFn.processBundle(GAME_EVENTS_ARRAY);
Assert.assertEquals(8, results.size());
Assert.assertEquals("user0_MagentaKangaroo", results.get(0).getUser());
Assert.assertEquals("MagentaKangaroo", results.get(0).getTeam());
Assert.assertEquals(Integer.valueOf(3), results.get(0).getScore());
p.run().waitUntilFinish();
}

/** Tests ExtractAndSumScore("user"). */
@Test
@Category(ValidatesRunner.class)
public void testUserScoreSums() throws Exception {

PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
PCollection<String> input = p.apply(Create.of(GAME_EVENTS));

PCollection<KV<String, Integer>> output =
input
Expand All @@ -125,7 +136,7 @@ public void testUserScoreSums() throws Exception {
@Category(ValidatesRunner.class)
public void testTeamScoreSums() throws Exception {

PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
PCollection<String> input = p.apply(Create.of(GAME_EVENTS));

PCollection<KV<String, Integer>> output =
input
Expand Down