Skip to content

Commit

Permalink
Remove unsafe implicit conversion operators in AtomicBoolean and …
Browse files Browse the repository at this point in the history
…`AtomicReference<T>` (#6429)

* removed unsafe conversion operators from `AtomicBoolean` and `AtomicReference`

close #6392

* api approvals

* fixed PhiAccrualFailureDetector

* added safe `implicit` operators back
  • Loading branch information
Aaronontheweb committed Feb 22, 2023
1 parent ac3c06a commit e58b041
Show file tree
Hide file tree
Showing 12 changed files with 23 additions and 56 deletions.
Expand Up @@ -4958,7 +4958,6 @@ namespace Akka.Util
public bool CompareAndSet(bool expected, bool newValue) { }
public bool GetAndSet(bool newValue) { }
public static bool op_Implicit(Akka.Util.AtomicBoolean atomicBoolean) { }
public static Akka.Util.AtomicBoolean op_Implicit(bool value) { }
}
public class AtomicReference<T>
where T : class
Expand All @@ -4970,7 +4969,6 @@ namespace Akka.Util
public bool CompareAndSet(T expected, T newValue) { }
public T GetAndSet(T newValue) { }
public static T op_Implicit(Akka.Util.AtomicReference<T> atomicReference) { }
public static Akka.Util.AtomicReference<T> op_Implicit(T value) { }
}
public class static BitArrayHelpers
{
Expand Down
Expand Up @@ -4965,7 +4965,6 @@ namespace Akka.Util
public bool CompareAndSet(bool expected, bool newValue) { }
public bool GetAndSet(bool newValue) { }
public static bool op_Implicit(Akka.Util.AtomicBoolean atomicBoolean) { }
public static Akka.Util.AtomicBoolean op_Implicit(bool value) { }
}
public class AtomicReference<T>
where T : class
Expand All @@ -4977,7 +4976,6 @@ namespace Akka.Util
public bool CompareAndSet(T expected, T newValue) { }
public T GetAndSet(T newValue) { }
public static T op_Implicit(Akka.Util.AtomicReference<T> atomicReference) { }
public static Akka.Util.AtomicReference<T> op_Implicit(T value) { }
}
public class static BitArrayHelpers
{
Expand Down
Expand Up @@ -4958,7 +4958,6 @@ namespace Akka.Util
public bool CompareAndSet(bool expected, bool newValue) { }
public bool GetAndSet(bool newValue) { }
public static bool op_Implicit(Akka.Util.AtomicBoolean atomicBoolean) { }
public static Akka.Util.AtomicBoolean op_Implicit(bool value) { }
}
public class AtomicReference<T>
where T : class
Expand All @@ -4970,7 +4969,6 @@ namespace Akka.Util
public bool CompareAndSet(T expected, T newValue) { }
public T GetAndSet(T newValue) { }
public static T op_Implicit(Akka.Util.AtomicReference<T> atomicReference) { }
public static Akka.Util.AtomicReference<T> op_Implicit(T value) { }
}
public class static BitArrayHelpers
{
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs
Expand Up @@ -458,7 +458,7 @@ public PhiObserver()
_log.Warning("Detected phi value of infinity for [{0}] - ", node);
var (history, time) = _cluster.FailureDetector.GetFailureDetector(node) switch
{
PhiAccrualFailureDetector fd => (fd.state.History, fd.state.TimeStamp),
PhiAccrualFailureDetector fd => (fd.State.History, fd.State.TimeStamp),
_ => (HeartbeatHistory.Apply(1), null)
};
_log.Warning("PhiValues: (Timestamp={0}, Mean={1}, Variance={2}, StdDeviation={3}, Intervals=[{4}])",time,
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Cluster/Cluster.cs
Expand Up @@ -264,7 +264,7 @@ public void Join(Address address)
/// <returns>Task which completes, once current cluster node reaches <see cref="MemberStatus.Up"/> state.</returns>
public Task JoinAsync(Address address, CancellationToken token = default)
{
if (_isTerminated)
if (_isTerminated.Value)
throw new ClusterJoinFailedException("Cluster has already been terminated");

if (IsUp)
Expand Down Expand Up @@ -340,7 +340,7 @@ public void JoinSeedNodes(IEnumerable<Address> seedNodes)
/// <param name="token">TBD</param>
public Task JoinSeedNodesAsync(IEnumerable<Address> seedNodes, CancellationToken token = default)
{
if (_isTerminated)
if (_isTerminated.Value)
throw new ClusterJoinFailedException("Cluster has already been terminated");

if (IsUp)
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Remote/DefaultFailureDetectorRegistry.cs
Expand Up @@ -31,14 +31,14 @@ public DefaultFailureDetectorRegistry(Func<FailureDetector> factory)

private readonly Func<FailureDetector> _factory;

private AtomicReference<ImmutableDictionary<T, FailureDetector>> _resourceToFailureDetector = new AtomicReference<ImmutableDictionary<T, FailureDetector>>(ImmutableDictionary<T, FailureDetector>.Empty);
private readonly AtomicReference<ImmutableDictionary<T, FailureDetector>> _resourceToFailureDetector = new AtomicReference<ImmutableDictionary<T, FailureDetector>>(ImmutableDictionary<T, FailureDetector>.Empty);

private readonly object _failureDetectorCreationLock = new object();

private ImmutableDictionary<T, FailureDetector> ResourceToFailureDetector
{
get { return _resourceToFailureDetector; }
set { _resourceToFailureDetector = value; }
get { return _resourceToFailureDetector.Value; }
set { _resourceToFailureDetector.Value = value; }
}

#endregion
Expand Down
24 changes: 12 additions & 12 deletions src/core/Akka.Remote/PhiAccrualFailureDetector.cs
Expand Up @@ -69,7 +69,7 @@ public PhiAccrualFailureDetector(double threshold, int maxSampleSize, TimeSpan m
_minStdDeviation = minStdDeviation;
_acceptableHeartbeatPause = acceptableHeartbeatPause;
_firstHeartbeatEstimate = firstHeartbeatEstimate;
state = new State(FirstHeartBeat, null);
_state = new AtomicReference<AccrualState>(new AccrualState(FirstHeartBeat, null));
}

/// <summary>
Expand All @@ -90,7 +90,7 @@ public PhiAccrualFailureDetector(Config config, EventStream ev)
_minStdDeviation = config.GetTimeSpan("min-std-deviation", null);
_acceptableHeartbeatPause = config.GetTimeSpan("acceptable-heartbeat-pause", null);
_firstHeartbeatEstimate = config.GetTimeSpan("heartbeat-interval", null);
state = new State(FirstHeartBeat, null);
_state = new AtomicReference<AccrualState>(new AccrualState(FirstHeartBeat, null));
EventStream = ev ?? Option<EventStream>.None;
}

Expand Down Expand Up @@ -128,14 +128,14 @@ private HeartbeatHistory FirstHeartBeat
/// <summary>
/// Uses volatile memory and immutability for lockless concurrency.
/// </summary>
internal class State
internal class AccrualState
{
/// <summary>
/// TBD
/// </summary>
/// <param name="history">TBD</param>
/// <param name="timeStamp">TBD</param>
public State(HeartbeatHistory history, long? timeStamp)
public AccrualState(HeartbeatHistory history, long? timeStamp)
{
TimeStamp = timeStamp;
History = history;
Expand All @@ -152,12 +152,12 @@ public State(HeartbeatHistory history, long? timeStamp)
public long? TimeStamp { get; private set; }
}

private AtomicReference<State> _state;
private readonly AtomicReference<AccrualState> _state;

internal State state
internal AccrualState State
{
get { return _state; }
set { _state = value; }
get { return _state.Value; }
set { _state.Value = value; }
}

/// <summary>
Expand All @@ -173,7 +173,7 @@ public override bool IsAvailable
/// </summary>
public override bool IsMonitoring
{
get { return state.TimeStamp.HasValue; }
get { return State.TimeStamp.HasValue; }
}

/// <summary>
Expand All @@ -182,7 +182,7 @@ public override bool IsMonitoring
public override void HeartBeat()
{
var timestamp = _clock();
var oldState = state;
var oldState = State;
HeartbeatHistory newHistory;

if (!oldState.TimeStamp.HasValue)
Expand All @@ -208,7 +208,7 @@ public override void HeartBeat()
else newHistory = oldState.History;
}

var newState = new State(newHistory, timestamp);
var newState = new AccrualState(newHistory, timestamp);

//if we won the race then update else try again
if(!_state.CompareAndSet(oldState, newState)) HeartBeat();
Expand Down Expand Up @@ -236,7 +236,7 @@ internal double CurrentPhi
/// <returns>TBD</returns>
internal double Phi(long timestamp)
{
var oldState = state;
var oldState = State;
var oldTimestamp = oldState.TimeStamp;

if (!oldTimestamp.HasValue)
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Streams/Implementation/Fusing/GraphStages.cs
Expand Up @@ -549,15 +549,15 @@ public override void PreStart()
{
_cancelCallback.Value = GetAsyncCallback<NotUsed>(_ => CompleteStage());

if (_cancelled)
if (_cancelled.Value)
CompleteStage();
else
ScheduleRepeatedly("TickTimer", _stage._initialDelay, _stage._interval);
}

protected internal override void OnTimer(object timerKey)
{
if (IsAvailable(_stage.Out) && !_cancelled)
if (IsAvailable(_stage.Out) && !_cancelled.Value)
Push(_stage.Out, _stage._tick);
}

Expand All @@ -567,7 +567,7 @@ public void Cancel()
_cancelCallback.Value?.Invoke(NotUsed.Instance);
}

public bool IsCancellationRequested => _cancelled;
public bool IsCancellationRequested => _cancelled.Value;

public CancellationToken Token { get; }

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Actor/CoordinatedShutdown.cs
Expand Up @@ -333,7 +333,7 @@ public void AddTask(string phase, string taskName, Func<Task<Done>> task)
/// <param name="hook">A task that will be executed during shutdown.</param>
internal void AddClrShutdownHook(Func<Task<Done>> hook)
{
if (!_clrHooksStarted)
if (!_clrHooksStarted.Value)
{
_clrShutdownTasks.TryAdd(hook);
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Pattern/CircuitBreakerState.cs
Expand Up @@ -210,7 +210,7 @@ protected override void EnterInternal()
/// <returns>TBD</returns>
public override string ToString()
{
return string.Format(CultureInfo.InvariantCulture, "Half-Open currently testing call for success = {0}", (_lock == true));
return string.Format(CultureInfo.InvariantCulture, "Half-Open currently testing call for success = {0}", (_lock.Value == true));
}
}

Expand Down
14 changes: 0 additions & 14 deletions src/core/Akka/Util/AtomicBoolean.cs
Expand Up @@ -71,8 +71,6 @@ public bool GetAndSet(bool newValue)
return Interlocked.Exchange(ref _value, newValue ? _trueValue : _falseValue) == _trueValue;
}

#region Conversion operators

/// <summary>
/// Performs an implicit conversion from <see cref="AtomicBoolean"/> to <see cref="System.Boolean"/>.
/// </summary>
Expand All @@ -82,18 +80,6 @@ public bool GetAndSet(bool newValue)
{
return atomicBoolean.Value;
}

/// <summary>
/// Performs an implicit conversion from <see cref="System.Boolean"/> to <see cref="AtomicBoolean"/>.
/// </summary>
/// <param name="value">The boolean to convert</param>
/// <returns>The result of the conversion.</returns>
public static implicit operator AtomicBoolean(bool value)
{
return new AtomicBoolean(value);
}

#endregion
}
}

13 changes: 0 additions & 13 deletions src/core/Akka/Util/AtomicReference.cs
Expand Up @@ -75,7 +75,6 @@ public T GetAndSet(T newValue)
return Interlocked.Exchange(ref atomicValue, newValue);
}

#region Conversion operators

/// <summary>
/// Performs an implicit conversion from <see cref="AtomicReference{T}"/> to <typeparamref name="T"/>.
Expand All @@ -86,18 +85,6 @@ public T GetAndSet(T newValue)
{
return atomicReference.Value;
}

/// <summary>
/// Performs an implicit conversion from <typeparamref name="T"/> to <see cref="AtomicReference{T}"/>.
/// </summary>
/// <param name="value">The reference to convert</param>
/// <returns>The result of the conversion.</returns>
public static implicit operator AtomicReference<T>(T value)
{
return new AtomicReference<T>(value);
}

#endregion
}
}

0 comments on commit e58b041

Please sign in to comment.