Skip to content

Commit

Permalink
Harden Option<T> by disallowing null value (#6426)
Browse files Browse the repository at this point in the history
* Harden `Option<T>` by disallowing null value

* Update API Verify list

* Make Create method aggresively inlined
  • Loading branch information
Arkatufus committed Feb 21, 2023
1 parent 9779a37 commit e5d8c30
Show file tree
Hide file tree
Showing 21 changed files with 68 additions and 54 deletions.
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Expand Up @@ -965,7 +965,7 @@ async Task ResolveEntityRef(Address destinationAddress, ActorPath entityPath)
{
var entityRef = await Context.ActorSelection(entityPath).ResolveOne(getEntityLocation.Timeout);
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, destinationAddress,
new Option<IActorRef>(entityRef)));
Option<IActorRef>.Create(entityRef)));
}
catch (ActorNotFoundException ex)
{
Expand Down
Expand Up @@ -5102,10 +5102,12 @@ namespace Akka.Util
public struct Option<T>
{
public static readonly Akka.Util.Option<T> None;
[System.ObsoleteAttribute("Use Option<T>.Create() instead")]
public Option(T value) { }
public bool HasValue { get; }
public bool IsEmpty { get; }
public T Value { get; }
public static Akka.Util.Option<T> Create(T value) { }
public bool Equals(Akka.Util.Option<T> other) { }
public override bool Equals(object obj) { }
public Akka.Util.Option<TNew> FlatSelect<TNew>(System.Func<T, Akka.Util.Option<TNew>> mapper) { }
Expand Down
Expand Up @@ -5110,10 +5110,12 @@ namespace Akka.Util
public struct Option<T>
{
public static readonly Akka.Util.Option<T> None;
[System.ObsoleteAttribute("Use Option<T>.Create() instead")]
public Option(T value) { }
public bool HasValue { get; }
public bool IsEmpty { get; }
public T Value { get; }
public static Akka.Util.Option<T> Create(T value) { }
public bool Equals(Akka.Util.Option<T> other) { }
public override bool Equals(object obj) { }
public Akka.Util.Option<TNew> FlatSelect<TNew>(System.Func<T, Akka.Util.Option<TNew>> mapper) { }
Expand Down
Expand Up @@ -5102,10 +5102,12 @@ namespace Akka.Util
public struct Option<T>
{
public static readonly Akka.Util.Option<T> None;
[System.ObsoleteAttribute("Use Option<T>.Create() instead")]
public Option(T value) { }
public bool HasValue { get; }
public bool IsEmpty { get; }
public T Value { get; }
public static Akka.Util.Option<T> Create(T value) { }
public bool Equals(Akka.Util.Option<T> other) { }
public override bool Equals(object obj) { }
public Akka.Util.Option<TNew> FlatSelect<TNew>(System.Func<T, Akka.Util.Option<TNew>> mapper) { }
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs
Expand Up @@ -813,7 +813,7 @@ public Option<IActorRef> ClusterResultAggregator()
{
Sys.ActorSelection(new RootActorPath(GetAddress(Roles.First())) / "user" / ("result" + Step))
.Tell(new Identify(Step), IdentifyProbe.Ref);
return new Option<IActorRef>(IdentifyProbe.ExpectMsg<ActorIdentity>().Subject);
return Option<IActorRef>.Create(IdentifyProbe.ExpectMsg<ActorIdentity>().Subject);
}

public void CreateResultAggregator(string title, int expectedResults, bool includeInHistory)
Expand All @@ -826,7 +826,7 @@ public void CreateResultAggregator(string title, int expectedResults, bool inclu
if (includeInHistory && Settings.Infolog)
{
aggregator.Tell(new ReportTo(new Option<IActorRef>(ClusterResultHistory.Value)));
aggregator.Tell(new ReportTo(Option<IActorRef>.Create(ClusterResultHistory.Value)));
}
else
{
Expand Down Expand Up @@ -1180,7 +1180,7 @@ public void ExerciseJoinRemove(string title, TimeSpan duration)
var sys = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
MuteLog(sys);
Akka.Cluster.Cluster.Get(sys).JoinSeedNodes(SeedNodes.Select(x => GetAddress(x)));
nextAs = new Option<ActorSystem>(sys);
nextAs = Option<ActorSystem>.Create(sys);
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/ClusterHeartbeat.cs
Expand Up @@ -634,7 +634,7 @@ public Option<IImmutableSet<UniqueAddress>> MyReceivers
{
if (_myReceivers.IsEmpty)
{
_myReceivers = new Option<IImmutableSet<UniqueAddress>>(Receivers(SelfAddress));
_myReceivers = Option<IImmutableSet<UniqueAddress>>.Create(Receivers(SelfAddress));
}

return _myReceivers;
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Streams.Tests/Dsl/FlowRecoverSpec.cs
Expand Up @@ -40,7 +40,7 @@ public void A_Recover_must_recover_when_there_is_a_handler()
throw Ex;
return x;
})
.Recover(_ => new Option<int>(0))
.Recover(_ => Option<int>.Create(0))
.RunWith(this.SinkProbe<int>(), Materializer)
.RequestNext(1)
.RequestNext(2)
Expand Down Expand Up @@ -77,7 +77,7 @@ public void A_Recover_must_not_influence_stream_when_there_is_no_exception()
{
Source.From(Enumerable.Range(1, 3))
.Select(x => x)
.Recover(_ => new Option<int>(0))
.Recover(_ => Option<int>.Create(0))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(3)
.ExpectNext(1, 2, 3)
Expand All @@ -92,7 +92,7 @@ public void A_Recover_must_finish_stream_if_it_is_empty()
{
Source.Empty<int>()
.Select(x => x)
.Recover(_ => new Option<int>(0))
.Recover(_ => Option<int>.Create(0))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(1)
.ExpectComplete();
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/LastElementSpec.cs
Expand Up @@ -33,7 +33,7 @@ public void A_stream_via_LastElement_should_materialize_to_the_last_element_emit
.ExpectNext(1, 2, 3)
.ExpectComplete();

lastElement.AwaitResult(TimeSpan.FromSeconds(1)).Should().Be(new Option<int>(3));
lastElement.AwaitResult(TimeSpan.FromSeconds(1)).Should().Be(Option<int>.Create(3));
}

[Fact]
Expand All @@ -58,7 +58,7 @@ public void A_stream_via_LastElement_should_materialize_to_the_last_element_emit
{
var t = Source.UnfoldInfinite(1, n => n >= 3 ? throw new Exception() : (n + 1, n + 1))
.ViaMaterialized(new LastElement<int>(), Keep.Right)
.ToMaterialized(Sink.Aggregate<int, Option<int>>(Option<int>.None, (_, o) => new Option<int>(o)), Keep.Both)
.ToMaterialized(Sink.Aggregate<int, Option<int>>(Option<int>.None, (_, o) => Option<int>.Create(o)), Keep.Both)
.Run(Sys.Materializer());

var lastElement = t.Item1;
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Streams.Tests/Dsl/PagedSourceSpec.cs
Expand Up @@ -37,7 +37,7 @@ public MultiplesOfTwoPage(int? size = null)
var indices = Enumerable.Range(key * _itemsPerPage, _itemsPerPage);
var filteredIndices = _size.HasValue ? indices.Where(x => x < _size.Value) : indices;

return Task.FromResult(new PagedSource.Page<int, int>(filteredIndices.Select(x => x * 2), new Option<int>(key + 1)));
return Task.FromResult(new PagedSource.Page<int, int>(filteredIndices.Select(x => x * 2), Option<int>.Create(key + 1)));
}
}

Expand Down Expand Up @@ -65,7 +65,7 @@ public class IndexedStringPages : Akka.TestKit.Xunit2.TestKit
private readonly Source<string, NotUsed> _source = PagedSource.Create
(
1,
i => Task.FromResult(new PagedSource.Page<string, int>(Page(i), new Option<int>(i + 1)))
i => Task.FromResult(new PagedSource.Page<string, int>(Page(i), Option<int>.Create(i + 1)))
);

private static IEnumerable<string> Page(int key)
Expand Down Expand Up @@ -107,7 +107,7 @@ public class LinkedIntPages : Akka.TestKit.Xunit2.TestKit
var items = t.Item1;
var next = t.Item2;
return Task.FromResult(new PagedSource.Page<int, string>(items, next == "" ? Option<string>.None : new Option<string>(next)));
return Task.FromResult(new PagedSource.Page<int, string>(items, next == "" ? Option<string>.None : Option<string>.Create(next)));
}
);

Expand Down
22 changes: 11 additions & 11 deletions src/core/Akka.Streams.Tests/Dsl/QueueSinkSpec.cs
Expand Up @@ -44,10 +44,10 @@ public void QueueSink_should_send_the_elements_as_result_of_future()
{
var expected = new List<Option<int>>
{
new Option<int>(1),
new Option<int>(2),
new Option<int>(3),
new Option<int>()
Option<int>.Create(1),
Option<int>.Create(2),
Option<int>.Create(3),
Option<int>.None
};
var queue = Source.From(expected.Where(o => o.HasValue).Select(o => o.Value))
.RunWith(Sink.Queue<int>(), _materializer);
Expand All @@ -74,7 +74,7 @@ public void QueueSink_should_allow_to_have_only_one_future_waiting_for_result_in
sub.SendNext(1);
future.PipeTo(TestActor);
ExpectMsg(new Option<int>(1));
ExpectMsg(Option<int>.Create(1));
sub.SendComplete();
queue.PullAsync();
Expand All @@ -94,7 +94,7 @@ public void QueueSink_should_wait_for_next_element_from_upstream()
ExpectNoMsg(_pause);
sub.SendNext(1);
ExpectMsg(new Option<int>(1));
ExpectMsg(Option<int>.Create(1));
sub.SendComplete();
queue.PullAsync();
}, _materializer);
Expand Down Expand Up @@ -146,7 +146,7 @@ public void QueueSink_should_timeout_future_when_stream_cannot_provide_data()
ExpectNoMsg(_pause);
sub.SendNext(1);
ExpectMsg(new Option<int>(1));
ExpectMsg(Option<int>.Create(1));
sub.SendComplete();
queue.PullAsync();
}, _materializer);
Expand All @@ -163,7 +163,7 @@ public void QueueSink_should_fail_pull_future_when_stream_is_completed()
queue.PullAsync().PipeTo(TestActor);
sub.SendNext(1);
ExpectMsg(new Option<int>(1));
ExpectMsg(Option<int>.Create(1));
sub.SendComplete();
var result = queue.PullAsync().Result;
Expand Down Expand Up @@ -195,7 +195,7 @@ public void QueueSink_should_keep_on_sending_even_after_the_buffer_has_been_full
for (var i = 1; i <= streamElementCount; i++)
{
queue.PullAsync().PipeTo(TestActor);
ExpectMsg(new Option<int>(i));
ExpectMsg(Option<int>.Create(i));
}
queue.PullAsync().PipeTo(TestActor);
ExpectMsg(Option<int>.None);
Expand All @@ -214,12 +214,12 @@ public void QueueSink_should_work_with_one_element_buffer()
queue.PullAsync().PipeTo(TestActor);
sub.SendNext(1); // should pull next element
ExpectMsg(new Option<int>(1));
ExpectMsg(Option<int>.Create(1));
queue.PullAsync().PipeTo(TestActor);
ExpectNoMsg(); // element requested but buffer empty
sub.SendNext(2);
ExpectMsg(new Option<int>(2));
ExpectMsg(Option<int>.Create(2));
sub.SendComplete();
var future = queue.PullAsync();
Expand Down
10 changes: 5 additions & 5 deletions src/core/Akka.Streams.Tests/Dsl/UnfoldFlowSpec.cs
Expand Up @@ -59,7 +59,7 @@ public void UnfoldFlow_should_unfold_Collatz_conjecture_with_a_sequence_of_111_e
.Recover(ex =>
{
if (ex == _done)
return new Option<(int, int)>((1, 1));
return Option<(int, int)>.Create((1, 1));
return Option<(int, int)>.None;
}),
Expand Down Expand Up @@ -100,7 +100,7 @@ public void UnfoldFlow_should_unfold_Collatz_conjecture_with_a_sequence_of_111_e
.Recover(ex =>
{
if (ex == _done)
return new Option<(int, int)>((1, 1));
return Option<(int, int)>.Create((1, 1));
return Option<(int, int)>.None;
}), _timeout)
Expand Down Expand Up @@ -287,7 +287,7 @@ public class WithFunction : Akka.TestKit.Xunit2.TestKit
public WithFunction()
{
var controlledFlow = Flow.FromSinkAndSource(this.SinkProbe<int>(), this.SourceProbe<int>(), Keep.Both);
_source = SourceGen.UnfoldFlowWith(1, controlledFlow, n => new Option<(int, int)>((n + 1, n)), _timeout);
_source = SourceGen.UnfoldFlowWith(1, controlledFlow, n => Option<(int, int)>.Create((n + 1, n)), _timeout);
}

[Fact]
Expand All @@ -299,9 +299,9 @@ public void UnfoldFlow_should_unfold_Collatz_conjecture_with_a_sequence_of_111_e
return Option<(int, int)>.None;

if (x % 2 == 0)
return new Option<(int, int)>((x / 2, x));
return Option<(int, int)>.Create((x / 2, x));

return new Option<(int, int)>((x * 3 + 1, x));
return Option<(int, int)>.Create((x * 3 + 1, x));
}

var source = SourceGen.UnfoldFlowWith(27, Flow.FromFunction<int, int>(x => x), Map, _timeout);
Expand Down
12 changes: 6 additions & 6 deletions src/core/Akka.Streams.Tests/Dsl/UnfoldResourceAsyncSourceSpec.cs
Expand Up @@ -240,7 +240,7 @@ public async Task A_UnfoldResourceAsyncSource_must_continue_when_strategy_is_res
switch (next)
{
case int n:
return Task.FromResult(new Option<int>(n));
return Task.FromResult(Option<int>.Create(n));
case TestException e:
throw e;
default:
Expand Down Expand Up @@ -274,7 +274,7 @@ public async Task A_UnfoldResourceAsyncSource_must_continue_when_strategy_is_res
switch (next)
{
case int n:
return Task.FromResult(new Option<int>(n));
return Task.FromResult(Option<int>.Create(n));
case TestException e:
return Task.FromException<Option<int>>(e);
default:
Expand Down Expand Up @@ -316,7 +316,7 @@ public async Task A_UnfoldResourceAsyncSource_must_close_and_open_stream_again_w
}
return reader.MoveNext() && reader.Current != null
? Task.FromResult(new Option<int>((int)reader.Current))
? Task.FromResult(Option<int>.Create((int)reader.Current))
: Task.FromResult(Option<int>.None);
},
_ => Task.FromResult(Done.Instance))
Expand Down Expand Up @@ -352,7 +352,7 @@ public async Task A_UnfoldResourceAsyncSource_must_close_and_open_stream_again_w
}
return reader.MoveNext() && reader.Current != null
? Task.FromResult(new Option<int>((int)reader.Current))
? Task.FromResult(Option<int>.Create((int)reader.Current))
: Task.FromResult(Option<int>.None);
},
_ => Task.FromResult(Done.Instance))
Expand Down Expand Up @@ -490,7 +490,7 @@ public async Task A_UnfoldResourceAsyncSource_must_close_resource_when_stream_is
var p = Source.UnfoldResourceAsync(
() => Task.FromResult(closePromise),
// a slow trickle of elements that never ends
_ => FutureTimeoutSupport.After(TimeSpan.FromMilliseconds(100), Sys.Scheduler, () => Task.FromResult(new Option<string>("element"))),
_ => FutureTimeoutSupport.After(TimeSpan.FromMilliseconds(100), Sys.Scheduler, () => Task.FromResult(Option<string>.Create("element"))),
tcs =>
{
tcs.SetResult("Closed");
Expand All @@ -517,7 +517,7 @@ public async Task A_UnfoldResourceAsyncSource_must_close_resource_when_stream_is
var closePromise = new TaskCompletionSource<string>();
var probe = Source.UnfoldResourceAsync(
() => Task.FromResult(closePromise),
_ => Task.FromResult(new Option<string>("whatever")),
_ => Task.FromResult(Option<string>.Create("whatever")),
tcs =>
{
tcs.SetResult("Closed");
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams/Dsl/AccumulateWhileUnchanged.cs
Expand Up @@ -37,7 +37,7 @@ public Logic(AccumulateWhileUnchanged<TElement, TProperty> accumulateWhileUnchan
var nextState = accumulateWhileUnchanged._propertyExtractor(nextElement);
if (!_currentState.HasValue)
_currentState = new Option<TProperty>(nextState);
_currentState = Option<TProperty>.Create(nextState);
if (EqualityComparer<TProperty>.Default.Equals(_currentState.Value, nextState))
{
Expand All @@ -50,7 +50,7 @@ public Logic(AccumulateWhileUnchanged<TElement, TProperty> accumulateWhileUnchan
_buffer.Clear();
_buffer.Add(nextElement);
Push(accumulateWhileUnchanged.Out, result);
_currentState = new Option<TProperty>(nextState);
_currentState = Option<TProperty>.Create(nextState);
}
}, onUpstreamFinish: () =>
{
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams/Dsl/Internal/GraphImpl.cs
Expand Up @@ -91,7 +91,7 @@ public static class ModuleExtractor
public static Option<IModule> Unapply<TShape, TMat>(IGraph<TShape, TMat> graph) where TShape : Shape
{
var module = graph as IModule;
return module != null ? new Option<IModule>(module) : Option<IModule>.None;
return module != null ? Option<IModule>.Create(module) : Option<IModule>.None;
}
}
}
2 changes: 1 addition & 1 deletion src/core/Akka.Streams/Dsl/LastElement.cs
Expand Up @@ -30,7 +30,7 @@ public Logic(LastElement<T> lastElement, TaskCompletionSource<Option<T>> complet
SetHandler(lastElement.In, onPush: () =>
{
var element = Grab(lastElement.In);
currentElement = new Option<T>(element);
currentElement = Option<T>.Create(element);
Push(lastElement.Out, element);
}, onUpstreamFinish: () =>
{
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams/Dsl/PagedSource.cs
Expand Up @@ -52,7 +52,7 @@ public Page(IEnumerable<T> items, Option<TKey> nextKey)
var pageSource =
Source.UnfoldAsync
(
new Option<TKey>(firstKey),
Option<TKey>.Create(firstKey),
async key =>
{
var page = key.HasValue ? await pageFactory(key.Value) : new Page<T, TKey>(Enumerable.Empty<T>(), Option<TKey>.None);
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams/Implementation/Sinks.cs
Expand Up @@ -716,7 +716,7 @@ public Logic(QueueSink<T> stage, int maxBuffer) : base(stage.Shape)
{
_stage = stage;
_maxBuffer = maxBuffer;
_currentRequest = new Option<TaskCompletionSource<Option<T>>>();
_currentRequest = Option<TaskCompletionSource<Option<T>>>.None;

SetHandler(stage.In, this);
}
Expand Down

0 comments on commit e5d8c30

Please sign in to comment.