Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/util/eventagg: general aggregation framework for reduction of event cardinality #119416

Merged
merged 2 commits into from
May 21, 2024

Conversation

abarganier
Copy link
Contributor

@abarganier abarganier commented Feb 20, 2024

Reviewer note: review commit-wise

The eventagg package is (currently) a proof of concept ("POC") that aims to provide an easy-to-use library that standardizes the way in which we aggregate Observability event data in CRDB. The goal is to eventually emit that data as "exhaust" from CRDB, which downstream systems can consume to build Observability features that do not rely on CRDB's own availability to aid in debugging & investigations. Additionally, we want to provide facilities for code within CRDB to consume this same data, such that it can also power features internally.

This pull request contains work to create the aggregation mechanism in pkg/util/eventagg.

This facilities provide a way of aggregating notable events to reduce cardinality, before performing further processing and/or structured logging.

In addition to the framework, a toy SQL Stats example is provided in pkg/sql/sqlstats/aggregate.go, which shows the current developer experience when using the APIs.

See pkg/util/eventagg/doc.go for more details

Since this feature is currently experimental, it's gated by the COCKROACH_ENABLE_STRUCTURED_EVENTS environment variable, which is disabled by default.


Release note: none

Epic: CRDB-35919

Copy link

blathers-crl bot commented Feb 20, 2024

It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR?

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@abarganier abarganier force-pushed the antenna-poc-sql-stats branch 2 times, most recently from d2dfb31 to 4376801 Compare February 20, 2024 21:50
Copy link
Collaborator

@petermattis petermattis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @abarganier, @dhartunian, and @xinhaoz)


pkg/aggregate/aggregate.go line 1 at r1 (raw file):

package aggregate

Placing this package at the top-level of the package directory feels wrong to me. This seems more like a sibling to packages like util/log and util/trace. The name aggregate also needs additional thinking to make it clear that this is applying to observability.


pkg/aggregate/aggregate.go line 10 at r1 (raw file):

// tenantAggregators contains each tenant-specific eventAggregator instance,
// allowing isolation of event aggregation to the tenant level.

Nit: I think bucketing might be a more accurate term than isolation here.


pkg/aggregate/aggregate.go line 26 at r1 (raw file):

// that their event will be handled properly, so long as they populate the
// event data properly and have an eventAggConsumer implemented to handle said
// event.

I'm not sure I grasp what this architecture will buy us. I'm specifically referring to having a central mechanism to push events and an internal routing system. Have you considered modeling this aggregation more along the lines of how metrics are handled. Yes, there is a central repository of metrics, but there isn't a central IncrementMetric call. Instead, the caller registers a metric and has a struct which they can perform operations on. What would that look like here? Well, you'd have along the lines of the MapMerge struct with the ability to push events into the struct and facilities for performing periodic processing on the aggregated data.


pkg/aggregate/aggregate.go line 87 at r2 (raw file):

// AggEvent represents an event that's able to be pushed to an EventAggregator.
type AggEvent interface {

The central PushEvent bottleneck is forcing everything to adhere to this type. If MapMerge were a generic data structure I don't think that would be necessary (or the constraints would be significantly reduced).


pkg/aggregate/map_merge.go line 34 at r2 (raw file):

	valueConstructor func() V
	mapFn            func(AggEvent) (K, V)
	mergeFn          func(other V, toMerge V) V

This is essentially a map-reduce operation. Your map function maps an event to a bucket, and a reduce function should take an event and merge it in with the existing data in the bucket. I think we'll want to make this easier and more ergonomic than what is present here. I think I mentioned elsewhere that the merge functions will be very repetitive which make them a candidate for code generation of some sort (e.g. from a proto definition). I think we could use proto annotations to extract the grouping key from the proto as well.


pkg/aggregate/stmt_stats.go line 22 at r2 (raw file):

// StmtStatEvent is an AggEvent representing statistics related to a single statement
// execution.
type StmtStatEvent struct {

I'm guessing that this is here for demonstration purposes, but just want to note that we should avoid centralized registration of events. That is how we end up with facilities like crdb_internal.go which is a monstrosity at this point. Event types should be decentralized.


pkg/aggregate/stmt_stats.go line 39 at r2 (raw file):

// here. In reality, this would be a much more robust data structure containing many
// aggregated statistics.
type stmtStats struct {

It seems repetitive to have both a struct (stmtStats) and a proto (eventpb.ExampleStmtStatistics) that contain the same state.

Copy link
Contributor Author

@abarganier abarganier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback, I found it very useful. I've mostly rewritten the POC in the latest patch to best address the feedback given. PTAL when you have a moment.

I'd recommend checking out pkg/util/eventagg/event_agg.go first, then pkg/util/eventagg/map_reduce.go, and then branch out from there.

I've also added a doc.go for the purposes of the POC to outline the latest thinking, and acknowledge challenges/problems that are being punted until later.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dhartunian, @petermattis, and @xinhaoz)


pkg/aggregate/aggregate.go line 1 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Placing this package at the top-level of the package directory feels wrong to me. This seems more like a sibling to packages like util/log and util/trace. The name aggregate also needs additional thinking to make it clear that this is applying to observability.

Agreed, placing it at the same level as logging/tracing is a better spot for it.

I moved and took a stab at renaming it to pkg/util/logagg. The idea being that these aggregations eventually generate logs.


pkg/aggregate/aggregate.go line 10 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Nit: I think bucketing might be a more accurate term than isolation here.

Agreed, although I ended up doing away with this mechanism entirely.


pkg/aggregate/aggregate.go line 26 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

I'm not sure I grasp what this architecture will buy us. I'm specifically referring to having a central mechanism to push events and an internal routing system. Have you considered modeling this aggregation more along the lines of how metrics are handled. Yes, there is a central repository of metrics, but there isn't a central IncrementMetric call. Instead, the caller registers a metric and has a struct which they can perform operations on. What would that look like here? Well, you'd have along the lines of the MapMerge struct with the ability to push events into the struct and facilities for performing periodic processing on the aggregated data.

I agree, following the example of pkg/util/metric is much more flexible. I've scrapped the old thinking and mostly rewrote the entire thing.

To start I went ahead and created a mechanism that's more in-line with the example set forth by metrics. The new thinking is that the MapReduceAggregator is the core component to all aggregations. Clients can create and configure their own MapReduce operations from within their own package and use it as needed. Extraction of keys, and logic to aggregate two events together, is defined via struct tags (using code gen). A plugin architecture allows users of the library to define how they want the aggregation results consumed, of which there can be many.

I've outlined the latest thinking in doc.go. I feel this is a step in the right direction. Let me know what you think.


pkg/aggregate/aggregate.go line 87 at r2 (raw file):

Previously, petermattis (Peter Mattis) wrote…

The central PushEvent bottleneck is forcing everything to adhere to this type. If MapMerge were a generic data structure I don't think that would be necessary (or the constraints would be significantly reduced).

Good point - I wrote this prior to getting generics involved. The latest patch no longer requires this.


pkg/aggregate/map_merge.go line 34 at r2 (raw file):

Previously, petermattis (Peter Mattis) wrote…

This is essentially a map-reduce operation. Your map function maps an event to a bucket, and a reduce function should take an event and merge it in with the existing data in the bucket. I think we'll want to make this easier and more ergonomic than what is present here. I think I mentioned elsewhere that the merge functions will be very repetitive which make them a candidate for code generation of some sort (e.g. from a proto definition). I think we could use proto annotations to extract the grouping key from the proto as well.

I think the code gen idea is a good one. In the latest patch I went ahead and sketched out how something like this might work. Let me know your thoughts.


pkg/aggregate/stmt_stats.go line 22 at r2 (raw file):

Previously, petermattis (Peter Mattis) wrote…

I'm guessing that this is here for demonstration purposes, but just want to note that we should avoid centralized registration of events. That is how we end up with facilities like crdb_internal.go which is a monstrosity at this point. Event types should be decentralized.

Agreed - in the latest patch, users of the library define events and construct aggregations in their own packages.


pkg/aggregate/stmt_stats.go line 39 at r2 (raw file):

Previously, petermattis (Peter Mattis) wrote…

It seems repetitive to have both a struct (stmtStats) and a proto (eventpb.ExampleStmtStatistics) that contain the same state.

Agreed. I've sketched some ideas about how to remove the necessity for protobuf usage for structured events in the latest patch. Let me know what you think.

Copy link
Contributor Author

@abarganier abarganier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dhartunian, @petermattis, and @xinhaoz)


pkg/aggregate/aggregate.go line 1 at r1 (raw file):

Previously, abarganier (Alex Barganier) wrote…

Agreed, placing it at the same level as logging/tracing is a better spot for it.

I moved and took a stab at renaming it to pkg/util/logagg. The idea being that these aggregations eventually generate logs.

Ah, stale reply. It's been moved to pkg/util/eventagg

Copy link
Collaborator

@petermattis petermattis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @abarganier, @dhartunian, and @xinhaoz)


pkg/sql/appstatspb/app_stats.go line 27 at r3 (raw file):

	result := make([]byte, 0, 8)
	encoding.EncodeUint64Ascending(result, uint64(s))
	return hex.EncodeToString(result)

This seems like a complicated way to write fmt.Sprintf("%016x", s). Am I missing something?


pkg/sql/sqlstats/aggregate.go line 32 at r3 (raw file):

// Within the struct definition, we also note some more interesting & challenging
// examples which require more thinking.
type MergeableStmtStat struct {

Nit: I'd omit the Mergeable prefix here. That is implied by this struct implementing the eventagg.Mergeable interface. And in general we'll won't want separate X and MergeableX types.


pkg/sql/sqlstats/aggregate.go line 57 at r3 (raw file):

	// Can we find a way to use code generation to turn the 'set_merge' annotation into a
	// map[int32]struct{} field, where the generated code merges the SQLInstanceIDs of the
	// incoming event into the aggregate set of SQLInstanceIDs?

I'd do this by defining a type Int32Set []int32 and defining a Merge operation on this type. The generate code can then look to see if the type for a field has an appropriate Merge operation defined and use it. This could also potentially be a type Int32Set map[int32]struct{} to make the merging easier.


pkg/sql/sqlstats/aggregate.go line 65 at r3 (raw file):

	// We could also simply build the event with these types from the start, where you'd merge
	// the two histograms. However, in the case of the singular incoming event, having to
	// construct a histogram for a single value feels heavy.

Sounds like it would be useful for the aggregation type to sometimes differ from the event type.


pkg/sql/sqlstats/aggregate.go line 82 at r3 (raw file):

// comments about this in the MergeableStmtStat struct definition)
func (m *MergeableStmtStat) Key() string {
	return fmt.Sprintf("%s", m.StmtFingerprintID)

Nit: this could be m.StmtFingerprintID.String().


pkg/util/eventagg/event_agg.go line 22 at r3 (raw file):

	Merge(other T)
	// Key returns the aggregation key, derived from T, that should be
	// used in aggregations.

Can you expand on what an aggregation key? Is this like the "grouping key" in a GROUP BY operation? If yes, perhaps GroupingKey would be a clearer name.


pkg/util/eventagg/event_agg.go line 23 at r3 (raw file):

	// Key returns the aggregation key, derived from T, that should be
	// used in aggregations.
	Key() string

I think this implies that the aggregation key is always a string. Is that overly limiting? Should the Mergeable type be templated on the type of aggregation key which could potentially be comparable (i.e. anything that can be a map key).


pkg/util/eventagg/map_reduce.go line 40 at r3 (raw file):

// NewMapReduceAggregator returns a new MapReduceAggregator[T].
func NewMapReduceAggregator[T Mergeable[T]](
	name string, flushConsumers ...mapReduceFlushConsumer[T],

You're tying together the aggregator and consumer at initialization time. The metrics package separates these two where initialization only defines the aggregation point and the consumers loop over the metrics registry when they need to consume them. I haven't thought through whether that is a feasible approach in this system.


pkg/util/eventagg/map_reduce.go line 43 at r3 (raw file):

) *MapReduceAggregator[T] {
	m := &MapReduceAggregator[T]{
		name:      name,

What are you imagining the name will be used for? I sometimes see such fields get added and then never really used. Rather than add it upfront, perhaps add it when we see a concrete need.


pkg/util/eventagg/map_reduce.go line 51 at r3 (raw file):

// PushEvent implements the aggregator interface.
func (m *MapReduceAggregator[T]) PushEvent(_ context.Context, e T) {

Nit: this feels more like an Add or Put or Record operation than a PushEvent operation.


pkg/util/eventagg/map_reduce.go line 61 at r3 (raw file):

		value.Merge(e)
	}
	m.mu.cache[k] = value

If T is a pointer or interface type, the aggregator will now be keeping a reference to e. That should be made clear in the doc comment. Alternately, perhaps the zero value of T should be valid and you should always create a new one if it doesn't exist and then merge with it.


pkg/util/eventagg/map_reduce.go line 65 at r3 (raw file):

// Flush triggers a flush of the in-memory aggregate data, which resets the
// underlying cache for a new aggregation window.

Do you imagine external callers will be invoking Flush? How will they decide when to do so? Clearing the cached data is a design choice which makes this structure operate more like a buffer. Metrics have an alternate design choice where reading the metric doesn't clear existing state. I'm not sure what the right choice is here, just noting that there is a choice.

Copy link

blathers-crl bot commented Apr 12, 2024

Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks.

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@abarganier abarganier force-pushed the antenna-poc-sql-stats branch 11 times, most recently from a949994 to f322355 Compare April 19, 2024 20:15
@abarganier abarganier force-pushed the antenna-poc-sql-stats branch 2 times, most recently from b69905e to 0f5457f Compare April 26, 2024 19:53
@abarganier abarganier force-pushed the antenna-poc-sql-stats branch 5 times, most recently from d319214 to 66eb2f4 Compare May 3, 2024 20:01
Copy link
Contributor Author

@abarganier abarganier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dhartunian, @petermattis, and @xinhaoz)


pkg/util/eventagg/flush_consumer.go line 30 at r10 (raw file):

Previously, abarganier (Alex Barganier) wrote…

Done.

Actually, I am finding the logspy interface to be very brittle... It's inconsistently causing leaked goroutines on test end despite canceling, and it pushes the test scope into the domain of pkg/util/log, which is not what we're aiming to test here. If it's alright with you, I'd like to swap back to using a knob here.

Copy link
Contributor Author

@abarganier abarganier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dhartunian, @petermattis, and @xinhaoz)


pkg/util/eventagg/flush_consumer.go line 30 at r10 (raw file):

Previously, abarganier (Alex Barganier) wrote…

Actually, I am finding the logspy interface to be very brittle... It's inconsistently causing leaked goroutines on test end despite canceling, and it pushes the test scope into the domain of pkg/util/log, which is not what we're aiming to test here. If it's alright with you, I'd like to swap back to using a knob here.

Nevermind, this was due to the ordering of some log scope/leak test calls:

// Works
defer leaktest.AfterTest(t)()
defer log.ScopeWithoutShowLogs(t).Close(t)

// Makes the leak detector think we're leaking goroutines
defer log.ScopeWithoutShowLogs(t).Close(t)
defer leaktest.AfterTest(t)()

Remember kids: deferred calls are unwound like a stack 🥲

I still believe that using the logspy interface here turns this into more of an integration test instead of a unit test. Maybe that's better 🤷‍♂️

Copy link
Collaborator

@dhartunian dhartunian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, keep stuff like the new logging channel in a separate commit since there's a bunch of codegen + test rewrites.

In the interest of getting this mergeable, I have a few asks:

  1. Can you take everything out of the conn_executor that's being wired up? I think it's too early to put this code in the critical path on master. That should be a separate PR. I think sqlstats/aggregate.go can stay and have tests, but should be wired up later.

  2. Can you pare this PR down to the first two commits (which I think we can merge now), and put the next 2 up in separate PRs? I think the logging stuff should stay separate from the eventagg package. I think the structured log consumer can wait until the use case with Jobs vets the design a bit more. I'm unsure if all the pieces there are necessary and need to sit down and walk through it together, but don't want to hold up the entire PR on that.

I think the Processor interfaces need a separate pass to see what's necessary. My instinct is that we might be better served by a single implementation of what you've separated out into pieces: a buffered router of log events split up by tenant. Right now it's tough to trace the execution path because there are many components involved, but one larger one might be easier to maintain and simpler to read and understand. This can also ease the challenge of having to name lots of similar components as well and make it easier to rearchitect later.

Reviewed 7 of 18 files at r3, 3 of 12 files at r4, 2 of 38 files at r12, 15 of 36 files at r15, 34 of 34 files at r22, 7 of 7 files at r23, 25 of 25 files at r24.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @abarganier, @petermattis, and @xinhaoz)


pkg/obs/eventagg/flush_consumer.go line 25 at r22 (raw file):

// LogWriteConsumer is an example flushConsumer, which provides an easy plug-and-play
// method of logging all the values Agg flushed by a MapReduceAggregator[Agg].
type LogWriteConsumer[K comparable, V any] struct{}

Add static type constraint to document that this is an impl


pkg/obs/logstream/registration.go line 58 at r25 (raw file):

// making their own calls to RegisterProcessor.
func RegisterProcessor(
	ctx context.Context, stopper *stop.Stopper, eventType log.EventType, processor Processor,

One thought I have about this that we can explore once you split it out is that Processor can just be a func type, and newProcessorBuffer and newLogTypeEventRouter can all get inlined into here so all the code is together instead of in many separate files.


pkg/util/log/clog.go line 113 at r25 (raw file):

	allLoggers   loggerRegistry
	metrics      LogMetrics
	processor    StructuredLogProcessor

I wonder if StructuredLogProcessor can be a concrete type we export out of the obs pkg.


pkg/obs/eventagg/map_reduce.go line 42 at r22 (raw file):

) *MapReduceAggregator[E, K, V] {
	m := &MapReduceAggregator[E, K, V]{
		newFn:     newFn,

Is there a reason we need an explicit constructor? You can construct a zero value of a type param via new(V) in this scenario.


pkg/obs/eventagg/map_reduce.go line 81 at r22 (raw file):

	// TODO(abarganier): We should probably use a stopper async task here.
	// TODO(abarganier): Should we make whether this is done async configurable?
	go func() {

I don't think we should leave the structured concurrency till later. I'm fine with a follow-up PR since it will affect all your call sites as well, but we should do this now. Just pass in a stopper and push the responsibility upstream. This will inform the design.

@abarganier abarganier changed the title pkg/util/eventagg: aggregation & structured logging framework POC pkg/util/eventagg: general aggregation framework for reduction of event cardinality May 13, 2024
Copy link
Contributor Author

@abarganier abarganier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. I've moved the structured logging commits over to #124058. I addressed most of your feedback in that PR.

Regarding the Processor interfaces, I'd like to expand upon that as a follow-up PR/task. I'd like to better understand our needs before we put effort into reworking such a simple interface.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dhartunian, @petermattis, and @xinhaoz)


pkg/obs/eventagg/flush_consumer.go line 25 at r22 (raw file):

Previously, dhartunian (David Hartunian) wrote…

Add static type constraint to document that this is an impl

Done.


pkg/obs/logstream/registration.go line 58 at r25 (raw file):

Previously, dhartunian (David Hartunian) wrote…

One thought I have about this that we can explore once you split it out is that Processor can just be a func type, and newProcessorBuffer and newLogTypeEventRouter can all get inlined into here so all the code is together instead of in many separate files.

Done (see other PR)


pkg/util/log/clog.go line 113 at r25 (raw file):

Previously, dhartunian (David Hartunian) wrote…

I wonder if StructuredLogProcessor can be a concrete type we export out of the obs pkg.

It needs to be exported by pkg/util/log for the purposes of dependency injection.


pkg/obs/eventagg/map_reduce.go line 42 at r22 (raw file):

Previously, dhartunian (David Hartunian) wrote…

Is there a reason we need an explicit constructor? You can construct a zero value of a type param via new(V) in this scenario.

We need an explicit constructor because we need to construct the object properly before it's used. This means initializing the cache, assigning the stopper, newFn, consumers, flushTrigger, etc. Leaving that to users of the API would be burdensome.


pkg/obs/eventagg/map_reduce.go line 81 at r22 (raw file):

Previously, dhartunian (David Hartunian) wrote…

I don't think we should leave the structured concurrency till later. I'm fine with a follow-up PR since it will affect all your call sites as well, but we should do this now. Just pass in a stopper and push the responsibility upstream. This will inform the design.

Done.

Copy link
Member

@xinhaoz xinhaoz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking great, all the detailed documentation really helped in understanding how it all fits together and the future applications.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @abarganier, @dhartunian, and @petermattis)


pkg/obs/eventagg/doc.go line 60 at r28 (raw file):

//		 fed a single element at a time using a Visitor() pattern. I'd like to find a way to have both here -
//		 the ability for consumers to be fed the raw data structure used in the previous aggregation, and the
//		 ability to somehow chain together consumers regardless of the type of that data structure.

This question is mostly to strengthen my own understanding on the needs of this package, please feel free to defer if I'm getting too into specifics since this is just a POC to nail down general interfaces.
I'm guessing this wish comes from the current limitation where we can only produce 1 aggregated type per struct (although could be misunderstanding something here). I can see that being desirable - I'm wondering if you're thinking of specific examples motivating this wish so perhaps we have a concrete example to work around.

This description sounds like the desire to create an aggregation pipeline of sorts where we'd want to insert consumers at different points in that pipeline, is that correct? Even in that case we can only consume intermediate aggregation types. Specifically I'm wondering if we'll ever need to take a struct, and have 2 consumers where each is expecting a grouping on a different set of keys. I suppose we could just create more structs if we need more aggregated types but it might not be very pretty.


pkg/obs/eventagg/event_agg.go line 24 at r28 (raw file):

//
//   - K is the type of the derived key.
//   - Agg is the aggregate representation of this type.

nit: rename V -> Agg below.


pkg/obs/eventagg/map_reduce.go line 53 at r28 (raw file):

}

// Add implements the aggregator interface.

nit: doesn't seem like this comment is relevant anymore

The eventagg package is (currently) a proof of concept ("POC") that aims
to provide an easy-to-use library that standardizes the way in which we
aggregate Observability event data in CRDB. The goal is to eventually
emit that data as "exhaust" from CRDB, which downstream systems can consume
to build Observability features that do not rely on CRDB's own availability to
aid in debugging & investigations.

This commit contains the first bits of work to scaffold such a library.
It focuses on creating the core building block of the eventagg package,
the MapReduceAggregator (a common library used to perform map/reduce-like
aggregations).

It also provides an unused toy example usage, showing how
MapReduceAggregator could be used to implement a SQL Stats-like feature.

Since this feature is currently experimental, it's gated by the
`COCKROACH_ENABLE_STRUCTURED_EVENTS` environment variable, which is
disabled by default.

Release note: none
This patch introduces the FlushTrigger interface, which can be used by
the MapReduceAggregator to determine when it's time to flush the current
aggregation.

Along with the interface, an initial implementation is provided called
`WindowedFlush`. `WindowedFlush` aligns event aggregations to truncated
time intervals given a user-provided time window.

For example, if a window of 5 minutes was given, the `WindowedFlush`
would enforce the following window boundaries:

- [12:00:00, 12:05:00)
- [12:05:00, 12:10:00)
- [12:10:00, 12:15:00)
- etc.

This is a first pass implementation of the flush mechanism used in
the eventagg package. As needs evolve, the interface and/or
implementation is subject to change. For the purposed of prototyping
though, this meets our needs.

Release note: none
Copy link
Contributor Author

@abarganier abarganier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! Addressed your comments.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dhartunian, @petermattis, and @xinhaoz)


pkg/obs/eventagg/doc.go line 60 at r28 (raw file):

Previously, xinhaoz (Xin Hao Zhang) wrote…

This question is mostly to strengthen my own understanding on the needs of this package, please feel free to defer if I'm getting too into specifics since this is just a POC to nail down general interfaces.
I'm guessing this wish comes from the current limitation where we can only produce 1 aggregated type per struct (although could be misunderstanding something here). I can see that being desirable - I'm wondering if you're thinking of specific examples motivating this wish so perhaps we have a concrete example to work around.

This description sounds like the desire to create an aggregation pipeline of sorts where we'd want to insert consumers at different points in that pipeline, is that correct? Even in that case while we can only consume intermediate aggregation types. Specifically I'm wondering if we'll ever need to take a struct, and have 2 consumers where each is expecting a grouping on a different set of keys. I suppose we could just create more structs if we need more aggregated types but it might not be very pretty.

As discussed during tech pod today, agreed re: the limitations you mentioned with the Mergeable interface.

I've gone ahead and refactored such that instead, the MapReduceAggregator takes in functions to generate a key, and merge an event. This should allow us to have multiple different types of aggregations for a single event type.

Good idea, thank you for this feedback!


pkg/obs/eventagg/map_reduce.go line 53 at r28 (raw file):

Previously, xinhaoz (Xin Hao Zhang) wrote…

nit: doesn't seem like this comment is relevant anymore

Done.

Copy link
Collaborator

@dhartunian dhartunian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: questions aren't blocking. can address in separate patch to tests.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @abarganier, @petermattis, and @xinhaoz)


pkg/obs/eventagg/flush_test.go line 40 at r31 (raw file):

	now = now.Add(1 * time.Minute)
	require.False(t, flush.shouldFlush())
}

can you make this test table-driven so that it more clearly steps through time stamps before, at, and after a window to test the flushes.


pkg/obs/eventagg/map_reduce_test.go line 162 at r31 (raw file):

		triggerTestFlush(ctx, mapReduce, trigger, dummyTestEvent)
		<-consumer.consumedCh()
		verifyFlush(expectedFlush, consumer.flushed)

do you need to verify the first flush to ensure that you just see 5s for all the counts?


pkg/obs/eventagg/map_reduce_test.go line 168 at r31 (raw file):

	// break. Only the goroutine handling that Flush should be impacted. Test to
	// ensure that other goroutines are able to push events into a new aggregation
	// window despite a hanging flush.

what should happen if flush hangs forever? or for longer than the flush window?

Copy link
Contributor Author

@abarganier abarganier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TFTR

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @dhartunian, @petermattis, and @xinhaoz)


pkg/obs/eventagg/flush_test.go line 40 at r31 (raw file):

Previously, dhartunian (David Hartunian) wrote…

can you make this test table-driven so that it more clearly steps through time stamps before, at, and after a window to test the flushes.

In a follow up PR, sure.


pkg/obs/eventagg/map_reduce_test.go line 162 at r31 (raw file):

Previously, dhartunian (David Hartunian) wrote…

do you need to verify the first flush to ensure that you just see 5s for all the counts?

That's all tested as part of the datadriven test. This is focused on post-flush aggregations to ensure the cache is cleared properly.


pkg/obs/eventagg/map_reduce_test.go line 168 at r31 (raw file):

Previously, dhartunian (David Hartunian) wrote…

what should happen if flush hangs forever? or for longer than the flush window?

We can look into implementing a timeout mechanism in a follow up PR.

@abarganier
Copy link
Contributor Author

bors r=dhartunian

@craig craig bot merged commit 7807ee2 into cockroachdb:master May 21, 2024
22 checks passed
@kyle-a-wong kyle-a-wong self-requested a review May 31, 2024 16:23
craig bot pushed a commit that referenced this pull request Jun 5, 2024
124058: log,logstream: structured log emission & consumption mechanism r=abarganier a=abarganier

**Note: please only consider the final two commits. The first two commits are being reviewed separately in #119416

---

This PR introduces `log.Structured` to the log package API. It aims to
serve as a prototype for the log facility we will use for exporting
"exhaust" from CRDB in the form of JSON objects. The intention is that
this exhaust can be consumed externally and be sufficient enough to
build features around.

This iteration has some serious limitations, the main one being that it
is not redaction-aware. The `log.StructuredEvent` exists alongside it
for now. Both implementations are quite similar, so they should probably
be reconciled and/or combined, but this is left as a TODO to avoid
slowing down the prototyping process. For now, it's sufficient for
prototyping.

The patch also introduces a new logging channel explicitly for the new
`log.Structured` API, called `STRUCTURED_EVENTS`. The ability to segment these
types of logs from the rest of the logs is what motivates this separate
channel. The name works for now, but we should consider if there's a
better name available.

The PR also expands upon the new structured logging facilities, adding a
mechanism to consume emitted structured logs internally.

This is done primarily via the new pkg/obs/logstream package, which
handles buffering, routing, & processing of events logged via
log.Structured.

It can be used in conjunction with the eventagg package, and the
KVProcessor interface, to provide users of the eventagg package a way
to consume streams of events flushed from their aggregations. This
enables engineers to use the aggregated data flushed from their
aggregations to build features internal to CRDB. Features that are
powered by the same data that could be consumed externally via the
STRUCTURED_EVENTS log channel.

The provided log config can be updated to make use of this new channel.
For example:
```
sinks:
  file-groups:
    structured-events:
      channels: [STRUCTURED_EVENTS]
```

The changes aim to complete the eventagg pipeline/ecosystem, which now
allows engineers to use common facilities to define aggregations, log
the aggregated results, and consume the logged events internally as
input data.

Finally, it completes the toy StmtStats example by defining a processor
for the aggregated events that are logged.

The below diagram outlines the end-to-end architecture of the system:

![Screenshot 2024-04-26 at 3 59 21 PM](https://github.com/cockroachdb/cockroach/assets/8194877/497c87d8-b7e7-440e-a69d-505a2e760be7)

Release note: none


Epic: CRDB-35919

Co-authored-by: Alex Barganier <abarganier@cockroachlabs.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants