Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Akka 1.5 #164

Merged
merged 2 commits into from
Mar 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": {
"version": "5.0.100",
"version": "6.0.114",
"rollForward" : "minor"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<ProjectReference Include="..\Akkling\Akkling.fsproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.Cluster.Sharding" Version="1.4.27" />
<PackageReference Include="Akka.Cluster.Sharding" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Akkling.Cluster.Sharding/ClusterSingleton.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ open Akkling
/// Spawns an actor in cluster singleton mode.
/// </summary>
/// <param name="stopMessage">Message used to stop an actor</param>
/// <param name="factory">Actor system used to spawn an actor</param>
/// <param name="system">Actor system used to spawn an actor</param>
/// <param name="name">Actor singleton name.</param>
/// <param name="props">Props used to build an actor.</param>
let spawnSingleton (stopMessage: obj) (system: ActorSystem) (name: string) (props: Props<'Message>) : IActorRef<'Message> =
Expand Down
2 changes: 1 addition & 1 deletion src/Akkling.DistributedData/Akkling.DistributedData.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<ProjectReference Include="..\Akkling\Akkling.fsproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.DistributedData" Version="1.4.27" />
<PackageReference Include="Akka.DistributedData" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Akkling.Hocon/Akkling.Hocon.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
<Compile Include="Akka.fs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka" Version="1.4.46" />
<PackageReference Include="Akka" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Akkling.Persistence/Akkling.Persistence.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<ProjectReference Include="..\Akkling\Akkling.fsproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.Persistence" Version="1.4.27" />
<PackageReference Include="Akka.Persistence" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Akkling.Streams.TestKit/Akkling.Streams.TestKit.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<ProjectReference Include="..\Akkling\Akkling.fsproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.Streams.TestKit" Version="1.4.27" />
<PackageReference Include="Akka.Streams.TestKit" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Akkling.Streams/Akkling.Streams.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<ProjectReference Include="..\Akkling\Akkling.fsproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.Streams" Version="1.4.27" />
<PackageReference Include="Akka.Streams" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
6 changes: 3 additions & 3 deletions src/Akkling.Streams/Flow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ module Flow =
/// The task completes with success when received complete message from upstream or cancel
/// from downstream. It fails with the same error when received error message from
/// downstream.
let inline watchTermination (matFn: 'mat -> Async<unit> -> 'mat2) (flow) : Flow<_, _, 'mat2> =
let inline watchTermination (matFn: 'mat -> Async<Done> -> 'mat2) (flow) : Flow<_, _, 'mat2> =
FlowOperations.WatchTermination(flow, Func<_,_,_>(fun m t -> matFn m (t |> Async.AwaitTask)))

/// Materializes to IFlowMonitor that allows monitoring of the the current flow. All events are propagated
Expand Down Expand Up @@ -593,7 +593,7 @@ module Flow =

Retry.Create(flow, fun s ->
match retryWith s with
| Some (i,s) -> Akka.Util.Option<_>(struct(i,s))
| Some (i,s) -> Akka.Util.Option.Create(struct(i,s))
| None -> Unchecked.defaultof<_>)
|> Flow.FromGraph // I convert IGraph to Flow here in order to map it below. Should I?
|> map (fun (struct(x, s)) ->
Expand Down Expand Up @@ -656,7 +656,7 @@ module Flow =
let retryFlow =
Retry.Create(flow, fun ((i, _) as state) ->
match retryWith state with
| Some newState -> Akka.Util.Option<_> (struct (i, (i, newState)))
| Some newState -> Akka.Util.Option.Create (struct (i, (i, newState)))
| None -> Akka.Util.Option<_>.None)
|> Flow.FromGraph
|> map (fun (struct(x, _)) ->
Expand Down
2 changes: 1 addition & 1 deletion src/Akkling.Streams/Graph.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ module Graph =
GraphDsl.Create(shape1, shape2, shape3, shape4, shape5, Func<_,_,_,_,_,_>(combineFn), Func<_,_,_,_,_,_,_>(builder))

/// Executes provided graph using provided materializer.
let run (mat: #IMaterializer) (graph: #IRunnableGraph<'mat>) =
let run (mat: IMaterializer) (graph: #IRunnableGraph<'mat>) =
graph.Run mat

/// Transform only the materialized value of this RunnableGraph, leaving all other properties as they were.
Expand Down
2 changes: 1 addition & 1 deletion src/Akkling.Streams/Prolog.fs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module Prolog =

let inline toCsOption (o: 'v option) : Akka.Util.Option<'v> =
match o with
| Some v -> Akka.Util.Option<'v>(v)
| Some v -> Akka.Util.Option.Create(v)
| None -> Akka.Util.Option<'v>.None

let inline ofCsOption (o: Akka.Util.Option<'v>) = if o.HasValue then Some o.Value else None
Expand Down
6 changes: 3 additions & 3 deletions src/Akkling.Streams/Sink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,18 @@ module Sink =
let inline fanoutPublisher<'t> : Sink<'t, IPublisher<'t>> = Sink.FanoutPublisher<'t>()

/// A sink that will consume the stream and discard the elements.
let inline ignore<'t> : Sink<'t, Async<unit>> = Sink.Ignore<'t>().MapMaterializedValue(Func<_,_>(Async.AwaitTask))
let inline ignore<'t> : Sink<'t, Async<Akka.Done>> = Sink.Ignore<'t>().MapMaterializedValue(Func<_,_>(Async.AwaitTask))

/// A sink that will invoke the given function for each received element.
/// The sink is materialized into an Async computation will be completed with success when reaching the
/// normal end of the stream, or completed with a failure if there is a failure signaled in
/// the stream.
let inline forEach (fn: 't -> unit) : Sink<'t, Async<unit>> = Sink.ForEach(Action<_>(fn)).MapMaterializedValue(Func<_,_>(Async.AwaitTask))
let inline forEach (fn: 't -> unit) : Sink<'t, Async<Akka.Done>> = Sink.ForEach(Action<_>(fn)).MapMaterializedValue(Func<_,_>(Async.AwaitTask))

/// A sink that will invoke the given function
/// to each of the elements as they pass in.
/// The sink is materialized into an Async computation.
let inline forEachParallel (parallelism: int) (fn: 't -> unit) : Sink<'t, Async<unit>> = Sink.ForEachParallel(parallelism, Action<_>(fn)).MapMaterializedValue(Func<_,_>(Async.AwaitTask))
let inline forEachParallel (parallelism: int) (fn: 't -> unit) : Sink<'t, Async<Akka.Done>> = Sink.ForEachParallel(parallelism, Action<_>(fn)).MapMaterializedValue(Func<_,_>(Async.AwaitTask))

/// A sink that will invoke the given folder function for every received element,
/// giving it its previous output (or the given zero value) and the element as input.
Expand Down
14 changes: 7 additions & 7 deletions src/Akkling.Streams/Source.fs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ module Source =
let unfold (fn: 's -> struct('s * 'e) option) (state: 's) : Source<'e, unit> =
Source.Unfold(state, Func<_,_>(fun x ->
match fn x with
| Some tuple -> Akka.Util.Option<_> tuple
| Some tuple -> Akka.Util.Option.Create tuple
| None -> Akka.Util.Option<_>.None
)).MapMaterializedValue(Func<_,_>(ignore))

Expand All @@ -120,7 +120,7 @@ module Source =
async {
let! r = fn x
match r with
| Some tuple -> return Akka.Util.Option<_> tuple
| Some tuple -> return Akka.Util.Option.Create tuple
| None -> return Akka.Util.Option<_>.None }
|> Async.StartAsTask<_>)).MapMaterializedValue(Func<_,_>(ignore))

Expand Down Expand Up @@ -563,7 +563,7 @@ module Source =
/// The task completes with success when received complete message from upstream or cancel
/// from downstream. It fails with the same error when received error message from
/// downstream.
let inline watchTermination (matFn: 'mat -> Async<unit> -> 'mat2) (source) : Source<'t, 'mat2> =
let inline watchTermination (matFn: 'mat -> Async<Done> -> 'mat2) (source) : Source<'t, 'mat2> =
SourceOperations.WatchTermination(source, Func<_,_,_>(fun m t -> matFn m (t |> Async.AwaitTask)))

/// Materializes to IFlowMonitor that allows monitoring of the the current flow. All events are propagated
Expand Down Expand Up @@ -691,7 +691,7 @@ module Source =

/// Connect this source to a sink and run it. The returned value is the materialized value of the sink
let inline runWith (mat: #IMaterializer) (sink: #IGraph<SinkShape<'t>, 'mat2>) (source: Source<'t, 'mat>) : 'mat2 =
source.RunWith(sink, mat)
source.RunWith(sink, mat :> IMaterializer)

/// Shortcut for running this source with a fold function.
/// The given function is invoked for every received element, giving it its previous
Expand All @@ -700,7 +700,7 @@ module Source =
/// function evaluation when the input stream ends, or completed with Failure
/// if there is a failure signaled in the stream.
let inline runFold (mat: #IMaterializer) (folder: 'state -> 't -> 'state) (zero: 'state) (source: Source<'t, 'mat>) : Async<'state> =
source.RunAggregate(zero, Func<_,_,_>(folder), mat) |> Async.AwaitTask
source.RunAggregate(zero, Func<_,_,_>(folder), mat :> IMaterializer) |> Async.AwaitTask

/// Shortcut for running this surce with a reduce function.
/// The given function is invoked for every received element, giving it its previous
Expand All @@ -709,15 +709,15 @@ module Source =
/// function evaluation when the input stream ends, or completed with Failure
/// if there is a failure signaled in the stream.
let inline runReduce (mat: #IMaterializer) (folder: 't -> 't -> 't) (source: Source<'t, 'mat>) : Async<'t> =
source.RunSum(Func<_,_,_>(folder), mat) |> Async.AwaitTask
source.RunSum(Func<_,_,_>(folder), mat :> IMaterializer) |> Async.AwaitTask

/// Shortcut for running this source with a foreach procedure. The given procedure is invoked
/// for each received element.
/// The returned Async computation will be completed with Success when reaching the
/// normal end of the stream, or completed with Failure if there is a failure signaled in
/// the stream.
let inline runForEach (mat: #IMaterializer) (fn: 't -> unit) (source: Source<'t, 'mat>) : Async<unit> =
source.RunForeach(Action<_>(fn), mat) |> Async.AwaitTask
source.RunForeach(Action<_>(fn), mat :> IMaterializer) |> Async.AwaitTask

/// A MergeHub is a special streaming hub that is able to collect streamed elements from a
/// dynamic set of producers
Expand Down
2 changes: 1 addition & 1 deletion src/Akkling.Streams/SubFlow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ module SubFlow =
/// The task completes with success when received complete message from upstream or cancel
/// from downstream. It fails with the same error when received error message from
/// downstream.
let inline watchTermination (matFn: 'mat -> Async<unit> -> 'mat2) (subFlow) : SubFlow<_, _, 'mat2> =
let inline watchTermination (matFn: 'mat -> Async<Akka.Done> -> 'mat2) (subFlow) : SubFlow<_, _, 'mat2> =
SubFlowOperations.WatchTermination(subFlow, Func<_,_,_>(fun m t -> matFn m (t |> Async.AwaitTask)))

/// Detaches upstream demand from downstream demand without detaching the
Expand Down
2 changes: 1 addition & 1 deletion src/Akkling.TestKit/Akkling.TestKit.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<ProjectReference Include="..\Akkling\Akkling.fsproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.TestKit.Xunit2" Version="1.4.27" />
<PackageReference Include="Akka.TestKit.Xunit2" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
6 changes: 3 additions & 3 deletions src/Akkling/Actors.fs
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ type Actor<'Message> =
abstract UnstashAll : unit -> unit

/// <summary>
/// Sets or clears a timeout before <see="ReceiveTimeout"/> message will be send to an actor.
/// Sets or clears a timeout before <see cref="ReceiveTimeout"/> message will be send to an actor.
/// </summary>
abstract SetReceiveTimeout : TimeSpan option -> unit

/// <summary>
/// Schedules a message to be transmited in specified delay.
/// Schedules a message to be transmitted in specified delay.
/// </summary>
abstract Schedule<'Scheduled> : TimeSpan -> IActorRef<'Scheduled> -> 'Scheduled -> ICancelable

/// <summary>
/// Schedules a message to be repeatedly transmited, starting at specified delay with provided intervals.
/// Schedules a message to be repeatedly transmitted, starting at specified delay with provided intervals.
/// </summary>
abstract ScheduleRepeatedly<'Scheduled> : TimeSpan -> TimeSpan -> IActorRef<'Scheduled> -> 'Scheduled -> ICancelable

Expand Down
4 changes: 2 additions & 2 deletions src/Akkling/Akkling.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
<Compile Include="IO.fs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka" Version="1.4.46" />
<PackageReference Include="Akka.Serialization.Hyperion" Version="1.4.27" />
<PackageReference Include="Akka" Version="1.5.0" />
<PackageReference Include="Akka.Serialization.Hyperion" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
11 changes: 7 additions & 4 deletions tests/Akkling.Tests/Akkling.Tests.fsproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net50</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
</PropertyGroup>
<ItemGroup>
Expand Down Expand Up @@ -30,9 +30,12 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="FsCheck.Xunit" Version="2.14.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.11.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.5.0" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>