Skip to content

Commit

Permalink
Add filtering to File_By_Line
Browse files Browse the repository at this point in the history
  • Loading branch information
jdunkerley committed Jan 22, 2024
1 parent b33b5b2 commit ab8ec9e
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,25 @@ type File_By_Line

Arguments
- predicate: The predicate to filter by.
filter : (Text -> Boolean) -> File_By_Line
filter : Text | (Text -> Boolean) -> File_By_Line
filter self predicate =
## Create the predicate
new_filter = case predicate of
_:Text -> FileLineReader.createContainsFilter predicate self.charset
_ -> FileLineReader.wrapBooleanFilter predicate self.charset

## Find the index of the first line matching the new index.
end_at = if self.limit_lines.is_nothing then -1 else self.limit_lines-1
first_index = FileLineReader.findFirstNewFilter self.java_file self.row_map end_at self.charset self.filter_func new_filter
new_row_map = LongArrayList.new
new_row_map.add first_index

## Merge the two predicates together.
new_predicate = if self.filter_func.is_nothing then predicate else (text -> self.filter_func text && predicate text)
new_predicate = if self.filter_func.is_nothing then new_filter else
FileLineReader.mergeTwoFilters self.filter_func new_filter

## Need to ensure the parent limit is obeyed - for now we just use it but should be applied on level above
File_By_Line.Reader self.file self.encoding self.limit_lines new_predicate self.row_map
File_By_Line.Reader self.file self.encoding self.limit_lines new_predicate new_row_map

## ADVANCED
Exports the row_map
Expand All @@ -176,7 +188,7 @@ type File_By_Line
Reads a specific line from the file.
read_line : File_By_Line->Integer->Any->Any
read_line file:File_By_Line line:Integer=0 ~default=Nothing = File_Error.handle_java_exceptions file.file <|
FileLineReader.scanAndReadLine file.java_file file.row_map line file.charset file.filter_func . if_nothing default
FileLineReader.readSingleLine file.java_file file.row_map line file.charset file.filter_func . if_nothing default

## PRIVATE
Performs an action on each line in the file.
Expand All @@ -187,7 +199,7 @@ for_each_lines file:File_By_Line start_at:Integer end_at:(Integer|Nothing) actio
charset = file.charset

## First if we haven't read the found the start_at line we need to find that.
if start_at >= row_map.getSize then FileLineReader.scanAndReadLine java_file row_map start_at charset file.filter_func
if start_at >= row_map.getSize then FileLineReader.readSingleLine java_file row_map start_at charset file.filter_func

## Now we can read the lines we need.
if row_map.getOrLast start_at >= java_file.length then Error.throw (Index_Out_Of_Bounds.Error start_at row_map.getSize) else
Expand Down
144 changes: 133 additions & 11 deletions std-bits/base/src/main/java/org/enso/base/FileLineReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,60 @@
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.enso.base.arrays.LongArrayList;
import org.enso.polyglot.common_utils.Core_Text_Utils;
import org.graalvm.polyglot.Context;
import com.ibm.icu.text.Normalizer2;

public class FileLineReader {
public static class ByteArrayOutputStreamWithContains extends ByteArrayOutputStream {
public ByteArrayOutputStreamWithContains(int size) {
super(size);
}

// ** Creates a preloaded stream from a byte array. */
public static ByteArrayOutputStreamWithContains fromByteArray(byte[] bytes) {
var stream = new ByteArrayOutputStreamWithContains(0);
stream.buf = bytes;
stream.count = bytes.length;
return stream;
}

public boolean contains(byte[] bytes) {
if (bytes.length > count) {
return false;
}
for (int i = 0; i < count - bytes.length; i++) {
boolean found = true;
for (int j = 0; j < bytes.length; j++) {
if (buf[i + j] != bytes[j]) {
found = false;
break;
}
}
if (found) {
return true;
}
}
return false;
}
}

private static class CancellationToken {
public boolean isCancelled = false;

public void cancel() {
isCancelled = true;
}
}

private static final Logger LOGGER = Logger.getLogger("enso-file-line-reader");

// ** Amount of data to read at a time for a single line (4KB). */
Expand Down Expand Up @@ -104,8 +148,12 @@ private static String readLineByIndex(File file, LongArrayList rowMap, int index
}

// ** Scans forward in a file and returns the line at the given index. */
public static String scanAndReadLine(
File file, LongArrayList rowMap, int index, Charset charset, Function<String, Boolean> filter)
public static String readSingleLine(
File file,
LongArrayList rowMap,
int index,
Charset charset,
Function<ByteArrayOutputStreamWithContains, String> filter)
throws IOException {
int size = rowMap.getSize();
if (index != -1 && size > index) {
Expand All @@ -122,7 +170,7 @@ public static List<String> readLines(
int startAt,
int endAt,
Charset charset,
Function<String, Boolean> filter)
Function<ByteArrayOutputStreamWithContains, String> filter)
throws IOException {
List<String> result = new ArrayList<>();
forEachLine(file, rowMap, startAt, endAt, charset, filter, (index, line) -> result.add(line));
Expand All @@ -145,11 +193,21 @@ public static String forEachLine(
int startAt,
int endAt,
Charset charset,
Function<String, Boolean> filter,
BiFunction<Integer, String, Boolean> action)
Function<ByteArrayOutputStreamWithContains, String> filter,
BiConsumer<Integer, String> action)
throws IOException {
LOGGER.log(Level.INFO, "forEachLine: {0} {1}", new Object[] {startAt, endAt});
return innerForEachLine(file, rowMap, startAt, endAt, charset, filter, action, new CancellationToken());
}

private static String innerForEachLine(
File file,
LongArrayList rowMap,
int startAt,
int endAt,
Charset charset,
Function<ByteArrayOutputStreamWithContains, String> filter,
BiConsumer<Integer, String> action,
CancellationToken cancellationToken) throws IOException {
if (startAt >= rowMap.getSize()) {
throw new IndexOutOfBoundsException(startAt);
}
Expand All @@ -162,7 +220,7 @@ public static String forEachLine(
}

boolean readAll = filter != null || action != null || endAt == -1;
var outputStream = new ByteArrayOutputStream(128);
var outputStream = new ByteArrayOutputStreamWithContains(128);
String output = null;

try (var stream = new FileInputStream(file)) {
Expand All @@ -173,7 +231,7 @@ public static String forEachLine(
var buffer = channel.map(FileChannel.MapMode.READ_ONLY, position, bufferSize);

// Loop until we either reach the required record or run out of data.
while ((endAt == -1 || index <= endAt) && (truncated || buffer.hasRemaining())) {
while (!cancellationToken.isCancelled && (endAt == -1 || index <= endAt) && (truncated || buffer.hasRemaining())) {
var linePosition = buffer.position() + position;

// Read a line.
Expand All @@ -183,14 +241,14 @@ public static String forEachLine(

if (success || !truncated) {
String line = null;
if (filter == null || filter.apply(line = outputStream.toString(charset))) {
if (filter == null || (line = filter.apply(outputStream)) != null) {
if (index >= rowMap.getSize()) {
rowMap.add(linePosition);
}

if (action != null) {
line = line == null ? outputStream.toString(charset) : line;
action.apply(index, line);
action.accept(index, line);
}

if (index == endAt) {
Expand Down Expand Up @@ -232,4 +290,68 @@ public static String forEachLine(
return output;
}
}

//** Scans forward in a file reading line by line until it finds a line that matches the new filter. */
public static long findFirstNewFilter(
File file,
LongArrayList rowMap,
int endAt,
Charset charset,
Function<ByteArrayOutputStreamWithContains, String> filter,
Function<ByteArrayOutputStreamWithContains, String> newFilter) throws IOException {
final CancellationToken token = new CancellationToken();
final List<Long> result = new ArrayList<>();
BiConsumer<Integer, String> action = (index, line) -> {
var bytes = line.getBytes(charset);
var outputStream = ByteArrayOutputStreamWithContains.fromByteArray(bytes);
if (newFilter.apply(outputStream) != null) {
result.add(rowMap.get(index));
token.cancel();
}
};
innerForEachLine(file, rowMap, 0, endAt, charset, filter, action, token);
return result.isEmpty() ? rowMap.get(rowMap.getSize() - 1) : result.get(0);
}

// ** Creates a filter that checks if the line contains the given string. */
public static Function<ByteArrayOutputStreamWithContains, String> createContainsFilter(
String contains, Charset charset) {
if (isUnicodeCharset(charset)) {
var nfcVersion = Normalizer2.getNFCInstance().normalize(contains);
var nfdVersion = Normalizer2.getNFDInstance().normalize(contains);
if (!nfcVersion.equals(nfdVersion)) {
// Need to use Unicode normalization for equality.
return (outputStream) -> {
var line = outputStream.toString(charset);
return Core_Text_Utils.compare_normalized(contains, line)==0 ? line : null;
};
}
}

var bytes = contains.getBytes(charset);
return (outputStream) -> outputStream.contains(bytes) ? outputStream.toString(charset) : null;
}

// ** Wraps an Enso function filter in a FileLineReader filter. */
public static Function<ByteArrayOutputStreamWithContains, String> wrapBooleanFilter(
Function<String, Boolean> filter, Charset charset) {
return (outputStream) -> {
var line = outputStream.toString(charset);
return filter.apply(line) ? line : null;
};
}

// Joins two filters together. */
public static Function<ByteArrayOutputStreamWithContains, String> mergeTwoFilters(
Function<ByteArrayOutputStreamWithContains, String> first,
Function<ByteArrayOutputStreamWithContains, String> second) {
return (outputStream) -> {
var first_result = first.apply(outputStream);
return first_result != null ? second.apply(outputStream) : null;
};
}

private static boolean isUnicodeCharset(Charset charset) {
return charset == StandardCharsets.UTF_8 || charset == StandardCharsets.UTF_16 || charset == StandardCharsets.UTF_16BE || charset == StandardCharsets.UTF_16LE;
}
}

0 comments on commit ab8ec9e

Please sign in to comment.