Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-2802] Support multi-byte custom separator in TextIO #3779

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 67 additions & 12 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
Expand Down Expand Up @@ -63,7 +64,8 @@
* PCollection}, apply {@link TextIO#readAll()}.
*
* <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to
* one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n').
* one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n',
* or specified separator see {@link TextIO.Read#withSeparator}).
*
* <h3>Filepattern expansion and watching</h3>
*
Expand Down Expand Up @@ -255,7 +257,8 @@ public static <UserT> TypedWrite<UserT> writeCustomType() {
/** Implementation of {@link #read}. */
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
@Nullable abstract ValueProvider<String> getFilepattern();
@Nullable
abstract ValueProvider<String> getFilepattern();
abstract Compression getCompression();

@Nullable
Expand All @@ -266,6 +269,8 @@ public abstract static class Read extends PTransform<PBegin, PCollection<String>

abstract boolean getHintMatchesManyFiles();
abstract EmptyMatchTreatment getEmptyMatchTreatment();
@Nullable
abstract byte[] getSeparator();

abstract Builder toBuilder();

Expand All @@ -278,6 +283,7 @@ abstract Builder setWatchForNewFilesTerminationCondition(
TerminationCondition<?, ?> condition);
abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
abstract Builder setSeparator(byte[] separator);

abstract Read build();
}
Expand Down Expand Up @@ -360,6 +366,37 @@ public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
return toBuilder().setEmptyMatchTreatment(treatment).build();
}

/**
* Set the custom separator to be used in place of the default ones ('\r', '\n' or '\r\n').
*/
public Read withSeparator(byte[] separator) {
checkNotNull(separator,
"need to provide a non null separator to TextIO.Read.withSeparator()");
checkArgument(!separatorSelfOverlaps(separator),
"separator that can self overlap are not supported");
return toBuilder().setSeparator(separator).build();
}

static boolean separatorSelfOverlaps(byte[] separator) {
int i = 1;
// s self-overlaps if v exists such as s = vu = wv with u and w non empty
// search for the first byte of v in wv
while (i < separator.length && separator[i] != separator[0]) {
i++;
}
if (i < separator.length) {
// found, compare next possible v bytes
int j = i + 1;
while (j < separator.length && (separator[j] == separator[j - i])) {
j++;
}
if (j >= separator.length) {
return true;
}
}
return false;
}

@Override
public PCollection<String> expand(PBegin input) {
checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
Expand All @@ -370,7 +407,8 @@ public PCollection<String> expand(PBegin input) {
ReadAll readAll =
readAll()
.withCompression(getCompression())
.withEmptyMatchTreatment(getEmptyMatchTreatment());
.withEmptyMatchTreatment(getEmptyMatchTreatment())
.withSeparator(getSeparator());
if (getWatchForNewFilesInterval() != null) {
TerminationCondition<String, ?> readAllCondition =
ignoreInput(getWatchForNewFilesTerminationCondition());
Expand All @@ -383,7 +421,8 @@ public PCollection<String> expand(PBegin input) {

// Helper to create a source specific to the requested compression type.
protected FileBasedSource<String> getSource() {
return CompressedSource.from(new TextSource(getFilepattern(), getEmptyMatchTreatment()))
return CompressedSource
.from(new TextSource(getFilepattern(), getEmptyMatchTreatment(), getSeparator()))
.withCompression(getCompression());
}

Expand All @@ -401,7 +440,11 @@ public void populateDisplayData(DisplayData.Builder builder) {
.withLabel("Treatment of filepatterns that match no files"))
.addIfNotNull(
DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
.withLabel("Interval to watch for new files"));
.withLabel("Interval to watch for new files"))
.addIfNotNull(
DisplayData.item("separator", Arrays.toString(getSeparator()))
.withLabel("Custom separator to split records"));

}
}

Expand All @@ -421,6 +464,8 @@ public abstract static class ReadAll

abstract EmptyMatchTreatment getEmptyMatchTreatment();
abstract long getDesiredBundleSizeBytes();
@Nullable
abstract byte[] getSeparator();

abstract Builder toBuilder();

Expand All @@ -432,7 +477,7 @@ abstract Builder setWatchForNewFilesTerminationCondition(
TerminationCondition<String, ?> condition);
abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);

abstract Builder setSeparator(byte[] separator);
abstract ReadAll build();
}

Expand Down Expand Up @@ -471,6 +516,10 @@ ReadAll withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
}

ReadAll withSeparator(byte[] separator) {
return toBuilder().setSeparator(separator).build();
}

@Override
public PCollection<String> expand(PCollection<String> input) {
Match.Filepatterns matchFilepatterns =
Expand All @@ -487,34 +536,40 @@ public PCollection<String> expand(PCollection<String> input) {
new ReadAllViaFileBasedSource<>(
new IsSplittableFn(getCompression()),
getDesiredBundleSizeBytes(),
new CreateTextSourceFn(getCompression(), getEmptyMatchTreatment())))
.setCoder(StringUtf8Coder.of());
new CreateTextSourceFn(getCompression(), getEmptyMatchTreatment(),
getSeparator()))).setCoder(StringUtf8Coder.of());
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder.add(
builder
.add(
DisplayData.item("compressionType", getCompression().toString())
.withLabel("Compression Type"));
.withLabel("Compression Type"))
.addIfNotNull(
DisplayData.item("separator", Arrays.toString(getSeparator()))
.withLabel("Custom separator to split records"));
}

private static class CreateTextSourceFn
implements SerializableFunction<String, FileBasedSource<String>> {
private final Compression compression;
private final EmptyMatchTreatment emptyMatchTreatment;
private byte[] separator;

private CreateTextSourceFn(
Compression compression, EmptyMatchTreatment emptyMatchTreatment) {
Compression compression, EmptyMatchTreatment emptyMatchTreatment, byte[] separator) {
this.compression = compression;
this.emptyMatchTreatment = emptyMatchTreatment;
this.separator = separator;
}

@Override
public FileBasedSource<String> apply(String input) {
return CompressedSource.from(
new TextSource(StaticValueProvider.of(input), emptyMatchTreatment))
new TextSource(StaticValueProvider.of(input), emptyMatchTreatment, separator))
.withCompression(compression);
}
}
Expand Down
95 changes: 66 additions & 29 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,38 +39,44 @@
* <p>A {@link FileBasedSource} which can decode records delimited by newline characters.
*
* <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or
* {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record
* {@code \r\n} as the separator. This source is not strict and supports decoding the last record
* even if it is not delimited. Finally, no records are decoded if the stream is empty.
*
* <p>This source supports reading from any arbitrary byte position within the stream. If the
* starting position is not {@code 0}, then bytes are skipped until the first delimiter is found
* starting position is not {@code 0}, then bytes are skipped until the first separator is found
* representing the beginning of the first record to be decoded.
*/
@VisibleForTesting
class TextSource extends FileBasedSource<String> {
TextSource(ValueProvider<String> fileSpec) {
this(fileSpec, EmptyMatchTreatment.DISALLOW);
byte[] separator;

TextSource(ValueProvider<String> fileSpec, byte[] separator) {
this(fileSpec, EmptyMatchTreatment.DISALLOW, separator);
}

TextSource(ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment) {
TextSource(ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment,
byte[] separator) {
super(fileSpec, emptyMatchTreatment, 1L);
this.separator = separator;
}

private TextSource(MatchResult.Metadata metadata, long start, long end) {
private TextSource(MatchResult.Metadata metadata, long start, long end, byte[] separator) {
super(metadata, 1L, start, end);
this.separator = separator;
}

@Override
protected FileBasedSource<String> createForSubrangeOfFile(
MatchResult.Metadata metadata,
long start,
long end) {
return new TextSource(metadata, start, end);
return new TextSource(metadata, start, end, separator);

}

@Override
protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {
return new TextBasedReader(this);
return new TextBasedReader(this, separator);
}

@Override
Expand All @@ -80,7 +86,7 @@ public Coder<String> getOutputCoder() {

/**
* A {@link FileBasedReader FileBasedReader}
* which can decode records delimited by newline characters.
* which can decode records delimited by separator characters.
*
* <p>See {@link TextSource} for further details.
*/
Expand All @@ -97,10 +103,12 @@ static class TextBasedReader extends FileBasedReader<String> {
private volatile boolean elementIsPresent;
private String currentValue;
private ReadableByteChannel inChannel;
private byte[] separator;

private TextBasedReader(TextSource source) {
private TextBasedReader(TextSource source, byte[] separator) {
super(source);
buffer = ByteString.EMPTY;
this.separator = separator;
}

@Override
Expand Down Expand Up @@ -132,11 +140,17 @@ protected void startReading(ReadableByteChannel channel) throws IOException {
this.inChannel = channel;
// If the first offset is greater than zero, we need to skip bytes until we see our
// first separator.
if (getCurrentSource().getStartOffset() > 0) {
long startOffset = getCurrentSource().getStartOffset();
if (startOffset > 0) {
checkState(channel instanceof SeekableByteChannel,
"%s only supports reading from a SeekableByteChannel when given a start offset"
+ " greater than 0.", TextSource.class.getSimpleName());
long requiredPosition = getCurrentSource().getStartOffset() - 1;
long requiredPosition = startOffset - 1;
if (separator != null && startOffset >= separator.length) {
// we need to move back the offset of at worse separator.size to be sure to see
// all the bytes of the separator in the call to findSeparatorBounds() below
requiredPosition = startOffset - separator.length;
}
((SeekableByteChannel) channel).position(requiredPosition);
findSeparatorBounds();
buffer = buffer.substring(endOfSeparatorInBuffer);
Expand All @@ -147,13 +161,13 @@ protected void startReading(ReadableByteChannel channel) throws IOException {
}

/**
* Locates the start position and end position of the next delimiter. Will
* consume the channel till either EOF or the delimiter bounds are found.
* Locates the start position and end position of the next separator. Will
* consume the channel till either EOF or the separator bounds are found.
*
* <p>This fills the buffer and updates the positions as follows:
* <pre>{@code
* ------------------------------------------------------
* | element bytes | delimiter bytes | unconsumed bytes |
* | element bytes | separator bytes | unconsumed bytes |
* ------------------------------------------------------
* 0 start of end of buffer
* separator separator size
Expand All @@ -170,23 +184,46 @@ private void findSeparatorBounds() throws IOException {

byte currentByte = buffer.byteAt(bytePositionInBuffer);

if (currentByte == '\n') {
startOfSeparatorInBuffer = bytePositionInBuffer;
endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
break;
} else if (currentByte == '\r') {
startOfSeparatorInBuffer = bytePositionInBuffer;
endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;

if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
currentByte = buffer.byteAt(bytePositionInBuffer + 1);
if (currentByte == '\n') {
endOfSeparatorInBuffer += 1;
if (separator == null) {
//default separator
if (currentByte == '\n') {
startOfSeparatorInBuffer = bytePositionInBuffer;
endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
break;
} else if (currentByte == '\r') {
startOfSeparatorInBuffer = bytePositionInBuffer;
endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;

if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
currentByte = buffer.byteAt(bytePositionInBuffer + 1);
if (currentByte == '\n') {
endOfSeparatorInBuffer += 1;
}
}
break;
}
} else {
// user defined separator
int i = 0;
// initialize separator not found
startOfSeparatorInBuffer = endOfSeparatorInBuffer = bytePositionInBuffer;
while ((i <= separator.length - 1) && (currentByte == separator[i])) {
//read next byte
i++;
if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + i + 1)) {
currentByte = buffer.byteAt(bytePositionInBuffer + i);
} else {
//corner case: separator truncated at the end of the file
startOfSeparatorInBuffer = endOfSeparatorInBuffer = bytePositionInBuffer;
break;
}
}
if (i == separator.length) {
// all bytes of separator found
endOfSeparatorInBuffer = bytePositionInBuffer + i;
break;
}
break;
}

// Move to the next byte in buffer.
bytePositionInBuffer += 1;
}
Expand Down
Loading