Skip to content

Commit

Permalink
Fix race condition where PersistentBlobCache wasn't guaranteeing objects
Browse files Browse the repository at this point in the history
were fully serialized / deserialized on ctor and Dispose()
  • Loading branch information
anaisbetts committed Dec 30, 2011
1 parent 7ea4467 commit 89ed763
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 9 deletions.
5 changes: 3 additions & 2 deletions Akavache/IBlobCache.cs
Expand Up @@ -21,8 +21,9 @@ public interface IFilesystemProvider
/// <param name="mode">The file mode</param>
/// <param name="access">The required access privileges</param>
/// <param name="share">The allowed file sharing modes.</param>
/// <param name="scheduler">The scheduler to schedule the open under.</param>
/// <returns>A Future result representing the Open file.</returns>
IObservable<Stream> SafeOpenFileAsync(string path, FileMode mode, FileAccess access, FileShare share);
IObservable<Stream> SafeOpenFileAsync(string path, FileMode mode, FileAccess access, FileShare share, IScheduler scheduler);

/// <summary>
/// Create a directory and its parents. If the directory already
Expand Down Expand Up @@ -82,7 +83,7 @@ public interface IBlobCache : IDisposable
/// <summary>
/// Invalidate all entries in the cache (i.e. clear it). Note that
/// this method is blocking and incurs a significant performance
/// penalty if used while the cache is being used on other threads.
/// penalty if used while the cache is being used on other threads.
/// </summary>
void InvalidateAll();

Expand Down
4 changes: 2 additions & 2 deletions Akavache/PersistentBlobCache.cs
Expand Up @@ -268,7 +268,7 @@ AsyncSubject<byte[]> FetchOrWriteBlobFromDisk(string key, object byteData, bool
var ms = new MemoryStream();

var scheduler = synchronous ? System.Reactive.Concurrency.Scheduler.Immediate : Scheduler;
filesystem.SafeOpenFileAsync(GetPathForKey(key), FileMode.Open, FileAccess.Read, FileShare.Read)
filesystem.SafeOpenFileAsync(GetPathForKey(key), FileMode.Open, FileAccess.Read, FileShare.Read, scheduler)
.SelectMany(x => x.CopyToAsync(ms, scheduler))
.SelectMany(x => AfterReadFromDiskFilter(ms.ToArray(), scheduler))
.Catch<byte[], FileNotFoundException>(ex => Observable.Throw<byte[]>(new KeyNotFoundException()))
Expand All @@ -285,7 +285,7 @@ AsyncSubject<byte[]> WriteBlobToDisk(string key, byte[] byteData, bool synchrono

var files = Observable.Zip(
BeforeWriteToDiskFilter(byteData, scheduler).Select(x => new MemoryStream(x)),
filesystem.SafeOpenFileAsync(GetPathForKey(key), FileMode.Create, FileAccess.Write, FileShare.None),
filesystem.SafeOpenFileAsync(GetPathForKey(key), FileMode.Create, FileAccess.Write, FileShare.None, scheduler),
(from, to) => new { from, to }
);

Expand Down
5 changes: 3 additions & 2 deletions Akavache/SimpleFilesystemProvider.cs
@@ -1,13 +1,14 @@
using System;
using System.IO;
using System.Reactive.Concurrency;

namespace Akavache
{
public class SimpleFilesystemProvider : IFilesystemProvider
{
public IObservable<Stream> SafeOpenFileAsync(string path, FileMode mode, FileAccess access, FileShare share)
public IObservable<Stream> SafeOpenFileAsync(string path, FileMode mode, FileAccess access, FileShare share, IScheduler scheduler)
{
return Utility.SafeOpenFileAsync(path, mode, access, share);
return Utility.SafeOpenFileAsync(path, mode, access, share, scheduler);
}

public void CreateRecursive(string path)
Expand Down
7 changes: 4 additions & 3 deletions Akavache/Utility.cs
Expand Up @@ -37,8 +37,9 @@ public static string GetMd5Hash(string input)
}
}

public static IObservable<FileStream> SafeOpenFileAsync(string path, FileMode mode, FileAccess access, FileShare share)
public static IObservable<FileStream> SafeOpenFileAsync(string path, FileMode mode, FileAccess access, FileShare share, IScheduler scheduler = null)
{
scheduler = scheduler ?? RxApp.TaskpoolScheduler;
return Observable.Create<FileStream>(subj =>
{
try
Expand All @@ -56,10 +57,10 @@ public static IObservable<FileStream> SafeOpenFileAsync(string path, FileMode mo
} else
{
#if SILVERLIGHT
return Observable.Start(() => new FileStream(path, mode, access, share, 4096), RxApp.TaskpoolScheduler)
return Observable.Start(() => new FileStream(path, mode, access, share, 4096), scheduler)
.Subscribe(subj);
#else
return Observable.Start(() => new FileStream(path, mode, access, share, 4096, true), RxApp.TaskpoolScheduler)
return Observable.Start(() => new FileStream(path, mode, access, share, 4096, true), scheduler)
.Subscribe(subj);
#endif
}
Expand Down

0 comments on commit 89ed763

Please sign in to comment.