Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Renamed EventStorage to EventBusAndStorage

  • Loading branch information...
commit 5570d26e309eef3750f5ec11861dce8a083b40c2 1 parent ee77ffa
@Thorium authored
Showing with 0 additions and 145 deletions.
  1. +0 −79 CommandSide/EventStorage.fs
  2. +0 −66 CommandSide/EventStorageRx.fs
View
79 CommandSide/EventStorage.fs
@@ -1,79 +0,0 @@
-module EventStorage
-
-open Domain
-open Events
-open System
-
-/// Infrastructure to save and restore data from some storage
-type IRepository =
- abstract Save : AggregateRoot -> string
- abstract GetHistoryById<'t when 't :> AggregateRoot> : Guid -> ('t -> 't)
-
-/// Infrastructure to generic event storage system
-type EventStoreMethod =
-| SaveEvents of Guid * Event list
-| GetEventsForAggregate of Guid * AsyncReplyChannel<Event list>
-| Quit
-
-/// Container to capsulate events
-type EventDescriptor(id:Guid, eventData:Event) =
- member x.Id = id
- member x.EventData = eventData
-
-///Used just to notify others if anyone would be interested
-let public EventBus = new Microsoft.FSharp.Control.Event<Event>()
-let public MonitorEvents (eventHandle: Events.Event -> unit) = EventBus.Publish |> Observable.add(eventHandle)
-
-/// Custom implementation of in-memory time async event storage. Using message passing.
-type EventStorage() =
- let eventstorage = MailboxProcessor.Start(fun ev ->
- let rec msgPassing history =
- async { let! k = ev.Receive()
- match k with
- | Quit -> return ()
- | SaveEvents(id, events) ->
-
- let storeAndPublish evt =
- EventBus.Trigger evt
- EventDescriptor(id, evt)
-
- let descriptors = events |> List.map (storeAndPublish)
-
- return! msgPassing(descriptors @ history)
- | GetEventsForAggregate(id, reply) ->
-
- let evts = history
- |> List.filter(fun i -> i.Id=id)
- |> List.map (fun i -> i.EventData)
- reply.Reply(evts)
- return! msgPassing(history)
- }
- msgPassing [])
-
- interface IRepository with
- member x.Save (item:AggregateRoot) =
- eventstorage.Post(SaveEvents(item.Id, item.GetUncommittedChanges))
- "saved"
-
- member x.GetHistoryById id =
- eventstorage.PostAndReply(fun rep -> GetEventsForAggregate(id,rep))
- |> List.rev
- |> LoadsFromHistory
-
- member x.Quit =
- eventstorage.Post(Quit)
- do Console.WriteLine("Storage exited.")
-
- member x.ShowItemHistory id =
- eventstorage.PostAndReply(fun rep -> GetEventsForAggregate(id,rep))
- |> List.rev
- |> List.iter Console.WriteLine
-
-(*
-// Tests for Interactive:
-let storage = new EventStorage()
-let id = System.Guid.NewGuid()
-(storage :> IRepository).Save (new InventoryItem(id, CreateType.New, "testi"))
-storage.ShowItemHistory id
-storage.Quit
-*)
View
66 CommandSide/EventStorageRx.fs
@@ -1,66 +0,0 @@
-/// Event storage using reactive extensions (Rx)
-module EventStorage
-
-open Domain
-open Events
-open System
-
-/// Infrastructure to save and restore data from some storage
-type IRepository =
- abstract Save : AggregateRoot -> string
- abstract GetHistoryById<'t when 't :> AggregateRoot> : Guid -> ('t -> 't)
-
-/// Container to capsulate events
-type EventDescriptor(id:Guid, eventData:Event) =
- member x.Id = id
- member x.EventData = eventData
-
-///Used just to notify others if anyone would be interested
-let private eventBusSubject = new System.Collections.Generic.Subject<Event>()
-let public EventBus = eventBusSubject :> IObservable<Event>
-let public MonitorEvents (eventHandle: Events.Event -> unit) = EventBus.Subscribe(eventHandle)
-
-/// Custom implementation of in-memory time async event storage. Using message passing.
-type EventStorage() =
-
- let eventstorage = new System.Collections.Generic.ReplaySubject<EventDescriptor>()
-
- let SaveEvents id events =
- let storeAndPublish evt =
- eventBusSubject.OnNext evt
- EventDescriptor(id, evt)
- |> eventstorage.OnNext
-
- events |> List.iter storeAndPublish
-
- let GetEventsForAggregate id =
- eventstorage
- |> Observable.filter(fun i -> i.Id=id)
- |> Observable.map (fun i -> i.EventData)
-
- interface IRepository with
- member x.Save (item:AggregateRoot) =
- SaveEvents item.Id item.GetUncommittedChanges
- "saved"
-
- member x.GetHistoryById id =
- let currentEvents =
- let currentEventList = new System.Collections.Generic.List<Event>()
- GetEventsForAggregate id
- |> Observable.subscribe(fun e -> currentEventList.Add(e))
- |> ignore
- currentEventList.ToArray() |> Array.toList
-
- LoadsFromHistory currentEvents
-
- member x.ShowItemHistory id =
- GetEventsForAggregate id
- |> Observable.subscribe(Console.WriteLine)
- |> ignore
-(*
-// Tests for Interactive:
-let storage = new EventStorage()
-let id = System.Guid.NewGuid()
-(storage :> IRepository).Save (new InventoryItem(id, CreateType.New, "testi"))
-storage.ShowItemHistory id
-*)
Please sign in to comment.
Something went wrong with that request. Please try again.