Skip to content

Commit 2f11089

Browse files
cjmctaguekeith-turner
authored andcommitted
fixes #867 - Stream support for Scanners (#884)
1 parent 3302fd6 commit 2f11089

File tree

4 files changed

+270
-0
lines changed

4 files changed

+270
-0
lines changed

modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java

+10
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,21 @@
1515

1616
package org.apache.fluo.api.client.scanner;
1717

18+
import java.util.stream.Stream;
19+
import java.util.stream.StreamSupport;
20+
1821
import org.apache.fluo.api.data.RowColumnValue;
1922

2023
/**
2124
* @since 1.0.0
2225
*/
2326
public interface CellScanner extends Iterable<RowColumnValue> {
2427

28+
/**
29+
* @since 1.2.0
30+
*/
31+
default Stream<RowColumnValue> stream() {
32+
return StreamSupport.stream(spliterator(), false);
33+
}
34+
2535
}

modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java

+12
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
package org.apache.fluo.api.client.scanner;
1717

18+
import java.util.stream.Stream;
19+
import java.util.stream.StreamSupport;
20+
1821
import org.apache.fluo.api.data.Bytes;
1922
import org.apache.fluo.api.data.ColumnValue;
2023

@@ -24,12 +27,21 @@
2427
public interface ColumnScanner extends Iterable<ColumnValue> {
2528

2629
/**
30+
* @since 1.0.0
2731
* @return the row for all column values
2832
*/
2933
Bytes getRow();
3034

3135
/**
36+
* @since 1.0.0
3237
* @return the row for all column values decoded as UTF-8
3338
*/
3439
String getsRow();
40+
41+
/**
42+
* @since 1.2.0
43+
*/
44+
default Stream<ColumnValue> stream() {
45+
return StreamSupport.stream(spliterator(), false);
46+
}
3547
}

modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java

+10
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,19 @@
1515

1616
package org.apache.fluo.api.client.scanner;
1717

18+
import java.util.stream.Stream;
19+
import java.util.stream.StreamSupport;
20+
1821
/**
1922
* @since 1.0.0
2023
*/
2124
public interface RowScanner extends Iterable<ColumnScanner> {
2225

26+
/**
27+
* @since 1.2.0
28+
*/
29+
default Stream<ColumnScanner> stream() {
30+
return StreamSupport.stream(spliterator(), false);
31+
}
32+
2333
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3+
* agreements. See the NOTICE file distributed with this work for additional information regarding
4+
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5+
* "License"); you may not use this file except in compliance with the License. You may obtain a
6+
* copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed under the License
11+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+
* or implied. See the License for the specific language governing permissions and limitations under
13+
* the License.
14+
*/
15+
package org.apache.fluo.api.client.scanner;
16+
17+
import java.util.ArrayList;
18+
import java.util.Collection;
19+
import java.util.HashSet;
20+
import java.util.Iterator;
21+
import java.util.List;
22+
import java.util.Objects;
23+
import java.util.Set;
24+
import java.util.stream.Collectors;
25+
26+
import org.apache.fluo.api.data.Bytes;
27+
import org.apache.fluo.api.data.Column;
28+
import org.apache.fluo.api.data.ColumnValue;
29+
import org.apache.fluo.api.data.RowColumnValue;
30+
import org.junit.Assert;
31+
import org.junit.Test;
32+
33+
/**
34+
* @since 1.2.0
35+
*/
36+
public class ScannerStreamTest {
37+
38+
@Test
39+
public void testCellScannerStream() {
40+
Set<RowColumnValue> rowCols = new HashSet<>();
41+
Set<RowColumnValue> empty = new HashSet<>();
42+
43+
rowCols.add(new RowColumnValue("r1", new Column("f1", "q1"), "v1"));
44+
rowCols.add(new RowColumnValue("r1", new Column("f2", "q3"), "v2"));
45+
rowCols.add(new RowColumnValue("r2", new Column("f1", "q1"), "v3"));
46+
rowCols.add(new RowColumnValue("r2", new Column("f1", "q2"), "v4"));
47+
rowCols.add(new RowColumnValue("r4", new Column("f2", "q5"), "v5"));
48+
49+
CellScannerImpl cellScanner = new CellScannerImpl(rowCols);
50+
51+
Set<RowColumnValue> expected =
52+
rowCols.stream().filter(rcv -> rcv.getColumn().getsFamily().equals("f2"))
53+
.collect(Collectors.toSet());
54+
Set<RowColumnValue> actualSubSet =
55+
cellScanner.stream().filter(rcv -> rcv.getColumn().getsFamily().equals("f2"))
56+
.collect(Collectors.toSet());
57+
58+
Assert.assertNotEquals(empty, actualSubSet);
59+
Assert.assertNotEquals(empty, cellScanner.stream().collect(Collectors.toSet()));
60+
Assert.assertEquals(rowCols, cellScanner.stream().collect(Collectors.toSet()));
61+
Assert.assertEquals(expected, actualSubSet);
62+
}
63+
64+
@Test
65+
public void testColumnScannerStream() {
66+
Set<ColumnValue> colsVal = new HashSet<>();
67+
Set<ColumnValue> empty = new HashSet<>();
68+
69+
Bytes row = Bytes.of("123");
70+
colsVal.add(new ColumnValue(new Column("f1", "q1"), Bytes.of("v1")));
71+
colsVal.add(new ColumnValue(new Column("f2", "q3"), Bytes.of("v2")));
72+
colsVal.add(new ColumnValue(new Column("f1", "q1"), Bytes.of("v3")));
73+
colsVal.add(new ColumnValue(new Column("f1", "q2"), Bytes.of("v4")));
74+
colsVal.add(new ColumnValue(new Column("f2", "q5"), Bytes.of("v5")));
75+
76+
ColumnScanner colScanner = new ColumnScannerImpl(row, colsVal);
77+
78+
Set<ColumnValue> expected =
79+
colsVal.stream().filter(cv -> cv.getColumn().getsFamily().equals("f2"))
80+
.collect(Collectors.toSet());
81+
Set<ColumnValue> colSubSet =
82+
colScanner.stream().filter(cv -> cv.getColumn().getsFamily().equals("f2"))
83+
.collect(Collectors.toSet());
84+
85+
Assert.assertNotEquals(empty, colSubSet);
86+
Assert.assertNotEquals(empty, colScanner.stream().collect(Collectors.toSet()));
87+
Assert.assertEquals(colsVal, colScanner.stream().collect(Collectors.toSet()));
88+
Assert.assertEquals(expected, colSubSet);
89+
}
90+
91+
@Test
92+
public void testRowScannerStream() {
93+
List<ColumnScanner> rows = new ArrayList<>();
94+
Set<ColumnValue> cv1 = new HashSet<>();
95+
Set<ColumnValue> cv2 = new HashSet<>();
96+
97+
Bytes row1 = Bytes.of("555555555");
98+
cv1.add(new ColumnValue(new Column("firstname"), Bytes.of("Chris")));
99+
cv1.add(new ColumnValue(new Column("lastname"), Bytes.of("McTague")));
100+
cv1.add(new ColumnValue(new Column("age"), Bytes.of("21")));
101+
102+
rows.add(new ColumnScannerImpl(row1, cv1));
103+
104+
Bytes row2 = Bytes.of("55234234");
105+
cv2.add(new ColumnValue(new Column("firstname"), Bytes.of("Hulk")));
106+
cv2.add(new ColumnValue(new Column("lastname"), Bytes.of("Hogan")));
107+
cv2.add(new ColumnValue(new Column("age"), Bytes.of("60")));
108+
109+
rows.add(new ColumnScannerImpl(row2, cv2));
110+
111+
RowScannerImpl rsi = new RowScannerImpl(rows);
112+
113+
Set<Person> people = rsi.stream().map(cs -> toPerson(cs)).collect(Collectors.toSet());
114+
115+
Set<Person> expected = new HashSet<>();
116+
expected.add(new Person("Chris", "McTague", 21, 555555555));
117+
expected.add(new Person("Hulk", "Hogan", 60, 55234234));
118+
119+
Assert.assertEquals(expected, people);
120+
}
121+
122+
private static Person toPerson(ColumnScanner cs) {
123+
Person p = new Person();
124+
p.id = Integer.parseInt(cs.getsRow());
125+
126+
for (ColumnValue cv : cs) {
127+
switch (cv.getColumn().getsFamily()) {
128+
case "firstname": {
129+
p.firstname = cv.getsValue();
130+
break;
131+
}
132+
case "lastname": {
133+
p.lastname = cv.getsValue();
134+
break;
135+
}
136+
case "age": {
137+
p.age = Integer.parseInt(cv.getsValue());
138+
break;
139+
}
140+
default: {
141+
throw new IllegalArgumentException("unknown column " + cv.getColumn());
142+
}
143+
}
144+
}
145+
return p;
146+
}
147+
148+
private static class CellScannerImpl implements CellScanner {
149+
private Collection<RowColumnValue> rcv;
150+
151+
CellScannerImpl(Collection<RowColumnValue> rcv) {
152+
this.rcv = rcv;
153+
}
154+
155+
@Override
156+
public Iterator<RowColumnValue> iterator() {
157+
return rcv.iterator();
158+
}
159+
}
160+
161+
private static class RowScannerImpl implements RowScanner {
162+
private Collection<ColumnScanner> scan;
163+
164+
RowScannerImpl(Collection<ColumnScanner> scan) {
165+
this.scan = scan;
166+
}
167+
168+
@Override
169+
public Iterator<ColumnScanner> iterator() {
170+
return scan.iterator();
171+
}
172+
}
173+
174+
private static class ColumnScannerImpl implements ColumnScanner {
175+
private Collection<ColumnValue> scan;
176+
private Bytes row;
177+
178+
ColumnScannerImpl(Bytes row, Collection<ColumnValue> scan) {
179+
this.row = row;
180+
this.scan = scan;
181+
}
182+
183+
@Override
184+
public Iterator<ColumnValue> iterator() {
185+
return scan.iterator();
186+
}
187+
188+
@Override
189+
public Bytes getRow() {
190+
return row;
191+
}
192+
193+
@Override
194+
public String getsRow() {
195+
return row.toString();
196+
}
197+
}
198+
199+
private static class Person {
200+
private String firstname;
201+
private String lastname;
202+
private int age;
203+
private int id;
204+
205+
Person() {
206+
this.firstname = "";
207+
this.lastname = "";
208+
this.age = 0;
209+
this.id = 0;
210+
}
211+
212+
Person(String f, String l, int a, int id) {
213+
this.firstname = f;
214+
this.lastname = l;
215+
this.age = a;
216+
this.id = id;
217+
}
218+
219+
@Override
220+
public int hashCode() {
221+
return Objects.hash(firstname, lastname, age, id);
222+
}
223+
224+
@Override
225+
public boolean equals(Object o) {
226+
if (o == this) {
227+
return true;
228+
}
229+
230+
if (o instanceof Person) {
231+
Person p = (Person) o;
232+
return (this.firstname.equals(p.firstname)) && (this.lastname.equals(p.lastname))
233+
&& (this.age == p.age) && (this.id == p.id);
234+
}
235+
return false;
236+
}
237+
}
238+
}

0 commit comments

Comments
 (0)