Skip to content

Commit

Permalink
更新 DiskCacheManager 应付并发情况
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivony committed Apr 27, 2016
1 parent 8216a72 commit c7f597e
Showing 1 changed file with 62 additions and 7 deletions.
69 changes: 62 additions & 7 deletions Ivony.Caching/DiskCacheManager.cs
@@ -1,8 +1,10 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Ivony.Caching
Expand Down Expand Up @@ -55,23 +57,49 @@ internal void AssignCacheDirectory()

private object _sync = new object();

private Dictionary<string, Task> actionTasks = new Dictionary<string, Task>();
private ConcurrentDictionary<string, Task> actionTasks = new ConcurrentDictionary<string, Task>();


/// <summary>
/// 读取一个流
/// </summary>
/// <param name="cacheKey">缓存键</param>
/// <returns></returns>
public Task<Stream> ReadStream( string cacheKey )
public async Task<Stream> ReadStream( string cacheKey )
{
var filepath = Path.Combine( CurrentDirectory, cacheKey );
if ( File.Exists( filepath ) == false )
return null;


var task = ReadStream( File.OpenRead( filepath ) );
return task;
var cancellation = new CancellationTokenSource();
var newTask = Task.Run( async () =>
{
await Task.Yield();
return await ReadStream( File.OpenRead( filepath ) );
}, cancellation.Token );


var task = actionTasks.AddOrUpdate( cacheKey, newTask, ( key, item ) => item ?? newTask );
if ( newTask != task ) //如果任务未能加入队列,则直接取消
cancellation.Cancel();

try
{
await task;
}
finally
{
actionTasks.TryUpdate( cacheKey, null, task );
}


var readTask = task as Task<Stream>;
if ( readTask != null ) //如果当前正在读,则以当前读取结果返回。
return readTask.Result;

else //如果当前正在写,等写完后再读取一次。
return await ReadStream( cacheKey );

}

Expand Down Expand Up @@ -104,13 +132,40 @@ public Task WriteStream( string cacheKey, byte[] data )
}


public Task WriteStream( string cacheKey, MemoryStream data )
public async Task WriteStream( string cacheKey, MemoryStream data )
{
var filepath = Path.Combine( CurrentDirectory, cacheKey );
Directory.CreateDirectory( CurrentDirectory );

var task = WriteStream( File.OpenWrite( filepath ), data );
return task;



var cancellation = new CancellationTokenSource();
var newTask = Task.Run( async () =>
{
await Task.Yield();
await WriteStream( File.OpenWrite( filepath ), data );
}, cancellation.Token );


var task = actionTasks.AddOrUpdate( cacheKey, newTask, ( key, item ) => item ?? newTask );

try
{
await task;
}
finally
{
actionTasks.TryUpdate( cacheKey, null, task );
}


if ( newTask != task ) //如果任务未能加入队列,则直接取消然后再尝试一次
{
cancellation.Cancel();
await WriteStream( cacheKey, data );
}

}


Expand Down

0 comments on commit c7f597e

Please sign in to comment.