Sync BMS#286
Conversation
Added Deployment related changes
fix: duplicate key query
Ingestion Revamp Internal Testing
This reverts commit e8dce9e.
Testing New GRPC Version
Added the json tags to admin ratelimiter
Revert "Added the json tags to admin ratelimiter"
* feat(Embedding Platform): Hierarchical Discovery (Entity->Model->Variant) * feat(Embedding Platform): Enhance API interactions and UI components for variant onboarding and management
* feat(Embedding Platform): Hierarchical Discovery (Entity->Model->Variant) * feat(Embedding Platform): Enhance API interactions and UI components for variant onboarding and management * refactor(Embedding Platform): Remove unused onboarding components and streamline API configurations and improve UI/UX
Fix Register Variant
update PR template
fix(interaction-store): make metadata update synchronous
* Added caller level metric in distributed cache, db * feat(online-feature-store): add caller_id and db (redis/scylla) to feature.retrieve.db.caller.requests metric Co-authored-by: Cursor <cursoragent@cursor.com> * Added confid as metric tag * changed inc factor --------- Co-authored-by: Cursor <cursoragent@cursor.com>
* Added otd path changes for Delta Model * Fix invalid caching checks
Feat/embedding fixes
|
Caution Review failedThe pull request is closed. WalkthroughAdds a new Interaction Store service (gRPC + HTTP) with PSDB serialization, Scylla persistence, consumers, compression, metrics, and tests; integrates Interaction Store into the Go SDK and CLI; updates CI to target Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client (SDK/HTTP/gRPC)
participant GRPC as Interaction Store gRPC Server
participant Persist as ClickPersistHandler
participant Scylla as ScyllaDB
rect rgba(135,206,250,0.5)
Client->>GRPC: PersistClickData RPC (userId, events)
GRPC->>Persist: Convert & dispatch Persist(userId, events)
Persist->>Scylla: RetrieveInteractions(table, userId, columns)
Scylla-->>Persist: existing PSDB blobs
Persist->>Persist: merge/trim events, build PSDB blocks
Persist->>Scylla: UpdateInteractions(table, userId, updated columns)
Scylla-->>Persist: ack
Persist-->>GRPC: PersistDataResponse
GRPC-->>Client: RPC response
end
sequenceDiagram
participant Scheduler as Cron Scheduler
participant Job as VariantScaleUpJob
participant Etcd as SkyeManager (ETCD)
participant Prism as PrismV2
participant Airflow as Airflow
participant Scylla as ScyllaDB
rect rgba(144,238,144,0.5)
Scheduler->>Job: trigger Run()
Job->>Etcd: GetVariantConfig(entity, model, variant)
Etcd-->>Job: variant config
Job->>Prism: UpdateStepParameters(...)
Prism-->>Job: ack
Job->>Airflow: TriggerDAG(dagRunID)
Airflow-->>Job: airflow response (dag_run_id)
Job->>Scylla: check collection status / create tables
Scylla-->>Job: collection status / ack
Job-->>Scheduler: mark task COMPLETED/IN_PROGRESS/FAILED
end
Comment |
There was a problem hiding this comment.
Actionable comments posted: 11
Note
Due to the large number of review comments, Critical severity comments were prioritized as inline comments.
🤖 Fix all issues with AI agents
In `@horizon/internal/externalcall/airflow_client.go`:
- Around line 125-134: The TriggerDAG handling currently returns an
AirflowResponse with Status="error" and a nil error on non-200 responses; change
it to return a non-nil error instead (consistent with ListDAGRuns and
GetDAGRun). In the TriggerDAG function, when resp.StatusCode != http.StatusOK,
replace the returned AirflowResponse with a nil response and return
fmt.Errorf("Airflow DAG trigger failed: HTTP %d: %s", resp.StatusCode,
string(body)) (and keep the log.Error call), so callers like TriggerDAG,
variant_onboarding_job.go and variant_scaleup_job.go will receive a proper error
rather than having to inspect AirflowResponse.Status.
In `@horizon/internal/middleware/middleware.go`:
- Line 160: The call to m.CheckScreenPermission has been commented out,
disabling role/screen authorization; restore enforcement by reintroducing the
call to m.CheckScreenPermission(c, claims) inside the middleware where
authentication is validated (or wrap it behind a clearly named, auditable
feature flag like EnableScreenPermissionEnforcement with an owner), and ensure
you handle its error/denial path by aborting the request with the correct HTTP
response (e.g., 403) and logging the denial; do not leave the check commented
out or silent—add explicit logging and a fail-closed response when
CheckScreenPermission returns an error or denies access.
In `@horizon/internal/repositories/scylla/skye_scylla.go`:
- Around line 78-111: CreateEmbeddingTable currently interpolates table and
column identifiers into CQL via fmt.Sprintf (using createEmbeddingTableQueryBase
and session.Query().Exec()) without validating them, allowing CQL injection; fix
by validating and sanitizing any identifier coming from user input (the
tableName passed to CreateEmbeddingTable and any column names derived from
variantsList) before using them in queries: enforce a strict whitelist regex
(e.g. ^[a-zA-Z_][a-zA-Z0-9_]{0,63}$), check length limits, and reject or return
an error for invalid identifiers, then build the variant column names only from
validated values (the variantColumnName derived in the loop) and only use
validated identifiers in tableExists and the fmt.Sprintf that produces the final
query; do not attempt to bind identifiers as query parameters—validate/escape
first and then use the validated strings when calling
s.session.Query(...).Exec().
In `@horizon/internal/repositories/sql/embedding/job_locks/repository.go`:
- Around line 13-16: The current AcquireLockNowait closes the DB connection
before returning (defer conn.Close()), so GET_LOCK() is released immediately and
ReleaseLock() runs on a different connection — make the lock connection live for
the caller by returning a lock handle tied to that connection instead of closing
it. Change the JobLocksRepository API: replace AcquireLockNowait(ctx, jobKey
string) error with AcquireLockNowait(ctx, jobKey string) (JobLock, error) (or
similar), add a JobLock struct that holds the dedicated *sql.Conn and jobKey and
exposes methods Release() error (which calls RELEASE_LOCK on the same
connection) and Close() error (to close the connection), update implementations
to remove the premature conn.Close() in AcquireLockNowait, implement
JobLock.Release to run RELEASE_LOCK on the stored conn, and adjust callers/tests
to call Release()/Close() on the returned JobLock rather than using the old
ReleaseLock(ctx, jobKey) signature.
In `@horizon/internal/skye/handler/skye.go`:
- Around line 470-474: The validation currently checks the wrong condition: in
skye.go where you inspect models.Models[variantPayload.Model].ModelType and
variantPayload.OTDTrainingDataPath, change the inequality check to an equality
check so the OTD path is required when ModelType IS DELTA (i.e., replace the !=
comparison with == for enums.ModelType(enums.DELTA)); update the error path
branch that returns fmt.Errorf("otd_training_data_path is required for DELTA
model type") accordingly to enforce the requirement only for DELTA variants.
In `@horizon/internal/skye/jobs/variant_scaleup_job.go`:
- Around line 70-143: The Run method of VariantScaleUpJob currently calls
triggerPrismAndAirflow while the task is still PENDING, which can cause
duplicate external DAG runs if the later UpdateStatus to IN_PROGRESS fails;
change the ordering in VariantScaleUpJob.Run to first call
j.taskRepo.UpdateStatus(pendingTask.TaskID, "IN_PROGRESS") before calling
j.triggerPrismAndAirflow(pendingTask), and if triggerPrismAndAirflow returns an
error, roll back the task state by calling
j.taskRepo.UpdateStatus(pendingTask.TaskID, "PENDING") (or an appropriate
failure state) and log the rollback; ensure error paths around
triggerPrismAndAirflow and the rollback are properly handled to avoid leaving
the task in an inconsistent state.
In `@interaction-store/internal/consumer/click/consumer.go`:
- Around line 27-35: Remove the unsynchronized nil pre-check and always call
syncOnce.Do to initialize the singleton: in newClickConsumer() eliminate the "if
clickConsumer == nil" block and invoke syncOnce.Do(func() { clickConsumer =
&ClickConsumer{ handler: persist.InitClickPersistHandler(), } })
unconditionally, then return clickConsumer; this ensures the initialization is
properly synchronized for the clickConsumer/ClickConsumer singleton using
syncOnce.
In `@interaction-store/internal/data/scylla/scylla.go`:
- Around line 52-59: The retrieve functions currently swallow errors—update
executeRetrieveInteractions to return (map[string]interface{}, error) instead of
nil on failure and ensure it returns a non-nil error when the DB call fails;
then change Scylla.RetrieveInteractions to capture the (response, err) from
executeRetrieveInteractions and return (response, err) instead of (response,
nil). Apply the same pattern to executeRetrieveMetadata and
Scylla.RetrieveMetadata so all DB retrievals propagate errors upstream rather
than returning nil success values.
In `@interaction-store/internal/handler/persist/click.go`:
- Around line 166-183: In mergeAndTrimEvents ensure you sort existing events
before using existing[0] as the largest timestamp: move the sort.Slice call so
existing is sorted in descending ClickedAt first, then compute largestTimestamp
from existing[0] and perform the 24-week diff check (clearing existing if
stale), then append newEvent and re-trim to maxClickEventsPerWeek; reference
function mergeAndTrimEvents and the use of existing[0], sort.Slice,
utils.TimestampDiffInWeeks, and maxClickEventsPerWeek when making the change.
In `@interaction-store/pkg/profiling/profiling.go`:
- Around line 12-27: Remove the unsynchronized `initialized` variable and any
reads/writes to it; rely solely on the existing `once sync.Once` to guarantee
single initialization. In `Init()` (and only there) keep the
`checkProfilingEnabled()` guard, then call `once.Do(func(){
initializeProfiling() })` and remove the `if initialized { ... }` branch and the
write to `initialized` (and its declaration). Ensure references are to `Init()`,
`once`, `initializeProfiling()`, and `checkProfilingEnabled()` so the file no
longer has the redundant `initialized` flag.
In `@interaction-store/scylla-init.cql`:
- Around line 1-7: The CREATE KEYSPACE statement defines
interaction_store_timeseries but the subsequent USE statement references
interaction_store; update one of them so both match. Locate the CREATE KEYSPACE
block (the CREATE KEYSPACE IF NOT EXISTS interaction_store_timeseries ...
statement) and the USE interaction_store statement and change the keyspace
identifier so both use the intended name (either interaction_store_timeseries or
interaction_store) consistently.
🟠 Major comments (46)
horizon/pkg/etcd/v1.go-432-437 (1)
432-437:⚠️ Potential issue | 🟠 MajorType mismatch bug:
int64value set intomap[string]intwill panic.
strconv.ParseIntreturnsint64, but when the map's value type isreflect.Int, usingreflect.ValueOf(floatVal)creates anint64value.SetMapIndexrequires exact type match and will panic at runtime formap[string]int.🐛 Proposed fix to convert to the correct type
case reflect.Int: floatVal, err := strconv.ParseInt(data, 10, 64) if err != nil { return err } - val.SetMapIndex(reflect.ValueOf(mapKey), reflect.ValueOf(floatVal)) + val.SetMapIndex(reflect.ValueOf(mapKey), reflect.ValueOf(int(floatVal)))interaction-store/pkg/scylla/options_util.go-41-49 (1)
41-49:⚠️ Potential issue | 🟠 MajorDocumentation claims defaults that aren't applied.
The docstring lists defaults like "(default: 600ms)" for optional variables, but the code doesn't actually apply these defaults—it only sets values when the env var is explicitly set. Either remove the default claims from documentation or apply actual defaults in code.
🐛 Option A: Remove misleading default claims
// Optional environment variables: -// - <envPrefix>_TIMEOUT_IN_MS (default: 600ms) -// - <envPrefix>_CONNECT_TIMEOUT_IN_MS (default: 600ms) -// - <envPrefix>_NUM_CONNS (default: 1) -// - <envPrefix>_MAX_PREPARED_STATEMENTS (default: 1000) -// - <envPrefix>_MAX_ROUTING_KEY_INFO (default: 1000) -// - <envPrefix>_PAGE_SIZE (default: 5000) -// - <envPrefix>_MAX_WAIT_SCHEMA_AGREEMENT (default: 60s) -// - <envPrefix>_RECONNECT_INTERVAL (default: 60s) -// - <envPrefix>_WRITE_COALESCE_WAIT_TIME (default: 200us) +// - <envPrefix>_TIMEOUT_IN_MS +// - <envPrefix>_CONNECT_TIMEOUT_IN_MS +// - <envPrefix>_NUM_CONNS +// - <envPrefix>_MAX_PREPARED_STATEMENTS +// - <envPrefix>_MAX_ROUTING_KEY_INFO +// - <envPrefix>_PAGE_SIZE +// - <envPrefix>_MAX_WAIT_SCHEMA_AGREEMENT (in seconds) +// - <envPrefix>_RECONNECT_INTERVAL (in seconds) +// - <envPrefix>_WRITE_COALESCE_WAIT_TIME (in microseconds)interaction-store/pkg/scylla/options_util.go-88-96 (1)
88-96:⚠️ Potential issue | 🟠 MajorInconsistent time unit suffixes may cause misconfiguration.
The docstring states: "In case of type duration, env name is env prefix + scylla config name + '_IN_MS'", but:
_MAX_WAIT_SCHEMA_AGREEMENTparses as seconds (no_IN_Ssuffix)_RECONNECT_INTERVALparses as seconds (no_IN_Ssuffix)_WRITE_COALESCE_WAIT_TIMEparses as microseconds (no_IN_USsuffix)Users expecting milliseconds (per the doc) will get values off by 1000x or 1,000,000x.
🐛 Suggested fix: Add unit suffixes for clarity
- maxWaitSchemaAgreementSuffix = "_MAX_WAIT_SCHEMA_AGREEMENT" - reconnectIntervalSuffix = "_RECONNECT_INTERVAL" - writeCoalesceWaitTimeSuffix = "_WRITE_COALESCE_WAIT_TIME" + maxWaitSchemaAgreementSuffix = "_MAX_WAIT_SCHEMA_AGREEMENT_IN_S" + reconnectIntervalSuffix = "_RECONNECT_INTERVAL_IN_S" + writeCoalesceWaitTimeSuffix = "_WRITE_COALESCE_WAIT_TIME_IN_US"And update the documentation at lines 47-49 to reflect the correct units.
horizon/internal/repositories/sql/embedding/variant_requests/repository.go-23-41 (1)
23-41:⚠️ Potential issue | 🟠 MajorGuard
db_nametype assertion to avoid startup panics.
meta["db_name"].(string)will panic if the key is missing or not a string; return a clear error instead.Proposed fix
meta, err := connection.GetMeta() if err != nil { return nil, err } - dbName := meta["db_name"].(string) + dbNameVal, ok := meta["db_name"] + if !ok { + return nil, errors.New("db_name missing from meta") + } + dbName, ok := dbNameVal.(string) + if !ok || dbName == "" { + return nil, errors.New("db_name must be a non-empty string") + }horizon/internal/repositories/sql/embedding/job_frequency_requests/repository.go-32-40 (1)
32-40:⚠️ Potential issue | 🟠 MajorUse an ok-check for the
db_nametype assertion to prevent panics.The unchecked type assertion
meta["db_name"].(string)will panic if the key is missing or the value is not a string. SinceGetMeta()returnsmap[string]interface{}with no validation of its contents, add a guard:meta, err := connection.GetMeta() if err != nil { return nil, err } - dbName := meta["db_name"].(string) + dbName, ok := meta["db_name"].(string) + if !ok || dbName == "" { + return nil, errors.New("db_name missing from connection metadata") + }online-feature-store/go.mod-16-16 (1)
16-16:⚠️ Potential issue | 🟠 MajorReview breaking changes introduced by redis/go-redis upgrade from v9.5.5 to v9.17.2.
This version jump spans multiple breaking changes requiring code updates:
- RediSearch dialect (v9.8.0+): Default changed from DIALECT 1 to DIALECT 2. Explicitly set dialect in
FTSearchWithArgs/FTAggregateWithArgsif relying on original behavior.- RedisGears/Triggers & Functions removed (v9.8.0+): Remove any usage of dropped Gears-related APIs.
- Connection buffer sizes (v9.12+): Defaults changed from 4096 to 32KiB. Set
ReadBufferSize/WriteBufferSizeexplicitly if memory or latency constraints require adjustment.- Error handling (v9.17.0+): Typed errors now recommended; replace string-based error checks with
errors.Is/errors.As.- RESP3 protocol: If using Redis Stack with older servers, consider pinning protocol to RESP2 via
Options{Protocol: 2}.The upgrade includes critical bug fixes for connection pool race conditions and timeout handling (v9.17.2 release notes). Run tests to confirm compatibility with the codebase.
horizon/internal/repositories/sql/embedding/job_locks/repository.go-29-41 (1)
29-41:⚠️ Potential issue | 🟠 MajorGuard metadata and session type assertions to avoid panics.
meta["db_name"].(string)andsession.(*gorm.DB)can panic if metadata is missing or the connection isn’t a *gorm.DB. Return a descriptive error instead.🔧 Proposed fix
- session, err := connection.GetConn() + session, err := connection.GetConn() if err != nil { return nil, err } meta, err := connection.GetMeta() if err != nil { return nil, err } - dbName := meta["db_name"].(string) + gormDB, ok := session.(*gorm.DB) + if !ok { + return nil, fmt.Errorf("expected *gorm.DB, got %T", session) + } + rawDBName, ok := meta["db_name"] + if !ok { + return nil, errors.New("db_name missing from metadata") + } + dbName, ok := rawDBName.(string) + if !ok || dbName == "" { + return nil, fmt.Errorf("db_name must be a non-empty string, got %T", rawDBName) + } return &jobLocksRepo{ - db: session.(*gorm.DB), + db: gormDB, dbName: dbName, }, nilhorizon/internal/repositories/sql/embedding/job_locks/repository.go-45-54 (1)
45-54:⚠️ Potential issue | 🟠 MajorFix error handling in EnsureRow to catch only duplicate-key errors.
Current logic silently ignores any Create error if the follow-up query succeeds—not just duplicate-key errors. This masks non-duplicate failures (e.g., connection loss, permission errors).
The proposed fix using
gorm.ErrDuplicatedKeywon't work in this codebase;TranslateError: trueis not enabled ingorm.Opencalls (seehorizon/pkg/infra/sql.go). Without it, GORM passes through native MySQL errors.Check for MySQL error code 1062 (duplicate entry) instead:
🔧 Corrected approach
Option 1: Check MySQL error code 1062 directly
lock := SkyeJobLock{JobKey: jobKey} if err := r.db.Create(&lock).Error; err != nil { - // If record exists, ensure by querying - var existing SkyeJobLock - if err2 := r.db.Where("job_key = ?", jobKey).First(&existing).Error; err2 != nil { - return fmt.Errorf("ensure row create failed: %w", err) - } + var mysqlErr *mysql.MySQLError + if !errors.As(err, &mysqlErr) || mysqlErr.Number != 1062 { + return fmt.Errorf("create job lock row failed: %w", err) + } + // If record exists, ensure by querying + var existing SkyeJobLock + if err2 := r.db.Where("job_key = ?", jobKey).First(&existing).Error; err2 != nil { + return fmt.Errorf("ensure row after duplicate failed: %w", err2) + } }(Requires
gorm.io/driver/mysqlimport for*mysql.MySQLError.)Option 2 (Preferred): Enable error translation in DB config
SetTranslateError: truewhen opening the DB, then use the portablegorm.ErrDuplicatedKeycheck.horizon/internal/skye/etcd/enums/filter_condition.go-3-9 (1)
3-9:⚠️ Potential issue | 🟠 MajorFix constant type mismatch.
Line 8-9 declares constants asModelTypeeven though the file definesFilterCondition. This breaks type safety and API intent.🐛 Suggested fix
-// ModelType represents the different types of models that can be used in the system. +// FilterCondition represents supported filter operators. type FilterCondition string -// Constants representing various model types. +// Constants representing filter conditions. const ( - EQUALS ModelType = "EQUALS" // Indicates a model that resets the state. - NOT_EQUALS ModelType = "NOT_EQUALS" // Indicates a model that applies changes incrementally. + EQUALS FilterCondition = "EQUALS" + NOT_EQUALS FilterCondition = "NOT_EQUALS" )interaction-store/internal/consumer/click.go-1-11 (1)
1-11:⚠️ Potential issue | 🟠 MajorDon’t silently succeed in a no-op click processor.
Line 9-10 returns nil without processing, which can acknowledge/drop events with no signal. Return an explicit error (or log + error) until implemented.🐛 Suggested safeguard
import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "errors" ) func ProcessClickEvents(record []*kafka.Message, c *kafka.Consumer) error { - return nil + return errors.New("ProcessClickEvents not implemented for !meesho build") }interaction-store/go.mod-51-52 (1)
51-52:⚠️ Potential issue | 🟠 MajorUpgrade
quic-goto v0.57.0 or later to patch two HIGH-severity advisories.
github.com/quic-go/quic-go v0.54.0is affected by GO-2025-4017 (GHSA-47m2-4cr7-mhcw) and GO-2025-4233 (GHSA-g754-hx8w-x2g6). Upgrade to v0.57.0+ and regenerate the module graph.interaction-store/cmd/server/main.go-34-36 (1)
34-36:⚠️ Potential issue | 🟠 MajorHTTP server error is silently ignored and port is hardcoded.
The goroutine discards errors from
ListenAndServe, which could mask binding failures or other startup issues. Additionally, the port is hardcoded to8080while the gRPC server uses the configurableAppPort.🔧 Proposed fix
go func() { - http2.ListenAndServe(":8080", nil) + if err := http2.ListenAndServe(":8080", nil); err != nil && err != http2.ErrServerClosed { + log.Error().Err(err).Msg("HTTP server failed") + } }()Also consider making the port configurable or documenting why it differs from
AppPort.interaction-store/internal/config/app_configs_builder.go-30-47 (1)
30-47:⚠️ Potential issue | 🟠 MajorAdd missing MQ environment variable bindings to
bindEnvVars().The
Configsstruct defines 18 MQ-related fields (MqApiAuthToken,MqBatchSize,MqCacheRefreshInitial,MqCacheRefreshSchedule,MqConsumerEnabled,MqConsumerVersion,MqEnabled,MqEnv,MqFlushIntervalMillis,MqKeepAliveDuration,MqMaxIdleConnections,MqOpenCircuitDuration,MqProducerEnabled,MqQueueSizeLogDelaySeconds,MqServerApiUrl,MqServerConnectionTimeouts,ClickConsumerMqId,OrderConsumerMqId), butbindEnvVars()does not bind any of them. Without these bindings, viper cannot populate these fields from environment variables. Additionally, allviper.BindEnv()calls should have their returned errors checked.interaction-store/internal/server/mux/server.go-46-48 (1)
46-48:⚠️ Potential issue | 🟠 MajorRemove the
cmux.HTTP2()matcher to avoid routing non-gRPC HTTP/2 traffic to the gRPC server.The
grpcListenermatches connections withcmux.HTTP2(), which will route all HTTP/2 traffic (including non-gRPC HTTP/2 requests) to the gRPC server. This causes unnecessary errors for HTTP/2 clients that don't use the gRPC protocol.The fix is to match gRPC connections by their protocol-specific header instead:
grpcListener := s.mux.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"), cmux.Any())This routes only HTTP/2 connections with the
application/grpccontent-type to the gRPC server, leaving other HTTP/2 traffic to be rejected gracefully rather than causing gRPC protocol errors.interaction-store/internal/data/scylla/database.go-3-14 (1)
3-14:⚠️ Potential issue | 🟠 MajorValidate/whitelist identifiers used in query templates.
These templates interpolate keyspace/table/column strings; if any originate from request input, this becomes a CQL injection vector. Ensure table/column names are derived from a strict allowlist or validated identifiers before formatting.
interaction-store/pkg/metric/metric.go-31-35 (1)
31-35:⚠️ Potential issue | 🟠 MajorData race on
initializedfast‑path.
initializedis read/written without synchronization; concurrentInit()calls will race even withonce.Do. Remove the fast‑path or guard it withatomic.✅ Safer pattern
- if initialized { - log.Debug().Msgf("Metrics already initialized!") - return - } once.Do(func() { ... - initialized = true })interaction-store/internal/utils/utils.go-71-117 (1)
71-117: 🛠️ Refactor suggestion | 🟠 MajorReturn errors instead of panicking on malformed vector data.
The
FP32Vector,FP64Vector,Int32Vector, andInt64Vectormethods panic when byte slice lengths don't match expected multiples (4 or 8 bytes). These are called withDefaultValuesInBytesfrom configuration metadata, which could be corrupted or incorrectly formatted. A malformed vector causes a service crash. Return an error type instead of panicking, or document that callers must validate input length before calling these methods.interaction-store/pkg/metric/metric.go-23-41 (1)
23-41:⚠️ Potential issue | 🟠 MajorDefault sampling rate 0.0 silently drops all metrics instead of enabling full sampling.
The comment on line 22 states "by default full sampling," but
samplingRate = 0.0means 0% of metrics are sent—the opposite of intended behavior. According to DataDog's DogStatsD specification, sampling rate 1 means send 100% of the time, while 0 means send nothing. WhenAPP_METRIC_SAMPLING_RATEis unset,viper.GetFloat64()returns 0, causing all metrics to be silently dropped.Change the default to
1.0and add validation to clamp values to(0, 1]:🛠 Suggested fix
- // by default full sampling - samplingRate = 0.0 + // by default full sampling + samplingRate = 1.0- samplingRate = viper.GetFloat64("APP_METRIC_SAMPLING_RATE") + sr := viper.GetFloat64("APP_METRIC_SAMPLING_RATE") + if sr <= 0 || sr > 1 { + log.Warn().Msg("APP_METRIC_SAMPLING_RATE out of range; defaulting to 1.0") + sr = 1.0 + } + samplingRate = srhorizon/pkg/infra/scylla.go-57-65 (1)
57-65:⚠️ Potential issue | 🟠 MajorSkye config empty currently disables all Scylla init.
The early return on empty
SkyeScyllaActiveConfigIdsstops initialization even whenScyllaActiveConfIdsis present. That’s a regression for non‑Skye deployments.🛠️ Suggested fix
- skyeActiveConfIdsStr := config.SkyeScyllaActiveConfigIds - if skyeActiveConfIdsStr == "" { - return - } + skyeActiveConfIdsStr := config.SkyeScyllaActiveConfigIds + var skyeActiveIds []string + if skyeActiveConfIdsStr != "" { + skyeActiveIds = strings.Split(skyeActiveConfIdsStr, ",") + } @@ - skyeActiveIds := strings.Split(skyeActiveConfIdsStr, ",") ScyllaConnections := make(map[int]ConnectionFacade, len(activeIds)+len(skyeActiveIds)) for _, configIdStr := range append(activeIds, skyeActiveIds...) {horizon/pkg/scheduler/init.go-59-74 (1)
59-74:⚠️ Potential issue | 🟠 MajorSame issues apply to
InitVariantScaleUpScheduler.This function has the same concerns: no idempotency guard and hardcoded log message.
🔒 Proposed fix
+var initVariantScaleUpOnce sync.Once + func InitVariantScaleUpScheduler(config configs.Configs) { + initVariantScaleUpOnce.Do(func() { + if config.VariantScaleUpCronExpression == "" { + log.Warn().Msg("VariantScaleUpCronExpression not found in config, skipping scheduler") + return + } c := cron.New(cron.WithSeconds()) _, err := c.AddFunc(config.VariantScaleUpCronExpression, func() { job := skyeJobs.InitVariantScaleUpJob(config) job.Run() }) if err != nil { log.Fatal().Err(err).Msg("Failed to schedule variant scale up job") return } c.Start() - log.Info().Msg("Variant Scale Up scheduler started (runs every 5 minutes)") + log.Info().Str("cron", config.VariantScaleUpCronExpression).Msg("Variant Scale Up scheduler started") + }) }horizon/pkg/scheduler/init.go-42-57 (1)
42-57:⚠️ Potential issue | 🟠 MajorMissing idempotency guard - multiple calls will create duplicate schedulers.
Unlike
Init()which usesinitSchedulerOnce,InitVariantOnboardingSchedulerhas no guard against multiple invocations. Each call creates a new cron instance, potentially causing duplicate job executions.Additionally, the log message hardcodes "runs every 5 minutes" but the actual schedule is determined by
config.VariantOnboardingCronExpression.🔒 Proposed fix with sync.Once and dynamic logging
+var initVariantOnboardingOnce sync.Once + func InitVariantOnboardingScheduler(config configs.Configs) { + initVariantOnboardingOnce.Do(func() { + if config.VariantOnboardingCronExpression == "" { + log.Warn().Msg("VariantOnboardingCronExpression not found in config, skipping scheduler") + return + } c := cron.New(cron.WithSeconds()) _, err := c.AddFunc(config.VariantOnboardingCronExpression, func() { job := skyeJobs.InitVariantOnboardingJob(config) job.Run() }) if err != nil { log.Fatal().Err(err).Msg("Failed to schedule variant onboarding job") return } c.Start() - log.Info().Msg("Variant onboarding scheduler started (runs every 5 minutes)") + log.Info().Str("cron", config.VariantOnboardingCronExpression).Msg("Variant onboarding scheduler started") + }) }go-sdk/pkg/interaction-store/init.go-34-39 (1)
34-39:⚠️ Potential issue | 🟠 MajorData race:
GetInstancereads registry without holding the mutex.
InitClientcorrectly acquires the mutex before modifying the registry, butGetInstancereads from the registry without any synchronization. IfInitClientis called concurrently withGetInstance, this could result in a data race.🐛 Proposed fix
func GetInstance(version int) Client { + mut.Lock() + defer mut.Unlock() if registry[version] == nil { log.Panic().Msgf("Client for version %d not initialised", version) } return registry[version] }interaction-store/internal/compression/zstd_test.go-17-43 (1)
17-43:⚠️ Potential issue | 🟠 MajorBug: Incorrect indexing causes overlapping writes and data corruption.
The function has inconsistent indexing: it uses
i*4to calculate byte positions while writing 8 bytes per iteration. This causes each iteration to overlap with the previous one by 4 bytes, corrupting the generated data.Additionally, the comment on line 18 mentions "float32 values (4 bytes each)" but the code actually generates float64 values (8 bytes each).
🐛 Proposed fix for indexing and comment
func populateNormFP64Bytes(num int) []byte { - // Create a byte slice large enough to hold `num` float32 values (4 bytes each) + // Create a byte slice large enough to hold `num` float64 values (8 bytes each) bytes := make([]byte, num*8) // Seed the random number generator r := rand.New(rand.NewSource(time.Now().UnixNano())) for i := 0; i < num; i++ { - // Generate a random float32 number + // Generate a random float64 number f := r.NormFloat64() - // Get the binary representation of the float32 number as a uint32 + // Get the binary representation of the float64 number as a uint64 bits := math.Float64bits(f) - // Use bit shifting to split the uint32 into 4 bytes - bytes[i*4] = byte(bits >> 56) // Most significant byte - bytes[i*4+1] = byte(bits >> 48) // Next byte - bytes[i*4+2] = byte(bits >> 40) // Next byte - bytes[i*4+3] = byte(bits >> 32) // Least significant byte - bytes[i*4+4] = byte(bits >> 24) // Least significant byte - bytes[i*4+5] = byte(bits >> 16) // Least significant byte - bytes[i*4+6] = byte(bits >> 8) // Least significant byte - bytes[i*4+7] = byte(bits) // Least significant byte + // Use bit shifting to split the uint64 into 8 bytes + bytes[i*8] = byte(bits >> 56) // Most significant byte + bytes[i*8+1] = byte(bits >> 48) + bytes[i*8+2] = byte(bits >> 40) + bytes[i*8+3] = byte(bits >> 32) + bytes[i*8+4] = byte(bits >> 24) + bytes[i*8+5] = byte(bits >> 16) + bytes[i*8+6] = byte(bits >> 8) + bytes[i*8+7] = byte(bits) // Least significant byte } return bytes }interaction-store/internal/data/block/psdb.go-280-295 (1)
280-295:⚠️ Potential issue | 🟠 MajorGuard against string length overflow in
serializeStringVector.
String lengths are stored asuint16; longer values will silently wrap and corrupt data.🛡️ Add a max‑length check
func serializeStringVector(values []string, p *PermanentStorageDataBlock) error { + const maxUint16 = int(^uint16(0)) if len(values) != int(p.DataLength) { return fmt.Errorf("mismatch in number of elements (%d) and defined data length (%d)", len(values), p.DataLength) } for _, v := range values { + if len(v) > maxUint16 { + return fmt.Errorf("string length %d exceeds uint16 max %d", len(v), maxUint16) + } strLen := uint16(len(v)) lenBytes := make([]byte, 2)horizon/internal/skye/jobs/variant_onboarding_job.go-399-406 (1)
399-406:⚠️ Potential issue | 🟠 MajorPrevent failure/panic when payload is empty or
otd_pathis missing.
json.Unmarshalon an empty payload errors, and the direct type assertion onotd_pathcan panic. Guard both to keep the job resilient.🛡️ Safer payload parsing and `otd_path` extraction
var payload map[string]interface{} - if err := json.Unmarshal([]byte(task.Payload), &payload); err != nil { - return fmt.Errorf("failed to unmarshal payload: %w", err) - } + if task.Payload == "" { + payload = make(map[string]interface{}) + } else if err := json.Unmarshal([]byte(task.Payload), &payload); err != nil { + return fmt.Errorf("failed to unmarshal payload: %w", err) + } trainingDataPath := model.TrainingDataPath if model.ModelType == enums.DELTA { - trainingDataPath = payload["otd_path"].(string) + otdPath, ok := payload["otd_path"].(string) + if !ok || otdPath == "" { + return fmt.Errorf("otd_path missing or not a string for DELTA model") + } + trainingDataPath = otdPath }interaction-store/internal/data/block/psdb.go-78-143 (1)
78-143:⚠️ Potential issue | 🟠 MajorValidate element count fits layout‑1 header (9‑bit length).
DataLengthis encoded into 9 bits; lengths > 511 will wrap and corrupt the header. Guard the event count before assigningDataLength.🛡️ Add a layout‑1 length guard
func (p *PermanentStorageDataBlock) serializeClickEvents() error { clickEvents, ok := p.Data.([]model.ClickEvent) if !ok { return fmt.Errorf("unexpected data type for click events: got %T, want []model.ClickEvent", p.Data) } + const maxLayout1DataLength = (1 << 9) - 1 + if len(clickEvents) > maxLayout1DataLength { + return fmt.Errorf("data length %d exceeds layout‑1 max %d", len(clickEvents), maxLayout1DataLength) + } p.DataLength = uint16(len(clickEvents)) @@ func (p *PermanentStorageDataBlock) serializeOrderEvents() error { orderEvents, ok := p.Data.([]model.FlattenedOrderEvent) if !ok { return fmt.Errorf("unexpected data type for order events: got %T, want []model.FlattenedOrderEvent", p.Data) } + const maxLayout1DataLength = (1 << 9) - 1 + if len(orderEvents) > maxLayout1DataLength { + return fmt.Errorf("data length %d exceeds layout‑1 max %d", len(orderEvents), maxLayout1DataLength) + } p.DataLength = uint16(len(orderEvents))interaction-store/internal/consumer/click/consumer_test.go-14-17 (1)
14-17:⚠️ Potential issue | 🟠 MajorPropagate TestMain exit code to avoid false‑green tests.
Withoutos.Exit(m.Run()), failures in this package won’t fail CI.✅ Fix TestMain exit handling
import ( "errors" + "os" "testing" @@ func TestMain(m *testing.M) { zerolog.SetGlobalLevel(zerolog.ErrorLevel) - m.Run() + os.Exit(m.Run()) }interaction-store/internal/compression/zstd.go-21-72 (1)
21-72:⚠️ Potential issue | 🟠 MajorUse
sync.Onceto eliminate data races in singleton initialization.The double-checked locking pattern creates a data race: the first check of
encoderanddecoder(lines 22, 52) happens outside mutex protection, while the writes (lines 37, 68) occur inside the mutex. This violates Go's memory model—operations outside the critical section are not synchronized with writes inside it, and go test -race would flag this. Replace the manual mutex pattern withsync.Once, the idiomatic, race-free approach for lazy initialization.interaction-store/internal/server/http/handlers.go-98-221 (1)
98-221:⚠️ Potential issue | 🟠 MajorEnsure consistent JSON field naming between requests and responses.
Requests are decoded via
ShouldBindJSON(expects snake_case), while responses are serialized viaprotojson.Marshal(defaults to camelCase based on protobuf metadata). This asymmetry breaks clients expecting consistent field naming. Useprotojson.UnmarshalOptionsfor requests andprotojson.MarshalOptions{UseProtoNames: true}for responses to ensure both directions consistently use snake_case field names.🛠️ Suggested fix (apply similarly to all handlers)
import ( "context" + "io" "net/http" @@ func handlePersistClickInteractions(c *gin.Context) { ctx := createContext(c) var request timeseries.PersistClickDataRequest - if err := c.ShouldBindJSON(&request); err != nil { + body, err := io.ReadAll(c.Request.Body) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body: " + err.Error()}) + return + } + if err := (protojson.UnmarshalOptions{DiscardUnknown: true}).Unmarshal(body, &request); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body: " + err.Error()}) return } @@ - jsonBytes, err := protojson.Marshal(result) + jsonBytes, err := (protojson.MarshalOptions{UseProtoNames: true}).Marshal(result)horizon/internal/repositories/scylla/skye_scylla.go-47-63 (1)
47-63:⚠️ Potential issue | 🟠 MajorUnsafe type assertions may cause runtime panics.
Similar to the SQL repository, lines 52 and 61 perform unsafe type assertions without checking. If
meta["keyspace"]is missing or the connection type is unexpected, this will panic.🛡️ Proposed fix with safe type assertions
func NewSkyeRepository(connection *infra.ScyllaClusterConnection) (SkyeStore, error) { meta, err := connection.GetMeta() if err != nil { return nil, fmt.Errorf("failed to get connection meta: %w", err) } - keySpace := meta["keyspace"].(string) + keySpaceVal, ok := meta["keyspace"] + if !ok { + return nil, fmt.Errorf("keyspace not found in connection meta") + } + keySpace, ok := keySpaceVal.(string) + if !ok { + return nil, fmt.Errorf("keyspace is not a string") + } session, err := connection.GetConn() if err != nil { return nil, fmt.Errorf("failed to get connection: %w", err) } + gocqlSession, ok := session.(*gocql.Session) + if !ok { + return nil, fmt.Errorf("connection is not a *gocql.Session") + } return &SkyeScylla{ keySpace: keySpace, - session: session.(*gocql.Session), + session: gocqlSession, }, nil }horizon/internal/repositories/sql/embedding/variant_onboarding_tasks/repository.go-42-47 (1)
42-47:⚠️ Potential issue | 🟠 MajorUnsafe type assertions may cause runtime panics.
The type assertions on lines 42 and 45 lack safety checks. If
meta["db_name"]is missing/nil or if the session is not a*gorm.DB, this will panic at runtime.🛡️ Proposed fix with safe type assertions
- dbName := meta["db_name"].(string) + dbNameVal, ok := meta["db_name"] + if !ok { + return nil, errors.New("db_name not found in connection meta") + } + dbName, ok := dbNameVal.(string) + if !ok { + return nil, errors.New("db_name is not a string") + } - return &variantOnboardingTaskRepo{ - db: session.(*gorm.DB), - dbName: dbName, - }, nil + db, ok := session.(*gorm.DB) + if !ok { + return nil, errors.New("session is not a *gorm.DB") + } + return &variantOnboardingTaskRepo{ + db: db, + dbName: dbName, + }, nilgo-sdk/pkg/interaction-store/grpc.go-52-57 (1)
52-57:⚠️ Potential issue | 🟠 MajorInsecure TLS configuration: certificate verification disabled and MinVersion missing.
The TLS config uses
InsecureSkipVerify: true, which disables certificate verification and makes the connection vulnerable to man-in-the-middle attacks. Additionally,MinVersionis not set, defaulting to older TLS versions.🔒 Proposed fix for secure TLS configuration
} else { - creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) + creds := credentials.NewTLS(&tls.Config{ + MinVersion: tls.VersionTLS12, + // Remove InsecureSkipVerify for production; use proper CA certs + // InsecureSkipVerify: true, + }) gConn, err = grpc.NewClient(config.Host+":"+config.Port,If certificate verification must be disabled for testing/development, consider making it configurable:
+ tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12} + if config.InsecureSkipVerify { // add field to Config + tlsConfig.InsecureSkipVerify = true + } + creds := credentials.NewTLS(tlsConfig)interaction-store/internal/consumer/order/consumer.go-80-131 (1)
80-131:⚠️ Potential issue | 🟠 MajorUnbounded concurrency may cause resource exhaustion.
There's no limit on concurrent goroutines spawned. If
userEventscontains many users, this could spawn an excessive number of goroutines, leading to resource exhaustion or overwhelming the downstream persistence layer.🛡️ Proposed fix: limit concurrency with errgroup.SetLimit
+const maxConcurrentPersists = 10 // adjust based on downstream capacity + func (c *OrderConsumer) persistUserEvents(userEvents map[string][]model.FlattenedOrderEvent) error { if len(userEvents) == 0 { return nil } var ( mu sync.Mutex errors []error ) g := new(errgroup.Group) + g.SetLimit(maxConcurrentPersists) for id, userToEvents := range userEvents {interaction-store/internal/handler/retrieve/order.go-17-35 (1)
17-35:⚠️ Potential issue | 🟠 MajorRemove the racy double‑check around
sync.Once.
The unsynchronized read oforderRetrieveHandlercan race;sync.Oncealone is sufficient.✅ Suggested fix
func InitOrderRetrieveHandler() *OrderRetrieveHandler { - if orderRetrieveHandler == nil { - orderOnce.Do(func() { - orderRetrieveHandler = &OrderRetrieveHandler{ - scyllaDb: scylla.NewDatabase(), - } - }) - } + orderOnce.Do(func() { + orderRetrieveHandler = &OrderRetrieveHandler{ + scyllaDb: scylla.NewDatabase(), + } + }) return orderRetrieveHandler }interaction-store/internal/handler/persist/order.go-17-35 (1)
17-35:⚠️ Potential issue | 🟠 MajorRemove the racy double‑check around
sync.Once.
The unsynchronized read oforderPersistHandlercan race;sync.Oncealone is sufficient.✅ Suggested fix
func InitOrderPersistHandler() *OrderPersistHandler { - if orderPersistHandler == nil { - orderOnce.Do(func() { - orderPersistHandler = &OrderPersistHandler{ - scyllaDb: scylla.NewDatabase(), - } - }) - } + orderOnce.Do(func() { + orderPersistHandler = &OrderPersistHandler{ + scyllaDb: scylla.NewDatabase(), + } + }) return orderPersistHandler }interaction-store/internal/handler/retrieve/order.go-55-62 (1)
55-62:⚠️ Potential issue | 🟠 MajorGuard the type assertion from
RetrieveEventData.
A bad or unexpected deserialized payload will panic here.✅ Suggested fix
for _, ddb := range weekToDeserializedBlocks { events, err := ddb.RetrieveEventData(userId) if err != nil { return nil, err } - allEvents = append(allEvents, events.([]model.FlattenedOrderEvent)...) + typedEvents, ok := events.([]model.FlattenedOrderEvent) + if !ok { + return nil, fmt.Errorf("unexpected event type in PSDB for user %s", userId) + } + allEvents = append(allEvents, typedEvents...) }horizon/internal/skye/etcd/models.go-5-97 (1)
5-97:⚠️ Potential issue | 🟠 MajorAdd JSON tags for fields that rely on snake_case keys.
Several fields (e.g.,DefaultInMemoryCachingTTLSeconds,DefaultDistributedCachingTTLSeconds,StoreId,Config,BackupConfig,TestConfig,Storage,Data) lack tags; if etcd values are snake_case (as elsewhere), those won’t unmarshal correctly.✅ Suggested fix (apply similarly across other missing fields)
type Skye struct { - Entity map[string]Models - Storage Storage - DefaultInMemoryCachingTTLSeconds int - DefaultDistributedCachingTTLSeconds int + Entity map[string]Models `json:"entity"` + Storage Storage `json:"storage"` + DefaultInMemoryCachingTTLSeconds int `json:"default_in_memory_caching_ttl_seconds"` + DefaultDistributedCachingTTLSeconds int `json:"default_distributed_caching_ttl_seconds"` }horizon/internal/externalcall/airflow_client.go-15-89 (1)
15-89:⚠️ Potential issue | 🟠 MajorClarify
TriggerDAGparameter (dagID vs dagRunID).
The parameter is used as thedag_idin the URL path and to build a uniquedag_run_idin the payload. The current namedagRunIDis misleading and inconsistent with the parameter's actual usage.Suggested fix
type AirflowClient interface { - TriggerDAG(dagRunID string) (*AirflowResponse, error) + TriggerDAG(dagID string) (*AirflowResponse, error) ListDAGRuns(dagID string) (*AirflowDAGRunsResponse, error) GetDAGRun(dagID, dagRunID string) (*AirflowDAGRun, error) } -func (a *airflowClientImpl) TriggerDAG(dagRunID string) (*AirflowResponse, error) { - url := fmt.Sprintf("%s/api/v1/dags/%s/dagRuns", a.BaseURL, dagRunID) +func (a *airflowClientImpl) TriggerDAG(dagID string) (*AirflowResponse, error) { + url := fmt.Sprintf("%s/api/v1/dags/%s/dagRuns", a.BaseURL, dagID) ... - DagRunID: dagRunID + "_" + timestamp, + DagRunID: dagID + "_" + timestamp, }go-sdk/pkg/interaction-store/adapter.go-100-175 (1)
100-175:⚠️ Potential issue | 🟠 MajorGuard against nil elements in proto responses to avoid panics.
response.Dataslices andresponse.Datamap values can legally contain nil pointers. The current code dereferences without checks, which would panic and crash the client. Add nil guards while building slices/maps.🛠️ Suggested nil-safe conversions
func (a *Adapter) ConvertFromRetrieveClickDataResponse(response *pb.RetrieveClickDataResponse) *RetrieveClickDataResponse { if response == nil || len(response.Data) == 0 { return &RetrieveClickDataResponse{Data: []ClickEvent{}} } - events := make([]ClickEvent, len(response.Data)) - for i, e := range response.Data { - events[i] = ClickEvent{ + events := make([]ClickEvent, 0, len(response.Data)) + for _, e := range response.Data { + if e == nil { + continue + } + events = append(events, ClickEvent{ CatalogId: e.CatalogId, ProductId: e.ProductId, Timestamp: e.Timestamp, Metadata: e.Metadata, - } + }) } return &RetrieveClickDataResponse{Data: events} } func (a *Adapter) ConvertFromRetrieveOrderDataResponse(response *pb.RetrieveOrderDataResponse) *RetrieveOrderDataResponse { if response == nil || len(response.Data) == 0 { return &RetrieveOrderDataResponse{Data: []OrderEvent{}} } - events := make([]OrderEvent, len(response.Data)) - for i, e := range response.Data { - events[i] = OrderEvent{ + events := make([]OrderEvent, 0, len(response.Data)) + for _, e := range response.Data { + if e == nil { + continue + } + events = append(events, OrderEvent{ CatalogId: e.CatalogId, ProductId: e.ProductId, SubOrderNum: e.SubOrderNum, Timestamp: e.Timestamp, Metadata: e.Metadata, - } + }) } return &RetrieveOrderDataResponse{Data: events} } func (a *Adapter) ConvertFromRetrieveInteractionsResponse(response *pb.RetrieveInteractionsResponse) *RetrieveInteractionsResponse { if response == nil || len(response.Data) == 0 { return &RetrieveInteractionsResponse{Data: make(map[string]InteractionData)} } - data := make(map[string]InteractionData) + data := make(map[string]InteractionData, len(response.Data)) for key, val := range response.Data { - clickEvents := make([]ClickEvent, len(val.ClickEvents)) - for i, e := range val.ClickEvents { - clickEvents[i] = ClickEvent{ + if val == nil { + data[key] = InteractionData{} + continue + } + clickEvents := make([]ClickEvent, 0, len(val.ClickEvents)) + for _, e := range val.ClickEvents { + if e == nil { + continue + } + clickEvents = append(clickEvents, ClickEvent{ CatalogId: e.CatalogId, ProductId: e.ProductId, Timestamp: e.Timestamp, Metadata: e.Metadata, - } + }) } - orderEvents := make([]OrderEvent, len(val.OrderEvents)) - for i, e := range val.OrderEvents { - orderEvents[i] = OrderEvent{ + orderEvents := make([]OrderEvent, 0, len(val.OrderEvents)) + for _, e := range val.OrderEvents { + if e == nil { + continue + } + orderEvents = append(orderEvents, OrderEvent{ CatalogId: e.CatalogId, ProductId: e.ProductId, SubOrderNum: e.SubOrderNum, Timestamp: e.Timestamp, Metadata: e.Metadata, - } + }) } data[key] = InteractionData{ ClickEvents: clickEvents, OrderEvents: orderEvents, } } return &RetrieveInteractionsResponse{Data: data} }horizon/internal/skye/jobs/variant_scaleup_job.go-328-353 (1)
328-353:⚠️ Potential issue | 🟠 MajorGuard against divide-by-zero when indexed counts are zero.
percentageDiffdivides byoriginalStatus.IndexedVectorCount, which can be zero even whenPointsCountis non-zero, leading to a panic. Add a guard before division.🛠️ Suggested guard
func (j *VariantScaleUpJob) isScaleUpComplete(scaleUpStatus, originalStatus *ScaleUpCollectionStatusPayload) bool { - if scaleUpStatus.PointsCount == 0 || originalStatus.PointsCount == 0 { + if scaleUpStatus.PointsCount == 0 || originalStatus.PointsCount == 0 || originalStatus.IndexedVectorCount == 0 { return false }go-sdk/pkg/interaction-store/client.go-188-192 (1)
188-192:⚠️ Potential issue | 🟠 MajorAdd validation for zero/negative deadlines in the
withTimeoutmethod.The current implementation is vulnerable to zero or negative
DeadLinevalues causing immediate context cancellation and all calls to fail. This is particularly problematic because:
- Unlike
helix-client(in the same codebase),interaction-storehas no validation ofDeadLineduring initialization- The
validateConfigfunction only checksHost,Port, andCallerId, notDeadLine- A misconfigured or unset deadline will silently cause all operations to timeout immediately
Implement the guard as suggested:
🛠️ Suggested fix
func (c *ClientV1) withTimeout(ctx context.Context) (context.Context, context.CancelFunc) { - timeout := time.Duration(c.grpcClient.DeadLine) * time.Millisecond - return context.WithTimeout(ctx, timeout) + if c.grpcClient.DeadLine <= 0 { + return context.WithCancel(ctx) + } + timeout := time.Duration(c.grpcClient.DeadLine) * time.Millisecond + return context.WithTimeout(ctx, timeout) }go-sdk/pkg/interaction-store/client.go-180-186 (1)
180-186:⚠️ Potential issue | 🟠 MajorUse
metadata.AppendToOutgoingContextto preserve existing gRPC metadata.
metadata.NewOutgoingContextreplaces any existing outgoing metadata on the context, which will drop tracing/auth headers set by upstream callers or middleware. Usemetadata.AppendToOutgoingContextinstead to merge metadata while preserving existing values.🛠️ Suggested fix to append metadata
func (c *ClientV1) withAuthMetadata(ctx context.Context) context.Context { - md := metadata.New(map[string]string{ - headerCallerID: c.callerId, - }) - return metadata.NewOutgoingContext(ctx, md) + return metadata.AppendToOutgoingContext(ctx, headerCallerID, c.callerId) }horizon/internal/skye/etcd/etcd.go-140-159 (1)
140-159:⚠️ Potential issue | 🟠 MajorRace condition risk in store ID calculation.
storeId := len(stores) + 1is not atomic. Concurrent calls toRegisterStorecould calculate the same ID, leading to overwrites.Consider using a sequence/counter in etcd or implementing locking.
horizon/internal/skye/etcd/etcd.go-326-330 (1)
326-330:⚠️ Potential issue | 🟠 MajorUnchecked error from final
CreateNodecall.Line 329 ignores the error when setting the
enabledproperty. A failure here would leave the variant in an inconsistent state.🐛 Proposed fix
if err := s.instance.CreateNodes(paths); err != nil { return fmt.Errorf("failed to create variant properties: %w", err) } - s.instance.CreateNode(fmt.Sprintf("%s/enabled", variantPath), true) + if err := s.instance.CreateNode(fmt.Sprintf("%s/enabled", variantPath), true); err != nil { + return fmt.Errorf("failed to enable variant: %w", err) + } return nilhorizon/internal/skye/etcd/etcd.go-291-294 (1)
291-294:⚠️ Potential issue | 🟠 MajorUnchecked error from
CreateNodecall.Line 293 calls
s.instance.CreateNode()but ignores the return value. If this fails, the variant will be partially created without thevector-db-typeproperty.🐛 Proposed fix
// Create variant node variantPath := fmt.Sprintf("/config/%s/entity/%s/models/%s/variants/%s", s.appName, entity, model, variant) - s.instance.CreateNode(fmt.Sprintf("%s/vector-db-type", variantPath), vectorDbType) + if err := s.instance.CreateNode(fmt.Sprintf("%s/vector-db-type", variantPath), vectorDbType); err != nil { + return fmt.Errorf("failed to create vector-db-type node: %w", err) + }horizon/internal/skye/etcd/etcd.go-296-299 (1)
296-299:⚠️ Potential issue | 🟠 MajorUnchecked type assertions could cause panics.
Lines 298-299 use
.(*Skye)type assertions without theokidiom. IfGetConfigInstance()returns an unexpected type, the application will panic.🛡️ Proposed defensive fix
- paths[fmt.Sprintf("%s/embedding-store-read-version", variantPath)] = s.instance.GetConfigInstance().(*Skye).Entity[entity].Models[model].EmbeddingStoreVersion - paths[fmt.Sprintf("%s/embedding-store-write-version", variantPath)] = s.instance.GetConfigInstance().(*Skye).Entity[entity].Models[model].EmbeddingStoreVersion + skyeConfig, ok := s.instance.GetConfigInstance().(*Skye) + if !ok { + return fmt.Errorf("failed to cast etcd config instance to Skye type") + } + embeddingVersion := skyeConfig.Entity[entity].Models[model].EmbeddingStoreVersion + paths[fmt.Sprintf("%s/embedding-store-read-version", variantPath)] = embeddingVersion + paths[fmt.Sprintf("%s/embedding-store-write-version", variantPath)] = embeddingVersion
| if resp.StatusCode != http.StatusOK { | ||
| log.Error(). | ||
| Int("status_code", resp.StatusCode). | ||
| Str("response_body", string(body)). | ||
| Msg("Airflow DAG trigger failed") | ||
| return &AirflowResponse{ | ||
| Status: "error", | ||
| Error: fmt.Sprintf("HTTP %d: %s", resp.StatusCode, string(body)), | ||
| }, nil | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find the airflow_client.go file and explore its structure
find . -name "airflow_client.go" -type fRepository: Meesho/BharatMLStack
Length of output: 113
🏁 Script executed:
#!/bin/bash
# Read the airflow_client.go file to understand the methods
wc -l ./horizon/internal/externalcall/airflow_client.go
cat -n ./horizon/internal/externalcall/airflow_client.goRepository: Meesho/BharatMLStack
Length of output: 7674
🏁 Script executed:
#!/bin/bash
# Find TriggerDAG call sites
rg -n "TriggerDAG\s*\(" -A 3Repository: Meesho/BharatMLStack
Length of output: 1990
🏁 Script executed:
#!/bin/bash
# Check variant_onboarding_job.go around line 426
echo "=== variant_onboarding_job.go ==="
sed -n '420,440p' ./horizon/internal/skye/jobs/variant_onboarding_job.go
echo -e "\n=== variant_scaleup_job.go ==="
sed -n '407,425p' ./horizon/internal/skye/jobs/variant_scaleup_job.go
echo -e "\n=== prism_client.go ==="
sed -n '287,310p' ./horizon/internal/externalcall/prism_client.goRepository: Meesho/BharatMLStack
Length of output: 2239
🏁 Script executed:
#!/bin/bash
# Check more context in variant_scaleup_job.go
sed -n '412,435p' ./horizon/internal/skye/jobs/variant_scaleup_job.goRepository: Meesho/BharatMLStack
Length of output: 893
Return an error on non‑200 HTTP status instead of nil error with "status=error" payload.
TriggerDAG returns nil error with AirflowResponse{Status: "error", ...} on HTTP failures, while ListDAGRuns and GetDAGRun both return proper errors. This inconsistency creates silent failures:
variant_onboarding_job.go:426andvariant_scaleup_job.go:412check onlyif err != nil, skip error handling, and proceed to store the failed response in payloads as if the operation succeeded.prism_client.go:292works around this by checkingairflowResponse.Status == "error", but this defensive coding shouldn't be necessary.
Apply the suggested fix to return fmt.Errorf(...) on non-200 status, consistent with other methods.
🤖 Prompt for AI Agents
In `@horizon/internal/externalcall/airflow_client.go` around lines 125 - 134, The
TriggerDAG handling currently returns an AirflowResponse with Status="error" and
a nil error on non-200 responses; change it to return a non-nil error instead
(consistent with ListDAGRuns and GetDAGRun). In the TriggerDAG function, when
resp.StatusCode != http.StatusOK, replace the returned AirflowResponse with a
nil response and return fmt.Errorf("Airflow DAG trigger failed: HTTP %d: %s",
resp.StatusCode, string(body)) (and keep the log.Error call), so callers like
TriggerDAG, variant_onboarding_job.go and variant_scaleup_job.go will receive a
proper error rather than having to inspect AirflowResponse.Status.
| } | ||
|
|
||
| m.CheckScreenPermission(c, claims) | ||
| // m.CheckScreenPermission(c, claims) |
There was a problem hiding this comment.
Do not bypass screen-permission enforcement.
Line 160 comments out m.CheckScreenPermission, which removes role/screen authorization for all authenticated requests. This is a security bypass. Please re-enable the check (or gate it behind an explicit, auditable feature flag with clear ownership).
✅ Suggested fix (restore permission enforcement)
- // m.CheckScreenPermission(c, claims)
+ m.CheckScreenPermission(c, claims)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // m.CheckScreenPermission(c, claims) | |
| m.CheckScreenPermission(c, claims) |
🤖 Prompt for AI Agents
In `@horizon/internal/middleware/middleware.go` at line 160, The call to
m.CheckScreenPermission has been commented out, disabling role/screen
authorization; restore enforcement by reintroducing the call to
m.CheckScreenPermission(c, claims) inside the middleware where authentication is
validated (or wrap it behind a clearly named, auditable feature flag like
EnableScreenPermissionEnforcement with an owner), and ensure you handle its
error/denial path by aborting the request with the correct HTTP response (e.g.,
403) and logging the denial; do not leave the check commented out or silent—add
explicit logging and a fail-closed response when CheckScreenPermission returns
an error or denies access.
| func (s *SkyeScylla) CreateEmbeddingTable(tableName string, defaultTimeToLive int, variantsList []string) error { | ||
| // Check if table already exists | ||
| exists, err := s.tableExists(tableName) | ||
| if err != nil { | ||
| log.Error().Err(err).Str("keyspace", s.keySpace).Str("table", tableName). | ||
| Msg("Failed to check if embedding table exists") | ||
| return fmt.Errorf("failed to check if table exists: %w", err) | ||
| } | ||
| if exists { | ||
| log.Error().Str("keyspace", s.keySpace).Str("table", tableName). | ||
| Msg("Embedding table already exists, skipping creation") | ||
| return nil | ||
| } | ||
| // Build variant column definitions | ||
| var variantColumns strings.Builder | ||
| for _, variant := range variantsList { | ||
| // Convert variant name to snake_case and append _to_be_indexed | ||
| variantColumnName := strings.ToLower(strings.ReplaceAll(variant, "-", "_")) + "_to_be_indexed" | ||
| variantColumns.WriteString(fmt.Sprintf(",\n\t\t%s boolean", variantColumnName)) | ||
| } | ||
|
|
||
| query := fmt.Sprintf(createEmbeddingTableQueryBase, s.keySpace, tableName, variantColumns.String(), defaultTimeToLive) | ||
|
|
||
| err = s.session.Query(query).Exec() | ||
| if err != nil { | ||
| log.Error().Err(err).Str("keyspace", s.keySpace).Str("table", tableName). | ||
| Msg("Failed to create embedding table") | ||
| return fmt.Errorf("failed to create embedding table %s: %w", tableName, err) | ||
| } | ||
|
|
||
| log.Info().Str("keyspace", s.keySpace).Str("table", tableName). | ||
| Msg("Successfully created embedding table") | ||
| return nil | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Search for callers of CreateEmbeddingTable and related methods
rg -n 'CreateEmbeddingTable|AddEmbeddingColumn|CreateAggregatorTable|AddAggregatorColumn' --type=go -B2 -A5Repository: Meesho/BharatMLStack
Length of output: 6198
🏁 Script executed:
# Find the definition of createEmbeddingTableQueryBase
rg -n 'createEmbeddingTableQueryBase' --type=go -B2 -A2Repository: Meesho/BharatMLStack
Length of output: 892
🏁 Script executed:
# Look at the full skye_scylla.go file to understand context
wc -l horizon/internal/repositories/scylla/skye_scylla.goRepository: Meesho/BharatMLStack
Length of output: 119
🏁 Script executed:
# Get more context around the handler calls
sed -n '160,210p' horizon/internal/skye/handler/skye.goRepository: Meesho/BharatMLStack
Length of output: 2405
🏁 Script executed:
# Check for validation of payload
rg -n 'payload\.' horizon/internal/skye/handler/skye.go | head -20Repository: Meesho/BharatMLStack
Length of output: 2004
🏁 Script executed:
# Look for the payload struct definition
rg -n 'type.*Payload|struct' horizon/internal/skye/handler/skye.go | head -20Repository: Meesho/BharatMLStack
Length of output: 204
🏁 Script executed:
# Search for any validation or sanitization functions
rg -n 'Validate|Sanitize|validate|sanitize' horizon/internal/skye/handler/skye.goRepository: Meesho/BharatMLStack
Length of output: 112
🏁 Script executed:
# Check what VariantsList is and how it's populated
rg -n 'VariantsList' horizon/internal/skye/handler/skye.go -B3 -A3Repository: Meesho/BharatMLStack
Length of output: 1960
🏁 Script executed:
# Find StoreRequestPayload struct definition
rg -n 'type StoreRequestPayload' --type=go -A10Repository: Meesho/BharatMLStack
Length of output: 941
🏁 Script executed:
# Look for any table name validation
rg -n 'EmbeddingsTable|AggregatorTable' --type=go -B2 -A2 | grep -i 'valid\|check\|match\|regex\|sanitize'Repository: Meesho/BharatMLStack
Length of output: 153
🏁 Script executed:
# Get full context around AddAggregatorColumn and AddEmbeddingColumn calls
rg -n 'AddAggregatorColumn|AddEmbeddingColumn' --type=go -B5 -A5Repository: Meesho/BharatMLStack
Length of output: 4499
🏁 Script executed:
# Check if there's any filtering API validation
rg -n 'type.*Filter|ColumnName' --type=go | head -30Repository: Meesho/BharatMLStack
Length of output: 2976
CQL injection vulnerability: table and column names are not validated.
Table names and column names are interpolated directly into CQL queries via fmt.Sprintf without validation. EmbeddingsTable and AggregatorTable come from user input in StoreRequestPayload (models.go:75-80), and ColumnName comes from user-provided filters, with no upstream validation. This allows CQL injection attacks.
Note: Variant names are safe because they are validated against a pre-defined allowed list (handler/skye.go:477-479).
🤖 Prompt for AI Agents
In `@horizon/internal/repositories/scylla/skye_scylla.go` around lines 78 - 111,
CreateEmbeddingTable currently interpolates table and column identifiers into
CQL via fmt.Sprintf (using createEmbeddingTableQueryBase and
session.Query().Exec()) without validating them, allowing CQL injection; fix by
validating and sanitizing any identifier coming from user input (the tableName
passed to CreateEmbeddingTable and any column names derived from variantsList)
before using them in queries: enforce a strict whitelist regex (e.g.
^[a-zA-Z_][a-zA-Z0-9_]{0,63}$), check length limits, and reject or return an
error for invalid identifiers, then build the variant column names only from
validated values (the variantColumnName derived in the loop) and only use
validated identifiers in tableExists and the fmt.Sprintf that produces the final
query; do not attempt to bind identifiers as query parameters—validate/escape
first and then use the validated strings when calling
s.session.Query(...).Exec().
| type JobLocksRepository interface { | ||
| EnsureRow(jobKey string) error | ||
| AcquireLockNowait(ctx context.Context, jobKey string) error | ||
| ReleaseLock(ctx context.Context, jobKey string) error |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
MySQL GET_LOCK connection-scoped automatically released on connection close
💡 Result:
Yes. A named lock acquired with GET_LOCK() is session/connection-scoped and is released automatically when that session terminates (connection closes), whether the termination is normal or abnormal. It is also released explicitly with RELEASE_LOCK(). Locks from GET_LOCK() are not released by COMMIT or ROLLBACK. [1]
Implication: if you use a connection pool, the lock can remain held until that pooled connection is actually closed (or you call RELEASE_LOCK()), because returning a connection to the pool usually does not terminate the MySQL session. [1]
Source: MySQL Reference Manual, “Locking Functions” (GET_LOCK()). [1]
🏁 Script executed:
cat -n horizon/internal/repositories/sql/embedding/job_locks/repository.go | head -130Repository: Meesho/BharatMLStack
Length of output: 4054
Lock is released immediately; AcquireLockNowait closes the connection before returning.
GET_LOCK() is connection-scoped and automatically released when the connection closes. The implementation acquires a dedicated connection, executes GET_LOCK() on it, then immediately closes the connection with defer conn.Close() on line 70—before the function returns and before the caller can perform any work. The lock is therefore released instantly. Additionally, ReleaseLock() attempts to release the lock from a different connection (line 96), which cannot succeed since the original session has already terminated. This makes the entire locking mechanism ineffective.
🔧 Suggested fix (return a lock handle tied to the same connection)
type JobLocksRepository interface {
EnsureRow(jobKey string) error
- AcquireLockNowait(ctx context.Context, jobKey string) error
- ReleaseLock(ctx context.Context, jobKey string) error
+ AcquireLockNowait(ctx context.Context, jobKey string) (JobLockHandle, error)
}
+
+type JobLockHandle interface {
+ Release(ctx context.Context) error
+}
+type jobLockHandle struct {
+ conn *sql.Conn
+ lockName string
+}
+
+func (h *jobLockHandle) Release(ctx context.Context) error {
+ var res sql.NullInt64
+ if err := h.conn.QueryRowContext(ctx, "SELECT RELEASE_LOCK(?)", h.lockName).Scan(&res); err != nil {
+ _ = h.conn.Close()
+ return fmt.Errorf("release_lock query failed: %w", err)
+ }
+ if !res.Valid || res.Int64 != 1 {
+ _ = h.conn.Close()
+ return fmt.Errorf("unable to release lock for %s", h.lockName)
+ }
+ return h.conn.Close()
+}-func (r *jobLocksRepo) AcquireLockNowait(ctx context.Context, jobKey string) error {
+func (r *jobLocksRepo) AcquireLockNowait(ctx context.Context, jobKey string) (JobLockHandle, error) {
// Get underlying *sql.DB from gorm DB
sqlDB, err := r.db.DB()
if err != nil {
- return fmt.Errorf("get sql.DB: %w", err)
+ return nil, fmt.Errorf("get sql.DB: %w", err)
}
// Acquire a dedicated connection so the session-level lock is tied to it
conn, err := sqlDB.Conn(ctx)
if err != nil {
- return fmt.Errorf("get dedicated conn: %w", err)
+ return nil, fmt.Errorf("get dedicated conn: %w", err)
}
- defer conn.Close()
// Use a deterministic lock name. Prefix with DB name to avoid cross-db collisions
lockName := fmt.Sprintf("%s:%s", r.dbName, jobKey)
// GET_LOCK returns 1 (success), 0 (timeout), or NULL (error). We use timeout 0 to fail-fast.
var res sql.NullInt64
if err := conn.QueryRowContext(ctx, "SELECT GET_LOCK(?, 0)", lockName).Scan(&res); err != nil {
- return fmt.Errorf("get_lock query failed: %w", err)
+ _ = conn.Close()
+ return nil, fmt.Errorf("get_lock query failed: %w", err)
}
if !res.Valid || res.Int64 != 1 {
- // failed to acquire lock immediately
- return fmt.Errorf("unable to get lock for %s", jobKey)
+ _ = conn.Close()
+ return nil, fmt.Errorf("unable to get lock for %s", jobKey)
}
- return nil
+ return &jobLockHandle{conn: conn, lockName: lockName}, nil
}🤖 Prompt for AI Agents
In `@horizon/internal/repositories/sql/embedding/job_locks/repository.go` around
lines 13 - 16, The current AcquireLockNowait closes the DB connection before
returning (defer conn.Close()), so GET_LOCK() is released immediately and
ReleaseLock() runs on a different connection — make the lock connection live for
the caller by returning a lock handle tied to that connection instead of closing
it. Change the JobLocksRepository API: replace AcquireLockNowait(ctx, jobKey
string) error with AcquireLockNowait(ctx, jobKey string) (JobLock, error) (or
similar), add a JobLock struct that holds the dedicated *sql.Conn and jobKey and
exposes methods Release() error (which calls RELEASE_LOCK on the same
connection) and Close() error (to close the connection), update implementations
to remove the premature conn.Close() in AcquireLockNowait, implement
JobLock.Release to run RELEASE_LOCK on the stored conn, and adjust callers/tests
to call Release()/Close() on the returned JobLock rather than using the old
ReleaseLock(ctx, jobKey) signature.
| } | ||
|
|
||
| if models.Models[variantPayload.Model].ModelType != enums.ModelType(enums.DELTA) && variantPayload.OTDTrainingDataPath == "" { | ||
| return fmt.Errorf("otd_training_data_path is required for DELTA model type") | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check the ModelType enum definition
echo "=== ModelType enum definition ==="
find . -name "*.go" -type f | xargs rg -n "type ModelType|DELTA" | head -50
echo ""
echo "=== OTD training path validation context ==="
rg -n "OTDTrainingDataPath|otd_training_data_path" -B 5 -A 5 --type goRepository: Meesho/BharatMLStack
Length of output: 4403
🏁 Script executed:
#!/bin/bash
# Look for RESET model type definition and any related comments
echo "=== RESET model type definition ==="
rg -n "RESET" horizon/internal/skye/etcd/enums/ -A 2
echo ""
echo "=== Context around the validation logic ==="
rg -n "Check model_type|RESET.*DELTA" -B 2 -A 2 --type go
echo ""
echo "=== Any documentation or comments about OTD ==="
rg -n "OTD|object.*tracking|incremental.*change" -i -B 2 -A 2 --type go | head -80Repository: Meesho/BharatMLStack
Length of output: 4900
Fix inverted condition: should check if ModelType IS DELTA, not if it's NOT DELTA.
The validation logic is backwards. Line 472 checks ModelType != enums.ModelType(enums.DELTA), but the variant onboarding job (variant_onboarding_job.go:404-406) clearly shows OTD path is used only when ModelType == DELTA.
The condition should be:
if models.Models[variantPayload.Model].ModelType == enums.ModelType(enums.DELTA) && variantPayload.OTDTrainingDataPath == "" {
return fmt.Errorf("otd_training_data_path is required for DELTA model type")
}🤖 Prompt for AI Agents
In `@horizon/internal/skye/handler/skye.go` around lines 470 - 474, The validation
currently checks the wrong condition: in skye.go where you inspect
models.Models[variantPayload.Model].ModelType and
variantPayload.OTDTrainingDataPath, change the inequality check to an equality
check so the OTD path is required when ModelType IS DELTA (i.e., replace the !=
comparison with == for enums.ModelType(enums.DELTA)); update the error path
branch that returns fmt.Errorf("otd_training_data_path is required for DELTA
model type") accordingly to enforce the requirement only for DELTA variants.
| func newClickConsumer() Consumer { | ||
| if clickConsumer == nil { | ||
| syncOnce.Do(func() { | ||
| clickConsumer = &ClickConsumer{ | ||
| handler: persist.InitClickPersistHandler(), | ||
| } | ||
| }) | ||
| } | ||
| return clickConsumer |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "consumer.go" -path "*/click/*" | head -5Repository: Meesho/BharatMLStack
Length of output: 119
🏁 Script executed:
cat -n ./interaction-store/internal/consumer/click/consumer.goRepository: Meesho/BharatMLStack
Length of output: 4480
🏁 Script executed:
cd ./interaction-store && git log --oneline -n 5 -- internal/consumer/click/consumer.goRepository: Meesho/BharatMLStack
Length of output: 96
🏁 Script executed:
# Check if there are other similar patterns in the codebase
rg -n "sync\.Once" --type go -B 2 -A 6 | head -100Repository: Meesho/BharatMLStack
Length of output: 7267
🏁 Script executed:
# Search for the specific anti-pattern: if variable == nil { syncOnce.Do(...) }
rg -n "if.*==.*nil.*{" --type go -A 3 | grep -A 3 "syncOnce\|once\.Do" | head -60Repository: Meesho/BharatMLStack
Length of output: 155
🏁 Script executed:
# Search for the specific anti-pattern with a simpler regex
rg "if.*== nil" --type go -A 3 | grep -B 1 -A 2 "syncOnce\|once\.Do" | head -80Repository: Meesho/BharatMLStack
Length of output: 4365
🏁 Script executed:
# Check how many files have this anti-pattern
rg "if.*== nil" --type go -A 1 | grep -B 1 "syncOnce\|once\.Do" | grep "^[^-].*\.go" | wc -lRepository: Meesho/BharatMLStack
Length of output: 66
🏁 Script executed:
# Let's check the order consumer to see if it has the same pattern
cat -n ./interaction-store/internal/consumer/order/consumer.go | head -50Repository: Meesho/BharatMLStack
Length of output: 1702
🌐 Web query:
Go sync.Once anti-pattern pre-check nil race condition best practice
💡 Result:
Why the nil “pre-check” around sync.Once is an anti-pattern
This pattern is common but wrong:
func Get() *T {
if v == nil { // unsynchronized read
once.Do(init) // init writes v
}
return v // unsynchronized read
}The if v == nil (and the return v) are ordinary reads that are not synchronized with the write to v performed during init, so they can constitute a data race (a concurrent read and write not ordered by a happens-before relationship). The Go memory model explicitly requires you to serialize concurrent access to shared data (via channels, mutexes, atomics, etc.). [1]
What sync.Once does guarantee (and how to use it correctly)
sync.Once provides the ordering you want: the return from f “synchronizes before” the return from any call of once.Do(f). [2] That means: if every goroutine calls once.Do(init) before reading the initialized data, all goroutines will see the initialized state.
So the idiomatic, race-free accessor is simply:
func Get() *T {
once.Do(func() { v = newT() })
return v
}Don’t add a separate “fast path” nil check unless you also make that check safe (e.g., with atomics) and you’ve proven it’s worth the complexity—Once already has an optimized fast path internally.
Best-practice alternatives (often cleaner)
If your goal is “compute once and return the value”, prefer the newer helpers:
sync.OnceValue(func() T) func() Tsync.OnceValues(func() (T1, T2)) func() (T1, T2)sync.OnceFunc(func()) func()
They encapsulate the “store + once” pattern and avoid a separate shared nil variable entirely. [2]
Sources
[1] Go Memory Model (data races / “serialize such access”) (tip.golang.org)
[2] sync package docs (Once / Do synchronizes-before guarantee; OnceValue/OnceFunc) (pkg.go.dev)
Citations:
Remove the unsynchronized pre-check around sync.Once.
The if clickConsumer == nil read outside sync.Once can race with the write inside Do(). The Go memory model requires synchronized access to shared data; sync.Once provides this ordering only when called uniformly. The idiomatic pattern is to always call Do() without the pre-check—Once already has an optimized fast path internally.
🔧 Safer singleton init
func newClickConsumer() Consumer {
- if clickConsumer == nil {
- syncOnce.Do(func() {
- clickConsumer = &ClickConsumer{
- handler: persist.InitClickPersistHandler(),
- }
- })
- }
+ syncOnce.Do(func() {
+ clickConsumer = &ClickConsumer{
+ handler: persist.InitClickPersistHandler(),
+ }
+ })
return clickConsumer
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func newClickConsumer() Consumer { | |
| if clickConsumer == nil { | |
| syncOnce.Do(func() { | |
| clickConsumer = &ClickConsumer{ | |
| handler: persist.InitClickPersistHandler(), | |
| } | |
| }) | |
| } | |
| return clickConsumer | |
| func newClickConsumer() Consumer { | |
| syncOnce.Do(func() { | |
| clickConsumer = &ClickConsumer{ | |
| handler: persist.InitClickPersistHandler(), | |
| } | |
| }) | |
| return clickConsumer | |
| } |
🤖 Prompt for AI Agents
In `@interaction-store/internal/consumer/click/consumer.go` around lines 27 - 35,
Remove the unsynchronized nil pre-check and always call syncOnce.Do to
initialize the singleton: in newClickConsumer() eliminate the "if clickConsumer
== nil" block and invoke syncOnce.Do(func() { clickConsumer = &ClickConsumer{
handler: persist.InitClickPersistHandler(), } }) unconditionally, then return
clickConsumer; this ensures the initialization is properly synchronized for the
clickConsumer/ClickConsumer singleton using syncOnce.
| func (s *Scylla) RetrieveInteractions(tableName string, userId string, columns []string) (map[string]interface{}, error) { | ||
| t1 := time.Now() | ||
| metric.Incr("scylla_db_interactions_retrieve_count", []string{metric.TagAsString("table_name", tableName)}) | ||
| preparedQuery := createRetrieveInteractionsPreparedQuery(s.session, s.keyspace, tableName, columns) | ||
| response := executeRetrieveInteractions(preparedQuery, userId) | ||
| metric.Timing("scylla_db_interactions_retrieve_latency", time.Since(t1), []string{metric.TagAsString("table_name", tableName)}) | ||
| return response, nil | ||
| } |
There was a problem hiding this comment.
Errors from retrieve operations are silently swallowed.
executeRetrieveInteractions returns nil on error (line 101), and RetrieveInteractions returns (response, nil) regardless (line 58). This silently swallows database errors, making debugging difficult and potentially returning invalid data.
🐛 Proposed fix: propagate errors properly
-func (s *Scylla) RetrieveInteractions(tableName string, userId string, columns []string) (map[string]interface{}, error) {
+func (s *Scylla) RetrieveInteractions(tableName string, userId string, columns []string) (map[string]interface{}, error) {
t1 := time.Now()
metric.Incr("scylla_db_interactions_retrieve_count", []string{metric.TagAsString("table_name", tableName)})
preparedQuery := createRetrieveInteractionsPreparedQuery(s.session, s.keyspace, tableName, columns)
- response := executeRetrieveInteractions(preparedQuery, userId)
+ response, err := executeRetrieveInteractions(preparedQuery, userId)
+ if err != nil {
+ return nil, err
+ }
metric.Timing("scylla_db_interactions_retrieve_latency", time.Since(t1), []string{metric.TagAsString("table_name", tableName)})
return response, nil
}
-func executeRetrieveInteractions(preparedQuery *gocql.Query, userId string) map[string]interface{} {
+func executeRetrieveInteractions(preparedQuery *gocql.Query, userId string) (map[string]interface{}, error) {
preparedQuery.Bind(userId).Consistency(gocql.One)
res, err := preparedQuery.Iter().SliceMap()
if err != nil {
log.Error().Msgf("error executing cql query %v: %v", preparedQuery, err)
- return nil
+ return nil, err
}
if len(res) == 0 {
- return make(map[string]interface{})
+ return make(map[string]interface{}), nil
}
- return res[0]
+ return res[0], nil
}Apply the same fix to RetrieveMetadata and executeRetrieveMetadata.
Also applies to: 96-107
🤖 Prompt for AI Agents
In `@interaction-store/internal/data/scylla/scylla.go` around lines 52 - 59, The
retrieve functions currently swallow errors—update executeRetrieveInteractions
to return (map[string]interface{}, error) instead of nil on failure and ensure
it returns a non-nil error when the DB call fails; then change
Scylla.RetrieveInteractions to capture the (response, err) from
executeRetrieveInteractions and return (response, err) instead of (response,
nil). Apply the same pattern to executeRetrieveMetadata and
Scylla.RetrieveMetadata so all DB retrievals propagate errors upstream rather
than returning nil success values.
| func (p *ClickPersistHandler) mergeAndTrimEvents(existing []model.ClickEvent, newEvent model.ClickEvent) []model.ClickEvent { | ||
| if len(existing) > 0 { | ||
| largestTimestamp := existing[0].ClickEventData.Payload.ClickedAt | ||
| if utils.TimestampDiffInWeeks(newEvent.ClickEventData.Payload.ClickedAt, largestTimestamp) >= 24 { | ||
| existing = existing[:0] | ||
| } | ||
| } | ||
|
|
||
| existing = append(existing, newEvent) | ||
| sort.Slice(existing, func(i, j int) bool { | ||
| return existing[i].ClickEventData.Payload.ClickedAt > existing[j].ClickEventData.Payload.ClickedAt | ||
| }) | ||
|
|
||
| if len(existing) > maxClickEventsPerWeek { | ||
| existing = existing[:maxClickEventsPerWeek] | ||
| } | ||
| return existing | ||
| } |
There was a problem hiding this comment.
Bug: Timestamp comparison uses unsorted data.
The logic at line 168 checks existing[0] for the largest timestamp, but the sort happens after this check (lines 175-177). If existing is not already sorted in descending order, this comparison may incorrectly clear valid data or retain stale data.
🐛 Proposed fix: move sort before timestamp check
func (p *ClickPersistHandler) mergeAndTrimEvents(existing []model.ClickEvent, newEvent model.ClickEvent) []model.ClickEvent {
+ existing = append(existing, newEvent)
+ sort.Slice(existing, func(i, j int) bool {
+ return existing[i].ClickEventData.Payload.ClickedAt > existing[j].ClickEventData.Payload.ClickedAt
+ })
+
if len(existing) > 0 {
largestTimestamp := existing[0].ClickEventData.Payload.ClickedAt
- if utils.TimestampDiffInWeeks(newEvent.ClickEventData.Payload.ClickedAt, largestTimestamp) >= 24 {
+ oldestTimestamp := existing[len(existing)-1].ClickEventData.Payload.ClickedAt
+ if utils.TimestampDiffInWeeks(existing[0].ClickEventData.Payload.ClickedAt, oldestTimestamp) >= 24 {
+ // Remove entries older than 24 weeks from newest
existing = existing[:0]
}
}
- existing = append(existing, newEvent)
- sort.Slice(existing, func(i, j int) bool {
- return existing[i].ClickEventData.Payload.ClickedAt > existing[j].ClickEventData.Payload.ClickedAt
- })
-
if len(existing) > maxClickEventsPerWeek {
existing = existing[:maxClickEventsPerWeek]
}
return existing
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (p *ClickPersistHandler) mergeAndTrimEvents(existing []model.ClickEvent, newEvent model.ClickEvent) []model.ClickEvent { | |
| if len(existing) > 0 { | |
| largestTimestamp := existing[0].ClickEventData.Payload.ClickedAt | |
| if utils.TimestampDiffInWeeks(newEvent.ClickEventData.Payload.ClickedAt, largestTimestamp) >= 24 { | |
| existing = existing[:0] | |
| } | |
| } | |
| existing = append(existing, newEvent) | |
| sort.Slice(existing, func(i, j int) bool { | |
| return existing[i].ClickEventData.Payload.ClickedAt > existing[j].ClickEventData.Payload.ClickedAt | |
| }) | |
| if len(existing) > maxClickEventsPerWeek { | |
| existing = existing[:maxClickEventsPerWeek] | |
| } | |
| return existing | |
| } | |
| func (p *ClickPersistHandler) mergeAndTrimEvents(existing []model.ClickEvent, newEvent model.ClickEvent) []model.ClickEvent { | |
| existing = append(existing, newEvent) | |
| sort.Slice(existing, func(i, j int) bool { | |
| return existing[i].ClickEventData.Payload.ClickedAt > existing[j].ClickEventData.Payload.ClickedAt | |
| }) | |
| if len(existing) > 0 { | |
| largestTimestamp := existing[0].ClickEventData.Payload.ClickedAt | |
| oldestTimestamp := existing[len(existing)-1].ClickEventData.Payload.ClickedAt | |
| if utils.TimestampDiffInWeeks(existing[0].ClickEventData.Payload.ClickedAt, oldestTimestamp) >= 24 { | |
| // Remove entries older than 24 weeks from newest | |
| existing = existing[:0] | |
| } | |
| } | |
| if len(existing) > maxClickEventsPerWeek { | |
| existing = existing[:maxClickEventsPerWeek] | |
| } | |
| return existing | |
| } |
🤖 Prompt for AI Agents
In `@interaction-store/internal/handler/persist/click.go` around lines 166 - 183,
In mergeAndTrimEvents ensure you sort existing events before using existing[0]
as the largest timestamp: move the sort.Slice call so existing is sorted in
descending ClickedAt first, then compute largestTimestamp from existing[0] and
perform the 24-week diff check (clearing existing if stale), then append
newEvent and re-trim to maxClickEventsPerWeek; reference function
mergeAndTrimEvents and the use of existing[0], sort.Slice,
utils.TimestampDiffInWeeks, and maxClickEventsPerWeek when making the change.
| var ( | ||
| once sync.Once | ||
| initialized = false | ||
| ) | ||
|
|
||
| func Init() { | ||
| if !checkProfilingEnabled() { | ||
| return | ||
| } | ||
| if initialized { | ||
| log.Debug().Msg("Profiling environment already initialized!") | ||
| return | ||
| } | ||
| once.Do(func() { | ||
| initializeProfiling() | ||
| }) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n interaction-store/pkg/profiling/profiling.goRepository: Meesho/BharatMLStack
Length of output: 1535
Eliminate the unsynchronized initialized flag to avoid data races.
initialized is read without synchronization at line 21 and written at line 45, creating a data race between concurrent Init() calls. The sync.Once alone is sufficient to ensure single initialization; the initialized flag is redundant and unsafe.
Suggested fix
var (
once sync.Once
- initialized = false
)
func Init() {
if !checkProfilingEnabled() {
return
}
- if initialized {
- log.Debug().Msg("Profiling environment already initialized!")
- return
- }
- once.Do(func() {
- initializeProfiling()
- })
+ once.Do(initializeProfiling)
}🤖 Prompt for AI Agents
In `@interaction-store/pkg/profiling/profiling.go` around lines 12 - 27, Remove
the unsynchronized `initialized` variable and any reads/writes to it; rely
solely on the existing `once sync.Once` to guarantee single initialization. In
`Init()` (and only there) keep the `checkProfilingEnabled()` guard, then call
`once.Do(func(){ initializeProfiling() })` and remove the `if initialized { ...
}` branch and the write to `initialized` (and its declaration). Ensure
references are to `Init()`, `once`, `initializeProfiling()`, and
`checkProfilingEnabled()` so the file no longer has the redundant `initialized`
flag.
| CREATE KEYSPACE IF NOT EXISTS interaction_store_timeseries | ||
| WITH replication = { | ||
| 'class': 'NetworkTopologyStrategy', | ||
| 'replication_factor': 2 | ||
| }; | ||
|
|
||
| USE interaction_store; |
There was a problem hiding this comment.
Critical: Keyspace name mismatch between creation and usage.
Line 1 creates keyspace interaction_store_timeseries, but line 7 switches to interaction_store. This script will fail if interaction_store doesn't already exist, or will create tables in an unintended keyspace.
🐛 Proposed fix: Align keyspace names
If the intended keyspace is interaction_store_timeseries:
-USE interaction_store;
+USE interaction_store_timeseries;Or if the intended keyspace is interaction_store:
-CREATE KEYSPACE IF NOT EXISTS interaction_store_timeseries
+CREATE KEYSPACE IF NOT EXISTS interaction_store
WITH replication = {
'class': 'NetworkTopologyStrategy',
'replication_factor': 2
};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| CREATE KEYSPACE IF NOT EXISTS interaction_store_timeseries | |
| WITH replication = { | |
| 'class': 'NetworkTopologyStrategy', | |
| 'replication_factor': 2 | |
| }; | |
| USE interaction_store; | |
| CREATE KEYSPACE IF NOT EXISTS interaction_store_timeseries | |
| WITH replication = { | |
| 'class': 'NetworkTopologyStrategy', | |
| 'replication_factor': 2 | |
| }; | |
| USE interaction_store_timeseries; |
🤖 Prompt for AI Agents
In `@interaction-store/scylla-init.cql` around lines 1 - 7, The CREATE KEYSPACE
statement defines interaction_store_timeseries but the subsequent USE statement
references interaction_store; update one of them so both match. Locate the
CREATE KEYSPACE block (the CREATE KEYSPACE IF NOT EXISTS
interaction_store_timeseries ... statement) and the USE interaction_store
statement and change the keyspace identifier so both use the intended name
(either interaction_store_timeseries or interaction_store) consistently.
|
Is this PR a bug fix? A bug fix corrects existing broken functionality. Examples of bug fixes: fixing crashes, correcting logic errors, fixing broken UI. Examples of NOT bug fixes: new features, refactoring, cleanup, docs, tests, performance improvements. Respond with JSON only: Important:
|
|
Tip For best results, initiate chat on the files or code changes.
{
"confidence": 0.1,
"reasoning": "This PR introduces extensive new functionality (entire interaction-store module with ~8000+ lines, new Skye integration in horizon), performs repository synchronization, updates CI/CD workflows (master→main branch changes), and adds new features rather than fixing broken functionality. While some enhancements like improved cache expiry handling exist, they represent improvements to behavior rather than corrections of bugs. The overwhelming nature of this PR is new feature development and infrastructure changes, not bug fixes."
} |
🔁 Pull Request Template – BharatMLStack
📌 Summary
📂 Modules Affected
horizon(Real-time systems / networking)online-feature-store(Feature serving infra)trufflebox-ui(Admin panel / UI)infra(Docker, CI/CD, GCP/AWS setup)docs(Documentation updates)___________✅ Type of Change
___________📊 Benchmark / Metrics (if applicable)
Summary by CodeRabbit
New Features
Chores