Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARCHIVED -- Event Store Improvements for v4 #1307

Closed
jeremydmiller opened this issue Jul 7, 2019 · 25 comments
Closed

ARCHIVED -- Event Store Improvements for v4 #1307

jeremydmiller opened this issue Jul 7, 2019 · 25 comments
Milestone

Comments

@jeremydmiller
Copy link
Member

jeremydmiller commented Jul 7, 2019

THIS ISSUE HAS BEEN REPLACED BY #1608.

WIP: I just got back from a vacation and got to thinking about the event store after getting enough rest for once

Big Existing Issues

Other ideas for improvements

  • Maybe the async daemon gets completely rewritten with RxExtensions as opposed to the TPL Dataflow. I like the Dataflow lib personally, the an easy way to deal with the async daemon and multi-tenancy is to split streams by tenant and use a separate document session per tenant as necessary. Either way, I want the async daemon a bit more optimized for rebuilds and regular projections

  • Possibly do either a sample app or a pre-built app that hosts the async daemon process. We could go super slim or build out full blown Azure and/or AWS infrastructure for monitoring and maybe an admin UI. Some kind of support for clustering the daemon with failover. Some kind of support for triggering rebuilds? I'm already dreading the arguments over exactly what technology stack to use, but oh well.

  • Possibly a different pre-built application that incorporates some kind of service bus or queuing mechanism to pipe events captures in the DocumentSession through a listener to a queue where the projections would be built by some kind of lightweight (or just the real thing in a slightly different mode) async daemon. We'd have to deal with some message sequencing to make that work, but it's possible. Not a slam dunk 'cause some projection types have to be singletons because they're stateful

  • Projection snapshotting. Like maybe you take a snapshot of an aggregate every 5 events and store that so that on domain aggregations are much faster. Kind of a hybrid between live and on demand. Plenty of folks have asked for this over the years.

  • Add extra, extending interfaces on top of IProjection that could refine the behavior of the async daemon for better efficiency. Stuff like, "does it need event metadata at all, or just the event data?" or "does it aggregate one stream at a time" that might change how the async daemon would work, especially for rebuilds.

  • I would like to see us do a full replacement for both the existing Aggregator implementation and possible ViewProjection. I've got some ideas for this, but haven't written anything down yet. Don't scream at me yet;)

  • Possibly do adapters so you can use existing projection libs like Liquid Projections from within Marten

@oskardudycz
Copy link
Collaborator

oskardudycz commented Jul 9, 2019

@jeremydmiller Thank's for this write-up! Here are my thoughts and other ideas that I was thinking on:

Async Daemon

Internals of it's implementation are still enigmatic for me, so to give more detailed answers. So at first I'd propose to provide good documentation and samples for that (as we still lack it, afaik it's still only your blog post about that).

  • Multi-Tenancy - For sure in my previous commercial project, as we were using extensively Conjoined Tenancy it was the issue that we were not able to rebuild projections. So from my perspective that's really important, although I'm not sure what's the adoption of that feature - so how much our users needs that.

  • Restart - I don't fully get the description of this feature. That sounds needed although it's hard to judge for me the complexity

  • Tombstone placeholders - it's the same, I'm not able to judge the complexity. Feature for me looks like nice to have but not top priority.

  • Clustering, ReactiveExtensions, Performance, etc. - All f that features are great, I'd gladly participate with implementing that, although my biggest concern is related to our capabilities. So having our timeboxes if we're able to deliver in set period of time to be production ready. Having cluster and fully fledged async support won't be easy, it's scope for even separate project/library. That might end up with writting our own Kafka/RabbitMQ (plus, monitoring, metrics, failover strategy etc.). I know that if we'd have good plan and work breakdown and focused as a group on that then we'd be able to deliver something simple and production ready, but the question is if people would use it? My perception is that we'd focus on AsyncDaemon being projection rebuild mechanism or the integration point for the outside world, but keep is as thin, simple and performant as possible. Imho instead of making it bigger we could look on the possibilities to use some already built tools or provide integrations with already existing messaging solutions. I love the idea of using Reactive Extensions, that could make the integration with others easier.

I like the idea for the prebuilded apps and samples with some integration to other tools. Regarding which cloud? Dunno, I still believe that Azure is poor mans AWS, but on the other hand, .NET community is in love with the Microsoft tools like MSSQL and others, so probably it would be better to start with Azure. Maybe with some cooperation with Microsoft it would give us some grants or at least marketing?

Partitioning/Multi Tenancy

Imho that's must have. As I gathered recently people's fears about Event Sourcing - performance is one of them. Also when I was explaining Marten to new people there is always a question (how single events table will handle the big load). Although I think that those fears are exaggerated, then I see a point of making our store more performant and also giving people hard numbers that "yes, we can do it, see, there is no point of being afraid".
So built in Partitioning and checking if TimescaleDB is really so performant as Internet describes it would help us on deliver that (there is an issue placed by @cocowalla that I'm still unable to reach and check (#1262).
I was also thinking about new Multitenancy for the Event Store - "Table per Stream". That would give possibility to logically split the tables into smaller chunks. So to have at least some simmilar Topics/partition split as Kafka/Rabbit MQ have. It might appear that partitioning would be such tenancy in fact.

Projections

I fully agree, current ViewProjection mechanism is hard to maitain. I'm currently working on #1302. I already started some small unification of projections mechanism to make it (at least from the abstractions perspective) more generic. I was thinking that maybe that would be good start for discussions around the potential refactoring? I could provide my first PoC and from that having some concrete proposal we could work to make it right?
Or if you prefer to come up with initial changes by yourself then I could focus on fixing this one issue and leave the rest for you. What do you think?

About Projection snapshotting - it would be nice to give some flexible way of snapshoting. I'm not sure if doing snapshot one per few times would be huge benefit, but if we give eg. possibility to define that she/he would like to have it once per day, or other custom filter expression - then imho that might be huge benefit.

I think that also two other types of projections are low hanging fruits and would be good "marketing" for coexistance with/migration from the ORMs like:

  • flat table projection - having that people might use Marten as Event Store and EF/Dapper as the read model, or have mixed solution like some modules with EventSourcing with Marten and readmodel with ORM and some modules (like admin ones) fully with EF,
  • transform projection - something similar as we have right now for the transforming events, but that would store the new state of the record in separate row. That would be imho great solution for the keeping history of the records, that's quite common issue with "traditional systems" (so you have the regular most recent state of the entity in the table + separate table with history of the record).

I'm all up for making this pluggable for other solutions (eg. Liquid projections) 👍 .

Event Metadata

I think that it's must have. For sure it should be optional, but for the distributed systems things like CorrelationID is must have. I was also thinking about possibility to give user to decide that Metadata will be mapped by convention to the event fields (eg. Version, Timestamp). That could be huge relief for the Event Sourced aggregates. See more in my comment: #1299 (comment)

I think that it would be worth to check how NEventStore is handling that - as I know they have quite good implementation of the Metadata.

Other things that I consider

Integration with messaging systems

It's not easy for those systems to always keep the ordering of events, and it's rare for those systems to have "exactly once delivery" semantic. Normally consumers need to handle indempotency by themselves.

Currently Marten doesn't allow to put events out of order (so eg. 2nd, 1st, 3rd). We'd need to change the current versioning mechanism to allow that and projections rebuild.

Imho it shouldn't be super hard to deliver first option to give user possibility to set the version number for imported events.

We discussed some time ago that maybe mechanism simmilar to Async Daemon would be also some potential options for that.

Integration points with other Event Stores / UI

I'd like to create the integration point as I described here: #1194 (comment) and discussed with @gregoryyoung. So start with exposing our event store features as atom feed. Then maybe provide some swagger like simple UI (that might be also used for document part).

Long Version for Events

#1080 - imho this is must have for the version 4.0 if we'd like to make it high scale.

@jeremydmiller what ar your thoughts?

I probably forgot about something, so I might add something later.

@oskardudycz oskardudycz added this to the 4.0 milestone Jul 10, 2019
@jeremydmiller
Copy link
Member Author

@oskardudycz I say we just convert to long ids for 4.0. Will have to explicitly test for the migration scripts, but that was coming regardless.

@oskardudycz
Copy link
Collaborator

@jeremydmiller great 👍

@jeremydmiller
Copy link
Member Author

More on the async daemon

  • For rebuilds of any kind of aggregated document, we could hugely optimize perf by doing lookaheads for which projections should be deleted and hence, not rebuilt at all
  • Rebuilds of per-stream aggregates could be very heavily optimized by fetching stream by stream
  • as much as possible, we want projections to expose exactly which events they consume because that heavily optimizes data fetching
  • we should try to use more information about the projections to customize the various queues and future Rx operator combinations

Convention Based Projections Concept

The main idea here is to allow users more flexibility to do whatever it is they need to do with less code ceremony and easier to author code. Drop mandatory base classes and interfaces (they're still there, just wrapped around it). Marten itself will use some kind of dynamic code generation (ala Jasper or Lamar from Jeremy's prior work) to create an IProjection implementation around their code.

The following shows some of the possible method signatures and the hopefully minimal set of optional attributes:


// The existence of this event will cause the aggregate
// Exposing this will allow the async daemon and projection rebuilds to be optimized
[DeletedBy(typeof(SomeEventType))]

// or by marker interface:
public interface IDeletedBy<TEvent>{}



// OR maybe--->
[Publishes(typeof(Type), AggregatedBy.Stream)]
[Publishes(typeof(Type), AggregatedBy.Tenant)]
[Publishes(typeof(Type), AggregatedBy.Event)]

// the aggregation across events is done some other way
// like maybe by "region" or "business line"
[Publishes(typeof(Type), AggregatedBy.Other)]

// or via a marker interface as an alternative:
public interface IPublishes<T>
{
    AggregatedBy AggregatedBy { get; }
}







public class MyProjection
{
    // this would be used to pluck the identity of the published
    // document out of an event object
    // the event type could be an interface, abstract type, or individual
    // concrete event type
    // If using this mechanism, the projection around this class would
    // be responsible for loading the existing projected document in the 
    // course of updates
    public Guid/int/long/string Identity(EventType @event)
    {
        // 
    }

    // Alternative to Identity, this time you'd do whatever to load the projected document
    public SomeAggregate Find(SomeEventType event, IDocumentSession session);
    // or do it async, preferably
    public Task<SomeAggregate> Find(SomeEventType event, IDocumentSession session);

    // actually apply the event, somehow. All of these would be valid options
    // EventType could be the specific event concrete type, a common interface,
    // or a base type. Could also use Event<T> for metadata as well
    public void Apply(EventType @event, ProjectedDocumentType projection);
    public Task Apply(EventType @event, ProjectedDocumentType projection);
    public void Apply(Event<EventType> @event, ProjectedDocumentType projection);
    public Task Apply(Event<EventType> @event, ProjectedDocumentType projection);

    public void Apply(EventType @event, ProjectedDocumentType projection, IDocumentSession session);

    // maybe allow method injection from the app's IoC container
    public void Apply(EventType @event, ProjectedDocumentType projection, IDocumentSession session, [FromServices] ISomeServiceInYourApp);


    // NOt sure this is 100% necessary, but know if the projected document would be deleted
    public bool ShouldBeDeleted(Event<T> @event);
    public bool ShouldBeDeleted(EventType, @event);


    // UNKNOWN --> optimize for using Partial updates vs. full blown get the existing document and updating

}

@ericgreenmix
Copy link
Contributor

ericgreenmix commented Jul 31, 2019

Anything that would increase the performance of rebuilding projections in the Async Daemon, would be huge for us. Snapshotting and the performance optimizations for rebuilding that @jeremydmiller was mentioning would be great.

For context, we currently have >3 million events in our event store and are storing ~25k new events per day now. Our rebuilding performance has noticeably gotten worse as the number of events increase.

I am definitely willing to help contribute to any of these event store improvements for v4.

@jacobpovar
Copy link
Contributor

Some observations based on our usage of Marten.

  • Event metatada

  • Snapshots, as mentioned above

  • HTTP subscription API or ATOM feed.

  • Allow to read events without the need to deserialize them into CLR objects. This would be helpfull in cases like direct streaming to HTTP response. Another example would be inspecting event metadata to see if event should be deserialized and processed futher

  • ability to store event progression value outside event store mt_event_progression table. Imagine if you are storing read models in separate database, then you'd want to save last sequence value near read models within same transaction.

  • stream archiving. One of the fears that prevent people from using Event Sourcing it that system will become slow because of the need to process obsolete data.

  • Provide guidance on event versioning strategy. Maybe by upcasting, being both simple and widely used solution. This is pretty complex topic, and can implemented on top of existing capalities, so I'm not sure that Marten should try to cover all possible solutions. A simple demo in docs would be a good start

  • Daemon clustering looks promising

  • As a general observation, it would be great if some of Marten ES internals were more flexible or customizable. For example, we had to copy most of daemon source into our codebase to modify how SQL queries are composed. Did the same for event type mapping strategy. However, most users won't need to do something like this. Its only a wish :)

I'm glad to help with some of these improvements. First one will probably be metadata.

@jeremydmiller
Copy link
Member Author

jeremydmiller commented Oct 8, 2020

Re-booting V4 Event Store Work

Alright, time to get this thing rolling again. V4 is heavily in flight on the Document Db side, and it's shortly going to come down to just the Event Sourcing work. This comment pretty well only covers projections. I'll have to follow up another day with metadata improvements, snapshotting, archiving, sharding, up/downcasters, and everything else I missed here...

Here's some miscellaneous things:

  • If SaveChanges() fails with events, add "tombstone" events to mark where there should have been events. That's gonna make the async daemon in database polling mode run a bit more efficiently
  • More readily utilize the Patch API in the projections for efficiency, but "know" when the projections have to use a full document update instead. This has been a problem in the past.
  • If using optimistic concurrency for the append events operations on a session, maybe do a database row lock on the stream row. Maybe do that as well for Serializable transactions? Might help folks that are having issues with concurrency

Projections

Here are what I think are the main points of change and goals for the projections in V4:

  1. Improve performance and scalability all the way around
  2. Standardize the definition of all projections on an improved, extended version of ViewProjection<T>
  3. BREAKING: Make IProjection an internal detail of Marten that is generated at runtime based on a ViewProjection type
  4. BREAKING: Eliminate all support for Aggregator and other non-ViewProjection projection types

Projection Patterns

I could really use some feedback from real users on this section please

  • Aggregate a document view for a stream <-- this seems to be the main usage of projections, and where I think most of the optimization effort is going on V4
  • New proposal for V4: A Process() model just for registering listeners to events that do not make any state changes to the projected documents. Basically a hook to do event stream processing from the async daemon (or its replacement)
  • New proposal for V4: Stream INSERT, UPDATE, DELETE statements to flat relational tables.
  • Project individual events to individual documents. This has been supported since the early days, but does anybody use this?
  • Write events to arbitrary aggregations. Does anyone actually need this? If they do, could we say that it's two stage, project to an aggregate per stream, then use some kind of materialized view for the cross-stream aggregation?

Defining Projections

  • Support the existing ViewProjection<T> API
  • Add explicit methods for using the Patch API as part of projections (more on this below)
  • New proposal for V4: As a complement to the ViewProjection<T> syntax for folks like me who don't like all the Lambdas, support conventional methods on your ViewProjection<T> class like this (think about how .Net Core's StartUp methods work):
    // actually apply the event, somehow. All of these would be valid options
    // EventType could be the specific event concrete type, a common interface,
    // or a base type. Could also use Event<T> for metadata as well
    public void Apply(EventType @event, ProjectedDocumentType projection);
    public Task Apply(EventType @event, ProjectedDocumentType projection);
    public void Apply(Event<EventType> @event, ProjectedDocumentType projection);
    public Task Apply(Event<EventType> @event, ProjectedDocumentType projection);

    public void Apply(EventType @event, ProjectedDocumentType projection, IDocumentSession session);

    // maybe allow method injection from the app's IoC container
    public void Apply(EventType @event, ProjectedDocumentType projection, IDocumentSession session, [FromServices] ISomeServiceInYourApp);

Projection "Modes"

Projections are applied and calculated at different times, and there's some significant opportunities for optimization. From my notes, projections will be processed in these modes:

  • Inline -- either synchronously or asynchronously as part of SaveChanges() / SaveChangesAsync() much like it is today
  • Live -- built upon demand just like it is today
  • Async Streaming -- building in the background as events are streaming in
  • Rebuild -- rebuilding the projected documents. There's a different set of optimization opportunities here

Now, one at a time:

Inline Projections

  • There will be a dynamically generated implementation of today's IProjection to perform the inline projections
  • Short circuit any other activity and just issue a DELETE for the projected document if you
  • If any live projections are active when SaveChanges() is called, the unit of work should first reserve stream versions in the database and apply these expected versions to the individual events so that the inline projections can use the proper event metadata (there's a bug in GitHub about this we can knock out in v4)
  • Optimization: If all the events for a stream in a unit of work can be done through Patch() operations, just do that to apply the changes. If event one event type requires a full projected document fetch, you gotta fetch the current version of the projected document and apply changes in memory to that.

Live

  • There will be a dynamically generated implementation of today's IProjection to perform the live projections
  • Skips any possible deletes or patch in the projection definition]

Async Projections

This is the big one for v4. All of the notes here are for the "project a single aggregated document for a stream" type of projection so far:

  • Use or at least opt into a 2nd level document cache for the projected documents. Thinking you'd use an LRU cache for the projected documents to avoid having to do so many repetitive reads
  • The new async daemon might not be using IDocumentSession and instead might be digging into lower level internals to
    try to be more efficient
  • We will parallelize the application of events by stream

When receiving a "page" of new events in the async projection runner:

  1. Group into different streams
  2. For each stream:
    1. Determine if the projected document is already in the 2nd level cache
    2. See if the projected document would have a DELETE triggered by any of the events being applied for that stream.
    3. Determine if the events could all be applied by the Patch API
  3. For each stream / projected document that needs to be loaded but isn't in the 2nd level cache already, issue a LoadMany() operation to fetch the documents
  4. For all other streams, start building out the necessary IStorageOperation objects necessary to make database changes. This would actually start in background Task as part of the 2nd step in this list
  5. After fetching all the necessary projected documents, start those streams creating IStorageOperation objects in a separate Task
  6. Gather all the IStorageOperation objects, and persist them in one unit of work

Rebuild Mode

  • Optimization: First find out which streams in the database should be deleted anyway, and avoid doing anything for those streams. I.e., don't rebuild a projected document that you're gonna delete. Duh.
  • Optimization: Might have to add additional columns to the mt_streams table to track this, but when rebuilding aggregated documents, rebuild stream by stream so one running view projector is fetching and building distinct streams at a time. That dramatically reduces the number of database reads and writes
  • Optimization: Rebuild the documents in a separate table, then switchover to the real database and continue from there

Async Daemon

For v4, let's assume that we're only building out a much improved version of the current Async Daemon that depends on polling the database. For v4+, we should consider alternatives using queueing, messaging, cloud technologies, CDC replication from Postgresql, etc.

Some major things:

  • Use leader election so that only one async daemon process per projection/shard is running across the active application nodes. That's a huge blocker for many shops. My intention to do this w/o requiring a boatload of new infrastructure is to rely very heavily on Postgresql advisory locks. The Jasper durability agent is similar to what I'm wanting to do here.
  • Use some kind of new database table to "trigger" rebuilds to the async daemon, so that whatever async daemon is running, it can see the changes to that table and switch into Rebuild mode.
  • Shard the event store someway so that we can parallelize more of the async daemon processing for scalability.
  • Use the lower level Channels model instead of TPL Dataflow? Honestly just because I'm curious?

@lahma
Copy link
Contributor

lahma commented Oct 8, 2020

@jeremydmiller is there a way to break things early, like giving us ability to check how our API usage is "wrong" for the future and then future-proof a bit? Or is a "reset your database with 4.0" kind of a thing? We are building a system with the event store that should hit production in the coming months 🙂

I love the ideas and improvements, just worried about the migration path.

@jeremydmiller
Copy link
Member Author

@lahma Hang on, I wasn't finished yet with the previous comment yet;-) And we generally do worry about any breaking changes -- especially to the database structure, but this is a full point release.

@jeremydmiller
Copy link
Member Author

@jacobpovar If you're still interested, I think we're gonna have to talk about some of your items. I'd wanna know why you needed to customize so many things.

@oskardudycz
Copy link
Collaborator

oskardudycz commented Oct 8, 2020

@jeremydmiller I already had a call with @jacobpovar, he has shown me what they're doing and what are his pain points. I can try to pass you that knowledge ;)

@lahma when we have public API and implementation stabilized then for sure we'll publish prerelease and try to search for early adopters to get the feedback 👍

@malscent
Copy link
Contributor

malscent commented Oct 8, 2020

One pain point for me is being able to externally define events and how to apply them to projections that exist in another assembly.

IE.. Projection in Assembly A and Event is in Assembly B which depends on Assembly A

Without inverting this dependency (A depends on B instead of B depending upon A) it is currently not possible to define how that projection should handle this new event.

Looking at the ViewProjection API, and other suggestions, I don't see a method to do so. This may be that I'm just not familiar enough with ViewProjection API, but if it is possible, an example would be good.

I would also like to see a way to initiate a rebuild via the AsyncDaemon through a IDocumentSession.

@jeremydmiller
Copy link
Member Author

@malscent Why are you needing to do that is my question.

@malscent
Copy link
Contributor

malscent commented Oct 8, 2020

@jeremydmiller which issue?

  1. This allows for a modular design where the projections exist in one assembly, but new events and their effects to those projections exist in other modules.

  2. To programmatically express changes in the building of a projection. If, for example, a bug is discovered a change can be made to data/code and then affected projections can be marked as needing re-built.

Also, another thing i have been experimenting with is the ability to define event transforms for obsoleting events. Where i can define a method that will translate one event type to another event type, for the purpose of building a projection, should the replacement of an event be necessary.

@jeremydmiller
Copy link
Member Author

#1. Why do you need to build your assembly references that way? How are you wanting things to be plugged in? Would it be good enough if projections could use base types or interfaces? That negates some of the original optimizations I'd intended, but at least it would work.

For #2, maybe introduce projection versions? That could be a way to trigger a rebuild when it's detected.

for the translation, @oskardudycz has some ideas and thoughts on that one

@malscent
Copy link
Contributor

malscent commented Oct 8, 2020

@jeremydmiller We built our software with a "Core" module that contains a large amount of functionality with core events and projections. From there, because we wanted our software installations to be highly customizable, we built "Modules" hosted in separate assemblies that can be added to the project or not, depending upon the necessary functionality. Marten works with this fairly well, as when it encounters an event on a stream that cannot be projected, it simply ignores it. So we can add/remove modules as necessary to add/remove functionality based on what our customers need.

The real struggle however, is defining what a projection should do with an event, and to tackle this, I built a customized projection that uses inheritance to determine what "Apply" method to use. However, this is severely limited, in that all my events must behave in a pre-defined ways, or else the projection doesn't know how to use them.

I would like to be able to define how an event is applied to a projection with the event definition. Even if that is just an "Apply" method on the Event that takes the projection as a parameter and returns the projection. (Granted this will struggle with private members of a projection, but if that is a requirement, reflection can always be used?)

@jeremydmiller
Copy link
Member Author

Having some kind of Apply(document) on a base type or interface would definitely work for the v4 design I've got in mind. If that's acceptable.

Don't use reflection. If need be, use [InternalsVisibleTo]. In .Net Core world, that just takes the assembly name. No strong naming necessary anymore.

@malscent
Copy link
Contributor

malscent commented Oct 8, 2020

Absolutely.. The idea is to try to divorce the logic of applying events from the projection and allow it to be defined with the events. So having some kind of interface that I can implement on the event that would override the projection's apply event would be great.

@wastaz
Copy link
Contributor

wastaz commented Oct 9, 2020

Make IProjection an internal detail of Marten that is generated at runtime based on a ViewProjection type

This will break some of our code. But that is fine, I hate that code anyway. As long as there are ways for us to do the same things then I certainly wont mind not having to use this clunky interface.

Write events to arbitrary aggregations. Does anyone actually need this? If they do, could we say that it's two stage, project to an aggregate per stream, then use some kind of materialized view for the cross-stream aggregation?

I think that the answer is a resounding yes. Cross-stream aggregates are very common, at least for us. We have more of them than we have 1 stream-1 document projections. We also have projections that project 1 event to several documents.

Imho, 1stream-1document projections are the "easy mode" that obviously have to work well. But being able to do cross-stream projections or fan-out projections in a nice way is a must for the projection support to be actually useable IRL.

@wastaz
Copy link
Contributor

wastaz commented Oct 9, 2020

And as for 2-stage projections to solve cross-stream aggregation....eh...I dont think thats gonna be very nice to work with? But Id need an example to understand what you are getting at because I might be misunderstanding you @jeremydmiller

@jeremydmiller
Copy link
Member Author

@wastaz Hey man, that's the kind of feedback I needed.

For the aggregation across streams, how do you identify the proper aggregate identity for the individual events? Some kind of well known field within the event? Something arbitrary? Could you determine that through the event json, or does it have to be within code?

For the 2-stage aggregation, I was thinking about finally adding some kind of background map/reduce process, but I havent' thought through the mechanics much.

The 1 event to 1 document pattern is easy at least.

And I had it in mind that you hated the IProjection interface:-). There's some serious optimizations we could do if the projections were more described, then code generated. Especially for rebuilding projections and live vs async vs inline.

@wastaz
Copy link
Contributor

wastaz commented Oct 9, 2020

@jeremydmiller Good that its useful! :)

For the aggregation across streams, how do you identify the proper aggregate identity for the individual events? Some kind of well known field within the event? Something arbitrary? Could you determine that through the event json, or does it have to be within code?

In basically all cases we have some identifying field in the events that we can use to "twist the angle" of the projection. To give you some examples so you understand a bit more of some common cases we have. We have an application that handles accounts. A customer can have several accounts, and each account contains 1-n periods. We have (among some others as well ofc) these projections

  1. A simple account stream projection, used for example when showing detailed account info about that account.
  2. A customer projection. This aggregates data from all accounts that the customer has. Each account has a CustomerId in its initial event that signifies the account owner. So adding data to the customer projection then just becomes keying on the CustomerId instead of the stream id for events. So this is a true cross stream projection.
  3. Each period has state as well that is interesting to show the customer or our customer service if they "drill down" into the data. Here we project to a document per period (so each account stream then creates several period documents as projections). Again, the events concerning periods contain a period id, so its just a matter of keying the document on the period id instead of the stream id.
  4. There are ofc transactions on an account. We show these both to the customer and customer service in different views, however our "transactions" doesnt always work the way that a user would expect to view them. Many transaction types here are the easy kind of projection, one event -> one document. But some transactions gets "split" into several documents. So one event -> multiple documents.

As you can see, we do a lot of projecting in different dimensions. Part of this is because of a choice we made early that we are now looking at eventually at some time maybe refactoring when it comes to the stream boundaries. However, being able to "twist the angle" of projections in order to do cross-stream, 1-1, 1-n projections etc has been very helpful in getting to the point where we can now talk about refactoring some of these things. And looking at we are doing and how our problems look Im convinced that even after the refactoring we will still want to do some of these things.

What is common though is that in basically every case I list, we determine the aggregate identity via some field in the event json except in case 4 where when we project to multiple documents from the same event we generate new ids for the projections (since the document ids need to be unique) but have an indexed [DuplicateField] property containing a back-link to an id from the event so we can trace it back.

For the 2-stage aggregation, I was thinking about finally adding some kind of background map/reduce process, but I havent' thought through the mechanics much.

I think we could make do with some kind of map-reduce. It's not impossible at least.

And I had it in mind that you hated the IProjection interface:-).

Honestly "hate" is a strong word. It's my escape hatch. We try to use the "simplest" way to do each projection there is. ITransform, AggregateStream, ViewProjection etc. And IProjection is what we reach for when everything else fails. So for case 4, that is an IProjection. It's an annoying thing to work with, but one thing that has been very nice with Marten is that there are ways to project on different levels of abstraction. There's not just a raw low-level IProjection, or just a high level AggregateStream, there is several "steps" of abstraction and you can start at the highest and successively drop lower as you need, until you are down at IProjection. I know "there are many ways to do it" isnt usually hailed as the best design (and is a hassle to maintain), but it is very pragmatic when done right. I wont blame you for trying to unify the projection stuff a bit more though. Id probably also try to do that in your shoes.

@jeremydmiller
Copy link
Member Author

@wastaz,

Okay, couple thoughts here.

"Exploding" the root event into multiple documents:

I think this is gonna be relatively simple for the main async / inline / live flow, you'd just have a method with a signature like:

public TargetDocument[] Create(EventType @event)
{
  // explode it out
}

That'd be a little problematic for optimizing the rebuilding of the projection, but I'm not sure you'd be doing that very often. That might be a case where it'd be easiest to "rebuild" the projection by running a document transform instead -- which wouldn't hurt if we added more documentation, examples, and possibly some support for that in V4. If you did want to rebuild the projected data, I think we could do a producer/consumer that fetches the raw events in the "producer", then the "consumer" explodes that out into the proper IStorageOperation objects that are then batched and persisted.

One of the next things I wanna design out is how big event stores can have their projected views rebuilt with no down time. I have some thoughts, but it's too late in my night to get much out here.

One event to one document

I think this is pretty easy, but I'd still want it to be as declarative as possible so we can get some optimizations around it.

Aggregate across the streams

If we can do something like this for the definition:

// This is strictly to capture the JSONPath to the value in each of the events
// that would identify the aggregate. This does NOT require any kind of common base
// type
AggregateByEventDataField<T>(T baseType, Expression<Func<T, object>>);

Then we could do similar of optimizations in the async daemon that I was outlining in the "aggregate by stream" notes. In rebuilds, we could parallelize the different aggregates by maybe first reaching into the events and finding the unique aggregate ids. If we were really good, and in some cases there were a finite number of unique aggregate ids (like by region, or country, or something where there's not a lot of cardinality), you could run parallel async daemons for each aggregate id to do much more in parallel.

@ericgreenmix
Copy link
Contributor

ericgreenmix commented Oct 12, 2020

I am still digesting a lot of this, but a couple things.

  1. We heavily use cross stream aggregates. A large majority of our projections are of this type. I have a feeling that most real world, production scenarios, that use event sourcing end up with this being the case. We currently use the ViewProjection for this. They are keyed based off of a property on the event. So we end up with something like the following
ProjectEvent<EventCreated>(Apply);
ProjectEvent<ResponseCreated>(e => e.EventId, Apply);
ProjectEvent<ResponseDeleted>(e => e.EventId, Apply);
  1. I may be confused, but I think you were saying that the ViewProjection needs conventional methods, which it already does support this. This is actually how we do all of our projections currently. So from the snippet above, our Apply methods look like this
public void Apply(EventStats model, EventCreated @event)
{
    model.DoChangesHere ....
}
public void Apply(EventStats model, ResponseCreated @event)
{
    model.DoChangesHere ....
}
...
  1. So early on in Marten (before the ViewProjection) we had built our own projection type from IProjection. We had it using the Patch api. We hit a big issue when it came to rebuilding projections that use the Patch API (with the Async Daemon), since changes with the Patch API are not tracked (or at least weren't then) in the Identity Map. So if we had an event that came in that used patch, then one that came in that needed that patched data within the same page, things would break.
    Are the changes to using Patch for Inline Projections going to avoid this issue?

  2. One of the next things I wanna design out is how big event stores can have their projected views rebuilt with no down time. I have some thoughts, but it's too late in my night to get much out here.

This would be huge for us. Right now it is totally infeasible for us to rebuild any projections. We have 20+ million events in our store and it can take hours to rebuild anything. Our current approach is creating new versions of the projection and letting those catch up before switching the code over use them. So all of the performance improvements to the async daemon sound
amazing

@jeremydmiller
Copy link
Member Author

@ericgreenmix,

1.) Yeah, got that from @wastaz. It's just a matter of thinking through how to optimize that pattern
2.) Shame on me, I didn't realize that. I've by and large stayed away from the ViewProjection code in the past. So let's just say that continues with very different internal implementation;-)
3.) I tried to address that a little bit above. The general idea is that you're probably going to have to define both a full document Apply and an optional Patch step in your projection. The projection mechanics will need to look ahead at the current page of events to determine if it can use the Patch alternative path to avoid mixing and matching Patch and Apply. So basically it's either gotta be all Patch or all Apply. That'll allow us to try to use more efficient Patch() w/o getting screwed over by what you were describing that.
4.) More on that maybe tomorrow afternoon after I clear a biggish client demo tomorrow:) It's mostly going to be the async daemon getting a lot smarter

@jeremydmiller jeremydmiller changed the title Event Store Improvements for v4 ARCHIVED -- Event Store Improvements for v4 Nov 18, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants