Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Will write offsets to disk

  • Loading branch information...
commit 2cda832eb7a8ac2e875ceec3fb9812cb17a4b66c 1 parent a3c13fe
@ayende authored
View
2  Rhino.Events.Tests/Compaction.cs
@@ -48,7 +48,7 @@ public void CanWriteAfterCompaction()
Assert.Equal(1, s.ReadRaw("users/1").Count());
- Assert.Equal(1, Directory.GetFiles("TestScribe").Length);
+ Assert.Equal(1, Directory.GetFiles("TestScribe","*.events.*").Length);
}
}
}
View
8 Rhino.Events.Tryouts/Program.cs
@@ -1,4 +1,5 @@
using System;
+using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
@@ -14,13 +15,14 @@ class Program
static volatile bool run = true;
private static int count = 0;
- static void Main(string[] args)
+ static void Main()
{
+
var scribe = new Scribe(new PersistedOptions
{
DirPath = "ScribedEvents",
- StreamSource = new FileStreamSource(),
- MaxTimeToWaitForFlushingToDisk = TimeSpan.FromSeconds(1)
+ StreamSource = new FileStreamSource("ScribedEvents"),
+ MaxTimeToWaitForFlush = TimeSpan.FromMilliseconds(1000)
});
var sp = Stopwatch.StartNew();
View
11 Rhino.Events/ClassDiagram1.cd
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="utf-8"?>
+<ClassDiagram MajorVersion="1" MinorVersion="1" MembersFormat="FullSignature">
+ <Interface Name="Rhino.Events.IScribe">
+ <Position X="4.5" Y="1.75" Width="5.25" />
+ <TypeIdentifier>
+ <HashCode>AAAAAAEAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAgAAEA=</HashCode>
+ <FileName>IScribe.cs</FileName>
+ </TypeIdentifier>
+ </Interface>
+ <Font Name="Segoe UI" Size="9" />
+</ClassDiagram>
View
24 Rhino.Events/Data/PersistedOptions.cs
@@ -10,18 +10,26 @@ public class PersistedOptions
public IStreamSource StreamSource { get; set; }
public bool AllowRecovery { get; set; }
- public int WeakMaxSize { get; set; }
- public int HardMaxSize { get; set; }
- public int CheckOncePer { get; set; }
+ public int CacheWeakMaxSize { get; set; }
+ public int CacheHardMaxSize { get; set; }
+ public int ClearCacheAfterSetCalledTimes { get; set; }
- public TimeSpan MaxTimeToWaitForFlushingToDisk { get; set; }
+ public TimeSpan IdleTime { get; set; }
+
+ public TimeSpan MaxTimeToWaitForFlush { get; set; }
+
+ public int WritesBetweenOffsetSnapshots { get; set; }
+ public TimeSpan MinTimeForOffsetSnapshots { get; set; }
public PersistedOptions()
{
- CheckOncePer = 1000;
- HardMaxSize = 100000;
- WeakMaxSize = 25000;
- MaxTimeToWaitForFlushingToDisk = TimeSpan.FromMinutes(3);
+ ClearCacheAfterSetCalledTimes = 1000;
+ CacheHardMaxSize = 100000;
+ CacheWeakMaxSize = 25000;
+ MaxTimeToWaitForFlush = TimeSpan.FromMilliseconds(200);
+ IdleTime = TimeSpan.FromMinutes(3);
+ WritesBetweenOffsetSnapshots = 15000;
+ MinTimeForOffsetSnapshots = TimeSpan.FromMinutes(3);
}
}
}
View
14 Rhino.Events/IScribe.cs
@@ -0,0 +1,14 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace Rhino.Events
+{
+ public interface IScribe : IDisposable
+ {
+ IEnumerable<object> ReadRaw(string streamId, bool untilLastSnapshot = true);
+ Task EnqueueEventAsync(string streamId, object @event);
+ Task EnqueueSnapshotAsync(string streamId, object @event);
+ Task EnqueueDeleteAsync(string streamId);
+ }
+}
View
4 Rhino.Events/Impl/JsonDataCache.cs
@@ -50,7 +50,7 @@ public void Set(long pos, T val)
var currentSet = Interlocked.Increment(ref sets);
- if (cache.Count <= options.WeakMaxSize || currentSet% options.CheckOncePer!= 0)
+ if (cache.Count <= options.CacheWeakMaxSize || currentSet% options.ClearCacheAfterSetCalledTimes!= 0)
return;
// release the strong references to them, but keep the weak ones
@@ -59,7 +59,7 @@ public void Set(long pos, T val)
source.Value.Data = null;
}
- if(cache.Count <= options.HardMaxSize)
+ if(cache.Count <= options.CacheHardMaxSize)
return;
foreach (var source in cache.OrderBy(x => x.Value.Usage).Take(cache.Count / 4))
View
2  Rhino.Events/Rhino.Events.csproj
@@ -44,6 +44,7 @@
<Compile Include="Data\EventState.cs" />
<Compile Include="Data\StreamInformation.cs" />
<Compile Include="Impl\ExceptionAggregator.cs" />
+ <Compile Include="IScribe.cs" />
<Compile Include="Scribe.cs" />
<Compile Include="Server\HttpServer.cs" />
<Compile Include="Impl\JsonDataCache.cs" />
@@ -63,6 +64,7 @@
<Compile Include="Util\LogProviders\NLogLogManager.cs" />
</ItemGroup>
<ItemGroup>
+ <None Include="ClassDiagram1.cd" />
<None Include="packages.config" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
View
10 Rhino.Events/Scribe.cs
@@ -9,14 +9,6 @@
namespace Rhino.Events
{
- public interface IScribe : IDisposable
- {
- IEnumerable<object> ReadRaw(string streamId, bool untilLastSnapshot = true);
- Task EnqueueEventAsync(string streamId, object @event);
- Task EnqueueSnapshotAsync(string streamId, object @event);
- Task EnqueueDeleteAsync(string streamId);
- }
-
public class Scribe : IScribe
{
readonly PersistedEventsStorage eventsStorage;
@@ -40,7 +32,7 @@ public Scribe(string dir)
: this(new PersistedOptions
{
AllowRecovery = true,
- StreamSource = new FileStreamSource(),
+ StreamSource = new FileStreamSource(dir),
DirPath = dir
})
{
View
2  Rhino.Events/Server/HttpServer.cs
@@ -18,7 +18,7 @@ public HttpServer()
{
data = new PersistedEventsStorage(new PersistedOptions
{
- StreamSource = new FileStreamSource(),
+ StreamSource = new FileStreamSource("Data"),
DirPath = "Data",
AllowRecovery = true
});
View
50 Rhino.Events/Storage/FileStreamSource.cs
@@ -12,6 +12,8 @@ namespace Rhino.Events.Storage
{
public class FileStreamSource : IStreamSource
{
+ private readonly string basePath;
+
private class FlushingToDiskFileStream: FileStream
{
public FlushingToDiskFileStream(string path, FileMode mode, FileAccess access, FileShare share)
@@ -25,22 +27,31 @@ public override void Flush()
}
}
- public Stream OpenReadWrite(string path)
+ public FileStreamSource(string basePath)
+ {
+ this.basePath = basePath;
+ if (Directory.Exists(basePath) == false)
+ Directory.CreateDirectory(basePath);
+
+ }
+
+ public Stream OpenReadWrite(string file)
{
- var dir = Path.GetDirectoryName(path);
- if (Directory.Exists(dir) == false)
- Directory.CreateDirectory(dir);
+ var path = Path.Combine(basePath, file);
var fileStream = new FlushingToDiskFileStream(LastFileVersion(path), FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read | FileShare.Delete);
return new BufferedStream( fileStream, 32*1024 );
}
- public Stream OpenRead(string path)
+ public Stream OpenRead(string file)
{
+ var path = Path.Combine(basePath, file);
return new BufferedStream(File.Open(LastFileVersion(path), FileMode.Open, FileAccess.Read, FileShare.Delete | FileShare.ReadWrite));
}
- public void DeleteOnClose(string path)
+ public void DeleteOnClose(string file)
{
+ var path = Path.Combine(basePath, file);
+
using (new FileStream(LastFileVersion(path), FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, FileOptions.DeleteOnClose))
{
}
@@ -51,14 +62,19 @@ public void Flush(Stream stream)
stream.Flush();
}
- public void DeleteIfExists(string path)
+ public void DeleteIfExists(string file)
{
+ var path = Path.Combine(basePath, file);
+
if (File.Exists(path))
File.Delete(path);
}
- public void RenameToLatest(string newFilePath, string path)
+ public void RenameToLatest(string newFile, string file)
{
+ var path = Path.Combine(basePath, file);
+ var newFilePath = Path.Combine(basePath, newFile);
+
var fileVersion = LastFileVersion(path);
var extension = Path.GetExtension(fileVersion);
Debug.Assert(extension != null);
@@ -72,9 +88,23 @@ public void RenameToLatest(string newFilePath, string path)
pathCache.TryRemove(path, out _);
}
- readonly ConcurrentDictionary<string,string> pathCache = new ConcurrentDictionary<string, string>();
+ public string GetLatestName(string file)
+ {
+ var path = Path.Combine(basePath, file);
+
+ return LastFileVersion(path);
+ }
+
+ public bool Exists(string file)
+ {
+ var path = Path.Combine(basePath, file);
+ return File.Exists(path);
+
+ }
+
+ readonly ConcurrentDictionary<string,string> pathCache = new ConcurrentDictionary<string, string>();
- public string LastFileVersion(string path)
+ private string LastFileVersion(string path)
{
return pathCache.GetOrAdd(path, s =>
{
View
5 Rhino.Events/Storage/IStreamSource.cs
@@ -4,7 +4,7 @@ namespace Rhino.Events.Storage
{
public interface IStreamSource
{
- Stream OpenReadWrite(string path);
+ Stream OpenReadWrite(string file);
Stream OpenRead(string path);
@@ -15,5 +15,8 @@ public interface IStreamSource
void DeleteIfExists(string path);
void RenameToLatest(string newFilePath, string path);
+
+ string GetLatestName(string path);
+ bool Exists(string offsetsPath);
}
}
View
122 Rhino.Events/Storage/PersistedEventsStorage.cs
@@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
+using System.Net.Security;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -23,8 +24,9 @@ public class PersistedEventsStorage : IDisposable
private const long Deleted = -2;
private readonly IStreamSource streamSource;
- private readonly string path;
- private readonly Task writerThread;
+ private readonly string dataPath = "data.events";
+ private readonly string offsetsPath = "data.offsets";
+ private readonly Task writerTask;
private readonly ManualResetEventSlim hasItems = new ManualResetEventSlim(false);
private readonly ConcurrentQueue<WriteState> itemsToWrite = new ConcurrentQueue<WriteState>();
private readonly CancellationTokenSource cts = new CancellationTokenSource();
@@ -36,6 +38,10 @@ public class PersistedEventsStorage : IDisposable
private BinaryWriter binaryWriter;
private Stream file;
+
+ private long lastWriteSnapshotCount;
+ private DateTime lastWriteSnapshotTime;
+
private long eventsCount;
private long deleteCount;
@@ -55,16 +61,13 @@ public PersistedEventsStorage(PersistedOptions options)
cache = new JsonDataCache<PersistedEvent>(options);
this.options = options;
streamSource = options.StreamSource;
- path = Path.Combine(options.DirPath, "data.events");
- file = streamSource.OpenReadWrite(path);
+ file = streamSource.OpenReadWrite(dataPath);
ReadAllFromDisk();
- MaxDurationForFlush = TimeSpan.FromMilliseconds(200);
-
binaryWriter = new BinaryWriter(file, Encoding.UTF8, leaveOpen: true);
- writerThread = Task.Factory.StartNew(() =>
+ writerTask = Task.Factory.StartNew(() =>
{
try
{
@@ -79,6 +82,7 @@ public PersistedEventsStorage(PersistedOptions options)
private void ReadAllFromDisk()
{
+ file.Position = ReadOffsets(streamSource.GetLatestName(dataPath));
while (true)
{
using (var reader = new BinaryReader(file, Encoding.UTF8, leaveOpen: true))
@@ -92,6 +96,8 @@ private void ReadAllFromDisk()
}
catch (EndOfStreamException)
{
+ lastWriteSnapshotCount = eventsCount;
+ lastWriteSnapshotTime = DateTime.UtcNow;
return;
}
catch (Exception e)
@@ -118,6 +124,7 @@ private void ReadAllFromDisk()
});
break;
case EventState.Delete:
+ deleteCount++;
var streamInformation = new StreamInformation
{
LastPosition = Deleted,
@@ -135,7 +142,6 @@ private void ReadAllFromDisk()
}
}
-
}
public IEnumerable<EventData> Read(string id)
@@ -153,7 +159,7 @@ public IEnumerable<EventData> Read(string id)
if(cachedReadStreams.TryDequeue(out stream) == false)
{
- stream = streamSource.OpenRead(path);
+ stream = streamSource.OpenRead(dataPath);
}
}
finally
@@ -273,14 +279,14 @@ private void WriteToDisk()
{
if (cts.IsCancellationRequested)
{
- FlushToDisk(tasksToNotify);
+ FlushToDisk(tasksToNotify, closing: true);
return;
}
WriteState item;
if (hadWrites)
{
- if ((DateTime.UtcNow - lastWrite) > MaxDurationForFlush)
+ if ((DateTime.UtcNow - lastWrite) > options.MaxTimeToWaitForFlush)
{
// we have to flush to disk now, because we have writes and nothing else is forthcoming
// or we have so many writes, that we need to flush to clear the buffers
@@ -301,7 +307,7 @@ private void WriteToDisk()
{
// we waited enough time to be pretty sure we are idle
// and can run compaction without too much problems.
- if (hasItems.Wait(options.MaxTimeToWaitForFlushingToDisk, cts.Token) == false)
+ if (hasItems.Wait(options.IdleTime, cts.Token) == false)
{
Compact();
continue;
@@ -344,12 +350,12 @@ public void Compact()
{
var newPositions = new Dictionary<string, StreamInformation>(StringComparer.InvariantCultureIgnoreCase);
- var newFilePath = path + ".compacting";
+ var newFilePath = dataPath + ".compacting";
streamSource.DeleteIfExists(newFilePath);
using (var newFile = streamSource.OpenReadWrite(newFilePath))
using (var newWriter = new BinaryWriter(newFile))
- using (var current = streamSource.OpenRead(path))
+ using (var current = streamSource.OpenRead(dataPath))
using (var reader = new BinaryReader(current))
{
while (true)
@@ -397,7 +403,7 @@ public void Compact()
try
{
newFile.Close();
- streamSource.DeleteOnClose(path);
+ streamSource.DeleteOnClose(dataPath);
Stream result;
while (cachedReadStreams.TryDequeue(out result))
@@ -408,11 +414,13 @@ public void Compact()
binaryWriter.Dispose();
file.Dispose();
- streamSource.RenameToLatest(newFilePath, path);
-
- file = streamSource.OpenReadWrite(path);
+ streamSource.RenameToLatest(newFilePath, dataPath);
+ file = streamSource.OpenReadWrite(dataPath);
binaryWriter = new BinaryWriter(file, Encoding.UTF8, leaveOpen: true);
-
+
+ streamSource.DeleteIfExists("data.offsets");
+ FlushOffsets();
+
idToPos.Clear();
foreach (var val in newPositions)
{
@@ -486,9 +494,7 @@ private void HandleFlushSuccessful(WriteState item, long currentPos)
item.TaskCompletionSource.SetResult(null);
}
- public TimeSpan MaxDurationForFlush { get; set; }
-
- private void FlushToDisk(ICollection<Action<Exception>> tasksToNotify)
+ private void FlushToDisk(ICollection<Action<Exception>> tasksToNotify, bool closing = false)
{
try
{
@@ -498,6 +504,12 @@ private void FlushToDisk(ICollection<Action<Exception>> tasksToNotify)
{
taskCompletionSource(null);
}
+ if(closing ||
+ (eventsCount - lastWriteSnapshotCount) > options.WritesBetweenOffsetSnapshots ||
+ (eventsCount > lastWriteSnapshotCount) && (DateTime.UtcNow - lastWriteSnapshotTime) > options.MinTimeForOffsetSnapshots)
+ {
+ FlushOffsets();
+ }
}
catch (Exception e)
{
@@ -516,6 +528,70 @@ private void FlushToDisk(ICollection<Action<Exception>> tasksToNotify)
}
}
+ private long ReadOffsets(string currentFileName)
+ {
+ long position;
+ if (streamSource.Exists(offsetsPath) == false)
+ return 0;
+ using (var offsets = streamSource.OpenRead(offsetsPath))
+ using(var reader = new BinaryReader(offsets))
+ {
+ var fileName = reader.ReadString();
+ if(fileName != currentFileName)
+ {
+ // wrong file, skipping
+ streamSource.DeleteOnClose(offsetsPath);
+ return 0;
+ }
+ position = reader.ReadInt64();
+
+ while (true)
+ {
+ string key;
+ try
+ {
+ key = reader.ReadString();
+ }
+ catch (EndOfStreamException)
+ {
+ break;
+ }
+ var pos = reader.ReadInt64();
+ var count = reader.ReadInt32();
+
+ idToPos[key] = new StreamInformation
+ {
+ LastPosition = pos,
+ StreamLength = count
+ };
+ }
+ }
+
+ return position;
+ }
+
+ private void FlushOffsets()
+ {
+ lastWriteSnapshotCount = eventsCount;
+ lastWriteSnapshotTime = DateTime.UtcNow;
+
+ using(var offsets = streamSource.OpenReadWrite( offsetsPath + ".new"))
+ using(var writer = new BinaryWriter(offsets))
+ {
+ writer.Write(Path.GetFileName(streamSource.GetLatestName(dataPath)));
+ writer.Write(file.Position);
+ foreach (var streamInformation in idToPos)
+ {
+ writer.Write(streamInformation.Key);
+ writer.Write(streamInformation.Value.LastPosition);
+ writer.Write(streamInformation.Value.StreamLength);
+ }
+ writer.Flush();
+ }
+ streamSource.DeleteIfExists(offsetsPath);
+ streamSource.RenameToLatest(offsetsPath + ".new", offsetsPath);
+ }
+
public Task EnqueueAsync(string id, EventState state, JObject data, JObject metadata)
{
AssertValidState();
@@ -537,7 +613,7 @@ public void Dispose()
{
var e = new ExceptionAggregator();
e.Execute(cts.Cancel);
- e.Execute(writerThread.Wait);
+ e.Execute(writerTask.Wait);
e.Execute(binaryWriter.Dispose);
e.Execute(file.Dispose);
e.Execute(cache.Dispose);
Please sign in to comment.
Something went wrong with that request. Please try again.