Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] Log-based storage for grain components #7691

Open
ReubenBond opened this issue Apr 16, 2022 · 12 comments
Open

[Proposal] Log-based storage for grain components #7691

ReubenBond opened this issue Apr 16, 2022 · 12 comments
Assignees
Labels
area-persistence category for Orleans persistence issues epic
Milestone

Comments

@ReubenBond
Copy link
Member

ReubenBond commented Apr 16, 2022

Implement extensible, per-grain log-based storage for grain components such that the state of each component can be implemented as a log-based, snapshotable state machine. Log entries for each component are multiplexed onto the per-grain log. Snapshots for each component are merged together and stored as a single unit. Components of a grain can register their state machines by name. This should be the primary storage mechanism used by grains.

image

Each color in the above image represents a differently named state machine. Each numbered box represents a log entry for the corresponding state machine. The picture attempts to demonstrate how multiple components' logs are multiplexed onto a single shared log per grain as well as how snapshots of each component are merged to form one unit. It shows three example state machines: Reminders, IPersistentState<T>, and IPersistentList<T>.

Benefits and use cases

Atomic updates across multiple components

By multiplexing the logs of each component state machine onto a single per-grain log, atomic updates involving multiple
components can be implemented. For example, if grain reminders (proposal to follow) and grain state are both implemented
using this approach, a reminder method can atomically modify state and schedule a reminder, and similarly modify state and delete a reminder.

Here is an example snippet for a theoretical game where a player can consume an item which has a limited-time effect on the player (eg, imagine an in-game buff). The item application and reminder scheduling occur atomically: there is no need to be sure to first schedule the reminder to remove the effect before applying the effect. Similarly, in the reminder handler, there is no need to first check that the effect was actually applied before removing it and the reminder.

// Consume an in-game consumable item
async Task ConsumeItem(ItemDetails item)
{
  // Apply the effect, promising to remove it after its effect duration elapses.
  item.ApplyEffect(_playerState);
  reminders.Schedule(
    name: "removeEffect-" + Guid.NewGuid(),
    state: item,
    dueTime: DateTime.UtcNow + item.EffectDuration);
  
  await WriteStateAsync(); // Persist all pending changes to storage
}

async Task ReceiveReminder(string reminderName, object state)
{
  if (reminderName.StartsWith("removeEffect-"))
  {
    // Remove the item's effect and the reminder.
    var item = (ItemDetails)state;
    item.RemoveEffect(_playerState);
    reminders.Remove(reminderName);
    await WriteStateAsync(); 
  }
}

This can greatly simplify the act of writing fault-oblivious code when there is no I/O involved (eg, calling into other grains). For cases where an operations crosses grain boundaries, Orleans Transactions can be used. The log-based approach can be applied to transactions, too: Sebastian Burckhardt created a branch of the Orleans-Tx repository which implemented transactions on top of event sourcing. I believe that is a good approach.

Efficient writes to large state machines

We typically advise developers to keep their grain state relatively small if it is going to be updated frequently, since each update involves serializing the entire state and writing it to storage. This log-based approach helps to relax this requirement in some cases. For example, instead of only offering the opaque blob form of grain state which we offer today, we could support reliable lists, dictionaries, hash maps, and other data structures. Instead of writing an entire snapshot of the data structure to storage on every operation, we can record individual operations, such as Insert object X at index 5. This is a more efficient approach to storing the entire data structure on each operation.

Very large data structures can also be implemented using this approach: rather than loading a snapshot of the entire data structure into memory during activation before replaying the log, the snapshot is not needed. The data structure can be stored in independent storage (eg, an Azure Table Storage partition), with a version number which is atomically updated as each log entry (storage operation) is applied. Log entries which have not been applied (where the log entry id is greater than the version number in storage) are applied before any subsequent operations.

Generally speaking, any data structure can be implemented on top of log-based storage, and this is how the majority of reliable data stores (databases, file systems, etc) are implemented: using a write-ahead log. That write-ahead log is often stored separately from the main data structure (eg, a B-Tree).

The key to this approach is that log entries are written prior to being applied to the data structure, log entries are always applied in-order, and either operations are idempotent or there is a way to ensure that they are only applied once (for example by including a version number in both the log entry and on the data structure).

Enhanced reminders and workflows

The reminder service will be modified to record only a grain id and the due time of the first reminder for that grain. For further
extensibility, we will also include a property bag (eg, imagine a priority property becomes useful). The reminders table contains one entry per grain, indicating the minimum of the due time of all reminders on the grain. The details of a grain's reminders will be stored in the grain's own state. A reminder will fire as soon as it becomes due by calling the ReminderGrainExtension, which will find and fire due reminders. The ReminderGrainExtension is responsible for removing unnecessary reminders from the reminder table by calling the reminder service. The extension is also responsible for calling IRemindable.ReceiveReminder on the grain, in order to support the existing reminder interface. This mechanism also allows us to enhance the functionality of reminders in the future, eg to support various retry mechanisms in a scalable manner. The reminder grain extension can be used to support external callers managing reminders for a grain without having to add additional grain methods.

image

The above image shows a depiction of the simplified reminder table. See the corresponding issue: #7573 and comment: #7573 (comment). This can allow a developer to modify reminders atomically with respect to state.

Hopefully, I can put out a workflows proposal soon, too.

Indexes

The Orleans.Indexing project implements indexing using workflows (see Indexing in an Actor-Oriented Database).

Exactly-once stream processing

We had an internal proposal some time back from a partner team to simplify the implementation of exactly-once processing by means of a state machine which automatically saved StreamSequenceTokens alongside state updates. One issue at the time was that user-defined state types (eg IPersistentState<MyData>) are not extensible.

Event Sourcing

Orleans.EventSourcing (from Geo-Distribution of Actor-Based Services) already offers log-based storage at the grain level.
JournaledGrain offers an event sourcing abstraction of a single log per grain, with events (log entries) handled by
application code. By contrast, the proposal is to support multiple, named state machines which have their events multiplexed onto a log. The goal is to allow stateful components within each grain to use log-based storage and while exposing an event sourcing abstraction to application can be supported, by registering a state machine for the application code, it is not the primary goal. This proposal could be implemented by extending/modifying Orleans.EventSourcing, and JournaledGrain could
be re-implemented on top of this approach.

Efficient, improved extensibility

Having all storage reads and writes co-located in the same multiplexed log and amalgamated snapshots affords a good deal of extensibility in an efficient manner. A component can register itself as a state machine without incurring any IO overhead on the grain if it does not emit any log entries. This improves startup time because instead of a grain needing to read from potentially many locations, it only needs to read from two (which could be one, using append-only blobs): a snapshot blob and a range-query on a log.

API Design

The key design points are:

  • Components implemented using state machines have synchronous APIs for making updates, with updates being persisted
    by calling a grain-wide WriteStateAsync() method. There is no API for undoing an update.
  • Each grain has a single state machine manager which can be used to register named state machines, persist pending log entries from registered state machines, and checkpoint state.
  • Each log entry and snapshot for a given state machine contains a monotonically increasing version number.
  • State machines are responsible for serializing their own log entries and snapshots. The infrastructure provides framing, but the contents are opaque to the infrastructure.
  • State machines are responsible for registering themselves with the state machine manager.
  • The state machine manager requests log entries from registered state machines when it itself is asked to persist the state of the grain.
  • The state machine manager is responsible for eventually purging unnecessary logs entries. Any log entry for a given state machine with a version number which is lower than the most recent snapshot for that state machine is unnecessary and is eligible to be purged.
  • The state machine manager is responsible for reading snapshots from storage and providing them to the corresponding state machine.

To make it a little more concrete, here is an example of what some of the underlying interfaces and data types might look like:

public interface IStateMachineManager
{
    void RegisterStateMachine(string name, IStateMachine stateMachine);
    ValueTask SnapshotAsync();
    ValueTask WriteStateAsync();
}

public interface IStateMachineStorage
{
    MemoryPool<byte> MemoryPool { get; }
    void AppendEntry(LogEntry entry);
    void AppendEntries(ICollection<LogEntry> entry);
    ValueTask WaitForCommit(StateMachineVersion minVersion);
    void RequestSnapshot();
}

public interface IStateMachine
{
    string Name { get; }
    void OnInitialize(IStateMachineStorage storage);
    ValueTask OnRestoreSnapshotAsync(Snapshot snapshot);
    ValueTask OnReplayLogEntryAsync(LogEntry logEntry);
    Snapshot CreateSnapshot();
}

public readonly struct LogEntry
{
    public string StateMachineName { get; init; }
    public StateMachineVersion Version { get; init; }
    public ReadOnlySequence<byte> Data { get; init; }
}

public readonly struct Snapshot
{
    public string StateMachineName { get; init; }
    public StateMachineVersion Version { get; init; }
    public ReadOnlySequence<byte> Data { get; init; }
}

public readonly struct StateMachineVersion
{
    public StateMachineVersion(long version) { Value = version; }
    public long Value { get; init; }
}

Further enhancements

Auto-persisting state and greatly improving throughput

If we do implement this, then we can reduce the amount of code a developer needs to write by ensuring that state is persisted automatically after every grain call, before the response is sent to the caller. By doing this, we can also enable an optimization to amortize storage writes over multiple grain calls, however this increases the scope for some potential complications. The optimization could work by allowing other grain calls to execute while a previous write operation is in-flight. Multiple grain calls can execute in that time, with their write operations batched together. This can greatly increase throughput for calls which involve storage writes. If a grain does not ensure that state is persisted before performing an operation which might have an observable side-effect (such as calling another grain, passing the current non-durable state), then errors could result if the grain is unable to persist that state (for example, because of a system crash or database failure). Developers can mitigate this by explicitly calling WriteStateAsync() before any such operation. This same issue (modify state and act on it before persisting it) exists today: a grain call might modify some state, try to persist it, and fail due to a transient network error. A subsequent grain call may then run, assume the in-memory state was durably persisted (since the developer was sure to call WriteStateAsync() every time they modified state), and erroneously perform some unsafe operation. Since the state machine manager can detect state changes by asking all state machines if they have any changes, calls to WriteStateAsync() can become no-ops if there are no changes to persist. Therefore, it would be relatively cheap to automatically call WriteStateAsync() before invoking any method in addition to afterwards.

Supporting transactions more broadly

This proposal allows for atomic operations across multiple components in a grain, but it does not support atomicity across multiple grains. For that, we have Orleans Transactions, a scalable implementation of distributed ACID transactions (see Transactions for Distributed Actors in the Cloud, Actor-Oriented Database Systems, and Resurrecting Middle-Tier Distributed Transactions).

Orleans already allows multiple ITransactionalState<TState> instances within a grain to participate in a transaction. Developers must decide at design-time whether some state is going to need transactions or not (since they are stored differently). Additionally, we do not currently support non-transactional access to ITransactionalState<TState>. If state machine operations do not have externally visible side-effects, then a transaction can be rolled back by applying the most recent snapshot and all subsequent logs from prior to the transaction. This rollback can be performed by the state machine manager. This may allow a developer to design their system without transactions in mind and then add transactions as-needed. We could gradually enhance various systems with transaction support. For example, a scheduled financial transaction could operate on multiple grains and unschedule itself atomically.

@rjygraham
Copy link

@ReubenBond I think this proposal looks great and while it may not be perfect I don't see anything glaring after my first past through.

Some initial thoughts/reactions/questions:

  • Will we be able to register multiple state machine managers with different underlying storage mechanisms in a single silo?
  • Will state machine managers be able to utilize different storage mechanisms for log entries vs snapshots?
  • How will the state machine manager determine when it can remove unnecessary log entries? While purging old log entries is ideal in many cases, there are scenarios where specific grains may need to retain full fidelity of their logs and therefore this purge strategy logic likely needs to be externalized from the state machine manager

Some additional (selfish) thoughts/musings taking a step back to view this proposal as it fits into the bigger picture:

  • For running this on Azure:
    • We need 1st class support for writing log entries/snapshots to Azure Cosmos DB (not in Orleans.Contrib)
    • We need 1st class support for Azure Cosmos DB change feed as an Orleans stream provider (not in Orleans.Contrib)
      • Can potentially be used for building/updating materialized views and other processing logic
  • Multi-cluster support, log-based storage, with multi-region storage systems (Azure Comsos DB) can offer:
    • Ability to run global active/active large-scale systems where grains can be activated with full-fidelity on clusters nearest the client. Consider a B2C application where customer starts on US West Coast and travels to Europe with layover in US East Coast. Grains could be activated in US West region -> US East region -> EU region. Activations on cluster in new region tell activations on clusters in other regions to deactivate.
    • Near turn-key disaster recovery/business continuity for many large-scale systems

@ReubenBond
Copy link
Member Author

ReubenBond commented Apr 18, 2022

Good to hear & thanks for the feedback!

Will we be able to register multiple state machine managers with different underlying storage mechanisms in a single silo?

I would resist this strongly, and I think that's an important point to be strict on. The grain's state is a unit of atomicity. It's important for the programming model to be easy for developers to understand. That doesn't mean that state must all be stored using the same mechanism (see above under Efficient writes to large state machines), but it means the log + snapshots do.

Will state machine managers be able to utilize different storage mechanisms for log entries vs snapshots?
Sure, no reason why not. It's slightly more efficient if they co-locate them, since it can potentially reduce recovery to a single read operation, instead of two (log + snapshot).

How will the state machine manager determine when it can remove unnecessary log entries?

Any log entry older than the most recent snapshot is eligible to be purged. If components need their full history, then they should independently write them to somewhere else.

We need 1st class support for writing log entries/snapshots to Azure Cosmos DB (not in Orleans.Contrib)

Yes, we should pull it in. It's out of scope, but it would be ideal.

We need 1st class support for Azure Cosmos DB change feed as an Orleans stream provider

I think that's too far out of scope. I don't see the Change Feed as being appropriate for Orleans Streams. Maybe there's something which can be done there, and I understand the benefits, but it's worth discussing on a separate issue.

Multi-cluster support, log-based storage, with multi-region storage systems (Azure Comsos DB) can offer

Cosmos DB only supports eventual consistency writes to different regions for an account with multiple write regions. I also consider this out-of-scope: discussions like this can get off-track very easily. We can consider some kind of multi-region support for non-Global-Single-Instance grains further down the line. We're working on initial multi-clustering support with the help of internal partners.

@jsteinich
Copy link
Contributor

Sebastian Burckhardt created a branch of the Orleans-Tx repository which implemented transactions on top of event sourcing

Is this publicly available somewhere? I know that the current transactions can be thought of as a series of events on the grains involved, but maintaining the coordination / knowing when the transaction has completed are still very important.

Would this open the door to reminder scheduling that ties into the transaction? Seems like it sort of does, but it's not clear to me that on state machine triggering a rollback would necessarily trigger another state machine to do the same.

This proposal does seem to make alternative storage mechanisms less viable. While you can still write what you want, you won't get the benefits that this proposal adds.
Personally, we've been using transactions so are already tied to storage; however, we've still been able to customize via a custom storage provider. That is also still possible, but only have a sequence of bytes makes that either more limiting or requiring extra processing.
This also connects back to indexing via storage backed (inactive) indexes. That's one of the main reasons that we have custom storage and is also a common question that the current default answer is to not use Orleans storage.

@ReubenBond
Copy link
Member Author

Is this publicly available somewhere? I know that the current transactions can be thought of as a series of events on the grains involved, but maintaining the coordination / knowing when the transaction has completed are still very important.

It's not, but it exists in a private, archived repository - we might be able to dig it up if you're interested.

Would this open the door to reminder scheduling that ties into the transaction? Seems like it sort of does, but it's not clear to me that on state machine triggering a rollback would necessarily trigger another state machine to do the same.

That's my hope. It would be contingent on it being implemented.

This proposal does seem to make alternative storage mechanisms less viable. While you can still write what you want, you won't get the benefits that this proposal adds.

Agreed, but I think the result is worthwhile. We could still use simple key-value storage, but writes would first be written to the grain's log before being eventually propagated to the KV storage.

@ElanHasson
Copy link
Contributor

ElanHasson commented May 5, 2022

Would this open the door to reminder scheduling that ties into the transaction? Seems like it sort of does, but it's not clear to me that on state machine triggering a rollback would necessarily trigger another state machine to do the same.

Does this mean (pseudo-code) the below would remove the scheduled reminder in addition to the work done in WriteStuff?

conductor.StartTx()
grain1.WriteStuff()
grain2.ScheduleReminderForSomething()
grain3.CallMeToRollbackTxn()

A thought that comes to mind is if the scheduled reminder's dueTime comes before the transaction is committed.

@ReubenBond
Copy link
Member Author

If the dueTime is before the transaction is committed, it will fire once the transaction is committed (based on the Reminders V2 proposal notes). If it didn't, then the effect of the transaction would be visible from outside the transaction. Under the implementation that I'm considering, there would be some effect which is arguably visible: the call to the reminder service would occur before the transaction is committed (no later than before Prepare completes), so the grain could be woken spuriously if the transaction aborts. A subsequent call to the reminder service will remove the grain if there end up being no reminders scheduled (the txn aborts and there are no other pending reminders).

@rafikiassumani-msft rafikiassumani-msft added area-persistence category for Orleans persistence issues epic and removed Needs: triage 🔍 labels Jun 2, 2022
@rafikiassumani-msft rafikiassumani-msft added this to the Backlog milestone Jun 2, 2022
@ghost
Copy link

ghost commented Jun 2, 2022

We've moved this issue to the Backlog. This means that it is not going to be worked on for the coming release. We review items in the backlog at the end of each milestone/release and depending on the team's priority we may reconsider this issue for the following milestone.

@randrewy
Copy link

randrewy commented Jun 6, 2022

That's a great proposal! First of all, having separate states for grain and components is cool by itself, and enabling atomicity between all of them is even cooler.

Though I have the same concerns as @jsteinich. This makes EventSourcing a superior pattern. And EventSourcing is a little bit verbose with all those extra events needed to be declared and extra process functions.

It is a great improvement for those who already use ES, but I think it could improve workflow for those who use simple KV as well. Or maybe it already does and I got this all wrong :)
Maybe you can provide some extra details about what is happening in item.ApplyEffect(_playerState); from the example to illustrate the difference between KV approach

class Item {
	public ApplyEffect(PlayerState state)
	{
		state.Effects.Add(this.effectData)	
	}
}

versus ES approach:

class Item {
	public ApplyEffect(PlayerState state) { <???> }
}

So here are some questions/thoughts:

We could still use simple key-value storage, but writes would first be written to the grain's log before being eventually propagated to the KV storage.

Does that mean that a new storage will be (implicitly) added to store ES events in addition to existing storage. And user will have to manually write to that storage or some auto-mappers will exist (1).

This log-based approach helps to relax this requirement <...> we could support reliable lists, dictionaries, hash maps, and other data structures. Instead of writing an entire snapshot of the data structure to storage on every operation, we can record individual operations, such as Insert object X at index 5. This is a more efficient approach to storing the entire data structure on each operation.

Looks like, if those collections are supported, it would be possible to adapt those internal operations-events to be used with any kind of storage.

The goal is to allow stateful components within each grain to use log-based storage and while exposing an event sourcing abstraction to application can be supported, by registering a state machine for the application code, it is not the primary goal

So this proposal aims to hide all ES-related stuff under the hood? How will that work?

This shared log is stored is a continuous collection of events, correct? That means that all states are loaded at once for all components, which is a good part. But what about grain extensions? They can be used as a lazy-loaded component, and it is tempting to use same IStateMachineStorage mechanism for them as well to continue to provide state atomicity. But that will tie the extension state to grain state, so it will be loaded with all other states no matter if an extension will ever be installed on grain.

Though as far as I know it is not possible to use persistence inside grain extensions, I hope that will change some day and this issue will arise.

(1):
If we are talking about Orleans 4, then its serialization attributes in couple with reliable collections can lead to auto-mappers from simple KV to ES events, something like

[GenerateSerializer]
public class ChatState
{
    [Id(0)]
    public EventSoutcedList<ChatMessage> Messages { get; set; };

    public ChatState(IStateMachineStorage storage)
    {
    	Messages = new(storage);
    } 
}

ChatState ghrainChatState;
ghrainChatState.Messages.Add(msg); // ->  will lead to auto-generated event ListItemAddedEvent {Id = 0, Item = msg}

@omariom
Copy link

omariom commented Jun 9, 2022

Speaking of WALs and KV stores, have you considered FASTER Log or FASTER KV?
https://github.com/microsoft/FASTER/

@ReubenBond
Copy link
Member Author

@omariom

Speaking of WALs and KV stores, have you considered FASTER Log or FASTER KV?

Yep, I think that if we do this, a FASTER-based backend would be great to have. The biggest concerns I have are around multiplexing many grain logs onto a small number of FasterLog instances. Eg, whether or not we could make it efficient and scalable without too much effort. Eg, there might be concerns such as whether or not to support dynamically increasing the number of log partitions (one FasterLog instance per grain would be too costly), and how, and the work around division of responsibilities among the hosts in the cluster. One approach might be to start with a fixed number of log partitions and add support for increasing that number later - with a directory (KV) mapping grains to logs. There would be a lot of design and implementation work required to make it production-worthy, but there is solid potential there. cc @badrishc

@ReubenBond
Copy link
Member Author

@randrewy apologies - I missed your response earlier.

Maybe you can provide some extra details about what is happening in item.ApplyEffect(_playerState); from the example to illustrate the difference between KV approach

I think I see where the confusion lies: ApplyEffect here has nothing to do with ES, it's just some pretend method which applies the effect of an item (eg, a stamina boost) to a player. It just updates the player's state. The WriteStateAsync() call after it persists that state to storage (first by writing a log entry and eventually by flushing that entry to KV storage, if necessary.

So this proposal aims to hide all ES-related stuff under the hood? How will that work?

The same way it works for any database. There's no "event sourcing" from the developer's perspective, the write-ahead logging is just an implementation detail of the individual components (such as a durable collection, durable state, reminders, and workflow storage).

This shared log is stored is a continuous collection of events, correct? That means that all states are loaded at once for all components, which is a good part. But what about grain extensions? They can be used as a lazy-loaded component, and it is tempting to use same IStateMachineStorage mechanism for them as well to continue to provide state atomicity. But that will tie the extension state to grain state, so it will be loaded with all other states no matter if an extension will ever be installed on grain.

Correct, it's a collection of events. The "continuous" part doesn't really gel with me since they are logically only continuous for a given component. Each component's log is multiplexed onto the grain's log, and each of those can be compacted. One simple compaction strategy for a component which is no longer present (eg, a grain extension which isn't loaded) might be to shift all existing log entries to the end of the log. That simple strategy might not work for transactions, but that's a separate concern.

For grain extensions, you might have choices other than leaving unclaimed logs as-is: eg, perhaps the log manager can resolve & activate a component by name (requiring that all components be registered with some kind of factory that can inspect the name and activate the component).

@randrewy
Copy link

@ReubenBond no worries, you've just laid it all out for me. I was too focused on ES :)

Just curious now about that WAL-like behaviour implementation. Will wait for PR to see the code:)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-persistence category for Orleans persistence issues epic
Projects
None yet
Development

No branches or pull requests

7 participants