Skip to content
This repository has been archived by the owner on Feb 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #32 from imranmomin/develop
Browse files Browse the repository at this point in the history
3.0.0-beta
  • Loading branch information
imranmomin committed Dec 25, 2018
2 parents 1e4c7d5 + bbb3883 commit 358f3eb
Show file tree
Hide file tree
Showing 54 changed files with 2,235 additions and 1,001 deletions.
10 changes: 10 additions & 0 deletions Hangfire.AzureDocumentDB.sln.DotSettings
@@ -0,0 +1,10 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateStaticReadonly/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /&gt;</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=azuredocumentdb/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=documentdb/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Doucment/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Enqueued/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=hangfire/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=hashs/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=storedprocedure/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Upsert/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
14 changes: 0 additions & 14 deletions Hangfire.AzureDocumentDB/App.config

This file was deleted.

111 changes: 73 additions & 38 deletions Hangfire.AzureDocumentDB/CountersAggregator.cs
Expand Up @@ -10,6 +10,7 @@
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;

using Hangfire.Azure.Helper;
using Hangfire.Azure.Documents;

namespace Hangfire.Azure
Expand All @@ -18,77 +19,111 @@ namespace Hangfire.Azure
internal class CountersAggregator : IServerComponent
#pragma warning restore 618
{
private static readonly ILog logger = LogProvider.For<CountersAggregator>();
private const string DISTRIBUTED_LOCK_KEY = "countersaggragator";
private static readonly TimeSpan defaultLockTimeout = TimeSpan.FromMinutes(2);
private readonly ILog logger = LogProvider.For<CountersAggregator>();
private const string DISTRIBUTED_LOCK_KEY = "locks:counters:aggragator";
private readonly TimeSpan defaultLockTimeout;
private readonly DocumentDbStorage storage;
private readonly FeedOptions queryOptions = new FeedOptions { MaxItemCount = 1000 };
private readonly Uri spDeleteDocumentsUri;

public CountersAggregator(DocumentDbStorage storage) => this.storage = storage ?? throw new ArgumentNullException(nameof(storage));
public CountersAggregator(DocumentDbStorage storage)
{
this.storage = storage ?? throw new ArgumentNullException(nameof(storage));
defaultLockTimeout = TimeSpan.FromSeconds(30) + storage.Options.CountersAggregateInterval;
spDeleteDocumentsUri = UriFactory.CreateStoredProcedureUri(storage.Options.DatabaseName, storage.Options.CollectionName, "deleteDocuments");
}

public void Execute(CancellationToken cancellationToken)
{
logger.Debug("Aggregating records in 'Counter' table.");

using (new DocumentDbDistributedLock(DISTRIBUTED_LOCK_KEY, defaultLockTimeout, storage))
{
List<Counter> rawCounters = storage.Client.CreateDocumentQuery<Counter>(storage.CollectionUri, queryOptions)
.Where(c => c.Type == CounterTypes.Raw && c.DocumentType == DocumentTypes.Counter)
.AsEnumerable()
.ToList();
logger.Trace("Aggregating records in 'Counter' table.");

Dictionary<string, (int Sum, DateTime? ExpireOn)> counters = rawCounters.GroupBy(c => c.Key)
.ToDictionary(k => k.Key, v => (Sum: v.Sum(c => c.Value), ExpireOn: v.Max(c => c.ExpireOn)));
List<Counter> rawCounters = storage.Client.CreateDocumentQuery<Counter>(storage.CollectionUri)
.Where(c => c.DocumentType == DocumentTypes.Counter && c.Type == CounterTypes.Raw)
.ToQueryResult();

Array.ForEach(counters.Keys.ToArray(), key =>
Dictionary<string, (int Value, DateTime? ExpireOn, List<Counter> Counters)> counters = rawCounters.GroupBy(c => c.Key)
.ToDictionary(k => k.Key, v => (Value: v.Sum(c => c.Value), ExpireOn: v.Max(c => c.ExpireOn), Counters: v.ToList()));

foreach (string key in counters.Keys)
{
cancellationToken.ThrowIfCancellationRequested();

if (counters.TryGetValue(key, out var data))
{
Counter aggregated = storage.Client.CreateDocumentQuery<Counter>(storage.CollectionUri, queryOptions)
.Where(c => c.Type == CounterTypes.Aggregrate && c.DocumentType == DocumentTypes.Counter && c.Key == key)
.AsEnumerable()
.FirstOrDefault();
Counter aggregated;
string id = $"{key}:{CounterTypes.Aggregate}".GenerateHash();
Uri uri = UriFactory.CreateDocumentUri(storage.Options.DatabaseName, storage.Options.CollectionName, id);

if (aggregated == null)
try
{
aggregated = new Counter
Task<DocumentResponse<Counter>> readTask = storage.Client.ReadDocumentWithRetriesAsync<Counter>(uri, cancellationToken: cancellationToken);
readTask.Wait(cancellationToken);

if (readTask.Result.StatusCode == HttpStatusCode.OK)
{
Key = key,
Type = CounterTypes.Aggregrate,
Value = data.Sum,
ExpireOn = data.ExpireOn
};
aggregated = readTask.Result;
aggregated.Value += data.Value;
aggregated.ExpireOn = new[] { aggregated.ExpireOn, data.ExpireOn }.Max();
}
else
{
logger.Warn($"Document with ID: {id} is a {readTask.Result.Document.Type.ToString()} type which could not be aggregated");
continue;
}
}
else
catch (AggregateException ex) when (ex.InnerException is DocumentClientException clientException)
{
aggregated.Value += data.Sum;
aggregated.ExpireOn = data.ExpireOn;
if (clientException.StatusCode == HttpStatusCode.NotFound)
{
aggregated = new Counter
{
Id = id,
Key = key,
Type = CounterTypes.Aggregate,
Value = data.Value,
ExpireOn = data.ExpireOn
};
}
else
{
logger.ErrorException("Error while reading document", ex.InnerException);
continue;
}
}

Task<ResourceResponse<Document>> task = storage.Client.UpsertDocumentAsync(storage.CollectionUri, aggregated);
Task<ResourceResponse<Document>> task = storage.Client.UpsertDocumentWithRetriesAsync(storage.CollectionUri, aggregated, cancellationToken: cancellationToken);
Task continueTask = task.ContinueWith(t =>
{
if (t.Result.StatusCode == HttpStatusCode.Created || t.Result.StatusCode == HttpStatusCode.OK)
{
string[] deleteCounterIds = rawCounters.Where(c => c.Key == key).Select(c => c.Id).ToArray();
Array.ForEach(deleteCounterIds, id =>
int deleted = 0;
ProcedureResponse response;
string ids = string.Join(",", data.Counters.Select(c => $"'{c.Id}'").ToArray());
string sql = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Counter} AND doc.counter_type = {(int)CounterTypes.Raw} AND doc.id IN ({ids})";
do
{
Uri uri = UriFactory.CreateDocumentUri(storage.Options.DatabaseName, storage.Options.CollectionName, id);
storage.Client.DeleteDocumentAsync(uri).Wait(cancellationToken);
});
logger.Trace($"Total {deleteCounterIds.Length} records from the 'Counter:{aggregated.Key}' were aggregated.");
Task<StoredProcedureResponse<ProcedureResponse>> procedureTask = storage.Client.ExecuteStoredProcedureWithRetriesAsync<ProcedureResponse>(spDeleteDocumentsUri, sql);
procedureTask.Wait(cancellationToken);
response = procedureTask.Result;
deleted += response.Affected;
} while (response.Continuation); // if the continuation is true; run the procedure again
logger.Trace($"Total {deleted} records from the 'Counter:{aggregated.Key}' were aggregated.");
}
}, cancellationToken, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Current);

continueTask.Wait(cancellationToken);
}
});
}

logger.Trace("Records from the 'Counter' table aggregated.");
}

logger.Trace("Records from the 'Counter' table aggregated.");
cancellationToken.WaitHandle.WaitOne(storage.Options.CountersAggregateInterval);
}

Expand Down

0 comments on commit 358f3eb

Please sign in to comment.