A reactive, write-through observable cache for F# backed by Rx.NET. Supports CRUD operations with correlation IDs, grouped eviction, and automatic database persistence.
dotnet add package ObservableCache
ObservableCache sits between your application and your database. Operations flow in as an IObservable<Input>, items are cached in memory, and persistence to the database is handled automatically on eviction. Each operation carries a CorrelationId so results can be matched back to the caller.
- Create / Update — items are written to the in-memory cache immediately and persisted to the database when evicted.
- Read — items are served from the cache if present; otherwise fetched from the database and cached.
- Delete — items are removed from the cache immediately and deleted from the database.
- Eviction — items are evicted (and persisted) after a configurable idle
TimeSpanwith no activity, or immediately on delete.
Takes an IObservable<Input> and returns an IObservable<Guid * CacheOutput>.
open ObservableCache
open System
open System.Reactive.Subjects
open FSharp.Control.Reactive
let inputSubject = new Subject<Input<MyItem, Guid, MyItemMsg>>()
let outputObservable =
obsCache
saveToDatabase // MyItem -> IObservable<Result<MyItem, string>>
loadFromDatabase // Guid -> IObservable<Result<MyItem, string>>
deleteFromDatabase // Guid -> IObservable<Result<unit, string>>
applyMessage // MyItemMsg -> MyItem -> MyItem
(TimeSpan.FromSeconds 30.0)
inputSubjectReturns four typed dispatch functions and the raw output observable — the recommended entry point for most use cases.
open ObservableCache
open System
let createItem, readItem, updateItem, deleteItem, outputObservable =
createHelperFunctions
saveToDatabase // MyItem -> IObservable<Result<MyItem, string>>
loadFromDatabase // Guid -> IObservable<Result<MyItem, string>>
deleteFromDatabase // Guid -> IObservable<Result<unit, string>>
applyMessage // MyItemMsg -> MyItem -> MyItem
(TimeSpan.FromSeconds 30.0)
// Dispatch functions return Tasks
let result: Task<Result<MyItem, string>> = createItem (id, item)
let result: Task<Result<MyItem, string>> = readItem id
let result: Task<Result<MyItem, string>> = updateItem (id, msg)
let result: Task<Result<unit, string>> = deleteItem id
// outputObservable emits all cache operations as (CorrelationId * CacheOutput) pairs| Type | Description |
|---|---|
CacheInput<'Item, 'ItemId, 'ItemMsg> |
Discriminated union of CreateItem, ReadItem, UpdateItem, DeleteItem |
Input<'Item, 'ItemId, 'ItemMsg> |
A CacheInput with a CorrelationId: Guid |
CacheOutput<'ItemId, 'Item> |
CreateItemOnDB, ReadItemOnDB, UpdateItemOnDB, DeleteItemOnDB — each carrying the 'ItemId and a Result |
Output<'ItemId, 'Item> |
A CacheOutput with a CorrelationId: Guid |
| Function | Signature |
|---|---|
obsCache |
('Item -> IObservable<Result<'Item, string>>) -> ('ItemId -> IObservable<Result<'Item, string>>) -> ('ItemId -> IObservable<Result<unit, string>>) -> ('ItemMsg -> 'Item -> 'Item) -> TimeSpan -> IObservable<Input<'Item, 'ItemId, 'ItemMsg>> -> IObservable<Guid * CacheOutput<'ItemId, 'Item>> |
createHelperFunctions |
Same first five parameters; returns (('ItemId * 'Item) -> Task<Result<'Item, string>>) * ('ItemId -> Task<Result<'Item, string>>) * (('ItemId * 'ItemMsg) -> Task<Result<'Item, string>>) * ('ItemId -> Task<Result<unit, string>>) * IObservable<Guid * CacheOutput<'ItemId, 'Item>> |