Skip to content

Commit

Permalink
Akka integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Onur Gumus committed Dec 7, 2019
1 parent 401cae3 commit a57252d
Show file tree
Hide file tree
Showing 17 changed files with 513 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .config/dotnet-tools.json
Expand Up @@ -3,7 +3,7 @@
"isRoot": true,
"tools": {
"paket": {
"version": "5.226.0",
"version": "5.240.0",
"commands": [
"paket"
]
Expand Down
8 changes: 4 additions & 4 deletions FBlazorShop.Web.BlazorClient/Checkout.fs
Expand Up @@ -56,7 +56,7 @@ let update remote message (model : Model, commonState : Common.State) =
{model with CurrentAddress = address;}
| Some _ -> validateModelForAddressForced address

match (message, (model,commonState)) with
match (message, (model,commonState.Authentication)) with
| SetAddressName value, _ ->
{ model.CurrentAddress with Name = value}
|> validateModelForAddress
Expand Down Expand Up @@ -91,14 +91,14 @@ let update remote message (model : Model, commonState : Common.State) =

| OrderPlaced order, ({ ValidatedAddress = None } , _) ->
model.CurrentAddress |> validateModelForAddressForced, Cmd.ofMsg (OrderPlaced order) , Cmd.none
| OrderPlaced order, (_, { Authentication = Common.AuthState.Failed}) ->
| OrderPlaced order, (_, Common.AuthState.Failed) ->
let c = Cmd.ofMsg(OrderPlaced order)
model, c, Common.authenticationRequested
| OrderPlaced order,(_,{Authentication = Common.AuthState.Success auth}) ->
| OrderPlaced order,(_,Common.AuthState.Success auth) ->
let order = {order with DeliveryAddress = model.CurrentAddress}
let cmd = Cmd.ofAsync remote.placeOrder (auth.Token, order) OrderAccepted raise
model, cmd, Cmd.none

| _, (_, Common.AuthState.NotTried)
| OrderAccepted _ , _ -> invalidOp "should not happen"

open Bolero.Html
Expand Down
16 changes: 8 additions & 8 deletions FBlazorShop.Web.BlazorClient/OrderDetail.fs
Expand Up @@ -15,16 +15,16 @@ type Message =
let reloadCmd = Cmd.ofMsg Reload

let loadPeriodically remote token id =
let doWork i =
async{
do! Async.Sleep 4000;
return! remote.getOrderWithStatus (token,i)
let doWork i =
async{
do! Async.Sleep 400000;
return! remote.getOrderWithStatus (token,i)
}
Cmd.ofAsync doWork id (fun m -> OrderLoaded(id,m)) (fun _ -> OrderLoaded(0,None))

let init id ={ Order = None; Key = 0}, Cmd.ofMsg (OrderLoaded id)

let update remote message (model : Model, commonModel: Common.State) =
let update remote message (model : Model, commonModel: Common.State) =
match message, commonModel.Authentication with
| Reload, Common.AuthState.Success auth -> model, loadPeriodically remote auth.Token (model.Key), Cmd.none
| OrderLoaded(key,_) , Common.AuthState.NotTried -> { model with Key = key }, Cmd.none, Cmd.none
Expand All @@ -38,15 +38,15 @@ let map markers =
comp<Map> ["Zoom" => 13.0; "Markers" => markers ] []

type OrderDetail = Template<"wwwroot\OrderDetail.html">
let view (model : Model) dispatch =
let view (model : Model) dispatch =
div [ attr.``class`` "main"][
cond model.Order <| function
| Some x ->
| Some x ->
OrderDetail()
.OrderCreatedTimeToLongDateString(x.Order.CreatedTime.ToLongDateString())
.StatusText(x.StatusText)
.OrderReview(OrderReview.view x.Order dispatch)
.Map(map (x.MapMarkers)) .Elt()

| _ -> text "Loading..."
]
5 changes: 4 additions & 1 deletion FBlazorShop.Web/FBlazorShop.Web.fsproj
Expand Up @@ -17,6 +17,10 @@
<DefineConstants>WASM</DefineConstants>
<PlatformTarget>x86</PlatformTarget>
</PropertyGroup>
<ItemGroup>
<Content Remove="Shared\**" />
<EmbeddedResource Remove="Shared\**" />
</ItemGroup>
<ItemGroup>
<Compile Include="Services.fs" />
<Compile Include="Startup.fs" />
Expand All @@ -32,7 +36,6 @@
<ProjectReference Include="..\FBlazorShop\FBlazorShop.fsproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="Shared\" />
<Folder Include="wwwroot\_content\FBlazorShop.Web.BlazorClient\" />
</ItemGroup>
<Import Project="..\.paket\Paket.Restore.targets" />
Expand Down
11 changes: 1 addition & 10 deletions FBlazorShop.Web/Program.fs
@@ -1,17 +1,8 @@
namespace FBlazorShop.Web

open System
open System.Collections.Generic
open System.IO
open System.Linq
open System.Threading.Tasks
open Microsoft.AspNetCore
open Microsoft.AspNetCore.Hosting
open Microsoft.Extensions.Configuration
open Microsoft.Extensions.Hosting
open Microsoft.Extensions.Logging
open Microsoft.Extensions.DependencyInjection;
open Microsoft.Extensions.Hosting;
open FBlazorShop.EF

module Program =
Expand All @@ -32,7 +23,7 @@ module Program =
let db = scope.ServiceProvider.GetRequiredService<PizzaStoreContext>()
if db.Database.EnsureCreated() then
Seed.initialize db
Actor.init() |> ignore
host.Run()

exitCode
12 changes: 6 additions & 6 deletions FBlazorShop.Web/Startup.fs
Expand Up @@ -18,16 +18,16 @@ type Startup() =
#if DEBUG
.AddHotReload(templateDir = "../FBlazorShop.Web.BlazorClient")
#endif
.AddRemoting<Services.PizzaService>()
.AddEF("Data Source=pizza.db")
.AddRemoting<Services.PizzaService>()
.AddEF("Data Source=pizza.db")
.SetupServices()
.AddMvc()

.AddRazorRuntimeCompilation() |> ignore
#if !WASM
#if !WASM
services.AddServerSideBlazor()|> ignore
#endif


// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
member _.Configure(app: IApplicationBuilder, env: IWebHostEnvironment) =
Expand All @@ -43,7 +43,7 @@ type Startup() =
#if WASM
.UseClientSideBlazorFiles<FBlazorShop.Web.BlazorClient.Main.Startup>()
#endif
.UseStaticFiles()
.UseStaticFiles()
.UseEndpoints(fun endpoints ->
#if !WASM
endpoints.MapBlazorHub() |> ignore
Expand Down
11 changes: 7 additions & 4 deletions FBlazorShop.Web/paket.references
@@ -1,10 +1,13 @@
FSharp.Core
Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Sqlite
Bolero
Bolero.Server
Bolero
Bolero.Server
Bolero.HotReload.Server
Microsoft.AspNetCore.Blazor
Microsoft.AspNetCore.Blazor.Server
Microsoft.AspNetCore.Blazor
Microsoft.AspNetCore.Blazor.Server
Microsoft.AspNetCore.Mvc.Razor.RuntimeCompilation
JWT
Akka.Cluster.Sharding
Akkling
Akka.Persistence.Sqlite
Binary file modified FBlazorShop.Web/pizza.db
Binary file not shown.
Binary file added FBlazorShop.Web/pizza.db-shm
Binary file not shown.
Binary file added FBlazorShop.Web/pizza.db-wal
Binary file not shown.
144 changes: 144 additions & 0 deletions FBlazorShop/Actor.fs
@@ -0,0 +1,144 @@
module Actor

open Akka.Persistence.Sqlite
open Akkling
open Akka.Cluster.Tools
open Akka.Cluster.Tools.Singleton
open Akkling.Persistence
open FBlazorShop.App.Model
open Akka.Persistence.Query
open Akka.Persistence.Query.Sql
open Akka.Streams
open Akka.Persistence.Journal
open System.Collections.Immutable


let configWithPort port =
let config = Configuration.parse ("""
akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
serializers {
hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
}
serialization-bindings {
// "System.Object" = hyperion
}
}
remote {
helios.tcp {
public-hostname = "localhost"
hostname = "localhost"
port = """ + port.ToString() + """
}
}
cluster {
auto-down-unreachable-after = 5s
// seed-nodes = [ "akka.tcp://cluster-system@localhost:12345" ]
// sharding.remember-entities = true
}
persistence{
query.journal.sql {
# Implementation class of the SQL ReadJournalProvider
class = "Akka.Persistence.Query.Sql.SqlReadJournalProvider, Akka.Persistence.Query.Sql"
# Absolute path to the write journal plugin configuration entry that this
# query journal will connect to.
# If undefined (or "") it will connect to the default journal as specified by the
# akka.persistence.journal.plugin property.
write-plugin = ""
# The SQL write journal is notifying the query side as soon as things
# are persisted, but for efficiency reasons the query side retrieves the events
# in batches that sometimes can be delayed up to the configured `refresh-interval`.
refresh-interval = 1s
# How many events to fetch in one query (replay) and keep buffered until they
# are delivered downstreams.
max-buffer-size = 100
}
journal {
plugin = "akka.persistence.journal.sqlite"
sqlite
{
connection-string = "Data Source=pizza.db;"
auto-initialize = on
event-adapters.tagger = "Actor+Tagger, FBlazorShop"
event-adapter-bindings {
"Actor+Message, FBlazorShop" = tagger
}
}
}
snapshot-store{
plugin = "akka.persistence.snapshot-store.sqlite"
sqlite {
auto-initialize = on
connection-string = "Data Source=pizza.db"
}
}
}
}
""")
config.WithFallback(ClusterSingletonManager.DefaultConfig())

type Command = PlaceOrder of Order
type Event = OrderPlaced of Order

type Message =
| Command of Command
| Event of Event

let deft = ImmutableHashSet.Create("default")

type Tagger () =
interface IWriteEventAdapter with
member _.Manifest _ = ""
member _.ToJournal evt =
match evt with
| :? Message ->
box <| Tagged(evt, deft)
| _ -> evt


let system = System.create "cluster-system" (configWithPort 0)
Akka.Cluster.Cluster.Get(system).SelfAddress
|> Akka.Cluster.Cluster.Get(system).Join

System.Threading.Thread.Sleep(2000)

SqlitePersistence.Get(system) |> ignore

let readJournal = PersistenceQuery.Get(system).ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);

let source = readJournal.EventsByTag("default")
let mat = ActorMaterializer.Create(system);
System.Threading.Thread.Sleep(2000)
source.RunForeach((fun e ->System.Console.WriteLine(e)), mat) |> ignore

let actorProp (mailbox : Eventsourced<_>)=
let rec set (state : Order option) =
actor {
let! (msg) = mailbox.Receive()
match msg with
| Event (OrderPlaced o) when mailbox.IsRecovering () ->
return! o |> Some |> set
| Command(PlaceOrder o) ->
return o |> OrderPlaced |> Event |> Persist
| Persisted mailbox (Event(OrderPlaced o)) ->
return! o |> Some |> set
| _ -> invalidOp "not supported"
}
set None




let orderFactory =
(AkklingHelpers.entityFactoryFor system "Order"
<| propsPersist actorProp
<| None).RefFor AkklingHelpers.DEFAULT_SHARD

let init () = mat, system


64 changes: 64 additions & 0 deletions FBlazorShop/AkklingHelpers.fs
@@ -0,0 +1,64 @@
module AkklingHelpers

open System
open Akka.Actor
open Akka.Configuration
open Akka.Cluster
open Akka.Cluster.Tools.Singleton
open Akka.Cluster.Sharding
open Akka.Persistence
open Akka.Persistence.Sqlite

open Akkling
open Akkling.Persistence
open Akkling.Cluster
open Akkling.Cluster.Sharding
open Hyperion
open Akka.Serialization

[<Literal>]
let DEFAULT_SHARD = "default-shard"

type internal TypedMessageExtractor<'Envelope, 'Message>(extractor: 'Envelope -> string*string*'Message) =
interface IMessageExtractor with
member this.ShardId message =
match message with
| :? 'Envelope as env ->
let shardId, _, _ = (extractor(env))
shardId
| :? Akka.Cluster.Sharding.ShardRegion.StartEntity as se -> printfn "%A" se.EntityId; DEFAULT_SHARD
| _ -> invalidOp <| message.ToString()
member this.EntityId message =
match message with
| :? 'Envelope as env ->
let _, entityId, _ = (extractor(env))
entityId
| _ ->printfn "kkj"; "entity-1"
member this.EntityMessage message =
match message with
| :? 'Envelope as env ->
let _, _, msg = (extractor(env))
box msg
| _ -> null


// HACK over persistent actors
type FunPersistentShardingActor<'Message>(actor : Eventsourced<'Message> -> Effect<'Message>) as this =
inherit FunPersistentActor<'Message>(actor)
// sharded actors are produced in path like /user/{name}/{shardId}/{entityId}, therefore "{name}/{shardId}/{entityId}" is peristenceId of an actor
let pid = this.Self.Path.Parent.Parent.Name + "/" + this.Self.Path.Parent.Name + "/" + this.Self.Path.Name
override this.PersistenceId = pid

// this function hacks persistent functional actors props by replacing them with dedicated sharded version using different PeristenceId strategy
let internal adjustPersistentProps (props: Props<'Message>) : Props<'Message> =
if props.ActorType = typeof<FunPersistentActor<'Message>>
then { props with ActorType = typeof<FunPersistentShardingActor<'Message>> }
else props
let entityFactoryFor (system: ActorSystem) (name: string) (props: Props<'Message>) (shardSettings:ClusterShardingSettings option) : EntityFac<'Message> =

let clusterSharding = ClusterSharding.Get(system)
let adjustedProps = adjustPersistentProps props
let shardRegion =
clusterSharding.Start(name, adjustedProps.ToProps(),
defaultArg shardSettings (ClusterShardingSettings.Create(system)), new TypedMessageExtractor<_,_>(EntityRefs.entityRefExtractor))
{ ShardRegion = shardRegion; TypeName = name }
2 changes: 2 additions & 0 deletions FBlazorShop/FBlazorShop.fsproj
Expand Up @@ -4,6 +4,8 @@
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<Compile Include="AkklingHelpers.fs" />
<Compile Include="Actor.fs" />
<Compile Include="OrderService.fs" />
<Compile Include="Extensions.fs" />
</ItemGroup>
Expand Down

0 comments on commit a57252d

Please sign in to comment.