Skip to content

DuckDB support#11

Merged
ZmeiGorynych merged 6 commits intomainfrom
egor/dev-1180-duckdb
Apr 6, 2026
Merged

DuckDB support#11
ZmeiGorynych merged 6 commits intomainfrom
egor/dev-1180-duckdb

Conversation

@ZmeiGorynych
Copy link
Copy Markdown
Member

@ZmeiGorynych ZmeiGorynych commented Apr 4, 2026

Summary by CodeRabbit

  • New Features

    • Improved DuckDB compatibility: local-file-style connections and more robust schema/primary-key detection for reliable queries and ingestion.
  • Tests

    • Added DuckDB integration tests covering queries, time-series behavior, and ingestion/rollups; added unit tests for schema/PK fallbacks. Tests skip if DuckDB isn’t installed.
  • Chores

    • Added optional DuckDB runtime and dev extras.
  • Documentation

    • Promoted DuckDB to Tier 1, added install guidance and test/run instructions.

@ZmeiGorynych
Copy link
Copy Markdown
Member Author

@CodeRabbit review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 4, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 363cbefa-d7f3-4a77-b9ae-668cb696f220

📥 Commits

Reviewing files that changed from the base of the PR and between fc4b010 and 13b203f.

📒 Files selected for processing (5)
  • docs/configuration/datasources.md
  • docs/development.md
  • docs/getting-started.md
  • docs/index.md
  • tests/test_ingestion.py
✅ Files skipped from review due to trivial changes (3)
  • docs/index.md
  • docs/getting-started.md
  • docs/development.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/test_ingestion.py

📝 Walkthrough

Walkthrough

Adds DuckDB support: optional dependencies/extras and docs, treats DuckDB as a file-based connection URL, adds INFORMATION_SCHEMA-based introspection fallbacks for columns and primary keys, and introduces in-process DuckDB integration tests plus unit tests for the fallback helpers.

Changes

Cohort / File(s) Summary
Documentation & Test Guidance
CLAUDE.md, docs/configuration/datasources.md, docs/development.md, docs/getting-started.md, docs/index.md
Promote DuckDB to first-class support, document motley-slayer[duckdb] extra, add pytest command for in-process DuckDB integration tests, and update test guidance and datasource table entries.
Dependency Configuration
pyproject.toml
Add optional dependencies duckdb >=0.9 and duckdb-engine >=0.13; add duckdb extra and include in all; add both to dev group.
Connection String Handling
slayer/core/models.py
Make DatasourceConfig.get_connection_string() treat duckdb like sqlite for local-file URLs when connection_string is not set (emit duckdb:///{database}).
Schema Introspection & Ingestion
slayer/engine/ingestion.py
Add INFORMATION_SCHEMA-based fallbacks and type mapping for columns and PKs, introduce _safe_get_columns()/_safe_get_pk_constraint() wrappers that fallback when Inspector fails or PKs are missing, thread sa_engine into rollup SQL generation, and adapt type conversion to accept DataType from fallback.
Integration & Unit Tests
tests/test_integration_duckdb.py, tests/test_ingestion.py
Add in-process DuckDB integration tests (fixtures that seed tables, query and ingestion/rollup assertions; conditional skip if duckdb missing) and unit tests for ingestion fallback helpers validating returned structures, parameterized SQL, and injection safety.
Repository Guidance
CLAUDE.md
Update to show DuckDB integration tests live in tests/test_integration_duckdb.py and are run in-process (no Docker).

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client
    participant Ingest as Ingestion
    participant Inspector as SQLAlchemy Inspector
    participant InfoSchema as InfoSchema Reader
    participant DB as DuckDB

    Client->>Ingest: request introspect or run query
    Ingest->>Inspector: inspector.get_columns / get_pk_constraint
    Inspector->>DB: metadata queries
    alt Inspector returns complete metadata
        DB-->>Inspector: column + pk metadata
        Inspector-->>Ingest: metadata objects
    else Inspector errors or missing PKs
        Ingest->>InfoSchema: SELECT from information_schema.columns / constraints
        InfoSchema->>DB: information_schema query
        DB-->>InfoSchema: raw rows
        InfoSchema-->>Ingest: rows with type strings + pk info
        Ingest->>Ingest: map types via _INFO_SCHEMA_TYPE_MAP
    end
    Ingest-->>Client: normalized columns and PKs (for SQL/gen/ingest)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐇 I nibbled at schemas where inspectors tripped,
DuckDB files, in-process — no Docker script,
Fallbacks fetched columns, PKs found in the grass,
Tests seeded and rolled up, joins stitched to pass,
A rabbit hops by, pleased with this tidy bit.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 46.51% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'DuckDB support' directly and clearly summarizes the main objective of the changeset, which adds comprehensive DuckDB database integration throughout the codebase.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch egor/dev-1180-duckdb

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 4, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (5)
tests/test_integration_duckdb.py (3)

1-9: Consider removing unused tempfile import after refactoring.

Once the tempfile.mkdtemp() calls are replaced with tmp_path, the tempfile import on line 3 can be removed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_integration_duckdb.py` around lines 1 - 9, Remove the now-unused
tempfile import: after replacing tempfile.mkdtemp() calls with pytest's tmp_path
fixture, delete the import statement "import tempfile" (and any references to
tempfile) so only needed imports remain; look for the top-level "import
tempfile" and references to "tempfile.mkdtemp()" in the
tests/test_integration_duckdb.py code and remove the import.

403-404: Same cleanup issue with tempfile.mkdtemp() in ingestion tests.

These test methods create temporary directories that won't be cleaned up. Consider using a fixture or tmp_path_factory for shared temporary directories.

♻️ Suggested approach using tmp_path_factory

Create a helper fixture or pass tmp_path to the test methods:

# Option 1: Use tmp_path in test methods
def test_rollup_query_group_by_customer(self, duckdb_ingest_env, tmp_path) -> None:
    models, ds = duckdb_ingest_env
    storage = YAMLStorage(base_dir=str(tmp_path))
    # ...

Also applies to: 426-427

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_integration_duckdb.py` around lines 403 - 404, Replace the
non-cleaned temporary directories created with tempfile.mkdtemp() in the
ingestion tests (e.g., in test_rollup_query_group_by_customer and the other test
at the same file) by using pytest's tmp_path or a tmp_path_factory fixture:
update the test signatures to accept tmp_path (or create a module-level fixture
that returns a shared tmp_path from tmp_path_factory) and instantiate
YAMLStorage with base_dir=str(tmp_path) so pytest will handle cleanup
automatically; locate usages by searching for tempfile.mkdtemp() in
tests/test_integration_duckdb.py and replace them accordingly.

59-60: Use tmp_path fixture instead of tempfile.mkdtemp() for automatic cleanup.

tempfile.mkdtemp() creates a directory that is not automatically cleaned up after the test, potentially leaving orphaned test directories. The tmp_path fixture (already used for db_path) is automatically managed by pytest.

♻️ Proposed fix
-    # Set up SLayer storage
-    tmpdir = tempfile.mkdtemp()
-    storage = YAMLStorage(base_dir=tmpdir)
+    # Set up SLayer storage
+    storage = YAMLStorage(base_dir=str(tmp_path / "slayer_data"))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_integration_duckdb.py` around lines 59 - 60, Replace the manual
temp dir creation with pytest's tmp_path fixture: stop calling
tempfile.mkdtemp() to create tmpdir and instead use the existing tmp_path (or
create a subdir from it) as the base_dir passed to YAMLStorage (the line
creating storage = YAMLStorage(base_dir=tmpdir) and the tmpdir variable). Update
references so YAMLStorage(base_dir=...) uses tmp_path (or tmp_path / "yaml") to
ensure automatic cleanup.
slayer/engine/ingestion.py (1)

245-248: Bare except Exception may silently hide unexpected errors.

Consider logging the exception before falling back, to aid debugging when the fallback is invoked unexpectedly.

📝 Suggested improvement with logging
     try:
         return inspector.get_columns(table_name, schema=schema)
-    except Exception:
+    except Exception as e:
+        logger.debug(f"Inspector.get_columns failed for {table_name}, using fallback: {e}")
         return _get_columns_fallback(sa_engine, table_name, schema)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@slayer/engine/ingestion.py` around lines 245 - 248, The try/except around
inspector.get_columns(table_name, schema=schema) silently swallows errors;
change it to catch Exception as e, log the exception (e.g., logger.exception or
logging.getLogger(__name__).exception) including table_name and schema for
context, then call _get_columns_fallback(sa_engine, table_name, schema); ensure
a module logger exists or import logging and create one before using it.
slayer/core/models.py (1)

75-76: Missing validation for database in file-based connection strings.

When type is "sqlite" or "duckdb" and database is None, this produces duckdb:///None or sqlite:///None, which may cause confusing errors downstream. Consider adding validation or defaulting to an in-memory database.

🛡️ Suggested defensive check
         if self.type in ("sqlite", "duckdb"):
+            if not self.database:
+                # Default to in-memory for missing database
+                return f"{self.type}:///:memory:"
             return f"{self.type}:///{self.database}"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@slayer/core/models.py` around lines 75 - 76, The code returns
"sqlite:///None" or "duckdb:///None" when self.database is None; update the
branch that handles self.type in ("sqlite", "duckdb") to validate self.database
and avoid embedding "None" — either raise a clear ValueError when self.database
is None/empty or default to an in-memory DB (e.g., use ":memory:" or an
appropriate in-memory literal) before returning
f"{self.type}:///{self.database}"; locate and change the block that checks
self.type and uses self.database to construct the URL.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@slayer/engine/ingestion.py`:
- Around line 223-234: The SQL built for the primary-key fallback uses f-strings
(sql and schema_filter) causing SQL injection; change to a parameterized query
by replacing the f-string interpolation with SQL that uses bound parameters
(e.g., :table_name and :schema) and pass a params dict to
conn.execute(sa.text(...), params). Remove direct insertion of schema into
schema_filter and instead conditionally append an "AND tc.table_schema =
:schema" clause only when schema is provided, and supply {"table_name":
table_name, "schema": schema} (or omit schema from params when None) to the
execute call so the query in ingestion.py uses sa.text with bound parameters
rather than string formatting.
- Around line 193-198: The query currently builds SQL by interpolating
table_name and schema into information_schema (variables table_name, schema,
schema_filter, sql), which risks SQL injection; instead stop composing raw SQL
and use SQLAlchemy's safe APIs—either call Inspector.get_columns(table_name,
schema=schema) to fetch column metadata, or if you must build SQL use the
SQLAlchemy identifier-quoting helpers (e.g.,
sqlalchemy.sql.elements.Identifier/quoted_name or the dialect-specific
quote_ident functions) to safely quote table/schema identifiers and never use
bound parameters for identifiers; update the code that constructs sql to use one
of these safe approaches and remove direct f-string interpolation of
table_name/schema.

---

Nitpick comments:
In `@slayer/core/models.py`:
- Around line 75-76: The code returns "sqlite:///None" or "duckdb:///None" when
self.database is None; update the branch that handles self.type in ("sqlite",
"duckdb") to validate self.database and avoid embedding "None" — either raise a
clear ValueError when self.database is None/empty or default to an in-memory DB
(e.g., use ":memory:" or an appropriate in-memory literal) before returning
f"{self.type}:///{self.database}"; locate and change the block that checks
self.type and uses self.database to construct the URL.

In `@slayer/engine/ingestion.py`:
- Around line 245-248: The try/except around inspector.get_columns(table_name,
schema=schema) silently swallows errors; change it to catch Exception as e, log
the exception (e.g., logger.exception or logging.getLogger(__name__).exception)
including table_name and schema for context, then call
_get_columns_fallback(sa_engine, table_name, schema); ensure a module logger
exists or import logging and create one before using it.

In `@tests/test_integration_duckdb.py`:
- Around line 1-9: Remove the now-unused tempfile import: after replacing
tempfile.mkdtemp() calls with pytest's tmp_path fixture, delete the import
statement "import tempfile" (and any references to tempfile) so only needed
imports remain; look for the top-level "import tempfile" and references to
"tempfile.mkdtemp()" in the tests/test_integration_duckdb.py code and remove the
import.
- Around line 403-404: Replace the non-cleaned temporary directories created
with tempfile.mkdtemp() in the ingestion tests (e.g., in
test_rollup_query_group_by_customer and the other test at the same file) by
using pytest's tmp_path or a tmp_path_factory fixture: update the test
signatures to accept tmp_path (or create a module-level fixture that returns a
shared tmp_path from tmp_path_factory) and instantiate YAMLStorage with
base_dir=str(tmp_path) so pytest will handle cleanup automatically; locate
usages by searching for tempfile.mkdtemp() in tests/test_integration_duckdb.py
and replace them accordingly.
- Around line 59-60: Replace the manual temp dir creation with pytest's tmp_path
fixture: stop calling tempfile.mkdtemp() to create tmpdir and instead use the
existing tmp_path (or create a subdir from it) as the base_dir passed to
YAMLStorage (the line creating storage = YAMLStorage(base_dir=tmpdir) and the
tmpdir variable). Update references so YAMLStorage(base_dir=...) uses tmp_path
(or tmp_path / "yaml") to ensure automatic cleanup.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: cc85243c-15c8-4cf1-bc5a-c255ceec6b78

📥 Commits

Reviewing files that changed from the base of the PR and between 147f9b5 and ac72cd9.

⛔ Files ignored due to path filters (1)
  • poetry.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • CLAUDE.md
  • pyproject.toml
  • slayer/core/models.py
  • slayer/engine/ingestion.py
  • tests/test_integration_duckdb.py

Comment thread slayer/engine/ingestion.py Outdated
Comment thread slayer/engine/ingestion.py Outdated
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
slayer/engine/ingestion.py (1)

235-252: ⚠️ Potential issue | 🟠 Major

Tighten and re-parameterize the PK fallback query.

This regresses to direct interpolation of table_name/schema, and the join only keys on constraint_name + schema. On backends where PK names repeat across tables, fallback can pull unrelated columns into constrained_columns.

🔒 Safer and more accurate query
 def _get_pk_constraint_fallback(
     sa_engine: sa.Engine,
     table_name: str,
     schema: Optional[str],
 ) -> Dict:
     """Get PK constraint via INFORMATION_SCHEMA when Inspector.get_pk_constraint() fails."""
-    schema_filter = f"AND tc.table_schema = '{schema}'" if schema else ""
-    sql = (
-        f"SELECT kcu.column_name "
-        f"FROM information_schema.table_constraints tc "
-        f"JOIN information_schema.key_column_usage kcu "
-        f"  ON tc.constraint_name = kcu.constraint_name "
-        f"  AND tc.table_schema = kcu.table_schema "
-        f"WHERE tc.table_name = '{table_name}' "
-        f"  AND tc.constraint_type = 'PRIMARY KEY' {schema_filter}"
-    )
+    if schema:
+        sql = (
+            "SELECT kcu.column_name "
+            "FROM information_schema.table_constraints tc "
+            "JOIN information_schema.key_column_usage kcu "
+            "  ON tc.constraint_name = kcu.constraint_name "
+            "  AND tc.table_schema = kcu.table_schema "
+            "  AND tc.table_name = kcu.table_name "
+            "WHERE tc.table_name = :table_name "
+            "  AND tc.constraint_type = 'PRIMARY KEY' "
+            "  AND tc.table_schema = :schema "
+            "ORDER BY kcu.ordinal_position"
+        )
+        params = {"table_name": table_name, "schema": schema}
+    else:
+        sql = (
+            "SELECT kcu.column_name "
+            "FROM information_schema.table_constraints tc "
+            "JOIN information_schema.key_column_usage kcu "
+            "  ON tc.constraint_name = kcu.constraint_name "
+            "  AND tc.table_schema = kcu.table_schema "
+            "  AND tc.table_name = kcu.table_name "
+            "WHERE tc.table_name = :table_name "
+            "  AND tc.constraint_type = 'PRIMARY KEY' "
+            "ORDER BY kcu.ordinal_position"
+        )
+        params = {"table_name": table_name}
     with sa_engine.connect() as conn:
-        rows = conn.execute(sa.text(sql)).fetchall()
+        rows = conn.execute(sa.text(sql), params).fetchall()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@slayer/engine/ingestion.py` around lines 235 - 252, The fallback in
_get_pk_constraint_fallback builds SQL by interpolating table_name and schema
and joins only on constraint_name+schema, which can return PK columns from other
tables; change the SQL to use bound parameters for table_name and schema (use
sa.text() with :table_name and :schema) and tighten the JOIN/WHERE to include
kcu.table_name (and kcu.table_schema) so the query filters by the exact table;
then execute via conn.execute(sql, {"table_name": table_name, "schema": schema})
and construct the returned dict with the correct constrained_columns from the
fetched rows.
🧹 Nitpick comments (1)
slayer/engine/ingestion.py (1)

266-266: Use keyword arguments in the new helper calls.

The added _get_*_fallback() and _safe_get_*() invocations all pass 3-4 positional arguments, which makes the inspector/engine/table/schema ordering easy to swap.

♻️ Apply the same pattern to the other new call sites
-        return _get_columns_fallback(sa_engine, table_name, schema)
+        return _get_columns_fallback(
+            sa_engine=sa_engine,
+            table_name=table_name,
+            schema=schema,
+        )
@@
-    source_cols = _safe_get_columns(inspector, sa_engine, source_table, schema)
+    source_cols = _safe_get_columns(
+        inspector=inspector,
+        sa_engine=sa_engine,
+        table_name=source_table,
+        schema=schema,
+    )

As per coding guidelines, "Use keyword arguments for functions with more than 1 parameter".

Also applies to: 281-283, 302-302, 338-338, 412-413, 425-426

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@slayer/engine/ingestion.py` at line 266, The call to _get_columns_fallback
(and the other new _get_*_fallback/_safe_get_* calls) currently passes
positional args (sa_engine, table_name, schema) which risks ordering bugs;
update each call site (including _get_columns_fallback and the calls around
lines 281-283, 302, 338, 412-413, 425-426) to use explicit keyword arguments
matching the helper signatures (e.g., inspector=..., engine=..., table=...,
schema=...) so the inspector/engine/table/schema mapping is unambiguous.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@slayer/engine/ingestion.py`:
- Around line 187-219: There is a duplicated _get_columns_fallback function
header and a malformed block that causes a syntax error; keep a single
definition of _get_columns_fallback and consolidate the body so only one with
sa_engine.connect() block executes the query using sa.text(sql) and the params
dict (i.e., remove the stray closing parenthesis and the second with
sa_engine.connect() that calls execute without params), ensuring variables sql,
params, conn, and rows are used exactly once in the single function body.

---

Duplicate comments:
In `@slayer/engine/ingestion.py`:
- Around line 235-252: The fallback in _get_pk_constraint_fallback builds SQL by
interpolating table_name and schema and joins only on constraint_name+schema,
which can return PK columns from other tables; change the SQL to use bound
parameters for table_name and schema (use sa.text() with :table_name and
:schema) and tighten the JOIN/WHERE to include kcu.table_name (and
kcu.table_schema) so the query filters by the exact table; then execute via
conn.execute(sql, {"table_name": table_name, "schema": schema}) and construct
the returned dict with the correct constrained_columns from the fetched rows.

---

Nitpick comments:
In `@slayer/engine/ingestion.py`:
- Line 266: The call to _get_columns_fallback (and the other new
_get_*_fallback/_safe_get_* calls) currently passes positional args (sa_engine,
table_name, schema) which risks ordering bugs; update each call site (including
_get_columns_fallback and the calls around lines 281-283, 302, 338, 412-413,
425-426) to use explicit keyword arguments matching the helper signatures (e.g.,
inspector=..., engine=..., table=..., schema=...) so the
inspector/engine/table/schema mapping is unambiguous.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e7a129a6-b120-485d-be99-4b0b9d33b111

📥 Commits

Reviewing files that changed from the base of the PR and between ac72cd9 and 037a6ec.

📒 Files selected for processing (1)
  • slayer/engine/ingestion.py

Comment thread slayer/engine/ingestion.py Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
slayer/engine/ingestion.py (1)

187-214: ⚠️ Potential issue | 🔴 Critical

Critical: Undefined params variable and SQL injection vulnerability remain unfixed.

This function has two severe issues:

  1. Undefined params (line 201): The params variable is referenced but never defined, causing a NameError at runtime. This is the pipeline failure flagged by ruff (F821).

  2. SQL injection vulnerability (lines 193-198): The SQL is still constructed using f-string interpolation with table_name and schema values directly embedded. This was flagged in the previous review but was not properly addressed—only _get_pk_constraint_fallback was fixed.

🔒 Proposed fix using parameterized queries (matching _get_pk_constraint_fallback pattern)
 def _get_columns_fallback(
     sa_engine: sa.Engine,
     table_name: str,
     schema: Optional[str],
 ) -> List[Dict]:
     """Get columns via INFORMATION_SCHEMA when Inspector.get_columns() fails."""
-    schema_filter = f"AND table_schema = '{schema}'" if schema else ""
-    sql = (
-        f"SELECT column_name, data_type "
-        f"FROM information_schema.columns "
-        f"WHERE table_name = '{table_name}' {schema_filter} "
-        f"ORDER BY ordinal_position"
-    )
+    if schema:
+        sql = (
+            "SELECT column_name, data_type "
+            "FROM information_schema.columns "
+            "WHERE table_name = :table_name AND table_schema = :schema "
+            "ORDER BY ordinal_position"
+        )
+        params = {"table_name": table_name, "schema": schema}
+    else:
+        sql = (
+            "SELECT column_name, data_type "
+            "FROM information_schema.columns "
+            "WHERE table_name = :table_name "
+            "ORDER BY ordinal_position"
+        )
+        params = {"table_name": table_name}
     with sa_engine.connect() as conn:
         rows = conn.execute(sa.text(sql), params).fetchall()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@slayer/engine/ingestion.py` around lines 187 - 214, The _get_columns_fallback
function currently references an undefined params and builds SQL via f-strings,
leaving an SQL injection risk; change the SQL to use parameter placeholders
(e.g. :table_name and optional :schema) instead of directly interpolating
table_name/schema, build a params dict (e.g. params = {"table_name": table_name}
and add "schema" only when provided), conditionally include "AND table_schema =
:schema" in the query string when schema is set, and call
conn.execute(sa.text(your_sql), params) so params is defined and the query is
parameterized; keep the rest of the logic in _get_columns_fallback (base_type
mapping, result construction) unchanged.
🧹 Nitpick comments (1)
tests/test_ingestion.py (1)

16-22: Consider extracting the duplicated _setup_engine helper.

The _setup_engine method is identical in both test classes. You could extract it to module level or a shared base class.

♻️ Optional refactor to reduce duplication
+def _setup_mock_engine(rows):
+    """Create a mock SQLAlchemy engine with stubbed connection/execute."""
+    engine = MagicMock(spec=sa.Engine)
+    conn = MagicMock()
+    engine.connect.return_value.__enter__ = MagicMock(return_value=conn)
+    engine.connect.return_value.__exit__ = MagicMock(return_value=False)
+    conn.execute.return_value.fetchall.return_value = rows
+    return engine, conn
+
+
 class TestGetColumnsFallback:
     """Tests for _get_columns_fallback parameterized queries."""

-    def _setup_engine(self, rows):
-        engine = MagicMock(spec=sa.Engine)
-        conn = MagicMock()
-        engine.connect.return_value.__enter__ = MagicMock(return_value=conn)
-        engine.connect.return_value.__exit__ = MagicMock(return_value=False)
-        conn.execute.return_value.fetchall.return_value = rows
-        return engine, conn
-
     def test_without_schema(self):
-        engine, conn = self._setup_engine([("id", "INTEGER"), ("name", "VARCHAR")])
+        engine, conn = _setup_mock_engine([("id", "INTEGER"), ("name", "VARCHAR")])

Also applies to: 74-80

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_ingestion.py` around lines 16 - 22, Extract the duplicated
_setup_engine helper out of the two test classes and place a single shared
implementation (either as a module-level function named _setup_engine or as a
method on a new TestCase base class) so both test classes call that one helper;
update both tests to use the shared _setup_engine and remove the duplicate
copies, keeping the implementation identical (MagicMock(spec=sa.Engine), conn
mocks, engine.connect.return_value.__enter__/__exit__, and
conn.execute.return_value.fetchall.return_value = rows) and run tests to ensure
mocks still behave as before.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@slayer/engine/ingestion.py`:
- Around line 187-214: The _get_columns_fallback function currently references
an undefined params and builds SQL via f-strings, leaving an SQL injection risk;
change the SQL to use parameter placeholders (e.g. :table_name and optional
:schema) instead of directly interpolating table_name/schema, build a params
dict (e.g. params = {"table_name": table_name} and add "schema" only when
provided), conditionally include "AND table_schema = :schema" in the query
string when schema is set, and call conn.execute(sa.text(your_sql), params) so
params is defined and the query is parameterized; keep the rest of the logic in
_get_columns_fallback (base_type mapping, result construction) unchanged.

---

Nitpick comments:
In `@tests/test_ingestion.py`:
- Around line 16-22: Extract the duplicated _setup_engine helper out of the two
test classes and place a single shared implementation (either as a module-level
function named _setup_engine or as a method on a new TestCase base class) so
both test classes call that one helper; update both tests to use the shared
_setup_engine and remove the duplicate copies, keeping the implementation
identical (MagicMock(spec=sa.Engine), conn mocks,
engine.connect.return_value.__enter__/__exit__, and
conn.execute.return_value.fetchall.return_value = rows) and run tests to ensure
mocks still behave as before.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: bba7031e-53e2-41d6-b7f8-65e81f78e5be

📥 Commits

Reviewing files that changed from the base of the PR and between 037a6ec and 4e8d38a.

📒 Files selected for processing (2)
  • slayer/engine/ingestion.py
  • tests/test_ingestion.py

Copy link
Copy Markdown
Collaborator

@AivanF AivanF left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but some files in docs/ need an update

@AivanF AivanF self-requested a review April 6, 2026 08:07
@ZmeiGorynych ZmeiGorynych merged commit 96eecb8 into main Apr 6, 2026
3 checks passed
@coderabbitai coderabbitai bot mentioned this pull request Apr 7, 2026
ZmeiGorynych added a commit that referenced this pull request Apr 16, 2026
Three Major correctness fixes + one paired test fix from CodeRabbit
review on PR #31, all in slayer/sql/generator.py around the
filtered first/last and weighted_avg machinery:

* Parameterized formula filters now wrap every row-level reference
  in CASE WHEN, not just {value}. Previously `weighted_avg` on a
  filtered measure produced SUM(CASE WHEN filter THEN value END * weight)
  / SUM(weight) — numerator filtered, denominator summing all weights.
  Now both numerator and denominator share the same filter mask, so
  the weighted_avg correctly normalizes over the filtered subset.
* filtered_rn_map is keyed by measure.alias (unique per enriched
  measure) instead of "{source_measure_name|name}:{aggregation}". Two
  filtered metrics with the same source/agg but different filters or
  time columns no longer clobber each other in the rank-column map.
* _build_last_ranked_from() now applies enriched.resolved_joins inside
  the ranked subquery. Previously cross-model filters on filtered
  first/last (e.g. customers.status = 'active') would reference a
  table not present in the subquery's FROM. The outer string-level
  join injection in _generate_base is now suppressed when the from
  clause is a ranked subquery to avoid duplicate joins.
* Drive-by: the unfiltered _last_rn / _first_rn columns are no longer
  emitted when only filtered first/last measures are present (caught
  by tightening the previously-vacuous test assertion in
  test_filtered_last_generates_dedicated_rn).

Also tightens tests/test_sql_generator.py:1352 (CodeRabbit #11) which
was always-True, and adds three regression tests:
test_filtered_weighted_avg_filters_both_terms,
test_two_filtered_lasts_same_source_different_filters_dont_collide,
test_filtered_last_with_cross_model_filter_carries_join.

Includes docs/dbt/slayer_vs_dbt.md (carried across the previous
poetry-lock churn).

Full pytest: 657 passed. ruff clean.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants