Skip to content

Commit

Permalink
Merge pull request #164 from object/master
Browse files Browse the repository at this point in the history
Add support for Akka 1.5
  • Loading branch information
Horusiath committed Mar 11, 2023
2 parents 48e27f1 + cec5d81 commit 6f7189f
Show file tree
Hide file tree
Showing 18 changed files with 37 additions and 34 deletions.
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": {
"version": "5.0.100",
"version": "6.0.114",
"rollForward" : "minor"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<ProjectReference Include="..\Akkling\Akkling.fsproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.Cluster.Sharding" Version="1.4.27" />
<PackageReference Include="Akka.Cluster.Sharding" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Akkling.Cluster.Sharding/ClusterSingleton.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ open Akkling
/// Spawns an actor in cluster singleton mode.
/// </summary>
/// <param name="stopMessage">Message used to stop an actor</param>
/// <param name="factory">Actor system used to spawn an actor</param>
/// <param name="system">Actor system used to spawn an actor</param>
/// <param name="name">Actor singleton name.</param>
/// <param name="props">Props used to build an actor.</param>
let spawnSingleton (stopMessage: obj) (system: ActorSystem) (name: string) (props: Props<'Message>) : IActorRef<'Message> =
Expand Down
2 changes: 1 addition & 1 deletion src/Akkling.DistributedData/Akkling.DistributedData.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<ProjectReference Include="..\Akkling\Akkling.fsproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.DistributedData" Version="1.4.27" />
<PackageReference Include="Akka.DistributedData" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Akkling.Hocon/Akkling.Hocon.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
<Compile Include="Akka.fs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka" Version="1.4.46" />
<PackageReference Include="Akka" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Akkling.Persistence/Akkling.Persistence.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<ProjectReference Include="..\Akkling\Akkling.fsproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.Persistence" Version="1.4.27" />
<PackageReference Include="Akka.Persistence" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Akkling.Streams.TestKit/Akkling.Streams.TestKit.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<ProjectReference Include="..\Akkling\Akkling.fsproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.Streams.TestKit" Version="1.4.27" />
<PackageReference Include="Akka.Streams.TestKit" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Akkling.Streams/Akkling.Streams.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<ProjectReference Include="..\Akkling\Akkling.fsproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.Streams" Version="1.4.27" />
<PackageReference Include="Akka.Streams" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
6 changes: 3 additions & 3 deletions src/Akkling.Streams/Flow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ module Flow =
/// The task completes with success when received complete message from upstream or cancel
/// from downstream. It fails with the same error when received error message from
/// downstream.
let inline watchTermination (matFn: 'mat -> Async<unit> -> 'mat2) (flow) : Flow<_, _, 'mat2> =
let inline watchTermination (matFn: 'mat -> Async<Done> -> 'mat2) (flow) : Flow<_, _, 'mat2> =
FlowOperations.WatchTermination(flow, Func<_,_,_>(fun m t -> matFn m (t |> Async.AwaitTask)))

/// Materializes to IFlowMonitor that allows monitoring of the the current flow. All events are propagated
Expand Down Expand Up @@ -593,7 +593,7 @@ module Flow =

Retry.Create(flow, fun s ->
match retryWith s with
| Some (i,s) -> Akka.Util.Option<_>(struct(i,s))
| Some (i,s) -> Akka.Util.Option.Create(struct(i,s))
| None -> Unchecked.defaultof<_>)
|> Flow.FromGraph // I convert IGraph to Flow here in order to map it below. Should I?
|> map (fun (struct(x, s)) ->
Expand Down Expand Up @@ -656,7 +656,7 @@ module Flow =
let retryFlow =
Retry.Create(flow, fun ((i, _) as state) ->
match retryWith state with
| Some newState -> Akka.Util.Option<_> (struct (i, (i, newState)))
| Some newState -> Akka.Util.Option.Create (struct (i, (i, newState)))
| None -> Akka.Util.Option<_>.None)
|> Flow.FromGraph
|> map (fun (struct(x, _)) ->
Expand Down
2 changes: 1 addition & 1 deletion src/Akkling.Streams/Graph.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ module Graph =
GraphDsl.Create(shape1, shape2, shape3, shape4, shape5, Func<_,_,_,_,_,_>(combineFn), Func<_,_,_,_,_,_,_>(builder))

/// Executes provided graph using provided materializer.
let run (mat: #IMaterializer) (graph: #IRunnableGraph<'mat>) =
let run (mat: IMaterializer) (graph: #IRunnableGraph<'mat>) =
graph.Run mat

/// Transform only the materialized value of this RunnableGraph, leaving all other properties as they were.
Expand Down
2 changes: 1 addition & 1 deletion src/Akkling.Streams/Prolog.fs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module Prolog =

let inline toCsOption (o: 'v option) : Akka.Util.Option<'v> =
match o with
| Some v -> Akka.Util.Option<'v>(v)
| Some v -> Akka.Util.Option.Create(v)
| None -> Akka.Util.Option<'v>.None

let inline ofCsOption (o: Akka.Util.Option<'v>) = if o.HasValue then Some o.Value else None
Expand Down
6 changes: 3 additions & 3 deletions src/Akkling.Streams/Sink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,18 @@ module Sink =
let inline fanoutPublisher<'t> : Sink<'t, IPublisher<'t>> = Sink.FanoutPublisher<'t>()

/// A sink that will consume the stream and discard the elements.
let inline ignore<'t> : Sink<'t, Async<unit>> = Sink.Ignore<'t>().MapMaterializedValue(Func<_,_>(Async.AwaitTask))
let inline ignore<'t> : Sink<'t, Async<Akka.Done>> = Sink.Ignore<'t>().MapMaterializedValue(Func<_,_>(Async.AwaitTask))

/// A sink that will invoke the given function for each received element.
/// The sink is materialized into an Async computation will be completed with success when reaching the
/// normal end of the stream, or completed with a failure if there is a failure signaled in
/// the stream.
let inline forEach (fn: 't -> unit) : Sink<'t, Async<unit>> = Sink.ForEach(Action<_>(fn)).MapMaterializedValue(Func<_,_>(Async.AwaitTask))
let inline forEach (fn: 't -> unit) : Sink<'t, Async<Akka.Done>> = Sink.ForEach(Action<_>(fn)).MapMaterializedValue(Func<_,_>(Async.AwaitTask))

/// A sink that will invoke the given function
/// to each of the elements as they pass in.
/// The sink is materialized into an Async computation.
let inline forEachParallel (parallelism: int) (fn: 't -> unit) : Sink<'t, Async<unit>> = Sink.ForEachParallel(parallelism, Action<_>(fn)).MapMaterializedValue(Func<_,_>(Async.AwaitTask))
let inline forEachParallel (parallelism: int) (fn: 't -> unit) : Sink<'t, Async<Akka.Done>> = Sink.ForEachParallel(parallelism, Action<_>(fn)).MapMaterializedValue(Func<_,_>(Async.AwaitTask))

/// A sink that will invoke the given folder function for every received element,
/// giving it its previous output (or the given zero value) and the element as input.
Expand Down
14 changes: 7 additions & 7 deletions src/Akkling.Streams/Source.fs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ module Source =
let unfold (fn: 's -> struct('s * 'e) option) (state: 's) : Source<'e, unit> =
Source.Unfold(state, Func<_,_>(fun x ->
match fn x with
| Some tuple -> Akka.Util.Option<_> tuple
| Some tuple -> Akka.Util.Option.Create tuple
| None -> Akka.Util.Option<_>.None
)).MapMaterializedValue(Func<_,_>(ignore))

Expand All @@ -120,7 +120,7 @@ module Source =
async {
let! r = fn x
match r with
| Some tuple -> return Akka.Util.Option<_> tuple
| Some tuple -> return Akka.Util.Option.Create tuple
| None -> return Akka.Util.Option<_>.None }
|> Async.StartAsTask<_>)).MapMaterializedValue(Func<_,_>(ignore))

Expand Down Expand Up @@ -563,7 +563,7 @@ module Source =
/// The task completes with success when received complete message from upstream or cancel
/// from downstream. It fails with the same error when received error message from
/// downstream.
let inline watchTermination (matFn: 'mat -> Async<unit> -> 'mat2) (source) : Source<'t, 'mat2> =
let inline watchTermination (matFn: 'mat -> Async<Done> -> 'mat2) (source) : Source<'t, 'mat2> =
SourceOperations.WatchTermination(source, Func<_,_,_>(fun m t -> matFn m (t |> Async.AwaitTask)))

/// Materializes to IFlowMonitor that allows monitoring of the the current flow. All events are propagated
Expand Down Expand Up @@ -691,7 +691,7 @@ module Source =

/// Connect this source to a sink and run it. The returned value is the materialized value of the sink
let inline runWith (mat: #IMaterializer) (sink: #IGraph<SinkShape<'t>, 'mat2>) (source: Source<'t, 'mat>) : 'mat2 =
source.RunWith(sink, mat)
source.RunWith(sink, mat :> IMaterializer)

/// Shortcut for running this source with a fold function.
/// The given function is invoked for every received element, giving it its previous
Expand All @@ -700,7 +700,7 @@ module Source =
/// function evaluation when the input stream ends, or completed with Failure
/// if there is a failure signaled in the stream.
let inline runFold (mat: #IMaterializer) (folder: 'state -> 't -> 'state) (zero: 'state) (source: Source<'t, 'mat>) : Async<'state> =
source.RunAggregate(zero, Func<_,_,_>(folder), mat) |> Async.AwaitTask
source.RunAggregate(zero, Func<_,_,_>(folder), mat :> IMaterializer) |> Async.AwaitTask

/// Shortcut for running this surce with a reduce function.
/// The given function is invoked for every received element, giving it its previous
Expand All @@ -709,15 +709,15 @@ module Source =
/// function evaluation when the input stream ends, or completed with Failure
/// if there is a failure signaled in the stream.
let inline runReduce (mat: #IMaterializer) (folder: 't -> 't -> 't) (source: Source<'t, 'mat>) : Async<'t> =
source.RunSum(Func<_,_,_>(folder), mat) |> Async.AwaitTask
source.RunSum(Func<_,_,_>(folder), mat :> IMaterializer) |> Async.AwaitTask

/// Shortcut for running this source with a foreach procedure. The given procedure is invoked
/// for each received element.
/// The returned Async computation will be completed with Success when reaching the
/// normal end of the stream, or completed with Failure if there is a failure signaled in
/// the stream.
let inline runForEach (mat: #IMaterializer) (fn: 't -> unit) (source: Source<'t, 'mat>) : Async<unit> =
source.RunForeach(Action<_>(fn), mat) |> Async.AwaitTask
source.RunForeach(Action<_>(fn), mat :> IMaterializer) |> Async.AwaitTask

/// A MergeHub is a special streaming hub that is able to collect streamed elements from a
/// dynamic set of producers
Expand Down
2 changes: 1 addition & 1 deletion src/Akkling.Streams/SubFlow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ module SubFlow =
/// The task completes with success when received complete message from upstream or cancel
/// from downstream. It fails with the same error when received error message from
/// downstream.
let inline watchTermination (matFn: 'mat -> Async<unit> -> 'mat2) (subFlow) : SubFlow<_, _, 'mat2> =
let inline watchTermination (matFn: 'mat -> Async<Akka.Done> -> 'mat2) (subFlow) : SubFlow<_, _, 'mat2> =
SubFlowOperations.WatchTermination(subFlow, Func<_,_,_>(fun m t -> matFn m (t |> Async.AwaitTask)))

/// Detaches upstream demand from downstream demand without detaching the
Expand Down
2 changes: 1 addition & 1 deletion src/Akkling.TestKit/Akkling.TestKit.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<ProjectReference Include="..\Akkling\Akkling.fsproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.TestKit.Xunit2" Version="1.4.27" />
<PackageReference Include="Akka.TestKit.Xunit2" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
6 changes: 3 additions & 3 deletions src/Akkling/Actors.fs
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ type Actor<'Message> =
abstract UnstashAll : unit -> unit

/// <summary>
/// Sets or clears a timeout before <see="ReceiveTimeout"/> message will be send to an actor.
/// Sets or clears a timeout before <see cref="ReceiveTimeout"/> message will be send to an actor.
/// </summary>
abstract SetReceiveTimeout : TimeSpan option -> unit

/// <summary>
/// Schedules a message to be transmited in specified delay.
/// Schedules a message to be transmitted in specified delay.
/// </summary>
abstract Schedule<'Scheduled> : TimeSpan -> IActorRef<'Scheduled> -> 'Scheduled -> ICancelable

/// <summary>
/// Schedules a message to be repeatedly transmited, starting at specified delay with provided intervals.
/// Schedules a message to be repeatedly transmitted, starting at specified delay with provided intervals.
/// </summary>
abstract ScheduleRepeatedly<'Scheduled> : TimeSpan -> TimeSpan -> IActorRef<'Scheduled> -> 'Scheduled -> ICancelable

Expand Down
4 changes: 2 additions & 2 deletions src/Akkling/Akkling.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
<Compile Include="IO.fs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka" Version="1.4.46" />
<PackageReference Include="Akka.Serialization.Hyperion" Version="1.4.27" />
<PackageReference Include="Akka" Version="1.5.0" />
<PackageReference Include="Akka.Serialization.Hyperion" Version="1.5.0" />
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>
11 changes: 7 additions & 4 deletions tests/Akkling.Tests/Akkling.Tests.fsproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net50</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
</PropertyGroup>
<ItemGroup>
Expand Down Expand Up @@ -30,9 +30,12 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="FsCheck.Xunit" Version="2.14.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.11.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.5.0" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Update="FSharp.Core" Version="6.0.4" />
</ItemGroup>
</Project>

0 comments on commit 6f7189f

Please sign in to comment.