Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Use SSCAN for DeleteAll by default to avoid pulling large sets into memory. #257

Merged
merged 3 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions src/ServiceStack.Redis/Generic/RedisTypedClient.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,25 @@ async Task IEntityStoreAsync<T>.DeleteByIdsAsync(IEnumerable ids, CancellationTo

async Task IEntityStoreAsync<T>.DeleteAllAsync(CancellationToken token)
{
var ids = await AsyncClient.GetAllItemsFromSetAsync(this.TypeIdsSetKey, token).ConfigureAwait(false);
var urnKeys = ids.Map(t => client.UrnKey<T>(t));
if (urnKeys.Count > 0)
await DeleteAllAsync(0,RedisConfig.DeleteAllBatchSize, token).ConfigureAwait(false);
}

private async Task DeleteAllAsync(ulong cursor, int pageSize, CancellationToken token)
{
var callCount = 0;
while (cursor != 0 || callCount == 0)
{
await AsyncClient.RemoveEntryAsync(urnKeys.ToArray(), token).ConfigureAwait(false);
await AsyncClient.RemoveEntryAsync(new[] { this.TypeIdsSetKey }, token).ConfigureAwait(false);
var scanResult = await AsyncNative.SScanAsync(this.TypeIdsSetKey, cursor, pageSize, token: token).ConfigureAwait(false);
callCount++;
cursor = scanResult.Cursor;
var ids = scanResult.Results.Select(x => Encoding.UTF8.GetString(x)).ToList();
var urnKeys = ids.Map(t => client.UrnKey<T>(t));
if (urnKeys.Count > 0)
{
await AsyncClient.RemoveEntryAsync(urnKeys.ToArray(), token).ConfigureAwait(false);
}
}
await AsyncClient.RemoveEntryAsync(new[] { this.TypeIdsSetKey }, token).ConfigureAwait(false);
}

async ValueTask<List<T>> IRedisTypedClientAsync<T>.GetValuesAsync(List<string> keys, CancellationToken token)
Expand Down
26 changes: 19 additions & 7 deletions src/ServiceStack.Redis/Generic/RedisTypedClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -466,16 +466,28 @@ public void DeleteByIds(IEnumerable ids)
}
}

public void DeleteAll()
private void DeleteAll(ulong cursor, int pageSize)
{
var ids = client.GetAllItemsFromSet(this.TypeIdsSetKey);
var urnKeys = ids.Map(t => client.UrnKey<T>(t));
if (urnKeys.Count > 0)
var callCount = 0;
while (cursor != 0 || callCount == 0)
{

this.RemoveEntry(urnKeys.ToArray());
this.RemoveEntry(this.TypeIdsSetKey);
var scanResult = client.SScan(this.TypeIdsSetKey, cursor, pageSize);
callCount++;
cursor = scanResult.Cursor;
var ids = scanResult.Results.Select(x => Encoding.UTF8.GetString(x)).ToList();
var urnKeys = ids.Map(t => client.UrnKey<T>(t));
if (urnKeys.Count > 0)
{
this.RemoveEntry(urnKeys.ToArray());
}
}

this.RemoveEntry(this.TypeIdsSetKey);
}

public void DeleteAll()
{
DeleteAll(0,RedisConfig.DeleteAllBatchSize);
}

#endregion
Expand Down
22 changes: 17 additions & 5 deletions src/ServiceStack.Redis/RedisClient.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -658,15 +658,27 @@ async Task IEntityStoreAsync.DeleteByIdsAsync<T>(ICollection ids, CancellationTo
}

async Task IEntityStoreAsync.DeleteAllAsync<T>(CancellationToken token)
{
await DeleteAllAsync<T>(0, RedisConfig.DeleteAllBatchSize, token).ConfigureAwait(false);
}

private async Task DeleteAllAsync<T>(ulong cursor, int pageSize, CancellationToken token)
{
var typeIdsSetKey = this.GetTypeIdsSetKey<T>();
var ids = await AsAsync().GetAllItemsFromSetAsync(typeIdsSetKey, token).ConfigureAwait(false);
if (ids.Count > 0)
var callCount = 0;
while (cursor != 0 || callCount == 0)
{
var urnKeys = ids.ToList().ConvertAll(UrnKey<T>);
await AsAsync().RemoveEntryAsync(urnKeys.ToArray(), token).ConfigureAwait(false);
await AsAsync().RemoveAsync(typeIdsSetKey, token).ConfigureAwait(false);
var scanResult = await NativeAsync.SScanAsync(typeIdsSetKey, cursor, pageSize, token: token).ConfigureAwait(false);
callCount++;
cursor = scanResult.Cursor;
var ids = scanResult.Results.Select(x => x.FromUtf8Bytes());
var urnKeys = ids.Map(t => AsAsync().UrnKey<T>(t));
if (urnKeys.Count > 0)
{
await AsAsync().RemoveEntryAsync(urnKeys.ToArray(), token).ConfigureAwait(false);
}
}
await AsAsync().RemoveEntryAsync(new[] { typeIdsSetKey }, token).ConfigureAwait(false);
}

ValueTask<List<string>> IRedisClientAsync.SearchSortedSetAsync(string setId, string start, string end, int? skip, int? take, CancellationToken token)
Expand Down
23 changes: 18 additions & 5 deletions src/ServiceStack.Redis/RedisClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -830,15 +830,28 @@ public void DeleteByIds<T>(ICollection ids)
}

public void DeleteAll<T>()
{
DeleteAll<T>(0,RedisConfig.DeleteAllBatchSize);
}

private void DeleteAll<T>(ulong cursor, int pageSize)
{
var typeIdsSetKey = this.GetTypeIdsSetKey<T>();
var ids = this.GetAllItemsFromSet(typeIdsSetKey);
if (ids.Count > 0)
var callCount = 0;
while (cursor != 0 || callCount == 0)
{
var urnKeys = ids.ToList().ConvertAll(UrnKey<T>);
this.RemoveEntry(urnKeys.ToArray());
this.Remove(typeIdsSetKey);
var scanResult = this.SScan(typeIdsSetKey, cursor, pageSize);
callCount++;
cursor = scanResult.Cursor;
var ids = scanResult.Results.Select(x => x.FromUtf8Bytes());
var urnKeys = ids.Map(t => this.UrnKey(t));
if (urnKeys.Count > 0)
{
this.RemoveEntry(urnKeys.ToArray());
}
}

this.RemoveEntry(typeIdsSetKey);
}

public RedisClient CloneClient()
Expand Down
5 changes: 5 additions & 0 deletions src/ServiceStack.Redis/RedisConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public class RedisConfig
/// </summary>
public static int BufferPoolMaxSize = 500000;

/// <summary>
/// The DeleteAll Batch Size is the number of keys returned each SSCAN when using DeleteAll on the RedisTypedClient.
/// </summary>
public static int DeleteAllBatchSize = 1000;

/// <summary>
/// Whether Connections to Master hosts should be verified they're still master instances (default true)
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,46 @@ public async Task Can_Delete_All_Items()
Assert.That(await RedisTyped.GetByIdAsync("key"), Is.Null);

}

[Test]
public async Task Can_Delete_All_Items_multiple_batches()
{
// Clear previous usage
await RedisAsync.DeleteAsync(RedisRaw.GetTypeIdsSetKey<CacheRecord>());
var cachedRecord = new CacheRecord
{
Id = "key",
Children = {
new CacheRecordChild { Id = "childKey", Data = "data" }
}
};

var exists = RedisRaw.Exists(RedisRaw.GetTypeIdsSetKey(typeof(CacheRecord)));
Assert.That(exists, Is.EqualTo(0));

await RedisTyped.StoreAsync(cachedRecord);

exists = RedisRaw.Exists(RedisRaw.GetTypeIdsSetKey(typeof(CacheRecord)));
Assert.That(exists, Is.EqualTo(1));

RedisConfig.DeleteAllBatchSize = 5;

for (int i = 0; i < 50; i++)
{
cachedRecord.Id = "key" + i;
await RedisTyped.StoreAsync(cachedRecord);
}

Assert.That(await RedisTyped.GetByIdAsync("key"), Is.Not.Null);

await RedisTyped.DeleteAllAsync();

Assert.That(await RedisTyped.GetByIdAsync("key"), Is.Null);

exists = RedisRaw.Exists(RedisRaw.GetTypeIdsSetKey(typeof(CacheRecord)));
Assert.That(exists, Is.EqualTo(0));

}
}

}
38 changes: 38 additions & 0 deletions tests/ServiceStack.Redis.Tests/Generic/RedisTypedClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,45 @@ public void Can_Delete_All_Items()
RedisTyped.DeleteAll();

Assert.That(RedisTyped.GetById("key"), Is.Null);
}

[Test]
public void Can_Delete_All_Items_multiple_batches()
{
// Clear previous usage
Redis.Delete(Redis.GetTypeIdsSetKey(typeof(CacheRecord)));
var cachedRecord = new CacheRecord
{
Id = "key",
Children = {
new CacheRecordChild { Id = "childKey", Data = "data" }
}
};

var exists = Redis.Exists(Redis.GetTypeIdsSetKey(typeof(CacheRecord)));
Assert.That(exists, Is.EqualTo(0));

RedisTyped.Store(cachedRecord);

exists = Redis.Exists(Redis.GetTypeIdsSetKey(typeof(CacheRecord)));

Assert.That(exists, Is.EqualTo(1));

RedisConfig.DeleteAllBatchSize = 5;

for (int i = 0; i < 50; i++)
{
cachedRecord.Id = "key" + i;
RedisTyped.Store(cachedRecord);
}

Assert.That(RedisTyped.GetById("key"), Is.Not.Null);

RedisTyped.DeleteAll();

exists = Redis.Exists(Redis.GetTypeIdsSetKey(typeof(CacheRecord)));
Assert.That(exists, Is.EqualTo(0));
Assert.That(RedisTyped.GetById("key"), Is.Null);
}
}

Expand Down