Permalink
Browse files

Properly reading offsets snapshot on startup

  • Loading branch information...
1 parent 2cda832 commit cf2a12311e25e61b2ba1f774dabce1081a34981e @ayende committed Oct 7, 2012
@@ -22,7 +22,7 @@ static void Main()
{
DirPath = "ScribedEvents",
StreamSource = new FileStreamSource("ScribedEvents"),
- MaxTimeToWaitForFlush = TimeSpan.FromMilliseconds(1000)
+ MaxTimeToWaitForFlush = TimeSpan.FromMilliseconds(200)
});
var sp = Stopwatch.StartNew();
@@ -51,8 +51,11 @@ public Stream OpenRead(string file)
public void DeleteOnClose(string file)
{
var path = Path.Combine(basePath, file);
-
- using (new FileStream(LastFileVersion(path), FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, FileOptions.DeleteOnClose))
+
+ var lastFileVersion = LastFileVersion(path);
+ if (File.Exists(lastFileVersion) == false)
+ return;
+ using (new FileStream(lastFileVersion, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, FileOptions.DeleteOnClose))
{
}
}
@@ -98,7 +101,7 @@ public string GetLatestName(string file)
public bool Exists(string file)
{
var path = Path.Combine(basePath, file);
- return File.Exists(path);
+ return File.Exists(LastFileVersion(path));
}
@@ -17,6 +17,7 @@ public interface IStreamSource
void RenameToLatest(string newFilePath, string path);
string GetLatestName(string path);
+
bool Exists(string offsetsPath);
}
}
@@ -82,7 +82,7 @@ public PersistedEventsStorage(PersistedOptions options)
private void ReadAllFromDisk()
{
- file.Position = ReadOffsets(streamSource.GetLatestName(dataPath));
+ file.Position = ReadOffsets(Path.GetFileName(streamSource.GetLatestName(dataPath)));
while (true)
{
using (var reader = new BinaryReader(file, Encoding.UTF8, leaveOpen: true))
@@ -504,12 +504,6 @@ private void FlushToDisk(ICollection<Action<Exception>> tasksToNotify, bool clos
{
taskCompletionSource(null);
}
- if(closing ||
- (eventsCount - lastWriteSnapshotCount) > options.WritesBetweenOffsetSnapshots ||
- (eventsCount > lastWriteSnapshotCount) && (DateTime.UtcNow - lastWriteSnapshotTime) > options.MinTimeForOffsetSnapshots)
- {
- FlushOffsets();
- }
}
catch (Exception e)
{
@@ -521,11 +515,17 @@ private void FlushToDisk(ICollection<Action<Exception>> tasksToNotify, bool clos
}
finally
{
-
tasksToNotify.Clear();
lastWrite = DateTime.UtcNow;
hadWrites = false;
}
+
+ if (closing ||
+ (eventsCount - lastWriteSnapshotCount) > options.WritesBetweenOffsetSnapshots ||
+ (eventsCount > lastWriteSnapshotCount) && (DateTime.UtcNow - lastWriteSnapshotTime) > options.MinTimeForOffsetSnapshots)
+ {
+ FlushOffsets();
+ }
}
private long ReadOffsets(string currentFileName)
@@ -588,7 +588,7 @@ private void FlushOffsets()
}
writer.Flush();
}
- streamSource.DeleteIfExists(offsetsPath);
+ streamSource.DeleteOnClose(offsetsPath);
streamSource.RenameToLatest(offsetsPath + ".new", offsetsPath);
}

0 comments on commit cf2a123

Please sign in to comment.