Skip to content

Commit

Permalink
Add fsync strategy for persistent bloom filter
Browse files Browse the repository at this point in the history
- takes more precise control over flushing
- tries to share the IOPS nicely with other writes
- avoids possible complications around using msync with zfs
  • Loading branch information
timothycoleman committed Feb 18, 2022
1 parent bfdb4db commit 10b7c43
Show file tree
Hide file tree
Showing 29 changed files with 1,400 additions and 416 deletions.
@@ -1,7 +1,7 @@
using System;
using System.Linq;
using System.Runtime.InteropServices;
using EventStore.Core.DataStructures.ProbabilisticFilter.MemoryMappedFileBloomFilter;
using EventStore.Core.DataStructures.ProbabilisticFilter;
using EventStore.Core.Index.Hashes;
using NUnit.Framework;

Expand Down
@@ -1,4 +1,4 @@
using EventStore.Core.DataStructures.ProbabilisticFilter.MemoryMappedFileBloomFilter;
using EventStore.Core.DataStructures.ProbabilisticFilter;
using NUnit.Framework;

namespace EventStore.Core.Tests.DataStructures {
Expand Down
@@ -1,11 +1,44 @@
using System;
using System.Collections.Generic;
using System.IO;
using EventStore.Core.DataStructures.ProbabilisticFilter.MemoryMappedFileBloomFilter;
using EventStore.Core.DataStructures.ProbabilisticFilter;
using EventStore.Core.Index.Hashes;
using NUnit.Framework;

namespace EventStore.Core.Tests.DataStructures {
public class memory_mapped_file_stream_bloom_filter : SpecificationWithDirectoryPerTestFixture {
public enum PersistenceStrategy {
MemoryMapped,
FileStream,
}

[TestFixture(PersistenceStrategy.MemoryMapped, PersistenceStrategy.MemoryMapped)]
[TestFixture(PersistenceStrategy.MemoryMapped, PersistenceStrategy.FileStream)]
[TestFixture(PersistenceStrategy.FileStream, PersistenceStrategy.MemoryMapped)]
[TestFixture(PersistenceStrategy.FileStream, PersistenceStrategy.FileStream)]
public class persistent_stream_bloom_filter : SpecificationWithDirectoryPerTestFixture {
private readonly PersistenceStrategy _forCreate;
private readonly PersistenceStrategy _forOpen;

public persistent_stream_bloom_filter(PersistenceStrategy forCreate, PersistenceStrategy forOpen) {
_forCreate = forCreate;
_forOpen = forOpen;
}

PersistentStreamBloomFilter GenSut(string path, bool create, long size, ILongHasher<string> hasher) =>
(create ? _forCreate : _forOpen) switch {
PersistenceStrategy.MemoryMapped =>
new PersistentStreamBloomFilter(
new MemoryMappedFilePersistence(size, path, create),
hasher: hasher),

PersistenceStrategy.FileStream =>
new PersistentStreamBloomFilter(
new FileStreamPersistence(size, path, create),
hasher: hasher),

_ => throw new ArgumentOutOfRangeException(),
};

private static string GenerateCharset() {
var charset = "";
for (var c = 'a'; c <= 'z'; c++) {
Expand Down Expand Up @@ -43,15 +76,22 @@ public class memory_mapped_file_stream_bloom_filter : SpecificationWithDirectory
return strings.ToArray();
}

[TestFixture]
private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {
private MemoryMappedFileStreamBloomFilter _filter;
[TestFixture(PersistenceStrategy.MemoryMapped, PersistenceStrategy.MemoryMapped)]
[TestFixture(PersistenceStrategy.MemoryMapped, PersistenceStrategy.FileStream)]
[TestFixture(PersistenceStrategy.FileStream, PersistenceStrategy.MemoryMapped)]
[TestFixture(PersistenceStrategy.FileStream, PersistenceStrategy.FileStream)]
private class with_fixed_size_filter : persistent_stream_bloom_filter {
private PersistentStreamBloomFilter _filter;
private string _path;

public with_fixed_size_filter(PersistenceStrategy forCreate, PersistenceStrategy forOpen)
: base(forCreate, forOpen) {
}

[SetUp]
public void SetUp() {
_path = GetTempFilePath();
_filter = new MemoryMappedFileStreamBloomFilter(_path, create: true, MemoryMappedFileBloomFilter.MinSizeKB * 1000, hasher: null);
_filter = GenSut(_path, create: true, BloomFilterAccessor.MinSizeKB * 1000, hasher: null);
}

[TearDown]
Expand All @@ -63,11 +103,23 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {
[Test]
public void can_close_and_reopen() {
_filter.Add("hello");
_filter.Flush();
_filter.Dispose();
using var newFilter = new MemoryMappedFileStreamBloomFilter(_path, create: false, MemoryMappedFileBloomFilter.MinSizeKB * 1000, hasher: null);
using var newFilter = GenSut(_path, create: false, BloomFilterAccessor.MinSizeKB * 1000, hasher: null);
Assert.IsTrue(newFilter.MightContain("hello"));
}

[Test]
public void can_detect_incorrect_size() {
_filter.Add("hello");
_filter.Flush();
_filter.Dispose();

Assert.Throws<SizeMismatchException>(() => {
using var newFilter = GenSut(_path, create: false, BloomFilterAccessor.MinSizeKB * 1000 + 1, hasher: null);
});
}

[Test]
public void creates_correct_header() {
_filter.Dispose();
Expand All @@ -82,7 +134,7 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {
var numBits = binaryReader.ReadInt64();
Assert.AreEqual( 0x01, version);
Assert.AreEqual( 0, corruptionRebuildCount);
Assert.AreEqual( MemoryMappedFileBloomFilter.MinSizeKB * 1000 * 8, numBits);
Assert.AreEqual(BloomFilterAccessor.MinSizeKB * 1000 * 8, numBits);
}

[Test]
Expand All @@ -99,10 +151,10 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {

[Test, Combinatorial]
public void has_false_positives_with_probability_p(
[Values(MemoryMappedFileBloomFilter.MinSizeKB*1000,2*MemoryMappedFileBloomFilter.MinSizeKB*1000)] long size,
[Values(BloomFilterAccessor.MinSizeKB*1000,2* BloomFilterAccessor.MinSizeKB*1000)] long size,
[Values(0.001,0.02,0.05,0.1,0.2)] double p
) {
using var filter = new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, size, hasher: null);
using var filter = GenSut(GetTempFilePath(), create: true, size, hasher: null);
var n = (int) filter.CalculateOptimalNumItems(p);

var random = new Random(123);
Expand Down Expand Up @@ -157,8 +209,8 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {

[Test, Category("LongRunning")]
public void always_returns_true_when_an_item_was_added([Range(10_000, 100_000, 13337)] long size) {
using var filter = new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, size, hasher: null);
var strings = GenerateRandomStrings((int)filter.CalculateOptimalNumItems(MemoryMappedFileBloomFilter.RecommendedFalsePositiveProbability), 100);
using var filter = GenSut(GetTempFilePath(), create: true, size, hasher: null);
var strings = GenerateRandomStrings((int)filter.CalculateOptimalNumItems(PersistentBloomFilter.RecommendedFalsePositiveProbability), 100);

//no items added yet
foreach (var s in strings) {
Expand All @@ -180,25 +232,25 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {
[Test]
public void throws_argument_out_of_range_exception_when_given_negative_size() {
Assert.Throws<ArgumentOutOfRangeException>(() =>
new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, -1, hasher: null));
GenSut(GetTempFilePath(), create: true, size: -1, hasher: null));
}

[Test]
public void throws_argument_out_of_range_exception_when_given_zero_size() {
Assert.Throws<ArgumentOutOfRangeException>(() =>
new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, 0, hasher: null));
GenSut(GetTempFilePath(), create: true, size: 0, hasher: null));
}

[Test]
public void throws_argument_out_of_range_exception_when_size_less_than_min_size() {
Assert.Throws<ArgumentOutOfRangeException>(() =>
new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, MemoryMappedFileBloomFilter.MinSizeKB * 1000 - 1, hasher: null));
GenSut(GetTempFilePath(), create: true, size: BloomFilterAccessor.MinSizeKB * 1000 - 1, hasher: null));
}

[Test]
public void throws_argument_out_of_range_exception_when_size_greater_than_max_size() {
Assert.Throws<ArgumentOutOfRangeException>(() =>
new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, MemoryMappedFileBloomFilter.MaxSizeKB * 1000 + 1, hasher: null));
GenSut(GetTempFilePath(), create: true, size: BloomFilterAccessor.MaxSizeKB * 1000 + 1, hasher: null));
}
}
}
@@ -0,0 +1,23 @@
using EventStore.Core.DataStructures.ProbabilisticFilter;
using Xunit;

namespace EventStore.Core.XUnit.Tests.DataStructures.ProbabilisticFilter {
public unsafe class AlignedMemoryTests {
[Theory]
[InlineData(1, 1)]
[InlineData(3, 8)]
[InlineData(8, 3)]
[InlineData(30_000, 8 * 1024)]
[InlineData(int.MaxValue + 8L, 8 * 1024)]
public void Works(long size, int alignTo) {
using var sut = new AlignedMemory(size, alignTo);

Assert.True((long)sut.Pointer % alignTo == 0);

if (size <= int.MaxValue) {
Assert.Equal(size, sut.AsSpan().Length);
sut.AsSpan().Clear(); // can write to the span
}
}
}
}
@@ -0,0 +1,181 @@
using System;
using EventStore.Core.DataStructures.ProbabilisticFilter;
using Xunit;

#pragma warning disable xUnit1026 // Theory methods should use all of their parameters
namespace EventStore.Core.XUnit.Tests.DataStructures.ProbabilisticFilter {
public unsafe class BloomFilterAccessorTests {
BloomFilterAccessor GenSut(long logicalFilterSize, BloomFilterAccessor.OnPageDirty onPageDirty = null) {
return new BloomFilterAccessor(
logicalFilterSize: logicalFilterSize,
cacheLineSize: BloomFilterIntegrity.CacheLineSize,
hashSize: BloomFilterIntegrity.HashSize,
pageSize: BloomFilterIntegrity.PageSize,
onPageDirty: onPageDirty ?? (pageNumber => { }),
log: Serilog.Log.Logger);
}

[Fact]
public void CorrectProperties() {
var sut = GenSut(10 * 1024);
Assert.Equal(BloomFilterIntegrity.CacheLineSize, sut.CacheLineSize);
Assert.Equal(BloomFilterIntegrity.HashSize, sut.HashSize);
Assert.Equal(BloomFilterIntegrity.PageSize, sut.PageSize);
}

[Theory]
[InlineData(10_000)]
[InlineData(10_001)]
[InlineData(256_000_000)]
public void CalculatesLogicalSize(int size) {
var sut = GenSut(size);

Assert.Equal(size, sut.LogicalFilterSize);
Assert.Equal(size * 8, sut.LogicalFilterSizeBits);
}

[Theory]
// we have a 64 bytes in the file for every 60 bytes of logical filter.
// plus 64 bytes for the header
// and another 64 bytes since it hits the boundary (unnecessary but change would be breaking)
[InlineData(60 * 500, 64 * 500 + 64 + 64, "")]
[InlineData(60 * 500 - 1, 64 * 500 + 64, "one less byte would need one less cache line")]
[InlineData(60 * 500 + 59, 64 * 500 + 64 + 64, "adding 59 more bytes still fits")]
[InlineData(60 * 500 + 60, 64 * 500 + 64 + 64 + 64, "but a 60th byte requires an extra cache line")]
public void CalculatesFileSize(int size, int expected, string detail) {
var sut = GenSut(size);
Assert.Equal(expected, sut.FileSize);
Assert.True(sut.FileSize % 64 == 0);
Assert.Equal(expected / 64, sut.NumCacheLines);
}

// we don't mind how many pages there are (within reason) as long as there are enough
[Fact]
public void CanGetPageForLastBit() {
for (int size = 10_000; size < 40_000; size++) {
var sut = GenSut(size);
var lastBit = size * 8 - 1;
var lastByte = sut.GetBytePositionInFile(lastBit);
var pageOfLastBit = sut.GetPageNumber(lastByte);
Assert.True(lastByte < sut.FileSize);
Assert.True(pageOfLastBit < sut.NumPages);
Assert.True(pageOfLastBit >= sut.NumPages - 2);
}
}

[Fact]
public void CalculatesBytePositionInFile() {
var sut = GenSut(10_000);
Assert.Equal(64, sut.GetBytePositionInFile(0));
Assert.Equal(64, sut.GetBytePositionInFile(1));
Assert.Equal(64, sut.GetBytePositionInFile(7));
Assert.Equal(65, sut.GetBytePositionInFile(8));
// jumps over the 4 bytes for the hash
Assert.Equal(64 + 59, sut.GetBytePositionInFile(60 * 8 - 1));
Assert.Equal(64 + 64, sut.GetBytePositionInFile(60 * 8));
}

[Fact]
public void CalculatesPagePositionInFile() {
var sut = GenSut(10_000);
var expectedFirstPageSize = 8 * 1024 - 64;
var expectedLastPageSize = 2560;
Assert.Equal((64, expectedFirstPageSize), sut.GetPagePositionInFile(0));
Assert.Equal((8 * 1024, expectedLastPageSize), sut.GetPagePositionInFile(1));
Assert.Equal(sut.FileSize, 64 + expectedFirstPageSize + expectedLastPageSize);
Assert.Throws<ArgumentOutOfRangeException>(() => sut.GetPagePositionInFile(2));
}

[Fact]
public void CalculatesPagePositionInFileLarge() {
var sut = GenSut(4_000_000_000);
Assert.Equal((64, 8 * 1024 - 64), sut.GetPagePositionInFile(0));
Assert.Equal((4_266_663_936, 2816), sut.GetPagePositionInFile(520_833));
}

[Fact]
public void CalculatesPageNumber() {
var sut = GenSut(10_000);
Assert.Equal(0, sut.GetPageNumber(0));
Assert.Equal(0, sut.GetPageNumber(8 * 1024 - 1));
Assert.Equal(1, sut.GetPageNumber(8 * 1024));
Assert.Equal(1, sut.GetPageNumber(sut.FileSize));
}

[Fact]
public void CanSetAndTestBits() {
var dirtyPage = -1L;
var sut = GenSut(10_000, pageNumber => dirtyPage = pageNumber);
using var mem = new AlignedMemory(sut.FileSize, 64);
sut.Pointer = mem.Pointer;
sut.FillWithZeros();

Assert.False(sut.IsBitSet(0));
sut.SetBit(0);
Assert.True(sut.IsBitSet(0));
Assert.False(sut.IsBitSet(1));
Assert.Equal(0, dirtyPage);
}

[Fact]
public void CanSetAndTestBitsLargeFile() {
var dirtyPage = -1L;
var sut = GenSut(4_000_000_000, pageNumber => dirtyPage = pageNumber);
using var mem = new AlignedMemory(sut.FileSize, 64);
sut.Pointer = mem.Pointer;

sut.SetBit(0);
Assert.True(sut.IsBitSet(0));
Assert.Equal(0, dirtyPage);

sut.SetBit(4_000_000_000L * 8);
Assert.True(sut.IsBitSet(4_000_000_000L * 8));
Assert.Equal(520_833, dirtyPage);
}

[Fact]
public void CanVerifySuccessfully() {
var sut = GenSut(10_000);
using var mem = new AlignedMemory(sut.FileSize, 64);
sut.Pointer = mem.Pointer;
sut.FillWithZeros();

sut.SetBit(0);
sut.Verify(0, 0);
}

[Fact]
public void CanFailToVerify() {
var sut = GenSut(10_000);
using var mem = new AlignedMemory(sut.FileSize, 64);
sut.Pointer = mem.Pointer;
sut.FillWithZeros();

ref var b = ref sut.ReadBytes(bytePositionInFile: 64, count: 1)[0];
Assert.False(b.IsBitSet(0));
sut.SetBit(0); // set the bit through the filter
Assert.True(b.IsBitSet(0)); // affected our reference

sut.Verify(0, 0);

b = b.SetBit(2); // setting the bit directly not through the accessor
b = b.SetBit(3);

// corruption doesn't meet 5% threshold
sut.Verify(corruptionRebuildCount: 0, corruptionThreshold: 5);

Assert.Throws<CorruptedHashException>(() => {
// corruption does meet 0% threshold
sut.Verify(0, 0);
});

}

[Fact]
public void ComplainsAboutUnalignedPointer() {
var sut = GenSut(10_000);
var ex = Assert.Throws<InvalidOperationException>(() => sut.Pointer = (byte*)123);
Assert.Equal("Pointer 123 is not aligned to a cacheline (64)", ex.Message);
}
}
}

0 comments on commit 10b7c43

Please sign in to comment.