Skip to content

Commit

Permalink
Forbid using separators that can self-overlap
Browse files Browse the repository at this point in the history
  • Loading branch information
echauchot committed Sep 1, 2017
1 parent 43e9faa commit a01a231
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
24 changes: 24 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,33 @@ public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
* Set the custom separator to be used in place of the default ones ('\r', '\n' or '\r\n').
*/
public Read withSeparator(byte[] separator) {
checkNotNull(separator,
"need to provide a non null separator to TextIO.Read.withSeparator()");
checkArgument(!separatorSelfOverlaps(separator),
"separator that can self overlap are not supported");
return toBuilder().setSeparator(separator).build();
}

static boolean separatorSelfOverlaps(byte[] separator) {
int i = 1;
// s self-overlaps if v exists such as s = vu = wv with u and w non empty
// search for the first byte of v in wv
while (i < separator.length && separator[i] != separator[0]) {
i++;
}
if (i < separator.length) {
// found, compare next possible v bytes
int j = i + 1;
while (j < separator.length && (separator[j] == separator[j - i])) {
j++;
}
if (j >= separator.length) {
return true;
}
}
return false;
}

@Override
public PCollection<String> expand(PBegin input) {
checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,25 @@ private void runTestRead(String[] expected) throws Exception {
p.run();
}

@Test
public void testSeparatorSelfOverlaps(){
assertFalse(TextIO.Read.separatorSelfOverlaps(new byte[]{'a', 'b', 'c'}));
assertFalse(TextIO.Read.separatorSelfOverlaps(new byte[]{'c', 'a', 'b', 'd', 'a', 'b'}));
assertFalse(TextIO.Read.separatorSelfOverlaps(new byte[]{'a', 'b', 'c', 'a', 'b', 'd'}));
assertTrue(TextIO.Read.separatorSelfOverlaps(new byte[]{'a', 'b', 'a'}));
assertTrue(TextIO.Read.separatorSelfOverlaps(new byte[]{'a', 'b', 'c', 'a', 'b'}));
}

@Test
@Category(NeedsRunner.class)
public void testReadStringsWithCustomSeparator() throws Exception {
final String[] inputStrings = new String[] {
// incomplete separator
"To be, or not to be: that |is the question: ",
// incomplete separator
"To be, or not to be: that *is the question: ",
// complete separator
"Whether 'tis nobler in the mind to suffer ||",
"Whether 'tis nobler in the mind to suffer |*",
//truncated separator
"The slings and arrows of outrageous fortune,|" };

Expand All @@ -233,9 +244,10 @@ public void testReadStringsWithCustomSeparator() throws Exception {
}
}
String[] expected = new String[] {
"To be, or not to be: that |is the question: Whether 'tis nobler in the mind to suffer ",
"To be, or not to be: that |is the question: To be, or not to be: that *is the question: "
+ "Whether 'tis nobler in the mind to suffer ",
"The slings and arrows of outrageous fortune,|" };
TextIO.Read read = TextIO.read().from(filename).withSeparator(new byte[] {'|', '|'});
TextIO.Read read = TextIO.read().from(filename).withSeparator(new byte[] {'|', '*'});

PCollection<String> output = p.apply(read);

Expand All @@ -245,8 +257,8 @@ public void testReadStringsWithCustomSeparator() throws Exception {

@Test
public void testSplittingSourceWithCustomSeparator() throws Exception {
TextSource source = prepareSource("asdf||hjkl||xyz".getBytes(StandardCharsets.UTF_8),
new byte[] { '|', '|' });
TextSource source = prepareSource("asdf|*hjkl|*xyz".getBytes(StandardCharsets.UTF_8),
new byte[] { '|', '*' });
SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create());
}

Expand Down

0 comments on commit a01a231

Please sign in to comment.