Skip to content

Enhancement: Automated model checks and Updated client latency metrics#347

Merged
shubhamk-meesho merged 7 commits intomainfrom
develop
Feb 25, 2026
Merged

Enhancement: Automated model checks and Updated client latency metrics#347
shubhamk-meesho merged 7 commits intomainfrom
develop

Conversation

@shubhamk-meesho
Copy link
Contributor

@shubhamk-meesho shubhamk-meesho commented Feb 25, 2026

Summary by CodeRabbit

  • New Features

    • Enhanced model upload validation with Python code checks and stricter warmup/config validations.
    • Cloud storage file discovery now returns all matching files instead of a single match.
    • Faster, parallelized interaction retrieval with centralized limits and week-based time-range handling; clearer time-range error responses.
  • Bug Fixes

    • Improved and more informative error messages across feature retrieval and sequence lookups.
    • Reduced allocation overhead in compression decoding.
  • Tests

    • Extensive unit and integration tests added for validations, retrieval logic, and compression.

#339)

* Added automated model checks to the model upload workflow, including validation for logger and print statements in Python backend models, and ensuring warmup configuration is present in config.pbtxt.

* Resolved static check lint error

* Added unit tests for the new added code changes,

* resolved coderabbit and other review comments

* removed extra comments

* removed dublicate code

---------

Co-authored-by: pavan-adari-meesho <pavan.adari@meesho.com>
@coderabbitai
Copy link

coderabbitai bot commented Feb 25, 2026

Walkthrough

Adds a GCS listing API and implements extensive Predator model-upload validations (config/warmup and Python logger/print checks) with supporting model types and constants, plus widespread interaction-store refactors (week/column constants, parallel fetch/deserialise, cache-key changes), new tests, and smaller service tweaks.

Changes

Cohort / File(s) Summary
GCS Client
horizon/internal/externalcall/gcs_client.go
Added ListFilesWithSuffix(bucket, folderPath, suffix string) ([]string, error) to interface and implementation to return all matching objects under a folder.
Predator: models & constants
horizon/internal/predator/handler/model.go, horizon/internal/predator/handler/predator_constants.go
Added internal types (fileViolationInfo, funcScopeEntry) and several new non-exported constants used by upload validation and Python checks.
Predator: upload validation
horizon/internal/predator/handler/predator_upload.go
Implemented full-upload validations: config.pbtxt/warmup checks, Python backend detection, scanning for logger/print statements with violation aggregation and new error paths.
Predator: tests
horizon/internal/predator/handler/predator_upload_test.go
Added comprehensive unit tests and a mock GCS client covering ensemble/python detection, warmup checks, logger/print scanning, and violation-summary formatting.
Skye variant RT partitioning
horizon/internal/skye/handler/skye.go
On variant approval, collect used RT partitions and assign a random available RT partition (1..256) when registering variants in ETCD.
Go SDK gRPC client
go-sdk/pkg/interaction-store/client.go
Changed client constructor call to pass the gRPC connection object directly (use conn instead of conn.Conn).
ZSTD compression pool & tests
interaction-store/internal/compression/zstd.go, .../zstd_test.go
Added a decode buffer pool and pooling logic to Reduce allocations in Decode; added ZSTD encoder/decoder tests including unsupported-type checks.
Interaction-store constants
interaction-store/internal/constants/constants.go
Introduced rolling-window and retrieval constants (WeekColumnPrefix, TotalWeeks, MaxWeekIndex, WeeksPerBucket, MaxRetrieveLimit, MaxOrder/ClickEventsPerWeek).
Scylla cache-key & query changes
interaction-store/internal/data/scylla/scylla.go, .../scylla_test.go
Removed xxhash-based unordered keys in favor of deterministic string keys built from keyspace/table/columns; updated query execution scanning logic and added tests for cache key generation.
Handler: persist (click/order)
interaction-store/internal/handler/persist/click.go, .../order.go, .../click_test.go, .../order_test.go
Replaced hard-coded week constants with new constants package; changed column naming to use WeekColumnPrefix; adjusted bucket iteration order and tests to use centralized constants.
Handler: retrieve (click/order) refactor
interaction-store/internal/handler/retrieve/click.go, .../order.go, .../retrieve.go, plus .../click_test.go,.../order_test.go,.../retrieve_test.go
Rewrote retrieval flow to compute ordered weeks, map tables->fields, fetch data in parallel, deserialize in parallel, and merge/filter with limit; added validateTimeRange limit checks and new error variables; extensive unit test updates/additions.
Handler: top-level interaction & proto conversion
interaction-store/internal/handler/interaction.go
Added timing instrumentation and parallel retrieval for Click+Order using errgroup; refactored proto conversion to use backing slices and pointer arrays.
Utils & time tests
interaction-store/internal/utils/utils_test.go
Added time/week utility tests (WeekFromTimestampMs, TimestampDiffInWeeks).
Online Feature Store: errors/logging
online-feature-store/internal/config/etcd.go, .../feature/features.go, .../feature/retrieve.go, .../retrieve_test.go
Enriched error messages with entity/FG context, adjusted a few log levels and control-flow to return distinct errors when sequence == -1; updated tests to match message changes.
HTTP server middleware
online-feature-store/internal/server/http/server.go
Removed Gin Logger middleware from server initialization.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Upload Handler
    participant GCS as GCS Client
    participant Parser as Config Parser
    participant Scanner as Python Scanner
    participant Validator as Validator

    Client->>Validator: validateModelConfiguration(config)
    Validator->>Validator: isEnsembleModel?
    
    alt Non-Ensemble Model
        Validator->>GCS: ReadObject(config.pbtxt)
        GCS-->>Validator: config content
        Validator->>Parser: parse config.pbtxt
        Parser-->>Validator: parsed config
        Validator->>Validator: validate warmup block
    else Ensemble Model
        Validator->>Validator: skip warmup check
    end
    
    Validator->>Validator: isPythonBackendModel?
    
    alt Python Backend
        Validator->>GCS: ListFilesWithSuffix(bucket, folder, ".py")
        GCS-->>Validator: list of Python files
        loop For each Python file
            Validator->>GCS: ReadObject(python file)
            GCS-->>Validator: file content
            Validator->>Scanner: hasPythonLoggerOrPrintStatements(file)
            Scanner-->>Validator: violations (if any)
        end
        Validator->>Validator: buildBalancedViolationSummary()
        Validator-->>Client: validation error if violations found
    else Non-Python Backend
        Validator-->>Client: validation success
    end
Loading

Possibly related PRs

  • Sync internal develop to public develop #346 — touches overlapping files and edits (interaction-store constants, scylla cache-key/query logic, retrieve/persist handlers, compression pool, and the go-sdk client constructor), suggesting a close code-level relation.

Suggested reviewers

  • ayushVerma-meesho
🚥 Pre-merge checks | ✅ 1 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Krd Checker ❓ Inconclusive PR description body not available; KRD validation requirement unclear without full PR context and template specifications. Provide actual PR description body from GitHub PR #347 or clarify if KRD validation is required for this repository.
✅ Passed checks (1 passed)
Check name Status Explanation
Dynamic Configuration Validation ✅ Passed No changes detected to dynamic configuration files matching pattern application-dyn-.yml or application-dyn-.yaml. All PR modifications are Go source code files only.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (5)
horizon/internal/predator/handler/predator_upload.go (4)

686-686: loggerPattern only catches info and debug levels.

The regex logger\.(info|debug)\s*\( will miss logger.warning(...), logger.error(...), logger.critical(...), and logger.log(...) calls. If the intent is to block all logging in production hot paths, those levels should be included. If only info/debug are intentionally targeted (since warning/error are considered acceptable), consider adding a comment to make this explicit.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@horizon/internal/predator/handler/predator_upload.go` at line 686, The
current regex assigned to loggerPattern
(regexp.MustCompile(`(?i)logger\.(info|debug)\s*\(`)) only matches info and
debug calls; update loggerPattern to include all desired levels (e.g., warning,
warn, error, critical, fatal, log) or narrow explicitly and add a clarifying
comment. Modify the regexp in loggerPattern (or replace it with a named
constant) to include the additional level names you want to block (for example
add warning|warn|error|critical|fatal|log) or, if only info/debug should be
matched, add a comment above loggerPattern explaining that warnings/errors are
intentionally excluded.

702-711: Tab/space mixing could mistrack indentation scope.

lineIndent (line 702) counts both tabs and spaces equally as single characters, but a tab typically represents 4 or 8 spaces in Python. While Python 3 forbids mixing tabs and spaces for indentation, if a file happens to use pure tabs, the relative indent comparison still holds correctly. However, if you receive a file that inconsistently mixes them (e.g., from a Python 2 codebase or a poorly configured editor), scope tracking could break. This is a low-probability edge case — just calling it out.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@horizon/internal/predator/handler/predator_upload.go` around lines 702 - 711,
The indentation counter can be thrown off by mixed tabs and spaces; before
computing lineIndent and comparing entries in functionStack, normalize tabs to a
fixed number of spaces (e.g., 4 or 8) so that lineIndent reflects visual
indentation consistently; update the code around lineIndent calculation (used
with funcDefPattern, strippedLine, funcScopeEntry, and functionStack) to replace
tabs with the chosen number of spaces (or otherwise expand tabs) and then
compute lineIndent from the normalized string.

682-726: Triple-quoted strings and regex compilation placement.

Two observations on hasPythonLoggerOrPrintStatements:

  1. Triple-quoted strings not handled by stripInlineComment: A # inside a multi-line """...""" or '''...''' block will be treated as a comment boundary, which could cause false negatives (stripping real code) or mask a print() on the same line. This is an inherent limitation of line-by-line analysis. Worth documenting as a known limitation.

  2. Regex patterns compiled per invocation (lines 685-687): These are compiled fresh on every call. Since validateNoLoggerOrPrintStatements calls this in a loop over all .py files, consider promoting the patterns to package-level var declarations using regexp.MustCompile to avoid repeated compilation.

♻️ Promote regexes to package level
+var (
+	funcDefPattern = regexp.MustCompile(`^def\s+(\w+)\s*\(`)
+	loggerPattern  = regexp.MustCompile(`(?i)logger\.(info|debug)\s*\(`)
+	printPattern   = regexp.MustCompile(`\bprint\s*\(`)
+)
+
 func hasPythonLoggerOrPrintStatements(content []byte) (found bool, details []string) {
 	lines := strings.Split(string(content), "\n")
-
-	funcDefPattern := regexp.MustCompile(`^def\s+(\w+)\s*\(`)
-	loggerPattern := regexp.MustCompile(`(?i)logger\.(info|debug)\s*\(`)
-	printPattern := regexp.MustCompile(`\bprint\s*\(`)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@horizon/internal/predator/handler/predator_upload.go` around lines 682 - 726,
The function hasPythonLoggerOrPrintStatements currently recompiles regexes on
each call and treats '#' inside triple-quoted strings as inline comments; move
loggerPattern and printPattern (and funcDefPattern if desired) to package-level
vars using regexp.MustCompile to avoid repeated compilation (update
hasPythonLoggerOrPrintStatements to reference the package-level names), and add
a brief comment in the file near stripInlineComment or atop
hasPythonLoggerOrPrintStatements documenting the known limitation that
multi-line triple-quoted strings ("""...""" or '''...''') are not parsed and '#'
inside them may be misinterpreted by the line-by-line stripInlineComment logic;
ensure validateNoLoggerOrPrintStatements continues to call
hasPythonLoggerOrPrintStatements unchanged.

556-577: Redundant config.pbtxt read and parse.

validateNoLoggerOrPrintStatements reads and parses config.pbtxt again (lines 563-572), even though validateModelConfiguration (called earlier in validateSourceModel at line 159) already performs the same read and parse. This doubles the GCS read for every full upload.

Consider passing the already-parsed ModelConfig (or at least the backend string) to avoid the duplicate I/O.

♻️ Sketch: pass parsed config downstream
-func (p *Predator) validateSourceModel(gcsPath string, isPartial bool) error {
+func (p *Predator) validateSourceModel(gcsPath string, isPartial bool) error {
 	srcBucket, srcPath := extractGCSPath(gcsPath)
 	if srcBucket == "" || srcPath == "" {
 		return fmt.Errorf("invalid GCS path format: %s", gcsPath)
 	}
 
-	if err := p.validateModelConfiguration(gcsPath); err != nil {
+	modelConfig, err := p.validateModelConfiguration(gcsPath)
+	if err != nil {
 		return fmt.Errorf("config.pbtxt validation failed: %w", err)
 	}
 
 	if !isPartial {
 		if err := p.validateCompleteModelStructure(srcBucket, srcPath); err != nil {
 			return fmt.Errorf("complete model structure validation failed: %w", err)
 		}
 
-		if err := p.validateNoLoggerOrPrintStatements(gcsPath); err != nil {
+		if err := p.validateNoLoggerOrPrintStatements(gcsPath, modelConfig); err != nil {
 			return fmt.Errorf("logger/print statement validation failed: %w", err)
 		}
 	}
 
 	return nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@horizon/internal/predator/handler/predator_upload.go` around lines 556 - 577,
The validateNoLoggerOrPrintStatements function is re-reading and parsing
config.pbtxt; change its signature to accept the already-parsed ModelConfig (or
at minimum the backend string) instead of gcsPath-only, update callers such as
validateSourceModel (which calls validateModelConfiguration) to pass the parsed
ModelConfig returned there, and inside validateNoLoggerOrPrintStatements use the
passed ModelConfig with isPythonBackendModel rather than calling
GcsClient.ReadFile/ prototext.Unmarshal; adjust any other call sites to the new
signature and remove the duplicate ReadFile/unmarshal logic from
validateNoLoggerOrPrintStatements.
horizon/internal/externalcall/gcs_client.go (1)

775-806: Inconsistent suffix matching with FindFileWithSuffix.

FindFileWithSuffix (line 758) matches the suffix against path.Base(attrs.Name) (basename only), while ListFilesWithSuffix (line 795) matches against attrs.Name (full object key). For typical suffixes like .py this is functionally equivalent, but the inconsistency could cause subtle bugs with suffixes that might appear in path components (e.g., a folder named data.py/). Consider aligning the matching strategy.

♻️ Suggested alignment
-		if strings.HasSuffix(attrs.Name, suffix) {
+		if strings.HasSuffix(path.Base(attrs.Name), suffix) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@horizon/internal/externalcall/gcs_client.go` around lines 775 - 806,
ListFilesWithSuffix currently checks suffix against the full object key
(attrs.Name) while FindFileWithSuffix uses path.Base(attrs.Name), causing
inconsistent behavior; update ListFilesWithSuffix to use the basename check
(strings.HasSuffix(path.Base(attrs.Name), suffix)) so both functions match the
suffix only against the file name, and add the "path" import if it's not already
present.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@horizon/internal/externalcall/gcs_client.go`:
- Around line 775-806: ListFilesWithSuffix currently checks suffix against the
full object key (attrs.Name) while FindFileWithSuffix uses
path.Base(attrs.Name), causing inconsistent behavior; update ListFilesWithSuffix
to use the basename check (strings.HasSuffix(path.Base(attrs.Name), suffix)) so
both functions match the suffix only against the file name, and add the "path"
import if it's not already present.

In `@horizon/internal/predator/handler/predator_upload.go`:
- Line 686: The current regex assigned to loggerPattern
(regexp.MustCompile(`(?i)logger\.(info|debug)\s*\(`)) only matches info and
debug calls; update loggerPattern to include all desired levels (e.g., warning,
warn, error, critical, fatal, log) or narrow explicitly and add a clarifying
comment. Modify the regexp in loggerPattern (or replace it with a named
constant) to include the additional level names you want to block (for example
add warning|warn|error|critical|fatal|log) or, if only info/debug should be
matched, add a comment above loggerPattern explaining that warnings/errors are
intentionally excluded.
- Around line 702-711: The indentation counter can be thrown off by mixed tabs
and spaces; before computing lineIndent and comparing entries in functionStack,
normalize tabs to a fixed number of spaces (e.g., 4 or 8) so that lineIndent
reflects visual indentation consistently; update the code around lineIndent
calculation (used with funcDefPattern, strippedLine, funcScopeEntry, and
functionStack) to replace tabs with the chosen number of spaces (or otherwise
expand tabs) and then compute lineIndent from the normalized string.
- Around line 682-726: The function hasPythonLoggerOrPrintStatements currently
recompiles regexes on each call and treats '#' inside triple-quoted strings as
inline comments; move loggerPattern and printPattern (and funcDefPattern if
desired) to package-level vars using regexp.MustCompile to avoid repeated
compilation (update hasPythonLoggerOrPrintStatements to reference the
package-level names), and add a brief comment in the file near
stripInlineComment or atop hasPythonLoggerOrPrintStatements documenting the
known limitation that multi-line triple-quoted strings ("""...""" or '''...''')
are not parsed and '#' inside them may be misinterpreted by the line-by-line
stripInlineComment logic; ensure validateNoLoggerOrPrintStatements continues to
call hasPythonLoggerOrPrintStatements unchanged.
- Around line 556-577: The validateNoLoggerOrPrintStatements function is
re-reading and parsing config.pbtxt; change its signature to accept the
already-parsed ModelConfig (or at minimum the backend string) instead of
gcsPath-only, update callers such as validateSourceModel (which calls
validateModelConfiguration) to pass the parsed ModelConfig returned there, and
inside validateNoLoggerOrPrintStatements use the passed ModelConfig with
isPythonBackendModel rather than calling GcsClient.ReadFile/
prototext.Unmarshal; adjust any other call sites to the new signature and remove
the duplicate ReadFile/unmarshal logic from validateNoLoggerOrPrintStatements.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cd106a1 and a59c278.

📒 Files selected for processing (5)
  • horizon/internal/externalcall/gcs_client.go
  • horizon/internal/predator/handler/model.go
  • horizon/internal/predator/handler/predator_constants.go
  • horizon/internal/predator/handler/predator_upload.go
  • horizon/internal/predator/handler/predator_upload_test.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (6)
online-feature-store/internal/handler/feature/retrieve_test.go (1)

648-652: ⚠️ Potential issue | 🟡 Minor

Fix the error message prefix for GetFeatureGroup failures in preProcessForKeys.

The production code at line 671 wraps a GetFeatureGroup error with the message prefix "failed to get active schema for feature group...", but this is semantically incorrect. The GetFeatureGroup operation is distinct from GetActiveFeatureSchema (which appears at line 719 and correctly uses the same prefix). Line 723 demonstrates the correct pattern by wrapping GetFeatureGroup errors with "failed to get feature group...". Update line 671 to use "failed to get feature group..." instead to maintain semantic accuracy and consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@online-feature-store/internal/handler/feature/retrieve_test.go` around lines
648 - 652, The test expects the error returned from preProcessForKeys when
config.GetFeatureGroup fails to be prefixed with "failed to get feature
group..."; update the production call in preProcessForKeys that wraps errors
from GetFeatureGroup to use the prefix "failed to get feature group ..." instead
of "failed to get active schema for feature group..."; ensure the other call
that wraps GetActiveFeatureSchema still uses "failed to get active schema for
feature group..." so semantics remain correct (refer to functions/methods
preProcessForKeys, GetFeatureGroup, and GetActiveFeatureSchema).
horizon/internal/skye/handler/skye.go (2)

605-620: ⚠️ Potential issue | 🟠 Major

Do not silently drop unknown filters while building criteria.

At Line [606], missing filter keys are ignored. This can approve a variant with fewer criteria than requested. Fail fast when any requested filter is absent.

🔧 Suggested fix (strict filter validation)
-			criteria := []skyeEtcd.Criteria{}
+			criteria := make([]skyeEtcd.Criteria, 0, len(payload.FilterConfiguration.Criteria))
 			for _, filter := range payload.FilterConfiguration.Criteria {
-				if _, ok := filtersFromEtcd[filter.ColumnName]; ok {
-					filterValue := filtersFromEtcd[filter.ColumnName].FilterValue
-					defaultValue := filtersFromEtcd[filter.ColumnName].DefaultValue
-					if filter.Condition == enums.FilterCondition(enums.NOT_EQUALS) {
-						filterValue = filtersFromEtcd[filter.ColumnName].DefaultValue
-						defaultValue = filtersFromEtcd[filter.ColumnName].FilterValue
-
-					}
-					criteria = append(criteria, skyeEtcd.Criteria{
-						ColumnName:   filter.ColumnName,
-						FilterValue:  filterValue,
-						DefaultValue: defaultValue,
-					})
-				}
+				filterCfg, ok := filtersFromEtcd[filter.ColumnName]
+				if !ok {
+					return fmt.Errorf("filter '%s' does not exist for entity '%s'", filter.ColumnName, payload.Entity)
+				}
+				filterValue := filterCfg.FilterValue
+				defaultValue := filterCfg.DefaultValue
+				if filter.Condition == enums.FilterCondition(enums.NOT_EQUALS) {
+					filterValue = filterCfg.DefaultValue
+					defaultValue = filterCfg.FilterValue
+				}
+				criteria = append(criteria, skyeEtcd.Criteria{
+					ColumnName:   filter.ColumnName,
+					FilterValue:  filterValue,
+					DefaultValue: defaultValue,
+				})
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@horizon/internal/skye/handler/skye.go` around lines 605 - 620, The loop that
builds criteria in skye.go currently ignores unknown filters and silently omits
them; update the logic in the block that iterates over
payload.FilterConfiguration.Criteria (referencing filtersFromEtcd and the
criteria slice of skyeEtcd.Criteria) to perform strict validation: if a
filter.ColumnName is not present in filtersFromEtcd, immediately return an error
(or propagate a failure) indicating the missing filter instead of skipping it;
otherwise continue building and appending the skyeEtcd.Criteria entries
(including the existing NOT_EQUALS swap logic) so the caller fails fast when any
requested filter is absent.

569-630: ⚠️ Potential issue | 🔴 Critical

Make RT partition allocation atomic to prevent duplicate assignments.

Line [570] through Line [630] does a read → choose → write flow without atomic reservation. Two concurrent approvals can pick the same partition and register conflicting assignments.

🔧 Suggested fix (allocate/release via ETCD atomic claim)
-			rtPartitions := make(map[int]bool)
-			entities, err := s.EtcdConfig.GetEntities()
-			if err != nil {
-				return fmt.Errorf("failed to get entities from etcd: %w", err)
-			}
-			for _, models := range entities {
-				for _, modelConfig := range models.Models {
-					for _, variant := range modelConfig.Variants {
-						if variant.RTPartition > 0 {
-							rtPartitions[variant.RTPartition] = true
-						}
-					}
-				}
-			}
-			availablePartitions := []int{}
-			for i := 1; i <= 256; i++ {
-				if _, exists := rtPartitions[i]; !exists {
-					availablePartitions = append(availablePartitions, i)
-				}
-			}
-			if len(availablePartitions) == 0 {
-				return fmt.Errorf("no RT partitions available (all 1-256 are used)")
-			}
-			seed := time.Now().UnixNano()
-			rnd := int(seed % int64(len(availablePartitions)))
-			if rnd < 0 {
-				rnd = -rnd
-			}
+			rtPartition, err := s.EtcdConfig.AllocateRTPartition(1, 256)
+			if err != nil {
+				return fmt.Errorf("failed to allocate RT partition: %w", err)
+			}

 			if err := s.EtcdConfig.RegisterVariant(payload.Entity, payload.Model, payload.Variant,
 				approval.AdminVectorDBConfig, payload.VectorDBType, criteria,
 				payload.Type, approval.AdminCachingConfiguration.DistributedCachingEnabled, approval.AdminCachingConfiguration.DistributedCacheTTLSeconds,
 				approval.AdminCachingConfiguration.InMemoryCachingEnabled, approval.AdminCachingConfiguration.InMemoryCacheTTLSeconds,
 				approval.AdminCachingConfiguration.EmbeddingRetrievalInMemoryConfig.Enabled, approval.AdminCachingConfiguration.EmbeddingRetrievalInMemoryConfig.TTL,
 				approval.AdminCachingConfiguration.EmbeddingRetrievalDistributedConfig.Enabled, approval.AdminCachingConfiguration.EmbeddingRetrievalDistributedConfig.TTL,
 				approval.AdminCachingConfiguration.DotProductInMemoryConfig.Enabled, approval.AdminCachingConfiguration.DotProductInMemoryConfig.TTL,
 				approval.AdminCachingConfiguration.DotProductDistributedConfig.Enabled, approval.AdminCachingConfiguration.DotProductDistributedConfig.TTL,
-				availablePartitions[rnd], approval.AdminRateLimiter); err != nil {
+				rtPartition, approval.AdminRateLimiter); err != nil {
+				_ = s.EtcdConfig.ReleaseRTPartition(rtPartition)
 				log.Error().Err(err).Msgf("Failed to register variant in ETCD for variant: %s", payload.Variant)
 				return fmt.Errorf("failed to register variant in ETCD: %w", err)
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@horizon/internal/skye/handler/skye.go` around lines 569 - 630, The code in
the RT partition selection (building rtPartitions, computing availablePartitions
and choosing availablePartitions[rnd]) is racy because it does a
read→choose→write; change it to perform an atomic claim against etcd instead of
relying on a later RegisterVariant write: implement an
EtcdConfig.ClaimRTPartition(partition int, entity, model, variant string) (or
extend RegisterVariant to perform the claim) that uses an etcd transaction
(Compare on absence/expected value then Put) to reserve the partition, and
update this flow to: compute availablePartitions, pick a candidate (random or
shuffle), attempt ClaimRTPartition in a short retry loop (failover to next
candidate on txn conflict), and only call s.EtcdConfig.RegisterVariant after a
successful claim (or let RegisterVariant perform the claim and return conflict
error which you then retry); ensure you release the claim on error path if
necessary and surface clear conflict errors for retry logic.
interaction-store/internal/compression/zstd.go (1)

32-52: ⚠️ Potential issue | 🟠 Major

Data race: unsynchronized reads in double-checked locking.

The first if encoder != nil / if decoder != nil reads a package-level pointer without holding mut. Under Go's memory model this is a data race — a concurrent goroutine writing inside the lock races with a concurrent reader on the fast path. The sync.Mutex added here does not fix it; it only protects the inner check.

Use sync.Once instead, which is the standard Go idiom for exactly this pattern:

🛡️ Proposed fix using `sync.Once`
 var (
-    encoder *ZStdEncoder
-    decoder *ZStdDecoder
-    mut     sync.Mutex
+    encoder     *ZStdEncoder
+    encoderOnce sync.Once
+    encoderErr  error
+
+    decoder     *ZStdDecoder
+    decoderOnce sync.Once
+    decoderErr  error

     // decodeBufPool ...
     decodeBufPool = sync.Pool{ ... }
 )

 func NewZStdEncoder() (*ZStdEncoder, error) {
-    if encoder != nil {
-        return encoder, nil
-    }
-    mut.Lock()
-    defer mut.Unlock()
-    if encoder != nil {
-        return encoder, nil
-    }
-    enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
-    if err != nil {
-        return nil, err
-    }
-    encoder = &ZStdEncoder{encoder: enc}
-    return encoder, nil
+    encoderOnce.Do(func() {
+        enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
+        if err != nil {
+            encoderErr = err
+            return
+        }
+        encoder = &ZStdEncoder{encoder: enc}
+    })
+    return encoder, encoderErr
 }

Apply the same pattern to NewZStdDecoder.

Also applies to: 62-83

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@interaction-store/internal/compression/zstd.go` around lines 32 - 52, The
fast-path unsynchronized reads of the package-level pointers in NewZStdEncoder
(and likewise NewZStdDecoder) cause a data race; replace the double-checked
locking with sync.Once: introduce package-level sync.Once variables (e.g.,
encoderOnce, decoderOnce) and move the zstd.NewWriter/zstd.NewReader
initialization into a closure passed to encoderOnce.Do / decoderOnce.Do that
sets the package-level encoder/decoder and captures any init error in a
package-level err var; remove the initial unsynchronized if checks and mutex
usage, and have NewZStdEncoder/NewZStdDecoder call the appropriate once.Do then
return the initialized pointer and error variables (ensuring the init error is
returned if set).
online-feature-store/internal/handler/feature/retrieve.go (1)

493-517: ⚠️ Potential issue | 🟠 Major

Pre-existing data race on missingDataKeys in the non-Redis code path.

missingDataKeys (line 508) is appended from multiple goroutines launched at line 496 without synchronization. In the non-Redis branch (lines 463–489), concurrent goroutines call append() on the same slice, which is a data race. Use a mutex to protect the slice or collect results into a channel instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@online-feature-store/internal/handler/feature/retrieve.go` around lines 493 -
517, The goroutines launched in the loop that call store.RetrieveV2(...) append
to the shared slice missingDataKeys without synchronization, causing a data
race; protect access by serializing writes (e.g., add a mutex and lock/unlock
around missingDataKeys = append(...)) or send missing keys back on a channel and
collect them in the parent goroutine; update the closure inside the loop that
uses pkMap, keys[i], keyIdx and writes to missingDataKeys and fgDataChan
accordingly (ensure the goroutine captures copies of loop variables) so that all
mutations to missingDataKeys are performed under the mutex (or moved to the
collector) while retaining current behavior for sending FGData to fgDataChan and
calling wg.Done().
interaction-store/internal/handler/retrieve/order.go (1)

259-272: ⚠️ Potential issue | 🟡 Minor

Remove buildDeserialisedPermanentStorageDataBlocks and its unit tests—it's dead code replaced by inline deserialization in deserializeWeeks.

The Retrieve method calls deserializeWeeks (line 166), which directly inlines blocks.DeserializePSDB without using this method. No production code references this method. Remove the method definition (lines 259–272) and the five dedicated unit tests (TestOrderRetrieveHandler_buildDeserialisedPermanentStorageDataBlocks_* in order_test.go).

Note: The same dead code pattern exists in ClickRetrieveHandler (click.go:265 and related tests).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@interaction-store/internal/handler/retrieve/order.go` around lines 259 - 272,
Remove the dead helper buildDeserialisedPermanentStorageDataBlocks from
OrderRetrieveHandler and its unit tests: the code path is no longer used because
Deserialize logic is in deserializeWeeks (called by Retrieve), so delete the
function buildDeserialisedPermanentStorageDataBlocks and remove the five tests
named TestOrderRetrieveHandler_buildDeserialisedPermanentStorageDataBlocks_* in
order_test.go; also mirror the same cleanup for the ClickRetrieveHandler (remove
click.go's equivalent function and its related tests) to keep handlers
consistent and eliminate unused code.
🧹 Nitpick comments (10)
interaction-store/internal/utils/utils_test.go (2)

10-16: Strengthen this case with an exact expected week assertion.

Line 12 uses a fixed date and the comment already states ISO week 3, but the current 1..53 bounds won’t catch incorrect week calculations.

Suggested test tightening
 func TestWeekFromTimestampMs(t *testing.T) {
 	// Jan 15, 2024 is in ISO week 3
 	ts := time.Date(2024, 1, 15, 12, 0, 0, 0, time.UTC).UnixMilli()
 	week := WeekFromTimestampMs(ts)
-	assert.GreaterOrEqual(t, week, 1)
-	assert.LessOrEqual(t, week, 53)
+	assert.Equal(t, 3, week)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@interaction-store/internal/utils/utils_test.go` around lines 10 - 16, The
test TestWeekFromTimestampMs is too loose—replace the range assertions with an
exact equality check against the known ISO week number for Jan 15, 2024; call
WeekFromTimestampMs(ts) and assert that the returned week equals 3 (use the test
helper's equality assertion, e.g. assert.Equal with expected 3 and actual week)
so the test fails on incorrect week calculations.

49-54: Make the 25-week expectation exact to catch off-by-one regressions.

Given fixed UTC boundaries (Line 50 and Line 51), this should be deterministically 25 weeks; >= 24 is too permissive.

Suggested assertion update
 func TestTimestampDiffInWeeks_25Weeks(t *testing.T) {
 	start := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
 	end := time.Date(2024, 6, 24, 0, 0, 0, 0, time.UTC).UnixMilli() // ~25 weeks
 	diff := TimestampDiffInWeeks(start, end)
-	assert.GreaterOrEqual(t, diff, 24)
+	assert.Equal(t, 25, diff)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@interaction-store/internal/utils/utils_test.go` around lines 49 - 54, The
test TestTimestampDiffInWeeks_25Weeks is too permissive (assert.GreaterOrEqual
>= 24); change it to assert the exact expected value by asserting diff == 25 so
it catches off-by-one regressions — update the assertion in
TestTimestampDiffInWeeks_25Weeks to use assert.Equal(t, 25, diff) (referencing
the TimestampDiffInWeeks call) to enforce the deterministic 25-week result.
online-feature-store/internal/handler/feature/retrieve_test.go (1)

563-569: Remove commented-out mock code.

The commented-out GetFeatureGroup mock (Lines 565–568) is dead code. If it was intentionally removed because string types skip this call, the comment on Line 564 already documents that. The commented block adds noise.

🧹 Proposed cleanup
 		setupMock: func(m *config.MockConfigManager) {
-			// Mock GetFeatureGroup for vector check only (string type skips default value)
-			//m.On("GetFeatureGroup", "user", "user_info").Return(&config.FeatureGroup{
-			//	Id:            1,
-			//	ActiveVersion: "1",
-			//}, nil).Once()
+			// String type skips default value fetch; no mocks required.
 		},
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@online-feature-store/internal/handler/feature/retrieve_test.go` around lines
563 - 569, Remove the dead commented-out mock block inside the setupMock
function: delete the commented lines that call m.On("GetFeatureGroup", "user",
"user_info").Return(&config.FeatureGroup{...}, nil).Once() and its surrounding
comment markers so only the existing comment about string types skipping the
call remains; this cleans up dead code in the setupMock closure that references
config.MockConfigManager and GetFeatureGroup.
interaction-store/internal/handler/interaction.go (1)

126-159: Redundant inner wantClick/wantOrder checks inside if wantClick && wantOrder block.

Lines 130 and 140 guard goroutine launches with if wantClick and if wantOrder respectively, but both are trivially true because the enclosing block already requires both to be true (line 126). Remove the inner checks to reduce noise.

♻️ Proposed cleanup
 if wantClick && wantOrder {
     var clickProto []*timeseries.ClickEvent
     var orderProto []*timeseries.OrderEvent
     g := new(errgroup.Group)
-    if wantClick {
-        g.Go(func() error {
-            events, err := h.clickRetrieveHandler.Retrieve(req.UserId, req.StartTimestamp, req.EndTimestamp, req.Limit)
-            if err != nil {
-                return err
-            }
-            clickProto = h.convertClickEventsToProto(events)
-            return nil
-        })
-    }
-    if wantOrder {
-        g.Go(func() error {
-            events, err := h.orderRetrieveHandler.Retrieve(req.UserId, req.StartTimestamp, req.EndTimestamp, req.Limit)
-            if err != nil {
-                return err
-            }
-            orderProto = h.convertOrderEventsToProto(events)
-            return nil
-        })
-    }
+    g.Go(func() error {
+        events, err := h.clickRetrieveHandler.Retrieve(req.UserId, req.StartTimestamp, req.EndTimestamp, req.Limit)
+        if err != nil {
+            return err
+        }
+        clickProto = h.convertClickEventsToProto(events)
+        return nil
+    })
+    g.Go(func() error {
+        events, err := h.orderRetrieveHandler.Retrieve(req.UserId, req.StartTimestamp, req.EndTimestamp, req.Limit)
+        if err != nil {
+            return err
+        }
+        orderProto = h.convertOrderEventsToProto(events)
+        return nil
+    })
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@interaction-store/internal/handler/interaction.go` around lines 126 - 159,
Inside the block gated by if wantClick && wantOrder, remove the redundant inner
if wantClick and if wantOrder checks and instead directly start the two
goroutines on the errgroup (g.Go) that call
h.clickRetrieveHandler.Retrieve(...)/h.orderRetrieveHandler.Retrieve(...),
assign clickProto via h.convertClickEventsToProto(...) and orderProto via
h.convertOrderEventsToProto(...), keeping the existing error returns and the
g.Wait() handling intact; this cleans up noise without changing behavior.
interaction-store/internal/data/scylla/scylla_test.go (1)

9-57: Add a collision regression test for composite key encoding.

The suite does not yet catch ambiguous key collisions when keyspace/table/column tokens include separators (e.g., _ and ,). Add one negative/regression case so future key-format changes remain safe.

Suggested additional test cases
+func TestQueryCacheKey_NoCollisionAcrossDifferentInputs(t *testing.T) {
+	k1 := getRetrieveQueryCacheKey("a_b", "c", []string{"d"})
+	k2 := getRetrieveQueryCacheKey("a", "b_c", []string{"d"})
+	assert.NotEqual(t, k1, k2)
+}
+
+func TestUpdateQueryCacheKey_NoCollisionAcrossDifferentInputs(t *testing.T) {
+	k1 := getUpdateQueryCacheKey("a_b", "c", []string{"d"})
+	k2 := getUpdateQueryCacheKey("a", "b_c", []string{"d"})
+	assert.NotEqual(t, k1, k2)
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@interaction-store/internal/data/scylla/scylla_test.go` around lines 9 - 57,
Add regression tests that ensure composite key encoding doesn't collide when
tokens contain separators: create two distinct inputs that would collide under
naive joining (e.g., columns or table values containing underscores/commas such
as columns {"a_b"} vs {"a","b_c"} or table "t1_a" vs table "t1" with column
"a"), call getRetrieveQueryCacheKey and getUpdateQueryCacheKey for both inputs
and assert the returned keys are not equal; add tests named like
TestGetRetrieveQueryCacheKey_NoCollision_WithSeparators and
TestGetUpdateQueryCacheKey_NoCollision_WithSeparators referencing
getRetrieveQueryCacheKey and getUpdateQueryCacheKey and using assert.NotEqual to
prevent regressions.
interaction-store/internal/compression/zstd_test.go (1)

111-115: Missing Type(99) coverage — inconsistent with encoder test.

TestGetEncoder_Unsupported also exercises Type(99) to confirm the default/fallthrough case; TestGetDecoder_Unsupported doesn't.

♻️ Proposed addition
 func TestGetDecoder_Unsupported(t *testing.T) {
     dec, err := GetDecoder(TypeNone)
     assert.Error(t, err)
     assert.Nil(t, dec)
     assert.Contains(t, err.Error(), "unsupported compression type")
+
+    dec, err = GetDecoder(Type(99))
+    assert.Error(t, err)
+    assert.Nil(t, dec)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@interaction-store/internal/compression/zstd_test.go` around lines 111 - 115,
Update the TestGetDecoder_Unsupported test to also exercise the
fallthrough/default case by calling GetDecoder(Type(99)) in addition to
GetDecoder(TypeNone); assert that the returned decoder is nil, err is non-nil,
and err.Error() contains "unsupported compression type" (mirroring
TestGetEncoder_Unsupported). Locate TestGetDecoder_Unsupported and add the
second call and assertions against GetDecoder(Type(99)) so both unsupported
paths are covered.
interaction-store/internal/compression/zstd.go (1)

86-112: Simplify redundant type assertions — pool invariant is always *[]byte.

Since decodeBufPool.New exclusively returns *[]byte, the if p, ok := bufPtr.(*[]byte); ok checks never fail. The two-value form hides pool-buffer leaks if the invariant were ever violated, and adds noise. Use a direct assertion to make the invariant explicit and panic loudly on violation:

♻️ Proposed simplification
 func (d *ZStdDecoder) Decode(cdata []byte) (data []byte, err error) {
-    bufPtr := decodeBufPool.Get()
-    var buf []byte
-    if p, ok := bufPtr.(*[]byte); ok {
-        buf = *p
-    }
-    decoded, err := d.decoder.DecodeAll(cdata, buf[:0])
+    p := decodeBufPool.Get().(*[]byte)
+    buf := *p
+    decoded, err := d.decoder.DecodeAll(cdata, buf[:0])
     if err != nil {
-        if p, ok := bufPtr.(*[]byte); ok {
-            *p = (*p)[:0]
-            decodeBufPool.Put(bufPtr)
-        }
+        *p = decoded[:0]
+        decodeBufPool.Put(p)
         return nil, err
     }
     result := make([]byte, len(decoded))
     copy(result, decoded)
-    if p, ok := bufPtr.(*[]byte); ok {
-        if cap(decoded) <= decodeBufPoolCap {
-            *p = (*p)[:0]
-            decodeBufPool.Put(bufPtr)
-        } else {
-            ptr := new([]byte)
-            *ptr = decoded[:0]
-            decodeBufPool.Put(ptr)
-        }
+    if cap(decoded) <= decodeBufPoolCap {
+        *p = decoded[:0]
+        decodeBufPool.Put(p)
+    } else {
+        *p = decoded[:0]
+        decodeBufPool.Put(p)
     }
     return result, nil
 }

Note: the else branch can now also simply reuse p (updated to point at the grown buffer) rather than allocating a new ptr, which eliminates one extra new([]byte) allocation per decode cycle once the pool holds large buffers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@interaction-store/internal/compression/zstd.go` around lines 86 - 112,
Replace the repeated two-value type assertions on bufPtr with a direct
single-value assertion (e.g., p := bufPtr.(*[]byte)) so the code enforces the
pool invariant and panics if violated; update the error-path and success-path to
use that p directly (set *p = (*p)[:0] before decodeBufPool.Put(p) on error),
and in the grown-buffer branch reuse p by assigning *p = decoded[:0] and putting
p back into decodeBufPool instead of allocating a new ptr; keep the make/copy
into result and the call to d.decoder.DecodeAll unchanged and retain the
capacity check against decodeBufPoolCap.
interaction-store/internal/handler/retrieve/order.go (1)

241-248: break assumes events are sorted descending within each week — fragile contract.

Line 247 uses break when ts < startTimestampMs, assuming that all subsequent events in the same week will also be below startTimestampMs. This relies on events being stored in strictly descending timestamp order. While the persist path enforces this via sort.Slice, any data corruption or alternative write path would silently drop valid events. Using continue instead of break would be safer, at a minor performance cost.

Suggested change
 		if ts < startTimestampMs {
-			break
+			continue
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@interaction-store/internal/handler/retrieve/order.go` around lines 241 - 248,
In the loop inside the retrieve handler that iterates events (the for i := range
events block in order.go), don't assume per-week events are strictly sorted:
replace the current break when ts < startTimestampMs with a continue so a single
out-of-order older event doesn't stop processing the rest; update the loop in
the method that processes events (the events iteration in the retrieve/order.go
handler) to use continue and add a short comment noting we no longer rely on
descending order.
interaction-store/internal/handler/persist/click.go (1)

38-267: Significant code duplication between ClickPersistHandler and OrderPersistHandler.

The structure of click.go and order.go is nearly identical: partitionEventsByBucket, getColumnsForBucket, deserializeExistingData, mergeEvents, persistEvents, updateMetadata, buildPermanentStorageDataBlock, etc. Consider extracting a generic persist handler (e.g., using generics or a shared base with type-specific callbacks) to reduce duplication. Not blocking for this PR.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@interaction-store/internal/handler/persist/click.go` around lines 38 - 267,
Duplicate persist logic exists between ClickPersistHandler and
OrderPersistHandler; extract a shared/generic persist framework and have
ClickPersistHandler implement only type-specific hooks. Create a generic
PersistHandler (or base with generics) that implements partitionEventsByBucket,
getColumnsForBucket, deserializeExistingData, mergeEvents, persistEvents,
updateMetadata flow and accepts callbacks/interfaces for type-specific
operations: GetTableName/GetMetadataTableName, InteractionType/enum,
BuildPermanentStorageDataBlock, RetrieveExistingEvents (e.g.,
getExistingClickEvents), and any model-specific typing; then refactor
ClickPersistHandler to delegate to the generic handler and only implement those
callbacks (keep function names like partitionEventsByBucket, persistToBucket,
getColumnsForBucket, deserializeExistingData, mergeEvents, persistEvents,
updateMetadata, buildPermanentStorageDataBlock, getTableName,
getMetadataTableName, getExistingClickEvents as the hooks).
interaction-store/internal/handler/retrieve/retrieve_test.go (1)

68-76: Add a 24 weeks + 1ms boundary test.

This suite has the exact-boundary case, but not the first-invalid case immediately above it. Adding it will prevent regressions in range precision logic.

🧪 Suggested test addition
 func TestValidateTimeRange_ValidExactly24Weeks(t *testing.T) {
 	// Exactly 24 weeks apart, both in the past
 	start := time.Now().Add(-25 * 7 * 24 * time.Hour).UnixMilli()
 	end := time.Now().Add(-1 * 7 * 24 * time.Hour).UnixMilli()

 	err := validateTimeRange(start, end, 1)

 	assert.NoError(t, err)
 }
+
+func TestValidateTimeRange_Exceeds24WeeksByOneMillisecond(t *testing.T) {
+	end := time.Now().Add(-1 * time.Hour).UnixMilli()
+	start := end - (24 * 7 * 24 * 60 * 60 * 1000) - 1
+
+	err := validateTimeRange(start, end, 1)
+
+	assert.ErrorIs(t, err, ErrTimeRangeExceeded)
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@interaction-store/internal/handler/retrieve/retrieve_test.go` around lines 68
- 76, Add a new unit test that mirrors TestValidateTimeRange_ValidExactly24Weeks
but asserts invalid when the range exceeds 24 weeks by 1ms: create a test (e.g.,
TestValidateTimeRange_InvalidJustOver24Weeks) that sets start and end timestamps
so end - start == 24 weeks + 1 millisecond (both in the past), call
validateTimeRange(start, end, 1) and assert that it returns an error; reference
the existing TestValidateTimeRange_ValidExactly24Weeks and validateTimeRange to
place and structure the new test consistently.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@interaction-store/internal/compression/zstd_test.go`:
- Around line 103-105: NewZStdEncoder's error is ignored which can leave enc nil
and cause a nil-pointer panic when calling enc.Encode; change the test to
capture the error (enc, err := NewZStdEncoder()), check err and fail the test
clearly (e.g., t.Fatalf or require.NoError) before using enc, so Encode is only
called on a valid encoder instance.

In `@interaction-store/internal/data/scylla/scylla.go`:
- Around line 182-207: The cache key functions getRetrieveQueryCacheKey and
getUpdateQueryCacheKey build ambiguous keys using raw separators; replace them
with an unambiguous, length-prefixed encoding for each component: write length +
":" + value for keyspace, tableName, each column (iterate columns and encode
each as len:col), and then append the action suffix ("retrieve"/"update") also
length-prefixed; update the strings.Builder Grow estimate accordingly (sum of
lengths of values + digits for lengths + separators) and remove reliance on
cacheKeySep so different inputs cannot collide.
- Around line 52-57: The retrieve path currently swallows errors: change
executeRetrieveInteractions and executeRetrieveMetadata to return
(map[string]interface{}, error) instead of silently logging and returning empty
maps; update RetrieveInteractions to call the new signature, capture and return
any error from executeRetrieveInteractions (and propagate iterator.Close and
iterator.Err errors instead of logging them), and only record the metric after
calling the execute function; ensure createRetrieveInteractionsPreparedQuery/use
of s.session and s.keyspace remain unchanged while updating callers to handle
and propagate the returned errors.

In `@interaction-store/internal/handler/retrieve/retrieve.go`:
- Around line 29-30: The current check uses utils.TimestampDiffInWeeks which
floors partial weeks and lets ranges like "24 weeks + 1ms" pass; change the
validation to compare the raw millisecond difference instead of the floored week
count: compute diffMs := endTimestampMs - startTimestampMs and fail with
ErrTimeRangeExceeded if diffMs > constants.TotalWeeks * 7 * 24 * 60 * 60 * 1000
(or introduce a constants.TotalWeeksMs), keeping references to startTimestampMs,
endTimestampMs, constants.TotalWeeks and ErrTimeRangeExceeded so the intent is
clear.

In `@online-feature-store/internal/handler/feature/features.go`:
- Line 181: Add a bounds check to prevent indexing past stringLengths: before
calling blocks.HelperVectorFeatureStringToConcatenatedString with
stringLengths[sequence], ensure sequence >= 0 && sequence < len(stringLengths)
(the existing sequence == -1 check handles negatives). If the check fails,
return an error (or appropriate empty result) and log the condition so callers
don’t hit a runtime panic; apply the same fix to the GetFeature function in
retrieve.go where it indexes stringLengths at the same pattern. Ensure you
reference the sequence variable and stringLengths slice and update the error
path consistently where decodedValue is produced.

---

Outside diff comments:
In `@horizon/internal/skye/handler/skye.go`:
- Around line 605-620: The loop that builds criteria in skye.go currently
ignores unknown filters and silently omits them; update the logic in the block
that iterates over payload.FilterConfiguration.Criteria (referencing
filtersFromEtcd and the criteria slice of skyeEtcd.Criteria) to perform strict
validation: if a filter.ColumnName is not present in filtersFromEtcd,
immediately return an error (or propagate a failure) indicating the missing
filter instead of skipping it; otherwise continue building and appending the
skyeEtcd.Criteria entries (including the existing NOT_EQUALS swap logic) so the
caller fails fast when any requested filter is absent.
- Around line 569-630: The code in the RT partition selection (building
rtPartitions, computing availablePartitions and choosing
availablePartitions[rnd]) is racy because it does a read→choose→write; change it
to perform an atomic claim against etcd instead of relying on a later
RegisterVariant write: implement an EtcdConfig.ClaimRTPartition(partition int,
entity, model, variant string) (or extend RegisterVariant to perform the claim)
that uses an etcd transaction (Compare on absence/expected value then Put) to
reserve the partition, and update this flow to: compute availablePartitions,
pick a candidate (random or shuffle), attempt ClaimRTPartition in a short retry
loop (failover to next candidate on txn conflict), and only call
s.EtcdConfig.RegisterVariant after a successful claim (or let RegisterVariant
perform the claim and return conflict error which you then retry); ensure you
release the claim on error path if necessary and surface clear conflict errors
for retry logic.

In `@interaction-store/internal/compression/zstd.go`:
- Around line 32-52: The fast-path unsynchronized reads of the package-level
pointers in NewZStdEncoder (and likewise NewZStdDecoder) cause a data race;
replace the double-checked locking with sync.Once: introduce package-level
sync.Once variables (e.g., encoderOnce, decoderOnce) and move the
zstd.NewWriter/zstd.NewReader initialization into a closure passed to
encoderOnce.Do / decoderOnce.Do that sets the package-level encoder/decoder and
captures any init error in a package-level err var; remove the initial
unsynchronized if checks and mutex usage, and have NewZStdEncoder/NewZStdDecoder
call the appropriate once.Do then return the initialized pointer and error
variables (ensuring the init error is returned if set).

In `@interaction-store/internal/handler/retrieve/order.go`:
- Around line 259-272: Remove the dead helper
buildDeserialisedPermanentStorageDataBlocks from OrderRetrieveHandler and its
unit tests: the code path is no longer used because Deserialize logic is in
deserializeWeeks (called by Retrieve), so delete the function
buildDeserialisedPermanentStorageDataBlocks and remove the five tests named
TestOrderRetrieveHandler_buildDeserialisedPermanentStorageDataBlocks_* in
order_test.go; also mirror the same cleanup for the ClickRetrieveHandler (remove
click.go's equivalent function and its related tests) to keep handlers
consistent and eliminate unused code.

In `@online-feature-store/internal/handler/feature/retrieve_test.go`:
- Around line 648-652: The test expects the error returned from
preProcessForKeys when config.GetFeatureGroup fails to be prefixed with "failed
to get feature group..."; update the production call in preProcessForKeys that
wraps errors from GetFeatureGroup to use the prefix "failed to get feature group
..." instead of "failed to get active schema for feature group..."; ensure the
other call that wraps GetActiveFeatureSchema still uses "failed to get active
schema for feature group..." so semantics remain correct (refer to
functions/methods preProcessForKeys, GetFeatureGroup, and
GetActiveFeatureSchema).

In `@online-feature-store/internal/handler/feature/retrieve.go`:
- Around line 493-517: The goroutines launched in the loop that call
store.RetrieveV2(...) append to the shared slice missingDataKeys without
synchronization, causing a data race; protect access by serializing writes
(e.g., add a mutex and lock/unlock around missingDataKeys = append(...)) or send
missing keys back on a channel and collect them in the parent goroutine; update
the closure inside the loop that uses pkMap, keys[i], keyIdx and writes to
missingDataKeys and fgDataChan accordingly (ensure the goroutine captures copies
of loop variables) so that all mutations to missingDataKeys are performed under
the mutex (or moved to the collector) while retaining current behavior for
sending FGData to fgDataChan and calling wg.Done().

---

Nitpick comments:
In `@interaction-store/internal/compression/zstd_test.go`:
- Around line 111-115: Update the TestGetDecoder_Unsupported test to also
exercise the fallthrough/default case by calling GetDecoder(Type(99)) in
addition to GetDecoder(TypeNone); assert that the returned decoder is nil, err
is non-nil, and err.Error() contains "unsupported compression type" (mirroring
TestGetEncoder_Unsupported). Locate TestGetDecoder_Unsupported and add the
second call and assertions against GetDecoder(Type(99)) so both unsupported
paths are covered.

In `@interaction-store/internal/compression/zstd.go`:
- Around line 86-112: Replace the repeated two-value type assertions on bufPtr
with a direct single-value assertion (e.g., p := bufPtr.(*[]byte)) so the code
enforces the pool invariant and panics if violated; update the error-path and
success-path to use that p directly (set *p = (*p)[:0] before
decodeBufPool.Put(p) on error), and in the grown-buffer branch reuse p by
assigning *p = decoded[:0] and putting p back into decodeBufPool instead of
allocating a new ptr; keep the make/copy into result and the call to
d.decoder.DecodeAll unchanged and retain the capacity check against
decodeBufPoolCap.

In `@interaction-store/internal/data/scylla/scylla_test.go`:
- Around line 9-57: Add regression tests that ensure composite key encoding
doesn't collide when tokens contain separators: create two distinct inputs that
would collide under naive joining (e.g., columns or table values containing
underscores/commas such as columns {"a_b"} vs {"a","b_c"} or table "t1_a" vs
table "t1" with column "a"), call getRetrieveQueryCacheKey and
getUpdateQueryCacheKey for both inputs and assert the returned keys are not
equal; add tests named like
TestGetRetrieveQueryCacheKey_NoCollision_WithSeparators and
TestGetUpdateQueryCacheKey_NoCollision_WithSeparators referencing
getRetrieveQueryCacheKey and getUpdateQueryCacheKey and using assert.NotEqual to
prevent regressions.

In `@interaction-store/internal/handler/interaction.go`:
- Around line 126-159: Inside the block gated by if wantClick && wantOrder,
remove the redundant inner if wantClick and if wantOrder checks and instead
directly start the two goroutines on the errgroup (g.Go) that call
h.clickRetrieveHandler.Retrieve(...)/h.orderRetrieveHandler.Retrieve(...),
assign clickProto via h.convertClickEventsToProto(...) and orderProto via
h.convertOrderEventsToProto(...), keeping the existing error returns and the
g.Wait() handling intact; this cleans up noise without changing behavior.

In `@interaction-store/internal/handler/persist/click.go`:
- Around line 38-267: Duplicate persist logic exists between ClickPersistHandler
and OrderPersistHandler; extract a shared/generic persist framework and have
ClickPersistHandler implement only type-specific hooks. Create a generic
PersistHandler (or base with generics) that implements partitionEventsByBucket,
getColumnsForBucket, deserializeExistingData, mergeEvents, persistEvents,
updateMetadata flow and accepts callbacks/interfaces for type-specific
operations: GetTableName/GetMetadataTableName, InteractionType/enum,
BuildPermanentStorageDataBlock, RetrieveExistingEvents (e.g.,
getExistingClickEvents), and any model-specific typing; then refactor
ClickPersistHandler to delegate to the generic handler and only implement those
callbacks (keep function names like partitionEventsByBucket, persistToBucket,
getColumnsForBucket, deserializeExistingData, mergeEvents, persistEvents,
updateMetadata, buildPermanentStorageDataBlock, getTableName,
getMetadataTableName, getExistingClickEvents as the hooks).

In `@interaction-store/internal/handler/retrieve/order.go`:
- Around line 241-248: In the loop inside the retrieve handler that iterates
events (the for i := range events block in order.go), don't assume per-week
events are strictly sorted: replace the current break when ts < startTimestampMs
with a continue so a single out-of-order older event doesn't stop processing the
rest; update the loop in the method that processes events (the events iteration
in the retrieve/order.go handler) to use continue and add a short comment noting
we no longer rely on descending order.

In `@interaction-store/internal/handler/retrieve/retrieve_test.go`:
- Around line 68-76: Add a new unit test that mirrors
TestValidateTimeRange_ValidExactly24Weeks but asserts invalid when the range
exceeds 24 weeks by 1ms: create a test (e.g.,
TestValidateTimeRange_InvalidJustOver24Weeks) that sets start and end timestamps
so end - start == 24 weeks + 1 millisecond (both in the past), call
validateTimeRange(start, end, 1) and assert that it returns an error; reference
the existing TestValidateTimeRange_ValidExactly24Weeks and validateTimeRange to
place and structure the new test consistently.

In `@interaction-store/internal/utils/utils_test.go`:
- Around line 10-16: The test TestWeekFromTimestampMs is too loose—replace the
range assertions with an exact equality check against the known ISO week number
for Jan 15, 2024; call WeekFromTimestampMs(ts) and assert that the returned week
equals 3 (use the test helper's equality assertion, e.g. assert.Equal with
expected 3 and actual week) so the test fails on incorrect week calculations.
- Around line 49-54: The test TestTimestampDiffInWeeks_25Weeks is too permissive
(assert.GreaterOrEqual >= 24); change it to assert the exact expected value by
asserting diff == 25 so it catches off-by-one regressions — update the assertion
in TestTimestampDiffInWeeks_25Weeks to use assert.Equal(t, 25, diff)
(referencing the TimestampDiffInWeeks call) to enforce the deterministic 25-week
result.

In `@online-feature-store/internal/handler/feature/retrieve_test.go`:
- Around line 563-569: Remove the dead commented-out mock block inside the
setupMock function: delete the commented lines that call m.On("GetFeatureGroup",
"user", "user_info").Return(&config.FeatureGroup{...}, nil).Once() and its
surrounding comment markers so only the existing comment about string types
skipping the call remains; this cleans up dead code in the setupMock closure
that references config.MockConfigManager and GetFeatureGroup.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a59c278 and b2a88fb.

📒 Files selected for processing (24)
  • go-sdk/pkg/interaction-store/client.go
  • horizon/internal/skye/handler/skye.go
  • interaction-store/internal/compression/zstd.go
  • interaction-store/internal/compression/zstd_test.go
  • interaction-store/internal/constants/constants.go
  • interaction-store/internal/data/scylla/scylla.go
  • interaction-store/internal/data/scylla/scylla_test.go
  • interaction-store/internal/handler/interaction.go
  • interaction-store/internal/handler/persist/click.go
  • interaction-store/internal/handler/persist/click_test.go
  • interaction-store/internal/handler/persist/order.go
  • interaction-store/internal/handler/persist/order_test.go
  • interaction-store/internal/handler/retrieve/click.go
  • interaction-store/internal/handler/retrieve/click_test.go
  • interaction-store/internal/handler/retrieve/order.go
  • interaction-store/internal/handler/retrieve/order_test.go
  • interaction-store/internal/handler/retrieve/retrieve.go
  • interaction-store/internal/handler/retrieve/retrieve_test.go
  • interaction-store/internal/utils/utils_test.go
  • online-feature-store/internal/config/etcd.go
  • online-feature-store/internal/handler/feature/features.go
  • online-feature-store/internal/handler/feature/retrieve.go
  • online-feature-store/internal/handler/feature/retrieve_test.go
  • online-feature-store/internal/server/http/server.go
💤 Files with no reviewable changes (1)
  • online-feature-store/internal/server/http/server.go

@shubhamk-meesho shubhamk-meesho merged commit 01e8846 into main Feb 25, 2026
32 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants