Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fsyncing bloomfilter #3425

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,7 +1,7 @@
using System;
using System.Linq;
using System.Runtime.InteropServices;
using EventStore.Core.DataStructures.ProbabilisticFilter.MemoryMappedFileBloomFilter;
using EventStore.Core.DataStructures.ProbabilisticFilter;
using EventStore.Core.Index.Hashes;
using NUnit.Framework;

Expand Down
@@ -1,4 +1,4 @@
using EventStore.Core.DataStructures.ProbabilisticFilter.MemoryMappedFileBloomFilter;
using EventStore.Core.DataStructures.ProbabilisticFilter;
using NUnit.Framework;

namespace EventStore.Core.Tests.DataStructures {
Expand Down
@@ -1,11 +1,44 @@
using System;
using System.Collections.Generic;
using System.IO;
using EventStore.Core.DataStructures.ProbabilisticFilter.MemoryMappedFileBloomFilter;
using EventStore.Core.DataStructures.ProbabilisticFilter;
using EventStore.Core.Index.Hashes;
using NUnit.Framework;

namespace EventStore.Core.Tests.DataStructures {
public class memory_mapped_file_stream_bloom_filter : SpecificationWithDirectoryPerTestFixture {
public enum PersistenceStrategy {
MemoryMapped,
FileStream,
}

[TestFixture(PersistenceStrategy.MemoryMapped, PersistenceStrategy.MemoryMapped)]
[TestFixture(PersistenceStrategy.MemoryMapped, PersistenceStrategy.FileStream)]
[TestFixture(PersistenceStrategy.FileStream, PersistenceStrategy.MemoryMapped)]
[TestFixture(PersistenceStrategy.FileStream, PersistenceStrategy.FileStream)]
public class persistent_stream_bloom_filter : SpecificationWithDirectoryPerTestFixture {
private readonly PersistenceStrategy _forCreate;
private readonly PersistenceStrategy _forOpen;

public persistent_stream_bloom_filter(PersistenceStrategy forCreate, PersistenceStrategy forOpen) {
_forCreate = forCreate;
_forOpen = forOpen;
}

PersistentStreamBloomFilter GenSut(string path, bool create, long size, ILongHasher<string> hasher) =>
(create ? _forCreate : _forOpen) switch {
PersistenceStrategy.MemoryMapped =>
new PersistentStreamBloomFilter(
new MemoryMappedFilePersistence(size, path, create),
hasher: hasher),

PersistenceStrategy.FileStream =>
new PersistentStreamBloomFilter(
new FileStreamPersistence(size, path, create),
hasher: hasher),

_ => throw new ArgumentOutOfRangeException(),
};

private static string GenerateCharset() {
var charset = "";
for (var c = 'a'; c <= 'z'; c++) {
Expand Down Expand Up @@ -43,15 +76,22 @@ public class memory_mapped_file_stream_bloom_filter : SpecificationWithDirectory
return strings.ToArray();
}

[TestFixture]
private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {
private MemoryMappedFileStreamBloomFilter _filter;
[TestFixture(PersistenceStrategy.MemoryMapped, PersistenceStrategy.MemoryMapped)]
[TestFixture(PersistenceStrategy.MemoryMapped, PersistenceStrategy.FileStream)]
[TestFixture(PersistenceStrategy.FileStream, PersistenceStrategy.MemoryMapped)]
[TestFixture(PersistenceStrategy.FileStream, PersistenceStrategy.FileStream)]
private class with_fixed_size_filter : persistent_stream_bloom_filter {
private PersistentStreamBloomFilter _filter;
private string _path;

public with_fixed_size_filter(PersistenceStrategy forCreate, PersistenceStrategy forOpen)
: base(forCreate, forOpen) {
}

[SetUp]
public void SetUp() {
_path = GetTempFilePath();
_filter = new MemoryMappedFileStreamBloomFilter(_path, create: true, MemoryMappedFileBloomFilter.MinSizeKB * 1000, hasher: null);
_filter = GenSut(_path, create: true, BloomFilterAccessor.MinSizeKB * 1000, hasher: null);
}

[TearDown]
Expand All @@ -63,11 +103,23 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {
[Test]
public void can_close_and_reopen() {
_filter.Add("hello");
_filter.Flush();
_filter.Dispose();
using var newFilter = new MemoryMappedFileStreamBloomFilter(_path, create: false, MemoryMappedFileBloomFilter.MinSizeKB * 1000, hasher: null);
using var newFilter = GenSut(_path, create: false, BloomFilterAccessor.MinSizeKB * 1000, hasher: null);
Assert.IsTrue(newFilter.MightContain("hello"));
}

[Test]
public void can_detect_incorrect_size() {
_filter.Add("hello");
_filter.Flush();
_filter.Dispose();

Assert.Throws<SizeMismatchException>(() => {
using var newFilter = GenSut(_path, create: false, BloomFilterAccessor.MinSizeKB * 1000 + 1, hasher: null);
});
}

[Test]
public void creates_correct_header() {
_filter.Dispose();
Expand All @@ -82,7 +134,7 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {
var numBits = binaryReader.ReadInt64();
Assert.AreEqual( 0x01, version);
Assert.AreEqual( 0, corruptionRebuildCount);
Assert.AreEqual( MemoryMappedFileBloomFilter.MinSizeKB * 1000 * 8, numBits);
Assert.AreEqual(BloomFilterAccessor.MinSizeKB * 1000 * 8, numBits);
}

[Test]
Expand All @@ -99,10 +151,10 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {

[Test, Combinatorial]
public void has_false_positives_with_probability_p(
[Values(MemoryMappedFileBloomFilter.MinSizeKB*1000,2*MemoryMappedFileBloomFilter.MinSizeKB*1000)] long size,
[Values(BloomFilterAccessor.MinSizeKB*1000,2* BloomFilterAccessor.MinSizeKB*1000)] long size,
[Values(0.001,0.02,0.05,0.1,0.2)] double p
) {
using var filter = new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, size, hasher: null);
using var filter = GenSut(GetTempFilePath(), create: true, size, hasher: null);
var n = (int) filter.CalculateOptimalNumItems(p);

var random = new Random(123);
Expand Down Expand Up @@ -157,8 +209,8 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {

[Test, Category("LongRunning")]
public void always_returns_true_when_an_item_was_added([Range(10_000, 100_000, 13337)] long size) {
using var filter = new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, size, hasher: null);
var strings = GenerateRandomStrings((int)filter.CalculateOptimalNumItems(MemoryMappedFileBloomFilter.RecommendedFalsePositiveProbability), 100);
using var filter = GenSut(GetTempFilePath(), create: true, size, hasher: null);
var strings = GenerateRandomStrings((int)filter.CalculateOptimalNumItems(PersistentBloomFilter.RecommendedFalsePositiveProbability), 100);

//no items added yet
foreach (var s in strings) {
Expand All @@ -180,25 +232,25 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {
[Test]
public void throws_argument_out_of_range_exception_when_given_negative_size() {
Assert.Throws<ArgumentOutOfRangeException>(() =>
new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, -1, hasher: null));
GenSut(GetTempFilePath(), create: true, size: -1, hasher: null));
}

[Test]
public void throws_argument_out_of_range_exception_when_given_zero_size() {
Assert.Throws<ArgumentOutOfRangeException>(() =>
new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, 0, hasher: null));
GenSut(GetTempFilePath(), create: true, size: 0, hasher: null));
}

[Test]
public void throws_argument_out_of_range_exception_when_size_less_than_min_size() {
Assert.Throws<ArgumentOutOfRangeException>(() =>
new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, MemoryMappedFileBloomFilter.MinSizeKB * 1000 - 1, hasher: null));
GenSut(GetTempFilePath(), create: true, size: BloomFilterAccessor.MinSizeKB * 1000 - 1, hasher: null));
}

[Test]
public void throws_argument_out_of_range_exception_when_size_greater_than_max_size() {
Assert.Throws<ArgumentOutOfRangeException>(() =>
new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, MemoryMappedFileBloomFilter.MaxSizeKB * 1000 + 1, hasher: null));
GenSut(GetTempFilePath(), create: true, size: BloomFilterAccessor.MaxSizeKB * 1000 + 1, hasher: null));
}
}
}
@@ -0,0 +1,23 @@
using EventStore.Core.DataStructures.ProbabilisticFilter;
using Xunit;

namespace EventStore.Core.XUnit.Tests.DataStructures.ProbabilisticFilter {
public unsafe class AlignedMemoryTests {
[Theory]
[InlineData(1, 1)]
[InlineData(3, 8)]
[InlineData(8, 3)]
[InlineData(30_000, 8 * 1024)]
[InlineData(int.MaxValue + 8L, 8 * 1024)]
public void Works(long size, int alignTo) {
using var sut = new AlignedMemory(size, alignTo);

Assert.True((long)sut.Pointer % alignTo == 0);

if (size <= int.MaxValue) {
Assert.Equal(size, sut.AsSpan().Length);
sut.AsSpan().Clear(); // can write to the span
}
}
}
}
@@ -0,0 +1,181 @@
using System;
using EventStore.Core.DataStructures.ProbabilisticFilter;
using Xunit;

#pragma warning disable xUnit1026 // Theory methods should use all of their parameters
namespace EventStore.Core.XUnit.Tests.DataStructures.ProbabilisticFilter {
public unsafe class BloomFilterAccessorTests {
BloomFilterAccessor GenSut(long logicalFilterSize, BloomFilterAccessor.OnPageDirty onPageDirty = null) {
return new BloomFilterAccessor(
logicalFilterSize: logicalFilterSize,
cacheLineSize: BloomFilterIntegrity.CacheLineSize,
hashSize: BloomFilterIntegrity.HashSize,
pageSize: BloomFilterIntegrity.PageSize,
onPageDirty: onPageDirty ?? (pageNumber => { }),
log: Serilog.Log.Logger);
}

[Fact]
public void CorrectProperties() {
var sut = GenSut(10 * 1024);
Assert.Equal(BloomFilterIntegrity.CacheLineSize, sut.CacheLineSize);
Assert.Equal(BloomFilterIntegrity.HashSize, sut.HashSize);
Assert.Equal(BloomFilterIntegrity.PageSize, sut.PageSize);
}

[Theory]
[InlineData(10_000)]
[InlineData(10_001)]
[InlineData(256_000_000)]
public void CalculatesLogicalSize(int size) {
var sut = GenSut(size);

Assert.Equal(size, sut.LogicalFilterSize);
Assert.Equal(size * 8, sut.LogicalFilterSizeBits);
}

[Theory]
// we have a 64 bytes in the file for every 60 bytes of logical filter.
// plus 64 bytes for the header
// and another 64 bytes since it hits the boundary (unnecessary but change would be breaking)
[InlineData(60 * 500, 64 * 500 + 64 + 64, "")]
[InlineData(60 * 500 - 1, 64 * 500 + 64, "one less byte would need one less cache line")]
[InlineData(60 * 500 + 59, 64 * 500 + 64 + 64, "adding 59 more bytes still fits")]
[InlineData(60 * 500 + 60, 64 * 500 + 64 + 64 + 64, "but a 60th byte requires an extra cache line")]
public void CalculatesFileSize(int size, int expected, string detail) {
var sut = GenSut(size);
Assert.Equal(expected, sut.FileSize);
Assert.True(sut.FileSize % 64 == 0);
Assert.Equal(expected / 64, sut.NumCacheLines);
}

// we don't mind how many pages there are (within reason) as long as there are enough
[Fact]
public void CanGetPageForLastBit() {
for (int size = 10_000; size < 40_000; size++) {
var sut = GenSut(size);
var lastBit = size * 8 - 1;
var lastByte = sut.GetBytePositionInFile(lastBit);
var pageOfLastBit = sut.GetPageNumber(lastByte);
Assert.True(lastByte < sut.FileSize);
Assert.True(pageOfLastBit < sut.NumPages);
Assert.True(pageOfLastBit >= sut.NumPages - 2);
}
}

[Fact]
public void CalculatesBytePositionInFile() {
var sut = GenSut(10_000);
Assert.Equal(64, sut.GetBytePositionInFile(0));
Assert.Equal(64, sut.GetBytePositionInFile(1));
Assert.Equal(64, sut.GetBytePositionInFile(7));
Assert.Equal(65, sut.GetBytePositionInFile(8));
// jumps over the 4 bytes for the hash
Assert.Equal(64 + 59, sut.GetBytePositionInFile(60 * 8 - 1));
Assert.Equal(64 + 64, sut.GetBytePositionInFile(60 * 8));
}

[Fact]
public void CalculatesPagePositionInFile() {
var sut = GenSut(10_000);
var expectedFirstPageSize = 8 * 1024 - 64;
var expectedLastPageSize = 2560;
Assert.Equal((64, expectedFirstPageSize), sut.GetPagePositionInFile(0));
Assert.Equal((8 * 1024, expectedLastPageSize), sut.GetPagePositionInFile(1));
Assert.Equal(sut.FileSize, 64 + expectedFirstPageSize + expectedLastPageSize);
Assert.Throws<ArgumentOutOfRangeException>(() => sut.GetPagePositionInFile(2));
}

[Fact]
public void CalculatesPagePositionInFileLarge() {
var sut = GenSut(4_000_000_000);
Assert.Equal((64, 8 * 1024 - 64), sut.GetPagePositionInFile(0));
Assert.Equal((4_266_663_936, 2816), sut.GetPagePositionInFile(520_833));
}

[Fact]
public void CalculatesPageNumber() {
var sut = GenSut(10_000);
Assert.Equal(0, sut.GetPageNumber(0));
Assert.Equal(0, sut.GetPageNumber(8 * 1024 - 1));
Assert.Equal(1, sut.GetPageNumber(8 * 1024));
Assert.Equal(1, sut.GetPageNumber(sut.FileSize));
}

[Fact]
public void CanSetAndTestBits() {
var dirtyPage = -1L;
var sut = GenSut(10_000, pageNumber => dirtyPage = pageNumber);
using var mem = new AlignedMemory(sut.FileSize, 64);
sut.Pointer = mem.Pointer;
sut.FillWithZeros();

Assert.False(sut.IsBitSet(0));
sut.SetBit(0);
Assert.True(sut.IsBitSet(0));
Assert.False(sut.IsBitSet(1));
Assert.Equal(0, dirtyPage);
}

[Fact]
public void CanSetAndTestBitsLargeFile() {
var dirtyPage = -1L;
var sut = GenSut(4_000_000_000, pageNumber => dirtyPage = pageNumber);
using var mem = new AlignedMemory(sut.FileSize, 64);
sut.Pointer = mem.Pointer;

sut.SetBit(0);
Assert.True(sut.IsBitSet(0));
Assert.Equal(0, dirtyPage);

sut.SetBit(4_000_000_000L * 8);
Assert.True(sut.IsBitSet(4_000_000_000L * 8));
Assert.Equal(520_833, dirtyPage);
}

[Fact]
public void CanVerifySuccessfully() {
var sut = GenSut(10_000);
using var mem = new AlignedMemory(sut.FileSize, 64);
sut.Pointer = mem.Pointer;
sut.FillWithZeros();

sut.SetBit(0);
sut.Verify(0, 0);
}

[Fact]
public void CanFailToVerify() {
var sut = GenSut(10_000);
using var mem = new AlignedMemory(sut.FileSize, 64);
sut.Pointer = mem.Pointer;
sut.FillWithZeros();

ref var b = ref sut.ReadBytes(bytePositionInFile: 64, count: 1)[0];
Assert.False(b.IsBitSet(0));
sut.SetBit(0); // set the bit through the filter
Assert.True(b.IsBitSet(0)); // affected our reference

sut.Verify(0, 0);

b = b.SetBit(2); // setting the bit directly not through the accessor
b = b.SetBit(3);

// corruption doesn't meet 5% threshold
sut.Verify(corruptionRebuildCount: 0, corruptionThreshold: 5);

Assert.Throws<CorruptedHashException>(() => {
// corruption does meet 0% threshold
sut.Verify(0, 0);
});

}

[Fact]
public void ComplainsAboutUnalignedPointer() {
var sut = GenSut(10_000);
var ex = Assert.Throws<InvalidOperationException>(() => sut.Pointer = (byte*)123);
Assert.Equal("Pointer 123 is not aligned to a cacheline (64)", ex.Message);
}
}
}