Permalink
Browse files

adjusting File and Azure stores

  • Loading branch information...
1 parent 3cc24ec commit f8ee245de6041ebb610c38bcbece9b140ba07996 @abdullin abdullin committed Mar 12, 2013
Showing with 81 additions and 37 deletions.
  1. +61 −17 Cqrs.Azure/AppendOnly/BlobAppendOnlyStore.cs
  2. +20 −20 Cqrs.Portable/TapeStorage/FileAppendOnlyStore.cs
View
78 Cqrs.Azure/AppendOnly/BlobAppendOnlyStore.cs
@@ -8,6 +8,7 @@
using System.Threading;
using Lokad.Cqrs.TapeStorage;
using Microsoft.WindowsAzure.StorageClient;
+using Microsoft.WindowsAzure.StorageClient.Protocol;
namespace Lokad.Cqrs.AppendOnly
{
@@ -53,7 +54,8 @@ public void InitializeReader()
LoadCaches();
}
- long _storeVersion = 0;
+ long _storeVersion;
+ int _pageSizeMultiplier = 1024 * 512;
public void Append(string streamName, byte[] data, long expectedStreamVersion = -1)
{
@@ -84,14 +86,14 @@ public void Append(string streamName, byte[] data, long expectedStreamVersion =
}
}
- public IEnumerable<DataWithVersion> ReadRecords(string streamName, long startingFrom, int maxCount)
+ public IEnumerable<DataWithVersion> ReadRecords(string streamName, long afterVersion, int maxCount)
{
- return _cache.ReadRecords(streamName, startingFrom, maxCount);
+ return _cache.ReadRecords(streamName, afterVersion, maxCount);
}
- public IEnumerable<DataWithKey> ReadRecords(long startingFrom, int maxCount)
+ public IEnumerable<DataWithKey> ReadRecords(long afterVersion, int maxCount)
{
- return _cache.ReadRecords(startingFrom, maxCount);
+ return _cache.ReadRecords(afterVersion, maxCount);
}
@@ -111,13 +113,13 @@ public void ResetStore()
{
Close();
_cache.Clear(() =>
- {
- var blobs = _container.ListBlobs().OfType<CloudPageBlob>().Where(item => item.Uri.ToString().EndsWith(".dat"));
-
- blobs
- .AsParallel().ForAll(i => i.DeleteIfExists());
- _storeVersion = 0;
- });
+ {
+ var blobs = _container.ListBlobs().OfType<CloudPageBlob>().Where(item => item.Uri.ToString().EndsWith(".dat"));
+
+ blobs
+ .AsParallel().ForAll(i => i.DeleteIfExists());
+ _storeVersion = 0;
+ });
}
public long GetCurrentVersion()
@@ -131,23 +133,65 @@ IEnumerable<StorageFrameDecoded> EnumerateHistory()
// load indexes
// build and save missing indexes
var datFiles = _container
- .ListBlobs()
+ .ListBlobs(new BlobRequestOptions()
+ {
+ BlobListingDetails = BlobListingDetails.Metadata
+ })
.OrderBy(s => s.Uri.ToString())
- .OfType<CloudPageBlob>();
+
+ .OfType<CloudPageBlob>()
+ .Where(s => s.Name.EndsWith(".dat"));
foreach (var fileInfo in datFiles)
{
- using (var stream = new MemoryStream(fileInfo.DownloadByteArray()))
+
+ var bytes = fileInfo.DownloadByteArray();
+ bool potentiallyNonTruncatedChunk = bytes.Length % _pageSizeMultiplier == 0;
+ long lastValidPosition = 0;
+ using (var stream = new MemoryStream(bytes))
{
StorageFrameDecoded result;
while (StorageFramesEvil.TryReadFrame(stream, out result))
{
+ lastValidPosition = stream.Position;
yield return result;
}
+
+
+ }
+ var haveSomethingToTruncate = bytes.Length - lastValidPosition >= 512;
+ if (potentiallyNonTruncatedChunk & haveSomethingToTruncate)
+ {
+ var trunc = lastValidPosition;
+ var remainder = lastValidPosition % 512;
+ if (remainder > 0)
+ {
+ trunc += 512 - remainder;
+ }
+ Trace.WriteLine(string.Format("Truncating {0} to {1}", fileInfo.Name, trunc));
+ _container.GetPageBlobReference(fileInfo.Name + ".bak").CopyFromBlob(fileInfo);
+ SetLength(fileInfo, trunc);
}
}
}
+ static void SetLength(CloudPageBlob blob, long newLength, int timeout = 10000)
+ {
+ var credentials = blob.ServiceClient.Credentials;
+
+ var requestUri = blob.Uri;
+ if (credentials.NeedsTransformUri)
+ requestUri = new Uri(credentials.TransformUri(requestUri.ToString()));
+
+ var request = BlobRequest.SetProperties(requestUri, timeout, blob.Properties, null, newLength);
+ request.Timeout = timeout;
+
+ credentials.SignRequest(request);
+
+ using (request.GetResponse()) { }
+ }
+
+
void LoadCaches()
{
@@ -181,9 +225,9 @@ void EnsureWriterExists(long version)
var fileName = string.Format("{0:00000000}-{1:yyyy-MM-dd-HHmmss}.dat", version, DateTime.UtcNow);
var blob = _container.GetPageBlobReference(fileName);
- blob.Create(1024 * 512);
+ blob.Create(_pageSizeMultiplier);
- _currentWriter = new AppendOnlyStream(512, (i, bytes) => blob.WritePages(bytes, i), 1024 * 512);
+ _currentWriter = new AppendOnlyStream(512, (i, bytes) => blob.WritePages(bytes, i), _pageSizeMultiplier);
}
static void CreateIfNotExists(CloudBlobContainer container, TimeSpan timeout)
View
40 Cqrs.Portable/TapeStorage/FileAppendOnlyStore.cs
@@ -15,14 +15,14 @@ public class FileAppendOnlyStore : IAppendOnlyStore
// used to synchronize access between threads within a process
-
+
// used to prevent writer access to store to other processes
FileStream _lock;
FileStream _currentWriter;
readonly LockingInMemoryCache _cache = new LockingInMemoryCache();
// caches
-
+
public void Initialize()
{
@@ -91,15 +91,15 @@ public void Append(string streamName, byte[] data, long expectedStreamVersion =
// should be locked
try
{
-
+
_cache.Append(streamName, data, _storeVersion + 1, streamVersion =>
- {
- EnsureWriterExists(_storeVersion);
- PersistInFile(streamName, data, streamVersion);
- }, expectedStreamVersion);
+ {
+ EnsureWriterExists(_storeVersion);
+ PersistInFile(streamName, data, streamVersion);
+ }, expectedStreamVersion);
_storeVersion += 1;
-
+
}
catch (AppendOnlyStoreConcurrencyException)
{
@@ -114,7 +114,7 @@ public void Append(string streamName, byte[] data, long expectedStreamVersion =
Close();
throw;
}
-
+
}
void PersistInFile(string key, byte[] buffer, long streamVersion)
@@ -133,15 +133,15 @@ void EnsureWriterExists(long storeVersion)
_currentWriter = File.OpenWrite(Path.Combine(_info.FullName, fileName));
}
- public IEnumerable<DataWithVersion> ReadRecords(string streamName, long startingFrom, int maxCount)
+ public IEnumerable<DataWithVersion> ReadRecords(string streamName, long afterVersion, int maxCount)
{
- return _cache.ReadRecords(streamName, startingFrom, maxCount);
+ return _cache.ReadRecords(streamName, afterVersion, maxCount);
}
- public IEnumerable<DataWithKey> ReadRecords(long startingFrom, int maxCount)
+ public IEnumerable<DataWithKey> ReadRecords(long afterVersion, int maxCount)
{
- return _cache.ReadRecords(startingFrom, maxCount);
-
+ return _cache.ReadRecords(afterVersion, maxCount);
+
}
bool _closed;
@@ -153,20 +153,20 @@ public void Close()
{
_currentWriter = null;
_closed = true;
-
+
}
}
public void ResetStore()
{
Close();
-
+
_cache.Clear(() =>
- {
- Directory.Delete(_info.FullName, true);
- _storeVersion = 0;
- });
+ {
+ Directory.Delete(_info.FullName, true);
+ _storeVersion = 0;
+ });
Initialize();
}

0 comments on commit f8ee245

Please sign in to comment.