Skip to content

Commit

Permalink
Cherry-pick to Release/1.1: Fix edgehub queue len metric (#4952) (#4990)
Browse files Browse the repository at this point in the history
cherry-pick: #4952
  • Loading branch information
nyanzebra committed May 14, 2021
1 parent b18d090 commit 4aab90b
Show file tree
Hide file tree
Showing 13 changed files with 707 additions and 6 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public static void CommitStarted(Checkpointer checkpointer, int successfulCount,

public static void CommitFinished(Checkpointer checkpointer)
{
Log.LogInformation((int)EventIds.CommitFinished, "[CheckpointerCommitFinishedo] {context}", GetContextString(checkpointer));
Log.LogInformation((int)EventIds.CommitFinished, "[CheckpointerCommitFinished] {context}", GetContextString(checkpointer));
}

public static void Close(Checkpointer checkpointer)
Expand All @@ -203,7 +203,13 @@ public static class Metrics
"Number of messages pending to be processed for the endpoint",
new List<string> { "endpoint", "priority", MetricsConstants.MsTelemetry });

public static void SetQueueLength(Checkpointer checkpointer) => QueueLength.Set(checkpointer.Proposed - checkpointer.Offset, new[] { checkpointer.EndpointId, checkpointer.Priority, bool.TrueString });
public static void SetQueueLength(Checkpointer checkpointer) => SetQueueLength(CalculateQueueLength(checkpointer), checkpointer.EndpointId, checkpointer.Priority);

public static void SetQueueLength(double length, string endpointId, string priority) => QueueLength.Set(length, new[] { endpointId, priority, bool.TrueString });

private static double CalculateQueueLength(Checkpointer checkpointer) => CalculateQueueLength(checkpointer.Proposed - checkpointer.Offset);

private static double CalculateQueueLength(long length) => Math.Max(length, 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,20 @@ namespace Microsoft.Azure.Devices.Edge.Storage.RocksDb
class ColumnFamilyDbStore : IDbStore
{
readonly IRocksDb db;
private ulong count;

public ColumnFamilyDbStore(IRocksDb db, ColumnFamilyHandle handle)
{
this.db = Preconditions.CheckNotNull(db, nameof(db));
this.Handle = Preconditions.CheckNotNull(handle, nameof(handle));

var iterator = db.NewIterator(this.Handle);
this.count = 0;
while (iterator.Valid())
{
this.count += 1;
iterator = iterator.Next();
}
}

internal ColumnFamilyHandle Handle { get; }
Expand Down Expand Up @@ -49,20 +58,23 @@ public async Task<Option<byte[]>> Get(byte[] key, CancellationToken cancellation
return returnValue;
}

public Task Put(byte[] key, byte[] value, CancellationToken cancellationToken)
public async Task Put(byte[] key, byte[] value, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(key, nameof(key));
Preconditions.CheckNotNull(value, nameof(value));

Action operation = () => this.db.Put(key, value, this.Handle);
return operation.ExecuteUntilCancelled(cancellationToken);
await operation.ExecuteUntilCancelled(cancellationToken);
this.count += 1;
}

public Task Remove(byte[] key, CancellationToken cancellationToken)
public async Task Remove(byte[] key, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(key, nameof(key));

Action operation = () => this.db.Remove(key, this.Handle);
return operation.ExecuteUntilCancelled(cancellationToken);
await operation.ExecuteUntilCancelled(cancellationToken);
this.count -= 1;
}

public async Task<Option<(byte[] key, byte[] value)>> GetLastEntry(CancellationToken cancellationToken)
Expand Down Expand Up @@ -128,6 +140,8 @@ public Task IterateBatch(int batchSize, Func<byte[], byte[], Task> callback, Can
return this.IterateBatch(iterator => iterator.SeekToFirst(), batchSize, callback, cancellationToken);
}

public Task<ulong> Count() => Task.FromResult(this.count);

public void Dispose()
{
this.Dispose(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,7 @@ public Task Remove(byte[] key, CancellationToken cancellationToken)
{
return this.dbStore.Remove(key, cancellationToken);
}

public Task<ulong> Count() => this.dbStore.Count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> perEntit
cancellationToken);
}

public Task<ulong> Count() => this.entityStore.Count();

public void Dispose()
{
this.Dispose(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ public Task IterateBatch(int batchSize, Func<TK, TV, Task> callback, Cancellatio
public Task<bool> Contains(TK key, CancellationToken cancellationToken)
=> this.dbStore.Contains(key, cancellationToken);

public Task<ulong> Count() => this.dbStore.Count();

public void Dispose()
{
this.Dispose(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,7 @@ public interface IKeyValueStore<TK, TV> : IDisposable
Task IterateBatch(int batchSize, Func<TK, TV, Task> perEntityCallback, CancellationToken cancellationToken);

Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> perEntityCallback, CancellationToken cancellationToken);

Task<ulong> Count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ public interface ISequentialStore<T> : IDisposable
Task<bool> RemoveFirst(Func<long, T, Task<bool>> predicate, CancellationToken cancellationToken);

Task<IEnumerable<(long, T)>> GetBatch(long startingOffset, int batchSize, CancellationToken cancellationToken);

Task<ulong> Count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public async Task Remove(byte[] key, CancellationToken cancellationToken)
}
}

public Task<ulong> Count() => Task.FromResult((ulong)this.keyValues.Count);

public void Dispose()
{
// No-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> callback
public Task IterateBatch(int batchSize, Func<TK, TV, Task> callback, CancellationToken cancellationToken)
=> this.IterateBatch(Option.None<TK>(), batchSize, callback, cancellationToken);

public Task<ulong> Count() => this.underlyingStore.Count();

Task IterateBatch(Option<TK> startKey, int batchSize, Func<TK, TV, Task> callback, CancellationToken cancellationToken)
{
Preconditions.CheckRange(batchSize, 1, nameof(batchSize));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,7 @@ public void Dispose()
public Task IterateBatch(int batchSize, Func<TK, TV, Task> perEntityCallback, CancellationToken cancellationToken) => Task.CompletedTask;

public Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> perEntityCallback, CancellationToken cancellationToken) => Task.CompletedTask;

public Task<ulong> Count() => Task.FromResult(0UL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ public async Task<IEnumerable<(long, T)>> GetBatch(long startingOffset, int batc
return batch;
}

public Task<ulong> Count() => this.entityStore.Count();

public void Dispose()
{
this.Dispose(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,7 @@ public Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> perEntit
Func<CancellationToken, Task> iterateWithTimeout = cts => this.underlyingKeyValueStore.IterateBatch(startKey, batchSize, perEntityCallback, cts);
return iterateWithTimeout.TimeoutAfter(cancellationToken, this.timeout);
}

public Task<ulong> Count() => this.underlyingKeyValueStore.Count();
}
}

0 comments on commit 4aab90b

Please sign in to comment.