Store DynamoDB schemas and items as protobuf while preserving legacy …#342
Store DynamoDB schemas and items as protobuf while preserving legacy …#342
Conversation
…JSON compatibility
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request refactors the internal storage mechanism for DynamoDB schemas and items, transitioning from JSON serialization to Protocol Buffers. The primary goal is to enhance performance and maintainability by using a more structured and efficient data format. Crucially, the implementation ensures full backward compatibility, allowing the system to read and gradually migrate existing JSON-encoded data without disruption. This change lays the groundwork for future optimizations in data handling. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Pull request overview
This PR introduces a protobuf-based on-disk encoding for DynamoDB adapter schemas and items while retaining backward compatibility with legacy JSON payloads already stored in the KV engine.
Changes:
- Add
dynamodb_internal.proto(and generated Go bindings) defining internal DynamoDB schema/item payloads. - Implement storage codec that writes protobuf with a versioned prefix and falls back to JSON decoding when prefix is absent.
- Update DynamoDB adapter read/write paths and tests to use the new codec and validate migration behavior.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| proto/dynamodb_internal.proto | Defines protobuf messages for table schema and DynamoDB AttributeValue/item representation. |
| proto/dynamodb_internal.pb.go | Generated Go bindings for the new internal protobuf schema. |
| proto/Makefile | Extends proto generation to include dynamodb_internal.proto. |
| adapter/dynamodb_storage_codec.go | Adds encoding/decoding helpers for storing schema/items as prefixed deterministic protobuf with JSON fallback. |
| adapter/dynamodb.go | Switches schema/item persistence and reads from JSON to the new codec (proto-first, JSON fallback). |
| adapter/dynamodb_streaming_test.go | Updates fixtures to store schema/items using the new encoding helpers. |
| adapter/dynamodb_storage_codec_test.go | Adds unit tests for proto round-trip and JSON fallback decoding. |
| adapter/dynamodb_storage_migration_test.go | Adds migration tests to ensure reads don’t rewrite legacy JSON while writes/migrations rewrite to protobuf. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a significant improvement by migrating DynamoDB schemas and items to protobuf encoding, while carefully preserving compatibility with legacy JSON data. The implementation is robust, leveraging new codec functions and a dedicated protobuf definition. The inclusion of comprehensive tests for both new protobuf encoding and legacy JSON fallback demonstrates a thorough approach to ensuring data integrity and a smooth transition. The migration strategy, which involves reading legacy formats and writing new formats upon update, is well-executed and crucial for gradual data migration.
…nsistent round-trips
|
/gemini review |
There was a problem hiding this comment.
Pull request overview
This PR introduces a protobuf-based on-disk storage format for the DynamoDB adapter’s internal table schemas and items, while keeping backward compatibility by falling back to legacy JSON decoding.
Changes:
- Added a new protobuf schema (
dynamodb_internal.proto) and generated Go bindings for storing DynamoDB table schemas and item attribute values. - Introduced a storage codec with explicit value prefixes and deterministic protobuf marshaling, plus JSON fallback for legacy reads.
- Updated DynamoDB adapter read/write paths and fixtures/tests to use the new codec and to validate migration behavior.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| proto/dynamodb_internal.proto | Defines protobuf messages for table schema, items, and attribute values. |
| proto/dynamodb_internal.pb.go | Generated Go types for the new DynamoDB internal protobuf schema. |
| proto/Makefile | Adds generation of the new protobuf bindings. |
| adapter/dynamodb_storage_codec.go | Implements prefixed deterministic protobuf encoding + legacy JSON fallback decoding. |
| adapter/dynamodb.go | Switches schema/item persistence and reads to use the new codec. |
| adapter/dynamodb_streaming_test.go | Updates test fixture writer to store schema/items via the new codec. |
| adapter/dynamodb_storage_codec_test.go | Adds round-trip and legacy JSON fallback tests for the new codec. |
| adapter/dynamodb_storage_migration_test.go | Adds tests asserting legacy JSON data is preserved on read and rewritten to proto on update/migration. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
Code Review
This pull request introduces protobuf as the new storage format for DynamoDB schemas and items, replacing the previous JSON format for better efficiency. It cleverly maintains backward compatibility by checking for a protobuf-specific prefix when decoding data, falling back to JSON parsing if the prefix is absent. The migration strategy is well-designed, with schemas being migrated eagerly upon access and items being migrated lazily on write operations to avoid read-time overhead. The implementation is robust, featuring a new codec, corresponding protobuf definitions, and necessary updates across the adapter. The accompanying tests are comprehensive, covering encoding/decoding round-trips, legacy format fallbacks, and the migration logic itself. Overall, the changes are well-executed and appear to be correct and efficient.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and valuable change by migrating the storage format for DynamoDB schemas and items from JSON to Protobuf, while maintaining backward compatibility. This should improve storage efficiency and performance. The code also adopts several modern Go idioms, which is great for maintainability. However, I've found a few critical issues, including some that will cause compilation errors related to incorrect usage of sync.WaitGroup and a new Go 1.22 feature. There are also some minor issues with error handling consistency. Please address these points to ensure the stability and correctness of the changes.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant optimization by storing DynamoDB schemas and items as protobuf instead of JSON, while maintaining backward compatibility for legacy data. The implementation uses a prefix to distinguish between formats and falls back to JSON when the prefix is not present. This is a solid strategy for a gradual migration. The changes are extensive, touching data access paths, adding new codec logic, and defining new protobuf messages. A large number of stylistic modernizations to use newer Go features have also been included, which improves code readability.
My review identified one critical bug related to incorrect loop iteration and a non-existent function call, and one opportunity for a performance improvement in the new transaction codec logic. Overall, this is a well-structured and important change.
| func readTxnSizedBytes(r *bytes.Reader, n uint64) ([]byte, error) { | ||
| if n == 0 { | ||
| return nil, nil | ||
| } | ||
| if strconv.IntSize == 64 && v > (^uint64(0)>>1) { | ||
| return 0, errors.New("txn codec: length overflows int64") | ||
|
|
||
| var out []byte | ||
| var chunkBuf [txnReadChunkSize]byte | ||
| remaining := n | ||
| for remaining >= txnReadChunkSize { | ||
| if _, err := io.ReadFull(r, chunkBuf[:]); err != nil { | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| out = append(out, chunkBuf[:]...) | ||
| remaining -= txnReadChunkSize | ||
| } | ||
| for remaining > 0 { | ||
| b, err := r.ReadByte() | ||
| if err != nil { | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| out = append(out, b) | ||
| remaining-- | ||
| } | ||
| return int(v), nil | ||
| return out, nil | ||
| } |
There was a problem hiding this comment.
The readTxnSizedBytes function can be made more efficient. The second loop reads the remaining bytes one by one, which can be slow for larger remainders. Using io.ReadFull for the remainder would be more performant.
func readTxnSizedBytes(r *bytes.Reader, n uint64) ([]byte, error) {
if n == 0 {
return nil, nil
}
var out []byte
var chunkBuf [txnReadChunkSize]byte
remaining := n
for remaining >= txnReadChunkSize {
if _, err := io.ReadFull(r, chunkBuf[:]); err != nil {
return nil, errors.WithStack(err)
}
out = append(out, chunkBuf[:]...)
remaining -= txnReadChunkSize
}
if remaining > 0 {
remBuf := make([]byte, remaining)
if _, err := io.ReadFull(r, remBuf); err != nil {
return nil, errors.WithStack(err)
}
out = append(out, remBuf...)
}
return out, nil
}There was a problem hiding this comment.
Pull request overview
This PR updates the DynamoDB adapter’s on-disk KV payload format to store table schemas and items as protobuf (with a prefix/version), while preserving backwards compatibility with legacy JSON payloads and adding targeted migration/compatibility coverage.
Changes:
- Add internal protobuf schema + generated Go types for DynamoDB table schemas/items, and wire proto generation into
proto/Makefile. - Introduce a DynamoDB storage codec that encodes to prefixed deterministic protobuf and decodes with legacy JSON fallback; update DynamoDB read/write paths and add migration/round-trip tests.
- Apply Go 1.25+ refactors across the codebase (e.g.,
any,rangeover ints,maps.Copy,slices.Sort/Contains,atomic.Uint64), plus a txn codec refactor.
Reviewed changes
Copilot reviewed 34 out of 34 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| store/mvcc_store_concurrency_test.go | Uses newer range syntax in concurrency scan test. |
| store/mvcc_store.go | Switches callback/comparator params to any. |
| store/list_helpers.go | Changes sortable int64 encoding implementation. |
| proto/dynamodb_internal.proto | Adds protobuf definitions for DynamoDB internal storage payloads. |
| proto/dynamodb_internal.pb.go | Adds generated Go protobuf code for the new proto schema. |
| proto/Makefile | Generates Go protobuf code for dynamodb_internal.proto. |
| monitoring/dynamodb_test.go | Uses newer range syntax in metrics test. |
| kv/txn_codec.go | Refactors decode helpers for length-delimited txn fields. |
| kv/sharded_integration_test.go | Uses newer range syntax in raft leader wait loop. |
| kv/sharded_coordinator.go | Uses slices.Sort and newer range syntax for retries. |
| kv/shard_router_test.go | Uses newer range syntax for loops over ints/slices. |
| kv/latest_commit_ts.go | Simplifies concurrency limit calculation via min/max. |
| kv/hlc_test.go | Uses newer range syntax in monotonicity test. |
| kv/grpc_conn_cache_test.go | Uses newer range syntax in concurrent cache test. |
| kv/fsm.go | Changes raft FSM Apply return type to any. |
| distribution/engine_test.go | Updates loops and goroutine coordination style in tests. |
| distribution/engine.go | Moves timestamp counter to atomic.Uint64 and updates increment call. |
| cmd/server/demo.go | Updates retry loop syntax and uses strings.SplitSeq for parsing. |
| adapter/test_util.go | Uses newer range syntax in test utilities. |
| adapter/redis_multi_test.go | Updates []interface{} to []any and loop syntax in tests. |
| adapter/redis.go | Uses maps.Copy when merging key sets. |
| adapter/grpc_transcoder_prop_test.go | Updates test helper interface to use ...any. |
| adapter/grpc_test.go | Uses newer range syntax in integration tests. |
| adapter/dynamodb_types_test.go | Uses newer range syntax in JSON nesting builders. |
| adapter/dynamodb_test.go | Uses newer range syntax in concurrent transaction tests. |
| adapter/dynamodb_table_compat_test.go | Uses newer range syntax in concurrency loop. |
| adapter/dynamodb_streaming_test.go | Switches fixtures from JSON marshal to proto storage codec + loop syntax updates. |
| adapter/dynamodb_storage_migration_test.go | Adds tests ensuring legacy JSON items aren’t rewritten on read and are rewritten on update/migration. |
| adapter/dynamodb_storage_codec_test.go | Adds codec round-trip and legacy fallback tests. |
| adapter/dynamodb_storage_codec.go | Implements prefixed deterministic protobuf encode + legacy JSON decode fallback for items/schemas. |
| adapter/dynamodb_migration_test.go | Updates schema generation key encoding to use fmt.Appendf. |
| adapter/dynamodb_failure_test.go | Uses newer range syntax in test data generation. |
| adapter/dynamodb.go | Wires codec into table/item read/write paths; uses maps/slices helpers and minor refactors. |
| adapter/add_voter_join_test.go | Updates worker goroutine helper implementation style. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| buf := bytes.NewBuffer(dst[:0]) | ||
| _ = binary.Write(buf, binary.BigEndian, seq^math.MinInt64) | ||
| } |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
…SortableInt64 Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
store: replace binary.Write with binary.BigEndian.PutUint64 in encodeSortableInt64
|
/gemini review |
There was a problem hiding this comment.
Pull request overview
This PR introduces a new internal protobuf-based storage format for the DynamoDB adapter (schemas + items) while preserving compatibility with legacy JSON payloads already stored in the KV engine.
Changes:
- Added
dynamodb_internal.proto+ generated Go types, and updated proto build tooling. - Implemented a DynamoDB storage codec that prefixes + encodes schemas/items as deterministic protobuf, with JSON fallback decoding for legacy data.
- Modernized various loops/types (e.g.,
any, integerrange,maps.Copy,slices.Sort/Contains) and refactored some helpers.
Reviewed changes
Copilot reviewed 35 out of 35 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| store/mvcc_store_concurrency_test.go | Uses integer range for tighter test loops. |
| store/mvcc_store.go | Replaces interface{} with any in callbacks/comparators. |
| store/list_helpers.go | Refactors sortable int64 encoding implementation. |
| proto/dynamodb_internal.proto | Defines internal protobuf messages for DynamoDB schema/item storage. |
| proto/dynamodb_internal.pb.go | Generated Go protobuf types for dynamodb_internal.proto. |
| proto/Makefile | Adds protoc invocation to generate dynamodb_internal.pb.go. |
| monitoring/dynamodb_test.go | Uses integer range in metrics test setup. |
| kv/txn_codec.go | Refactors txn decoding to use shared field reader helpers. |
| kv/sharded_integration_test.go | Uses integer range in leader-wait loop. |
| kv/sharded_coordinator.go | Uses slices.Sort and integer range in retry loop. |
| kv/shard_router_test.go | Uses integer range for slice iteration and fixed loops. |
| kv/latest_commit_ts.go | Simplifies concurrency limit computation using min/max. |
| kv/hlc_test.go | Uses integer range in monotonicity test loop. |
| kv/grpc_conn_cache_test.go | Uses integer range in concurrent connection test loops. |
| kv/fsm.go | Updates raft FSM Apply signature to return any. |
| go.mod | Promotes smithy-go and prometheus client to direct dependencies. |
| distribution/engine_test.go | Uses integer range; adopts WaitGroup.Go style concurrency. |
| distribution/engine.go | Switches timestamp generator to atomic.Uint64. |
| cmd/server/demo.go | Uses integer range retries; uses strings.SplitSeq for map parsing. |
| adapter/test_util.go | Uses integer range for test node/port setup loops. |
| adapter/redis_multi_test.go | Updates expected types to []any and modernizes loops. |
| adapter/redis.go | Uses maps.Copy to merge keysets. |
| adapter/grpc_transcoder_prop_test.go | Uses any in test helper interface. |
| adapter/grpc_test.go | Uses integer range for generating test keys and goroutines. |
| adapter/dynamodb_types_test.go | Uses integer range when building nested JSON fixtures. |
| adapter/dynamodb_test.go | Uses integer range in concurrent transact tests. |
| adapter/dynamodb_table_compat_test.go | Uses integer range in concurrent update test. |
| adapter/dynamodb_streaming_test.go | Switches fixtures from JSON marshaling to new storage codec helpers. |
| adapter/dynamodb_storage_migration_test.go | Adds tests covering legacy JSON read behavior and rewrite-on-write migration. |
| adapter/dynamodb_storage_codec_test.go | Adds unit tests for proto round-trip + legacy JSON fallback decoding. |
| adapter/dynamodb_storage_codec.go | Implements prefixed deterministic-proto encoding + JSON fallback decoding. |
| adapter/dynamodb_migration_test.go | Updates legacy generation fixture encoding to fmt.Appendf. |
| adapter/dynamodb_failure_test.go | Uses integer range in item population loop. |
| adapter/dynamodb.go | Integrates new schema/item codec into table/item read/write paths and utilities. |
| adapter/add_voter_join_test.go | Switches to WaitGroup.Go style concurrency helper. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| buf := bytes.NewBuffer(dst[:0]) | ||
| // binary.Write cannot fail here: the payload is a fixed-size int64 and the | ||
| // destination is an in-memory bytes.Buffer. | ||
| _ = binary.Write(buf, binary.BigEndian, seq^math.MinInt64) | ||
| } |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a new Protobuf-based storage codec for DynamoDB table schemas and items, replacing the previous JSON serialization. This change includes new encodeStoredDynamoTableSchema and encodeStoredDynamoItem functions for writing data, and corresponding decodeStoredDynamoTableSchema and decodeStoredDynamoItem functions that provide backward compatibility by falling back to JSON unmarshaling for legacy data. Additionally, the PR updates various parts of the codebase to leverage new Go 1.22 features, such as for range loops for iteration, maps.Copy for map operations, slices.Contains for slice checks, and strings.Cut for string manipulation, improving code conciseness and efficiency. Several instances of sync.WaitGroup were incorrectly used with a non-existent Go method, which needs to be corrected to either use wg.Add(1) with a standard goroutine or switch to errgroup.Group for concurrent execution. The readTxnSizedBytes function in the transaction codec can be optimized for performance and readability by pre-allocating the byte slice and using io.ReadFull instead of chunked appends.
I am having trouble creating individual review comments. Click here to see my feedback.
adapter/add_voter_join_test.go (299-303)
The Go method on serverWorkers is calling w.wg.Go(...). However, w.wg is of type sync.WaitGroup, which does not have a Go method. This will cause a compilation error. This looks like an accidental recursive call. Did you mean to use w.wg.Add(1) and a goroutine, or change w.wg to be an errgroup.Group?
w.wg.Add(1)
go func() {
defer w.wg.Done()
if err := run(); err != nil {
w.errCh <- err
}
}()adapter/dynamodb.go (3184-3186)
The variable wg is a *sync.WaitGroup, which does not have a Go method. This will cause a compilation error. You might have intended to use an errgroup.Group here.
wg.Add(1)
go func() {
defer wg.Done()
d.gsiReadWorker(ctx, readTS, filter, jobs, results, cancel)
}()distribution/engine_test.go (118-120)
The variable wg is declared as a sync.WaitGroup, which does not have a Go method. This will cause a compilation error. To fix this, you could change the type of wg to errgroup.Group and use its Go method.
wg.Add(1)
go func() {
defer wg.Done()
e.RecordAccess([]byte("b"))
}()kv/txn_codec.go (233-257)
The readTxnSizedBytes function reads from the bytes.Reader in chunks and appends to the out slice. This can be inefficient due to multiple reallocations if n is large. Since readTxnField already validates that n fits into an int, you can simplify this by pre-allocating the slice and using io.ReadFull. This would be more performant and easier to read.
func readTxnSizedBytes(r *bytes.Reader, n uint64) ([]byte, error) {
if n == 0 {
return nil, nil
}
out := make([]byte, n)
if _, err := io.ReadFull(r, out); err != nil {
return nil, errors.WithStack(err)
}
return out, nil
}| return nil, errStoredDynamoMessageTooLarge | ||
| } | ||
|
|
||
| totalLen := int(prefixLen + bodyLen) |
There was a problem hiding this comment.
🚫 [golangci] reported by reviewdog 🐶
G115: integer overflow conversion uint64 -> int (gosec)
|
|
||
| // n has already been validated by readTxnField to be <= math.MaxInt, | ||
| // so this conversion is safe. | ||
| out := make([]byte, int(n)) |
There was a problem hiding this comment.
🚫 [golangci] reported by reviewdog 🐶
G115: integer overflow conversion uint64 -> int (gosec)
… allocation Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
store: replace bytes.NewBuffer+binary.Write with binary.BigEndian.PutUint64 in encodeSortableInt64
| dst[i] = byte(sortable) | ||
| sortable >>= 8 | ||
| } | ||
| binary.BigEndian.PutUint64(dst, uint64(seq^math.MinInt64)) |
There was a problem hiding this comment.
🚫 [golangci] reported by reviewdog 🐶
G115: integer overflow conversion int64 -> uint64 (gosec)
|
|
||
| // Guard against excessively large fields before attempting to read them. | ||
| if n > uint64(math.MaxInt) { | ||
| return nil, errors.Newf("%s: field size %d overflows int", truncatedMessage, n) |
There was a problem hiding this comment.
🚫 [golangci] reported by reviewdog 🐶
error returned from external package is unwrapped: sig: func github.com/cockroachdb/errors.Newf(format string, args ...interface{}) error (wrapcheck)
…JSON compatibility