Skip to content

[RFC] support accumulator in cpgpass#1845

Closed
bbrehm wants to merge 1 commit intomasterfrom
bbrehm/passWithAccumulator
Closed

[RFC] support accumulator in cpgpass#1845
bbrehm wants to merge 1 commit intomasterfrom
bbrehm/passWithAccumulator

Conversation

@bbrehm
Copy link
Contributor

@bbrehm bbrehm commented Mar 11, 2026

This is how I would implement joernio/joern#5877

Then, the pass can simply put in whatever kind of accumulator it needs.

Having an extra null-pointer to carry around for other passes should not be too large a burden. This PR is designed for minimal changes / backwards compat. A nicer design would be to subclass DiffGraphBuilder... but that will need a lot of code shuffling, especially in order to not break compound passes (btw, we should probably move the abstract baseclasses from the closed-source codescience to here).

@max-leuthaeuser
Copy link
Contributor

Ok, that uses the Bi-Consumer API nicely.

But two things I do not like here:

  1. all the Nulls (esp. type-wise at API level)
  2. the API of ForkJoinParallelCpgPass suddently concerned with accumulator stuff.

@SuperUserDone
Copy link

SuperUserDone commented Mar 12, 2026

Just my 2c since I am also looking into this for: https://github.com/ShiftLeftSecurity/codescience/issues/8761

While I like the design of this PR because of its usage of built-in APIs, I would still lean towards the other implementation. I also like the runOnFinalAccumulator step, although that might not be as relevant to my usecase.

The primary use case for accumulators here is merging typemaps and typesets, so mergers are mostly non-trivial.

With the other approach having thread-local accumulators, the merger step would assuming a best case merging/reduction algorithm need to do $log_2(t)$ steps (where $t$ is the number of threads in the pool)

In this approach, unless a thread local handle is returned from the createAccumulator, it would need to do again assuming a best case merging algorithm $log_2(k)$ steps (where $k$ is the number of files). In almost all of invocations I would argue that $t < k$. In the case that $t > k$, the stream API would use $t' = k$.

However, it is also worth noting that the JVM's parallel collect() does not necessarily create one accumulator per element, but without any guarantees.

As I pointed out, it would be possible to return a thread local var from the createAccumulator function, but that I feel like is leaking implementation details to the usage side, leading to a lot of code duplication across frontends that likely want to do something similar, particularly given that typemap/typeset merging is likely to be a common pattern.

On the DiffGraphBuilder note, while I would agree that extending from it might create cleaner usages, I would argue that it is already established pattern to extend from the CpgClass classes for pass logic, while I think DiffGraphBuilder should not be concerned with domain/frontend specific usecases.

@max-leuthaeuser
Copy link
Contributor

max-leuthaeuser commented Mar 12, 2026

And to complete this: #1846
Now someone needs to make a decision. :-)

FYI @ml86

@bbrehm
Copy link
Contributor Author

bbrehm commented Mar 12, 2026

However, it is also worth noting that the JVM's parallel collect() does not necessarily create one accumulator per element, but without any guarantees.

Small primer on collect, i.e. the imo best java8 stdlib for map-reduce style parallelism:

This is supposed to model associative non-commutative reductions, done in parallel.

The stream is split into N segments of consecutive elements, where N is the desired parallelism level. These segments are distributed on threads. For each segment, an accumulator is created. Then, the the accumulator absorbs / "consumes" elements from the stream, from left to right, until it hits the end of the segment.

Once a thread hits the end of the segment, it looks whether the next / previous segment is finished (and not locked); if possible it then merges accumulators.

If a thread goes idle (end of segment reached, cannot merge with adjacent segments because they are not finished) it looks for more work: It takes a segment that is currently accumulating, and steals work from it, i.e. cuts its current "todo tail" in half.

So, in first approximation, the number of accumulators is roughly the number of threads; but depending on races / timing differences between absorb can lead to more accumulators. This is good! Suppose file 1 takes a very long time to process; ideally, a new accumulator starting at file 2 would be started.

Alas, it is not exactly that way in the java stream implementation: The stream only allows work-stealing every k=20 (I think?) elements. This is a trade-off: Allowing work-stealing costs one uncontested atomic exchange, and this is too expensive for super cheap work elements. If I was a better open-source citizen I would make a PR to openjdk to make that configurable... but afaiu this has not bitten us so far (I have been looking for this issue! When writing the current stream-based ForkJoinCpgPass class, I did consider writing my own, via atomics, that allows interruption at every element, in order to get better parallelism for extremely divergent per-element runtimes).

The thread-local variant permits non-deterministic reordering of accumulator merges. This would be illegal for non-commutative accumulators (but I think your accumulators are commutative?).

Diffgraph accumulation is not exactly commutative: If we used something like your thread-local for diffgraph construction, the order in which nodes are added would depend on race-conditions. This would make debugging much harder (run the same thing again, suddenly node ids change).

PS. This is the textbook explanation. A big thing to consider is the "stream overhead" for accumulator merging: If merging two accumulators of size Nleft and Nright costs O(min(Nleft, Nright)) then we the accumulator merging costs worst-case O(N log N), where N is the size of the final accumulator. This is why the diffgraph merging uses Deque and merges the smaller into the larger accumulator, either from front or from back, depending on whether the smaller accumulator was the left or right one.

If you naively implement merge(left, right) = left ++= right, then you can get O(N^2) costs with worst-case scheduling! Parallel flatmap must use a deque and cannot use an arraybuffer.

@SuperUserDone
Copy link

SuperUserDone commented Mar 12, 2026

You're right, I stand corrected. If we assume accumulators are cheap to construct and that file processing costs dominate the merge overhead anyway, my merger cost concern is largely moot. I'd say this is better than the thread local approach

@bbrehm
Copy link
Contributor Author

bbrehm commented Mar 12, 2026

But two things I do not like here:

  1. all the Nulls (esp. type-wise at API level)
    2.the API of ForkJoinParallelCpgPass suddently concerned with accumulator stuff.

The ForkJoinParallelCpgPass works exactly as before and needs to know nothing about accumulators, just like CpgPass needs to know nothing about parallelism; you would simply derive from ForkJoinParallelCpgPassWithAccumulator instead of ForkJoinParallelCpgPass.

This change should be fully backwards compatible, even on the ABI level (ie you should be able to compile joern without updating, then put the updated codepropertygraph into the classpath, and presto). If I'm wrong on that, please tell me (either mistake on my part, or something new to learn for me about jvm binary compat).

All the nulls were just because I needed a bogus type <: AnyRef. Would you be happier if I instead put a placeholder object in there?

@max-leuthaeuser
Copy link
Contributor

max-leuthaeuser commented Mar 12, 2026

The ForkJoinParallelCpgPass works exactly as before and needs to know nothing about accumulator

It inherits from ForkJoinParallelCpgPassWithAccumulator and has to override some of its methods. I have no strong opinion on that but to me that seems like the wrong direction for specialization. But its ok I guess, if that saves us from duplicated code.

@maltek
Copy link
Contributor

maltek commented Mar 12, 2026

All the nulls were just because I needed a bogus type <: AnyRef. Would you be happier if I instead put a placeholder object in there?

Could we drop the AnyRef bound in order to use Unit / ()? Or does this break for some JVM interop reason?

There's also java.lang.Void - but that's just a less obvious way of saying all values must be null.

@ml86
Copy link
Contributor

ml86 commented Mar 12, 2026

Since @max-leuthaeuser now also provided a PR which fully uses the collect.stream facilities and does not use explicit thread locals, I think we are decided on the general approach.
Both versions keep the pre-existing API constant but I do not want to achieve that by duplication of the base logic like in @max-leuthaeuser PR. I also think that the having the implementation with the accumulator as a base class is the logical thing to have and that the pre-existing API just maps to that with null/Void as accumulator.

@max-leuthaeuser Please go ahead an adjust your PR in that way and than we will take yours to merge.

@max-leuthaeuser
Copy link
Contributor

Thanks for all the input guys!
I will create a new PR merging all the stuff we discussed incl. tests.

@max-leuthaeuser max-leuthaeuser deleted the bbrehm/passWithAccumulator branch March 12, 2026 16:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants