forked from thaingo/crunch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MaterializeToMapTest.java
60 lines (49 loc) · 1.79 KB
/
MaterializeToMapTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.cloudera.crunch;
import java.io.IOException;
import java.util.Map;
import org.junit.Test;
import static junit.framework.Assert.assertTrue;
import com.cloudera.crunch.impl.mem.MemPipeline;
import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.test.FileHelper;
import com.cloudera.crunch.types.PTypeFamily;
import com.google.common.collect.ImmutableList;
public class MaterializeToMapTest {
static final ImmutableList<Pair<Integer,String>> kvPairs =
ImmutableList.of(
Pair.of(0, "a"),
Pair.of(1, "b"),
Pair.of(2, "c"),
Pair.of(3, "e"));
public void assertMatches(Map<Integer,String> m) {
for (Integer k : m.keySet()) {
System.out.println(k + " " + kvPairs.get(k).second() + " " + m.get(k));
assertTrue(kvPairs.get(k).second().equals(m.get(k)));
}
}
@Test
public void testMemMaterializeToMap() {
assertMatches(MemPipeline.tableOf(kvPairs).materializeToMap());
}
private static class Set1Mapper extends MapFn<String,Pair<Integer,String>> {
@Override
public Pair<Integer, String> map(String input) {
int k = -1;
if (input.equals("a")) k = 0;
else if (input.equals("b")) k = 1;
else if (input.equals("c")) k = 2;
else if (input.equals("e")) k = 3;
return Pair.of(k, input);
}
}
@Test
public void testMRMaterializeToMap() throws IOException {
Pipeline p = new MRPipeline(MaterializeToMapTest.class);
String inputFile = FileHelper.createTempCopyOf("set1.txt");
PCollection<String> c = p.readTextFile(inputFile);
PTypeFamily tf = c.getTypeFamily();
PTable<Integer,String> t = c.parallelDo(new Set1Mapper(), tf.tableOf(tf.ints(), tf.strings()));
Map<Integer, String> m = t.materializeToMap();
assertMatches(m);
}
}