Skip to content

Commit

Permalink
+Distinct
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Nov 2, 2018
1 parent 66fce5a commit 9f92c3f
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ finally
- `Collect` - collect items into a custom collection and emit the collection at the end
- `ConcatMap` - concatenate in order the inner async sequences mapped from the main sequence
- `ConcatWith` - concatenate in order with another async sequence
- `Distinct` - makes sure only distinct elements get relayed
- `DistinctUntilChanged` - relays an element only if it is distinct from the previous item
- `Debounce` - wait a bit after each item and emit them if no newer item arrived from the source
- `DefaultIfEmpty` - return a fallback value if the source async sequence turns out to be empty
- `DoOnNext` - execute an action when an item becomes available
Expand Down
1 change: 1 addition & 0 deletions async-enumerable-dotnet-test/AsyncEnumerableTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ static AsyncEnumerableTest()
Defaults.Add(typeof(Func<long, Exception, bool>), (Func<long, Exception, bool>)((v, w) => false));
Defaults.Add(typeof(Func<long, Exception, Task<bool>>), (Func<long, Exception, Task<bool>>)((v, w) => Task.FromResult(false)));
Defaults.Add(typeof(Func<long, Task<bool>>), (Func<long, Task<bool>>)(v => Task.FromResult(false)));
Defaults.Add(typeof(Func<ISet<int>>), (Func<ISet<int>>)(() => null));

Defaults.Add(typeof(Func<IAsyncEnumerable<int>, IAsyncEnumerable<int>>), (Func<IAsyncEnumerable<int>, IAsyncEnumerable<int>>)(w => w));

Expand Down
69 changes: 69 additions & 0 deletions async-enumerable-dotnet-test/DistinctTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) David Karnok & Contributors.
// Licensed under the Apache 2.0 License.
// See LICENSE file in the project root for full license information.

using Xunit;
using async_enumerable_dotnet;
using System.Collections.Generic;

namespace async_enumerable_dotnet_test
{
public class DistinctTest
{
[Fact]
public async void Empty()
{
await AsyncEnumerable.Empty<int>()
.Distinct()
.AssertResult();
}

[Fact]
public async void Normal()
{
await AsyncEnumerable.Range(1, 5)
.Distinct()
.AssertResult(1, 2, 3, 4, 5);
}

[Fact]
public async void Redundant()
{
await AsyncEnumerable.FromArray(1, 2, 3, 2, 1, 4, 5, 1, 5)
.Distinct()
.AssertResult(1, 2, 3, 4, 5);
}

[Fact]
public async void KeySelector()
{
await AsyncEnumerable.Range(1, 5)
.Distinct(v => v % 3)
.AssertResult(1, 2, 3);
}

[Fact]
public async void EqualityComparer()
{
await AsyncEnumerable.Range(1, 5)
.Distinct(EqualityComparer<int>.Default)
.AssertResult(1, 2, 3, 4, 5);
}

[Fact]
public async void KeySelector_EqualityComparer()
{
await AsyncEnumerable.Range(1, 5)
.Distinct(v => v % 3, EqualityComparer<long>.Default)
.AssertResult(1, 2, 3);
}

[Fact]
public async void Custom_Set()
{
await AsyncEnumerable.Range(1, 5)
.Distinct(v => (v % 3), () => new HashSet<long>())
.AssertResult(1, 2, 3);
}
}
}
80 changes: 80 additions & 0 deletions async-enumerable-dotnet/AsyncEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1616,5 +1616,85 @@ public static IAsyncEnumerable<TSource> Merge<TSource>(this IAsyncEnumerable<IAs
return new BufferBoundaryExact<TSource, TOther, TCollection>(source, boundary, collectionSupplier, maxSize);
}

/// <summary>
/// Checks if the source items have been seen before by checking if
/// they are already successfully added to a HashSet or not, ensuring
/// only distinct source items pass through.
/// </summary>
/// <typeparam name="TSource">The element type of the source and result.</typeparam>
/// <param name="source">The source to have distinct items only of.</param>
/// <returns>The new IAsyncEnumerable sequence.</returns>
public static IAsyncEnumerable<TSource> Distinct<TSource>(this IAsyncEnumerable<TSource> source)
{
return Distinct(source, v => v, () => new HashSet<TSource>());
}

/// <summary>
/// Checks if the source items have been seen before by checking if
/// they are already successfully added to a HashSet or not, ensuring
/// only distinct source items pass through.
/// </summary>
/// <typeparam name="TSource">The element type of the source and result.</typeparam>
/// <param name="source">The source to have distinct items only of.</param>
/// <param name="comparer">The comparer for comparing items in the HashSet</param>
/// <returns>The new IAsyncEnumerable sequence.</returns>
public static IAsyncEnumerable<TSource> Distinct<TSource>(this IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
{
RequireNonNull(comparer, nameof(comparer));
return Distinct(source, v => v, () => new HashSet<TSource>(comparer));
}

/// <summary>
/// Checks if the source items have been seen before by checking if
/// a key extracted from such items is in a HashSet or not, ensuring
/// only distinct source items pass through.
/// </summary>
/// <typeparam name="TSource">The element type of the source and result.</typeparam>
/// <typeparam name="TKey">The key type used for comparing items.</typeparam>
/// <param name="source">The source to have distinct items only of.</param>
/// <param name="keySelector">The function that receives the source item and should return a key value for distinct comparison.</param>
/// <returns>The new IAsyncEnumerable sequence.</returns>
public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector)
{
return Distinct(source, keySelector, () => new HashSet<TKey>());
}

/// <summary>
/// Checks if the source items have been seen before by checking if
/// a key extracted from such items is in a HashSet or not, ensuring
/// only distinct source items pass through.
/// </summary>
/// <typeparam name="TSource">The element type of the source and result.</typeparam>
/// <typeparam name="TKey">The key type used for comparing items.</typeparam>
/// <param name="source">The source to have distinct items only of.</param>
/// <param name="keySelector">The function that receives the source item and should return a key value for distinct comparison.</param>
/// <param name="keyComparer">The comparer for comparing keys in the HashSet</param>
/// <returns>The new IAsyncEnumerable sequence.</returns>
public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> keyComparer)
{
RequireNonNull(keyComparer, nameof(keyComparer));
return Distinct(source, keySelector, () => new HashSet<TKey>(keyComparer));
}

/// <summary>
/// Checks if the source items have been seen before by checking
/// a key extracted from such items agains a set of keys and
/// drops such items, ensuring that only distinct source items pass
/// through.
/// </summary>
/// <typeparam name="TSource">The element type of the source and result.</typeparam>
/// <typeparam name="TKey">The key type used for comparing items.</typeparam>
/// <param name="source">The source to have distinct items only of.</param>
/// <param name="keySelector">The function that receives the source item and should return a key value for distinct comparison.</param>
/// <param name="setSupplier">The function that should provide a set implementation whose <see cref="ISet{T}.Add(T)"/> method's return
/// value decided is the source item should pass or not.</param>
/// <returns>The new IAsyncEnumerable sequence</returns>
public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<ISet<TKey>> setSupplier)
{
RequireNonNull(source, nameof(source));
RequireNonNull(keySelector, nameof(keySelector));
RequireNonNull(setSupplier, nameof(setSupplier));
return new Distinct<TSource, TKey>(source, keySelector, setSupplier);
}
}
}
83 changes: 83 additions & 0 deletions async-enumerable-dotnet/impl/Distinct.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) David Karnok & Contributors.
// Licensed under the Apache 2.0 License.
// See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace async_enumerable_dotnet.impl
{
internal sealed class Distinct<TSource, TKey> : IAsyncEnumerable<TSource>
{
private readonly IAsyncEnumerable<TSource> _source;

private readonly Func<TSource, TKey> _keySelector;

private readonly Func<ISet<TKey>> _collectionSupplier;

public Distinct(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<ISet<TKey>> collectionSupplier)
{
_source = source;
_keySelector = keySelector;
_collectionSupplier = collectionSupplier;
}

public IAsyncEnumerator<TSource> GetAsyncEnumerator()
{
ISet<TKey> collection;

try
{
collection = _collectionSupplier();
}
catch (Exception ex)
{
return new Error<TSource>.ErrorEnumerator(ex);
}
return new DistinctEnumerator(_source.GetAsyncEnumerator(), _keySelector, collection);
}

private sealed class DistinctEnumerator : IAsyncEnumerator<TSource>
{
private readonly IAsyncEnumerator<TSource> _source;

private readonly Func<TSource, TKey> _keySelector;

private ISet<TKey> _collection;

public TSource Current => _source.Current;

public DistinctEnumerator(IAsyncEnumerator<TSource> source, Func<TSource, TKey> keySelector, ISet<TKey> collection)
{
_source = source;
_keySelector = keySelector;
_collection = collection;
}

public ValueTask DisposeAsync()
{
_collection = default;
return _source.DisposeAsync();
}

public async ValueTask<bool> MoveNextAsync()
{
for (; ; )
{
if (await _source.MoveNextAsync())
{
if (_collection.Add(_keySelector(_source.Current)))
{
return true;
}
}
else
{
return false;
}
}
}
}
}
}

0 comments on commit 9f92c3f

Please sign in to comment.