Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds a Delta Lake I/O adapter and wiring: new Delta Lake module, StandaloneCluster routing for Changes
Sequence Diagram(s)sequenceDiagram
participant User as User/CLI
participant Cluster as StandaloneCluster
participant DLAdapter as from_deltalake()
participant DLAddr as DeltaLakeAddress
participant Polars as Polars
participant CrateDB as CrateDB
User->>Cluster: load_table(source_url, target_url)
Cluster->>DLAdapter: from_deltalake(source_url, target_url)
DLAdapter->>DLAddr: DeltaLakeAddress.from_url(source_url)
DLAddr->>DLAddr: parse URL & collect storage options
DLAddr->>Polars: scan_delta(location, version, storage_options)
Polars-->>DLAdapter: LazyFrame / DataFrame
DLAdapter->>CrateDB: write chunks to CrateDB
CrateDB-->>DLAdapter: ack
DLAdapter-->>Cluster: result True/False
Cluster-->>User: operation result
sequenceDiagram
participant User as User/CLI
participant Cluster as StandaloneCluster
participant DLAdapter as to_deltalake()
participant DLAddr as DeltaLakeAddress
participant CrateDB as CrateDB
participant Polars as Polars
participant DeltaStorage as Delta Storage
User->>Cluster: save_table(source_url, target_url)
Cluster->>DLAdapter: to_deltalake(source_url, target_url)
DLAdapter->>DLAddr: DeltaLakeAddress.from_url(target_url)
DLAddr->>DLAddr: parse URL & collect storage options
DLAdapter->>CrateDB: read_cratedb(streaming, chunk_size)
CrateDB-->>DLAdapter: DataFrame chunks
loop per chunk
DLAdapter->>Polars: write_delta(chunk, mode, storage_options)
Polars->>DeltaStorage: write (overwrite/append)
DeltaStorage-->>Polars: ack
end
DLAdapter-->>Cluster: result True/False
Cluster-->>User: operation result
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
2612f2b to
7600ec0
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
cratedb_toolkit/io/deltalake.py (1)
74-87:⚠️ Potential issue | 🟠 Major
collect_propertiesdrops documented LakeFS options.On Line 74–Line 87, only
aws_/azure_/google_/delta_-prefixed keys are retained. The documented LakeFS example (endpoint,access_key_id,secret_access_key) won’t pass throughstorage_options, which can break LakeFS access.🛠️ Proposed fix
- prefixes = ["aws_", "azure_", "google_", "delta_"] + prefixes = ["aws_", "azure_", "google_", "delta_", "endpoint", "access_key_id", "secret_access_key"] @@ - if name.lower().startswith(prefix) and value is not None: - opts[name.upper()] = value + if name.lower().startswith(prefix) and value is not None: + key = name.upper() if name.lower().startswith(("aws_", "azure_", "google_", "delta_")) else name + opts[key] = value break#!/bin/bash # Verify mismatch between documented LakeFS params and current filter logic. set -euo pipefail echo "== collect_properties implementation ==" rg -n -C2 'prefixes = \[|opts\[name\.upper\(\)\]' cratedb_toolkit/io/deltalake.py echo echo "== LakeFS docs example params ==" rg -n 'lakefs\+deltalake://|endpoint=|access_key_id=|secret_access_key=' doc/io/deltalake/index.md🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cratedb_toolkit/io/deltalake.py` around lines 74 - 87, collect_properties currently only returns query params that start with prefixes = ["aws_", "azure_", "google_", "delta_"], which drops documented LakeFS options like endpoint, access_key_id, and secret_access_key; update collect_properties (and its prefixes check) to also accept those LakeFS option names (case-insensitive) by adding a small whitelist (e.g. {"endpoint", "access_key_id", "secret_access_key"}) and when iterating query_params include a param if it matches any prefix OR if its lowercased name is in that whitelist, preserving the existing behavior of uppercasing keys in opts and skipping None values (refer to collect_properties, prefixes and url.query_params to locate the code).
🧹 Nitpick comments (1)
doc/io/deltalake/index.md (1)
61-63: Prefer env vars over inline secrets in URL examples.The examples on Line 61 and Line 118 place secret-bearing parameters in the command URL. Even with placeholders, this pattern encourages leaking credentials via shell history/process lists. Consider adding an alternative example using environment variables for auth values.
Also applies to: 106-107, 118-119
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@doc/io/deltalake/index.md` around lines 61 - 63, Replace the inline secret-bearing query parameters in the example URL strings (e.g. the "s3+deltalake://bucket1/demo/taxi-tiny?AWS_ACCESS_KEY_ID=<your_access_key_id>&AWS_SECRET_ACCESS_KEY=..." and similar examples) with a short alternative that demonstrates using environment variables (e.g., export or --env usage) for AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_ENDPOINT and AWS_REGION, and update the doc text to show both options so the primary example does not encourage embedding credentials in the URL.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/io/test_deltalake.py`:
- Line 95: The assertion in tests/io/test_deltalake.py currently uses
exc_info.match() with the full error text which is brittle; change it to assert
on a stable substring from the exception (e.g. assert "not the same as the
specified end version" in str(exc_info.value) or assert "LogSegment" in
str(exc_info.value)) instead of matching the entire message, replacing the
exc_info.match("LogSegment end version 0 not the same as the specified end
version 99") call with a substring containment assertion to make the test
resilient to upstream message changes.
---
Duplicate comments:
In `@cratedb_toolkit/io/deltalake.py`:
- Around line 74-87: collect_properties currently only returns query params that
start with prefixes = ["aws_", "azure_", "google_", "delta_"], which drops
documented LakeFS options like endpoint, access_key_id, and secret_access_key;
update collect_properties (and its prefixes check) to also accept those LakeFS
option names (case-insensitive) by adding a small whitelist (e.g. {"endpoint",
"access_key_id", "secret_access_key"}) and when iterating query_params include a
param if it matches any prefix OR if its lowercased name is in that whitelist,
preserving the existing behavior of uppercasing keys in opts and skipping None
values (refer to collect_properties, prefixes and url.query_params to locate the
code).
---
Nitpick comments:
In `@doc/io/deltalake/index.md`:
- Around line 61-63: Replace the inline secret-bearing query parameters in the
example URL strings (e.g. the
"s3+deltalake://bucket1/demo/taxi-tiny?AWS_ACCESS_KEY_ID=<your_access_key_id>&AWS_SECRET_ACCESS_KEY=..."
and similar examples) with a short alternative that demonstrates using
environment variables (e.g., export or --env usage) for AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY, AWS_ENDPOINT and AWS_REGION, and update the doc text to
show both options so the primary example does not encourage embedding
credentials in the URL.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 742831ff-cb6c-497c-bf8e-89eff8f80cb0
📒 Files selected for processing (7)
CHANGES.mdcratedb_toolkit/cluster/core.pycratedb_toolkit/io/deltalake.pydoc/io/deltalake/index.mddoc/io/index.mdpyproject.tomltests/io/test_deltalake.py
🚧 Files skipped from review as they are similar to previous changes (2)
- CHANGES.md
- doc/io/index.md
| args=["load", "table", source_url], | ||
| catch_exceptions=False, | ||
| ) | ||
| assert exc_info.match("LogSegment end version 0 not the same as the specified end version 99") |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check deltalake version constraints in pyproject.toml and assertion method in test file
rg -n 'polars\[deltalake\]|deltalake' pyproject.toml tests/io/test_deltalake.py || true
echo "---"
# Get context around line 95 in the test file
sed -n '90,100p' tests/io/test_deltalake.py || trueRepository: crate/cratedb-toolkit
Length of output: 1698
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Get full context around line 95 including pytest exception handling
sed -n '75,100p' tests/io/test_deltalake.pyRepository: crate/cratedb-toolkit
Length of output: 984
🌐 Web query:
deltalake python library error message "LogSegment end version" changelog releases
💡 Result:
The error text:
LogSegment end version X not the same as the specified end version Y
comes from delta-kernel-rs’s LogSegment::try_new check, which asserts that the “effective” last commit/checkpoint version found during _delta_log listing equals the end_version you requested. If log listing returns a different last version (often due to missing/hidden commit JSONs, non-contiguous versions, eventual consistency, or a stale/partial listing), it raises this error. [1]
Where to look in release notes / changelogs
This message is not typically called out verbatim in deltalake (Python) release notes; it’s emitted by the Rust kernel component. So the relevant “changelog trail” is usually: deltalake (PyPI) version → delta-rs release → delta-kernel version bumps.
deltalake(Python) release history (dates/versions to correlate with your environment). [2]delta-rsconsolidatedCHANGELOG.mdincludes multiple “update kernel” entries (meaning delta-kernel behavior and errors like this may shift across these releases), e.g.:rust-v0.27.0includes “upgrade to delta_kernel 0.12.x”. [3]rust-v0.28.1includes “update kernel to 0.15.1”. [4]rust-v0.30.0includes “update delta-kernel to 0.19”. [5]
If you tell me your exact deltalake version and the operation you were calling (often CDF / table-changes / time travel), I can map it to the corresponding delta-rs/kernel dependency set and point you at the most relevant release block.
Sources: [1] [2] [3] [4] [5]
Use a stable substring for exception message assertion instead of full pattern.
The assertion currently matches the entire error message using exc_info.match(), which can break when the deltalake/delta-kernel stack updates since the underlying error text is generated by delta-kernel-rs and may change across versions. The dependency polars[deltalake]<1.39 is not pinned, leaving room for version drift.
Prefer asserting on a stable substring:
🛠️ Suggested fix
- assert exc_info.match("LogSegment end version 0 not the same as the specified end version 99")
+ assert "specified end version 99" in str(exc_info.value)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| assert exc_info.match("LogSegment end version 0 not the same as the specified end version 99") | |
| assert "specified end version 99" in str(exc_info.value) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/io/test_deltalake.py` at line 95, The assertion in
tests/io/test_deltalake.py currently uses exc_info.match() with the full error
text which is brittle; change it to assert on a stable substring from the
exception (e.g. assert "not the same as the specified end version" in
str(exc_info.value) or assert "LogSegment" in str(exc_info.value)) instead of
matching the entire message, replacing the exc_info.match("LogSegment end
version 0 not the same as the specified end version 99") call with a substring
containment assertion to make the test resilient to upstream message changes.
About
Import from and export to Delta Lake tables, for interoperability purposes.
Documentation
https://cratedb-toolkit--664.org.readthedocs.build/io/deltalake/
References
Review
Please don't hesitate to review in retrospective when applicable.
Any feedback is much appreciated and well received.