Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion go/internal/analyzer/graph_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ type Snapshot struct {

// Snapshot returns the current state as a sorted, dangling-edge-free
// Snapshot with surfaced dedup/drop counts.
//
// After this call returns, the builder's internal dedup maps are cleared
// (set to nil). This releases ~280 MB of reference pressure at ~/projects/
// scale where the downstream enrich pipeline holds the returned Snapshot
// slices for the lifetime of the function — coexisting with the dedup
// maps was the largest in-memory duplication in the pipeline. Snapshot
// is therefore single-shot: subsequent calls to Snapshot or Add on the
// same builder are not supported.
func (b *GraphBuilder) Snapshot() Snapshot {
b.mu.Lock()
defer b.mu.Unlock()
Expand All @@ -109,11 +117,17 @@ func (b *GraphBuilder) Snapshot() Snapshot {
}
sort.Slice(edges, func(i, j int) bool { return edges[i].ID < edges[j].ID })

return Snapshot{
snap := Snapshot{
Nodes: nodes,
Edges: edges,
DedupedNodes: b.dedupedNodes,
DedupedEdges: b.dedupedEdges,
DroppedEdges: dropped,
}
// Release dedup maps so Go GC can collect them while downstream
// enrich stages run. The maps held references to every node and
// edge already projected into the returned slices.
b.nodes = nil
b.edges = nil
return snap
}
15 changes: 15 additions & 0 deletions go/internal/analyzer/graph_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@ import (
"github.com/randomcodespace/codeiq/go/internal/model"
)

func TestSnapshotReleasesDedupMaps(t *testing.T) {
gb := NewGraphBuilder()
gb.Add(&detector.Result{
Nodes: []*model.CodeNode{model.NewCodeNode("x", model.NodeClass, "X")},
Edges: []*model.CodeEdge{{ID: "e:x:x", SourceID: "x", TargetID: "x", Kind: model.EdgeContains}},
})
_ = gb.Snapshot()
if gb.nodes != nil {
t.Errorf("Snapshot must nil GraphBuilder.nodes to allow GC; got len=%d", len(gb.nodes))
}
if gb.edges != nil {
t.Errorf("Snapshot must nil GraphBuilder.edges to allow GC; got len=%d", len(gb.edges))
}
}

func TestGraphBuilderDeduplicatesByID(t *testing.T) {
gb := NewGraphBuilder()
n1 := model.NewCodeNode("a", model.NodeClass, "A")
Expand Down
90 changes: 71 additions & 19 deletions go/internal/graph/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,60 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"sync"
"time"

kuzu "github.com/kuzudb/go-kuzu"
)

// DefaultBufferPoolBytes caps Kuzu's buffer pool to 2 GiB by default.
// kuzu.DefaultSystemConfig() allocates 80% of system RAM (~12 GiB on a 15
// GiB host) before any Go-side enrich work runs, leaving insufficient
// headroom for the in-memory enricher pipeline. 2 GiB is enough for
// real-world graphs at ~/projects/-scale (~430k nodes / ~300k edges) while
// keeping the host OOM bar well below ceiling.
const DefaultBufferPoolBytes uint64 = 2 << 30

// defaultMaxThreads returns the per-query thread cap for Kuzu — bounded so
// COPY FROM's working set scales with parallelism in a controlled way.
// min(4, GOMAXPROCS): keeps headroom even on small hosts; 4 is enough to
// saturate IO+CPU for our COPY shape.
func defaultMaxThreads() uint64 {
n := runtime.GOMAXPROCS(0)
if n > 4 {
n = 4
}
if n < 1 {
n = 1
}
return uint64(n)
}

// OpenOptions tunes how Open and OpenReadOnly wire the underlying Kuzu
// SystemConfig. Zero-valued fields fall back to safe defaults documented
// alongside each field.
type OpenOptions struct {
// BufferPoolBytes caps Kuzu's buffer pool in bytes. Zero -> DefaultBufferPoolBytes.
BufferPoolBytes uint64
// MaxThreads caps Kuzu's per-query parallelism. Zero -> defaultMaxThreads().
MaxThreads uint64
// ReadOnly opens the database in read-only mode.
ReadOnly bool
// QueryTimeout, if > 0, sets the per-query wall-clock timeout.
QueryTimeout time.Duration
}

func (o OpenOptions) resolved() OpenOptions {
if o.BufferPoolBytes == 0 {
o.BufferPoolBytes = DefaultBufferPoolBytes
}
if o.MaxThreads == 0 {
o.MaxThreads = defaultMaxThreads()
}
return o
}

// Store is the embedded Kuzu graph store facade. It owns one Kuzu database
// and a single long-lived connection. The zero value is not usable — call
// Open or OpenReadOnly to construct.
Expand All @@ -32,14 +80,26 @@ type Store struct {
readOnly bool
}

// Open creates or opens a Kuzu database at the given directory path. Kuzu
// itself creates the directory if it does not exist; we ensure the parent
// exists so a fresh `.codeiq/graph/codeiq.kuzu/` works on first run.
// Open creates or opens a Kuzu database with safe default OpenOptions
// (capped BufferPoolBytes + MaxThreads). For tuning, see OpenWithOptions.
func Open(path string) (*Store, error) {
return OpenWithOptions(path, OpenOptions{})
}

// OpenWithOptions creates or opens a Kuzu database, applying any non-zero
// fields of opts. Zero-valued fields fall back to safe defaults — see
// OpenOptions and DefaultBufferPoolBytes.
func OpenWithOptions(path string, opts OpenOptions) (*Store, error) {
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return nil, fmt.Errorf("graph: mkdir parent: %w", err)
}
opts = opts.resolved()
sys := kuzu.DefaultSystemConfig()
sys.BufferPoolSize = opts.BufferPoolBytes
sys.MaxNumThreads = opts.MaxThreads
if opts.ReadOnly {
sys.ReadOnly = true
}
db, err := kuzu.OpenDatabase(path, sys)
if err != nil {
return nil, fmt.Errorf("graph: open db: %w", err)
Expand All @@ -49,7 +109,10 @@ func Open(path string) (*Store, error) {
db.Close()
return nil, fmt.Errorf("graph: open conn: %w", err)
}
return &Store{db: db, conn: conn, path: path}, nil
if opts.QueryTimeout > 0 {
conn.SetTimeout(uint64(opts.QueryTimeout / time.Millisecond))
}
return &Store{db: db, conn: conn, path: path, readOnly: opts.ReadOnly}, nil
}

// OpenReadOnly opens an existing Kuzu store in read-only mode and sets a
Expand All @@ -65,21 +128,10 @@ func Open(path string) (*Store, error) {
// queryTimeout <= 0 disables the per-query timeout. Kuzu interprets the
// timeout in milliseconds; we accept a Go duration for ergonomics.
func OpenReadOnly(path string, queryTimeout time.Duration) (*Store, error) {
sys := kuzu.DefaultSystemConfig()
sys.ReadOnly = true
db, err := kuzu.OpenDatabase(path, sys)
if err != nil {
return nil, fmt.Errorf("graph: open read-only %q: %w", path, err)
}
conn, err := kuzu.OpenConnection(db)
if err != nil {
db.Close()
return nil, fmt.Errorf("graph: open ro conn: %w", err)
}
if queryTimeout > 0 {
conn.SetTimeout(uint64(queryTimeout / time.Millisecond))
}
return &Store{db: db, conn: conn, path: path, readOnly: true}, nil
return OpenWithOptions(path, OpenOptions{
ReadOnly: true,
QueryTimeout: queryTimeout,
})
}

// IsReadOnly reports whether the store rejects mutating Cypher.
Expand Down
26 changes: 23 additions & 3 deletions go/internal/intelligence/extractor/enricher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package extractor
import (
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"

"github.com/randomcodespace/codeiq/go/internal/model"
"github.com/randomcodespace/codeiq/go/internal/parser"
)

// Enricher orchestrates per-language extractors over a node list. Mirrors
Expand Down Expand Up @@ -93,11 +95,18 @@ func (en *Enricher) Enrich(nodes []*model.CodeNode, edges *[]*model.CodeEdge, ro

// Run per-file work concurrently; collect into indexed slots so the
// final concat order matches `paths` (sorted) — deterministic output.
// Cap concurrent goroutines at 2*GOMAXPROCS so the simultaneously-live
// tree-sitter Trees + file content strings stay bounded. Polyglot
// targets like airflow (~7k Python files) previously spawned one
// goroutine per file, driving peak RSS into OOM territory.
out := make([][]*model.CodeEdge, len(tasks))
sem := make(chan struct{}, 2*runtime.GOMAXPROCS(0))
var wg sync.WaitGroup
for i, t := range tasks {
wg.Add(1)
sem <- struct{}{}
go func(i int, t task) {
defer func() { <-sem }()
defer wg.Done()
full := filepath.Join(root, t.path)
raw, err := os.ReadFile(full)
Expand All @@ -114,12 +123,23 @@ func (en *Enricher) Enrich(nodes []*model.CodeNode, edges *[]*model.CodeEdge, ro
Content: content,
Registry: registry,
}
// Parse once per file; reuse the tree across every node in this
// file via ExtractFromTree. Eliminates the per-node re-parse that
// pprof on airflow flagged as 91% of total allocations.
tree, _ := parser.ParseByName(t.ext.Language(), raw)
if tree != nil {
defer tree.Close()
}
results := t.ext.ExtractFromTree(ctx, tree, t.ns)
var localEdges []*model.CodeEdge
for _, n := range t.ns {
r := t.ext.Extract(ctx, n)
for j, r := range results {
if j >= len(t.ns) {
break
}
n := t.ns[j]
localEdges = append(localEdges, r.CallEdges...)
localEdges = append(localEdges, r.SymbolReferences...)
if len(r.TypeHints) > 0 {
if len(r.TypeHints) > 0 && n != nil {
if n.Properties == nil {
n.Properties = map[string]any{}
}
Expand Down
107 changes: 103 additions & 4 deletions go/internal/intelligence/extractor/enricher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@ package extractor
import (
"os"
"path/filepath"
"runtime"
"sort"
"sync/atomic"
"testing"
"time"

"github.com/randomcodespace/codeiq/go/internal/model"
"github.com/randomcodespace/codeiq/go/internal/parser"
)

// fakeExtractor is a test-only LanguageExtractor that records each call so we
// can assert the orchestrator's read-once contract and per-language dispatch.
type fakeExtractor struct {
lang string
calls int32 // atomic counter of Extract() invocations
calls int32 // counts per-node visits (across both Extract and ExtractFromTree)
filesSeen []string
emitEdge bool
emitHint bool
Expand All @@ -25,9 +28,9 @@ type fakeExtractor struct {

func (f *fakeExtractor) Language() string { return f.lang }

func (f *fakeExtractor) Extract(ctx Context, node *model.CodeNode) Result {
atomic.AddInt32(&f.calls, 1)
f.filesSeen = append(f.filesSeen, ctx.FilePath)
// resultFor synthesises a Result for one node — shared between Extract and
// ExtractFromTree so behaviour is identical regardless of call path.
func (f *fakeExtractor) resultFor(node *model.CodeNode) Result {
r := EmptyResult()
if f.emitEdge {
r.CallEdges = []*model.CodeEdge{{
Expand All @@ -47,6 +50,26 @@ func (f *fakeExtractor) Extract(ctx Context, node *model.CodeNode) Result {
return r
}

func (f *fakeExtractor) Extract(ctx Context, node *model.CodeNode) Result {
atomic.AddInt32(&f.calls, 1)
f.filesSeen = append(f.filesSeen, ctx.FilePath)
return f.resultFor(node)
}

func (f *fakeExtractor) ExtractFromTree(ctx Context, _ *parser.Tree, nodes []*model.CodeNode) []Result {
atomic.AddInt32(&f.calls, int32(len(nodes)))
f.filesSeen = append(f.filesSeen, ctx.FilePath)
results := make([]Result, len(nodes))
for i, n := range nodes {
if n == nil {
results[i] = EmptyResult()
continue
}
results[i] = f.resultFor(n)
}
return results
}

func TestEnricher_DispatchesPerLanguageAndAppendsEdges(t *testing.T) {
dir := t.TempDir()
javaPath := "src/Foo.java"
Expand Down Expand Up @@ -156,6 +179,82 @@ func TestEnricher_SkipsFilteredFiles(t *testing.T) {
}
}

// concurrencyTrackingExtractor records the maximum number of goroutines
// observed inside ExtractFromTree at the same time, so we can assert that
// the orchestrator bounds the fan-out.
type concurrencyTrackingExtractor struct {
lang string
inFlight atomic.Int32
maxSeen atomic.Int32
hold time.Duration
}

func (c *concurrencyTrackingExtractor) Language() string { return c.lang }

func (c *concurrencyTrackingExtractor) Extract(ctx Context, node *model.CodeNode) Result {
// Unused for this test; orchestrator hits ExtractFromTree.
return EmptyResult()
}

func (c *concurrencyTrackingExtractor) ExtractFromTree(_ Context, _ *parser.Tree, nodes []*model.CodeNode) []Result {
cur := c.inFlight.Add(1)
defer c.inFlight.Add(-1)
for {
old := c.maxSeen.Load()
if cur <= old || c.maxSeen.CompareAndSwap(old, cur) {
break
}
}
time.Sleep(c.hold)
results := make([]Result, len(nodes))
for i := range results {
results[i] = EmptyResult()
}
return results
}

func TestEnricher_BoundedConcurrency(t *testing.T) {
// Generate enough files to overwhelm the goroutine pool if it were
// unbounded — 4 * cap files at minimum.
cap := 2 * runtime.GOMAXPROCS(0)
nFiles := 4 * cap
dir := t.TempDir()
nodes := make([]*model.CodeNode, 0, nFiles)
for i := 0; i < nFiles; i++ {
// Deterministic distinct file paths so the orchestrator schedules
// one task per file.
rel := filepath.Join("src", "f", "F"+itoa(i)+".java")
writeFile(t, filepath.Join(dir, rel), "class F"+itoa(i)+" {}")
n := model.NewCodeNode("n:"+itoa(i), model.NodeClass, "F"+itoa(i))
n.FilePath = rel
nodes = append(nodes, n)
}
ext := &concurrencyTrackingExtractor{lang: "java", hold: 25 * time.Millisecond}
en := NewEnricher(ext)
var edges []*model.CodeEdge
en.Enrich(nodes, &edges, dir)
peak := ext.maxSeen.Load()
if peak == 0 {
t.Fatal("peak in-flight was 0 — orchestrator never invoked the extractor")
}
if int(peak) > cap {
t.Fatalf("peak concurrent ExtractFromTree calls = %d, want <= %d (2*GOMAXPROCS)", peak, cap)
}
}

func itoa(i int) string {
const digits = "0123456789"
if i == 0 {
return "0"
}
out := make([]byte, 0, 8)
for i > 0 {
out = append([]byte{digits[i%10]}, out...)
i /= 10
}
return string(out)
}

func TestEnricher_NoExtractorsIsNoop(t *testing.T) {
en := NewEnricher()
n := model.NewCodeNode("n:1", model.NodeClass, "Foo")
Expand Down
Loading
Loading