Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reading and parsing optimizations: CalculateAverage_spullara #21

Merged
merged 22 commits into from Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 21 additions & 0 deletions calculate_average_spullara.sh
@@ -0,0 +1,21 @@
#!/bin/sh
#
# Copyright 2023 The original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


JAVA_OPTS=""
time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_spullara

50 changes: 50 additions & 0 deletions src/main/java/dev/morling/onebrc/CalculateAverage_naive.java
@@ -0,0 +1,50 @@
/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.onebrc;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;

public class CalculateAverage_naive {

record Result(double min, double max, double sum, long count) {
}

public static void main(String[] args) throws FileNotFoundException {
long start = System.currentTimeMillis();
var results = new BufferedReader(new FileReader("./measurements.txt"))
.lines()
.map(l -> l.split(";"))
.collect(Collectors.toMap(
parts -> parts[0],
parts -> {
double temperature = Double.parseDouble(parts[1]);
return new Result(temperature, temperature, temperature, 1);
},
(oldResult, newResult) -> {
double min = Math.min(oldResult.min, newResult.min);
double max = Math.max(oldResult.max, newResult.max);
double sum = oldResult.sum + newResult.sum;
long count = oldResult.count + newResult.count;
return new Result(min, max, sum, count);
}, ConcurrentSkipListMap::new));
System.out.println(System.currentTimeMillis() - start);
System.out.println(results);
}
}
Expand Up @@ -52,7 +52,7 @@ private double round(double value) {

public static void main(String[] args) throws IOException {

// long before = System.currentTimeMillis();
// long before = System.currentTimeMillis();

Map<String, Measurement> resultMap = Files.lines(Path.of(FILE)).parallel()
.map(record -> {
Expand All @@ -73,7 +73,7 @@ public static void main(String[] args) throws IOException {
resultMap.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(Object::toString).collect(Collectors.joining(", ")));
System.out.println("}");

// System.out.println("Took: " + (System.currentTimeMillis() - before));
// System.out.println("Took: " + (System.currentTimeMillis() - before));

}
}
232 changes: 232 additions & 0 deletions src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java
@@ -0,0 +1,232 @@
/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.onebrc;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class CalculateAverage_spullara {
private static final String FILE = "./measurements.txt";

/*
* My results on this computer:
*
* CalculateAverage: 2m37.788s
* CalculateAverage_royvanrijn: 0m29.639s
* CalculateAverage_spullara: 0m2.013s
*
*/

public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
var filename = args.length == 0 ? FILE : args[0];
var file = new File(filename);
long start = System.currentTimeMillis();

var totalLines = new AtomicInteger();
var results = getFileSegments(file).stream().map(segment -> {
var resultMap = new ByteArrayToResultMap();
long segmentEnd = segment.end();
try (var fileChannel = (FileChannel) Files.newByteChannel(Path.of(filename), StandardOpenOption.READ)) {
var bb = fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segmentEnd - segment.start());
var buffer = new byte[64];
int lines = 0;
int startLine;
int limit = bb.limit();
while ((startLine = bb.position()) < limit) {
int currentPosition = startLine;
byte b;
int offset = 0;
while (currentPosition != segmentEnd && (b = bb.get(currentPosition++)) != ';') {
buffer[offset++] = b;
}
int temp = 0;
int negative = 1;
outer:
while (currentPosition != segmentEnd && (b = bb.get(currentPosition++)) != '\n') {
switch (b) {
case '-':
negative = -1;
case '.':
break;
case '\r':
currentPosition++;
break outer;
default:
temp = 10 * temp + (b - '0');
}
}
temp *= negative;
double finalTemp = temp / 10.0;
resultMap.putOrMerge(buffer, 0, offset,
() -> new Result(finalTemp),
measurement -> merge(measurement, finalTemp, finalTemp, finalTemp, 1));
lines++;
bb.position(currentPosition);
}
totalLines.addAndGet(lines);
return resultMap;
} catch (IOException e) {
throw new RuntimeException(e);
}
}).parallel().toList();

var resultMap = results.stream()
.flatMap(partition -> partition.getAll().stream())
.collect(Collectors.toMap(e -> new String(e.key()), Entry::value, CalculateAverage_spullara::merge, TreeMap::new));

System.out.println("Time: " + (System.currentTimeMillis() - start) + "ms");
System.out.println("Lines processed: " + totalLines);
System.out.println(resultMap);
}

private static List<FileSegment> getFileSegments(File file) throws IOException {
int numberOfSegments = Runtime.getRuntime().availableProcessors();
long fileSize = file.length();
long segmentSize = fileSize / numberOfSegments;
List<FileSegment> segments = new ArrayList<>();
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
for (int i = 0; i < numberOfSegments; i++) {
long segStart = i * segmentSize;
long segEnd = (i == numberOfSegments - 1) ? fileSize : segStart + segmentSize;
segStart = findSegment(i, 0, randomAccessFile, segStart, segEnd);
segEnd = findSegment(i, numberOfSegments - 1, randomAccessFile, segEnd, fileSize);

segments.add(new FileSegment(segStart, segEnd));
}
}
return segments;
}

private static Result merge(Result v, Result value) {
return merge(v, value.min, value.max, value.sum, value.count);
}

private static Result merge(Result v, double value, double value1, double value2, long value3) {
v.min = Math.min(v.min, value);
v.max = Math.max(v.max, value1);
v.sum += value2;
v.count += value3;
return v;
}

private static long findSegment(int i, int skipSegment, RandomAccessFile raf, long location, long fileSize) throws IOException {
if (i != skipSegment) {
raf.seek(location);
while (location < fileSize) {
location++;
if (raf.read() == '\n')
break;
}
}
return location;
}
}

class Result {
double min, max, sum;
long count;

Result(double value) {
min = max = sum = value;
this.count = 1;
}

@Override
public String toString() {
return round(min) + "/" + round(sum / count) + "/" + round(max);
}

double round(double v) {
return Math.round(v * 10.0) / 10.0;
}

}

record Pair(int slot, Result slotValue) {
}

record Entry(byte[] key, Result value) {
}

record FileSegment(long start, long end) {
}

class ByteArrayToResultMap {
public static final int MAPSIZE = 1024*128;
Result[] slots = new Result[MAPSIZE];
byte[][] keys = new byte[MAPSIZE][];

private int hashCode(byte[] a, int fromIndex, int length) {
int result = 0;
int end = fromIndex + length;
for (int i = fromIndex; i < end; i++) {
result = 31 * result + a[i];
}
return result;
}

private Pair getPair(byte[] key, int offset, int size) {
int hash = hashCode(key, offset, size);
int slot = hash & (slots.length - 1);
var slotValue = slots[slot];
// Linear probe for open slot
while (slotValue != null && (keys[slot].length != size || !Arrays.equals(keys[slot], 0, size, key, offset, size))) {
slot = (slot + 1) & (slots.length - 1);
slotValue = slots[slot];
}
return new Pair(slot, slotValue);
}

public void putOrMerge(byte[] key, int offset, int size, Supplier<Result> supplier, Consumer<Result> merge) {
Pair result = getPair(key, offset, size);
Result value = result.slotValue();
if (value == null) {
int slot = result.slot();
slots[slot] = supplier.get();
byte[] bytes = new byte[size];
System.arraycopy(key, offset, bytes, 0, size);
keys[slot] = bytes;
} else {
merge.accept(value);
}
}

// Get all pairs
public List<Entry> getAll() {
List<Entry> result = new ArrayList<>();
for (int i = 0; i < slots.length; i++) {
Result slotValue = slots[i];
if (slotValue != null) {
result.add(new Entry(keys[i], slotValue));
}
}
return result;
}
}