From f45ed4df4530f99ce42ccd14081cccbc3ffad5ed Mon Sep 17 00:00:00 2001 From: Nikolaus Schuetz Date: Sun, 28 Jun 2026 18:56:36 -0700 Subject: [PATCH] fix: Use LONGBLOB for SQL registry proto columns on MySQL The SQL registry stores each Feast object as a serialized protobuf in a binary column. On MySQL/MariaDB, SQLAlchemy's LargeBinary maps to BLOB, which caps at 64 KB. A single FeatureView proto routinely exceeds that, so MySQL silently truncates the write and the registry later fails to load with a protobuf DecodeError (e.g. `feast serve` failing to start). PostgreSQL and SQLite were never affected. Introduce a dialect-aware `ProtoBytes` type that emits LONGBLOB on MySQL and MariaDB while keeping LargeBinary's default mapping on every other dialect, and apply it to all binary proto/metadata columns. The variants are chained (not variadic) so the expression also works on SQLAlchemy 1.4.x, which Feast still supports. `metadata.create_all` only creates missing tables, so existing MySQL registries are not migrated automatically. Add a best-effort startup warning that names any columns still typed BLOB and points operators at the documented ALTER TABLE migration, and document the migration (with metadata-lock / online-schema-change guidance) in the SQL registry reference. Tests assert the compiled DDL emits LONGBLOB on MySQL and MariaDB and BLOB on SQLite, and cover the startup-warning paths. Signed-off-by: Nikolaus Schuetz --- .claude/rules/feast-components.md | 1 + .cursor/rules/feast-components.mdc | 1 + docs/reference/registries/sql.md | 79 ++++++- sdk/python/feast/infra/registry/sql.py | 155 ++++++++++++-- .../registration/test_universal_registry.py | 107 ++++++++++ .../unit/infra/registry/test_sql_registry.py | 198 +++++++++++++++++- skills/feast-architecture/SKILL.md | 5 + skills/feast-testing/SKILL.md | 22 ++ 8 files changed, 548 insertions(+), 20 deletions(-) diff --git a/.claude/rules/feast-components.md b/.claude/rules/feast-components.md index 5c03cb0bd3d..02b1c6f4dd4 100644 --- a/.claude/rules/feast-components.md +++ b/.claude/rules/feast-components.md @@ -24,6 +24,7 @@ For testing patterns and debugging, also read `skills/feast-testing/SKILL.md`. - **Unit tests**: add or update tests in `sdk/python/tests/unit/infra//` - **Integration tests**: run `make test-python-integration-local`; add a universal test case in `sdk/python/tests/integration/` if the change affects retrieval or materialization behavior +- **SQL registry binary columns**: in `infra/registry/sql.py`, a new column that stores a serialized proto or blob metadata must use `ProtoBytes`, not `LargeBinary` directly — `LargeBinary` maps to MySQL `BLOB` (64 KB cap) and silently truncates large protos - **Protos**: if you add a field to a proto message, recompile with `make protos` and update serialization helpers in `proto_registry_utils.py` - **Both SDKs**: if the change affects online serving, check whether the Go server (`go/`) also needs updating - **Skills/Rules**: if the change introduces new patterns, interfaces, or conventions that agents should follow, update the relevant section in `skills/feast-architecture/SKILL.md` (and `skills/feast-testing/SKILL.md` if testing patterns changed) diff --git a/.cursor/rules/feast-components.mdc b/.cursor/rules/feast-components.mdc index a474f00fc47..f015619020d 100644 --- a/.cursor/rules/feast-components.mdc +++ b/.cursor/rules/feast-components.mdc @@ -20,6 +20,7 @@ For testing patterns and debugging, also read `skills/feast-testing/SKILL.md`. - **Unit tests**: add or update tests in `sdk/python/tests/unit/infra//` - **Integration tests**: run `make test-python-integration-local`; add a universal test case in `sdk/python/tests/integration/` if the change affects retrieval or materialization behavior +- **SQL registry binary columns**: in `infra/registry/sql.py`, a new column that stores a serialized proto or blob metadata must use `ProtoBytes`, not `LargeBinary` directly — `LargeBinary` maps to MySQL `BLOB` (64 KB cap) and silently truncates large protos - **Protos**: if you add a field to a proto message, recompile with `make protos` and update serialization helpers in `proto_registry_utils.py` - **Both SDKs**: if the change affects online serving, check whether the Go server (`go/`) also needs updating - **Skills/Rules**: if the change introduces new patterns, interfaces, or conventions that agents should follow, update the relevant section in `skills/feast-architecture/SKILL.md` (and `skills/feast-testing/SKILL.md` if testing patterns changed) diff --git a/docs/reference/registries/sql.md b/docs/reference/registries/sql.md index ef9993c8753..8b4ff19ab11 100644 --- a/docs/reference/registries/sql.md +++ b/docs/reference/registries/sql.md @@ -83,7 +83,84 @@ If you are running Feast in Kubernetes, set the `image.repository` and There are some things to note about how the SQL registry works: - Once instantiated, the Registry ensures the tables needed to store data exist, and creates them if they do not. - Upon tearing down the feast project, the registry ensures that the tables are dropped from the database. -- The schema for how data is laid out in tables can be found . It is intentionally simple, storing the serialized protobuf versions of each Feast object keyed by its name. +- The schema for how data is laid out in tables can be found in the table definitions in [`sdk/python/feast/infra/registry/sql.py`](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/registry/sql.py). It is intentionally simple, storing the serialized protobuf versions of each Feast object keyed by its name. + +## MySQL: serialized-proto columns use `LONGBLOB` + +The registry stores each Feast object as a serialized protobuf in a binary +column. On MySQL these columns are created as `LONGBLOB` (up to 4 GB). Earlier +versions created them as `BLOB`, which caps at 64 KB — a single `FeatureView` +proto routinely exceeds that, so MySQL would silently truncate the write and the +registry would later fail to load with a protobuf `DecodeError` (for example, +`feast serve` failing to start). Other dialects (PostgreSQL, SQLite) were never +affected. + +New deployments get the correct schema automatically — the registry creates its +tables as `LONGBLOB` on first use. When an existing MySQL/MariaDB registry still +has `BLOB` columns, the registry logs an error at startup listing the affected +columns (it does not refuse to start — a registry whose protos all fit in 64 KB +is unaffected). **Existing deployments are not migrated automatically**: the +registry only creates tables that do not already exist, and it has no +schema-migration step, so previously created `BLOB` columns remain `BLOB`. To +upgrade an existing MySQL registry, alter each serialized-proto column to +`LONGBLOB`, for example: + +> ⚠️ **Run the migration carefully on a live registry.** A `BLOB`→`LONGBLOB` +> change is a column *data-type* change, which MySQL InnoDB performs with +> `ALGORITHM=COPY` — a full table rebuild under a metadata lock that blocks +> readers and writers for the duration (potentially minutes on a large table +> such as `feature_view_version_history`). `ALGORITHM=INPLACE` is **not** +> generally supported for this change and is rejected with +> `ER_ALTER_OPERATION_NOT_SUPPORTED_REASON` on most builds — do not rely on it. +> +> **Before running any `ALTER TABLE`:** +> +> 1. **Stop all `feast apply` and materialization jobs.** This is required, not +> optional — a write of a `>64 KB` proto to a not-yet-widened `BLOB` column +> truncates silently with no error, and concurrent writes also extend the +> `ALTER`'s lock duration. +> 2. Confirm there are no active writers (e.g. `SHOW PROCESSLIST`). +> 3. Verify you have a backup of the registry database. +> +> Then, to minimize the lock window: +> +> - On large tables, or on managed MySQL (AWS RDS, Aurora) without shell access, +> use an online schema-change tool — +> [`pt-online-schema-change`](https://docs.percona.com/percona-toolkit/pt-online-schema-change.html) +> (Percona Toolkit) or [`gh-ost`](https://github.com/github/gh-ost) — which +> rebuild the table without a long-held lock. For small tables a plain +> `ALTER TABLE` in the maintenance window is fine. +> - Apply one table at a time so a failure is easy to isolate and re-run. +> - Resume jobs only after all `ALTER TABLE` statements complete successfully. +> - Rollback is safe (revert `MODIFY ... BLOB`) **only** while no stored proto +> exceeds 64 KB; otherwise a revert re-introduces truncation. + +```sql +ALTER TABLE projects MODIFY project_proto LONGBLOB NOT NULL; +ALTER TABLE entities MODIFY entity_proto LONGBLOB NOT NULL; +ALTER TABLE data_sources MODIFY data_source_proto LONGBLOB NOT NULL; +ALTER TABLE feature_views MODIFY materialized_intervals LONGBLOB, + MODIFY feature_view_proto LONGBLOB NOT NULL, + MODIFY user_metadata LONGBLOB; +ALTER TABLE stream_feature_views MODIFY feature_view_proto LONGBLOB NOT NULL, + MODIFY user_metadata LONGBLOB; +ALTER TABLE on_demand_feature_views MODIFY feature_view_proto LONGBLOB NOT NULL, + MODIFY user_metadata LONGBLOB; +ALTER TABLE label_views MODIFY feature_view_proto LONGBLOB NOT NULL, + MODIFY user_metadata LONGBLOB; +ALTER TABLE feature_services MODIFY feature_service_proto LONGBLOB NOT NULL; +ALTER TABLE saved_datasets MODIFY saved_dataset_proto LONGBLOB NOT NULL; +ALTER TABLE validation_references MODIFY validation_reference_proto LONGBLOB NOT NULL; +ALTER TABLE managed_infra MODIFY infra_proto LONGBLOB NOT NULL; +ALTER TABLE permissions MODIFY permission_proto LONGBLOB NOT NULL; +-- LARGE TABLE: one row per versioned apply — likely the slowest ALTER. Use +-- pt-online-schema-change or gh-ost if this registry has significant history. +ALTER TABLE feature_view_version_history MODIFY feature_view_proto LONGBLOB NOT NULL; +``` + +Any object whose proto already exceeded 64 KB before the upgrade may have been +stored truncated; re-run `feast apply` for those objects after altering the +columns so the full proto is rewritten. ## Example Usage: Concurrent materialization The SQL Registry should be used when materializing feature views concurrently to ensure correctness of data in the registry. This can be achieved by simply running feast materialize or feature_store.materialize multiple times using a correctly configured feature_store.yaml. This will make each materialization process talk to the registry database concurrently, and ensure the metadata updates are serialized. diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 531ab496776..f3fa840b665 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -17,13 +17,16 @@ String, Table, Text, + bindparam, create_engine, delete, func, insert, select, + text, update, ) +from sqlalchemy.dialects import mysql from sqlalchemy.engine import Engine from sqlalchemy.exc import IntegrityError @@ -87,6 +90,29 @@ metadata = MetaData() +# Serialized protos (and their accompanying metadata blobs) can grow well past +# 64 KB — a single FeatureView proto routinely does. On MySQL, SQLAlchemy's +# LargeBinary maps to BLOB, which silently truncates anything over 64 KB and +# later surfaces as a protobuf DecodeError when the registry is read back +# (e.g. `feast serve` failing to load). Use LONGBLOB (up to 4 GB) on MySQL while +# keeping LargeBinary's default mapping on every other dialect. +# +# "mysql" and "mariadb" are registered separately because SQLAlchemy 2.x reports +# dialect.name == "mariadb" for MariaDB connections, which would otherwise miss +# the variant and fall back to BLOB. The variants are chained (rather than passed +# as variadic dialect names to a single with_variant call) so the expression also +# works on SQLAlchemy 1.4.x, which Feast still supports and which only accepts a +# single dialect name per with_variant call. +# +# NOTE for contributors: any new binary column that stores a serialized proto or +# blob metadata must use ProtoBytes, not LargeBinary directly, or the 64 KB +# MySQL/MariaDB truncation bug reappears silently. +ProtoBytes = ( + LargeBinary() + .with_variant(mysql.LONGBLOB(), "mysql") + .with_variant(mysql.LONGBLOB(), "mariadb") +) + projects = Table( "projects", @@ -94,7 +120,7 @@ Column("project_id", String(255), primary_key=True), Column("project_name", String(255), nullable=False), Column("last_updated_timestamp", BigInteger, nullable=False), - Column("project_proto", LargeBinary, nullable=False), + Column("project_proto", ProtoBytes, nullable=False), ) Index("idx_projects_project_id", projects.c.project_id) @@ -105,7 +131,7 @@ Column("entity_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), - Column("entity_proto", LargeBinary, nullable=False), + Column("entity_proto", ProtoBytes, nullable=False), ) Index("idx_entities_project_id", entities.c.project_id) @@ -116,7 +142,7 @@ Column("data_source_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), - Column("data_source_proto", LargeBinary, nullable=False), + Column("data_source_proto", ProtoBytes, nullable=False), ) Index("idx_data_sources_project_id", data_sources.c.project_id) @@ -127,9 +153,9 @@ Column("feature_view_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), - Column("materialized_intervals", LargeBinary, nullable=True), - Column("feature_view_proto", LargeBinary, nullable=False), - Column("user_metadata", LargeBinary, nullable=True), + Column("materialized_intervals", ProtoBytes, nullable=True), + Column("feature_view_proto", ProtoBytes, nullable=False), + Column("user_metadata", ProtoBytes, nullable=True), ) Index("idx_feature_views_project_id", feature_views.c.project_id) @@ -140,8 +166,8 @@ Column("feature_view_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), - Column("feature_view_proto", LargeBinary, nullable=False), - Column("user_metadata", LargeBinary, nullable=True), + Column("feature_view_proto", ProtoBytes, nullable=False), + Column("user_metadata", ProtoBytes, nullable=True), ) Index("idx_stream_feature_views_project_id", stream_feature_views.c.project_id) @@ -152,8 +178,8 @@ Column("feature_view_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), - Column("feature_view_proto", LargeBinary, nullable=False), - Column("user_metadata", LargeBinary, nullable=True), + Column("feature_view_proto", ProtoBytes, nullable=False), + Column("user_metadata", ProtoBytes, nullable=True), ) Index("idx_on_demand_feature_views_project_id", on_demand_feature_views.c.project_id) @@ -164,8 +190,8 @@ Column("feature_view_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), - Column("feature_view_proto", LargeBinary, nullable=False), - Column("user_metadata", LargeBinary, nullable=True), + Column("feature_view_proto", ProtoBytes, nullable=False), + Column("user_metadata", ProtoBytes, nullable=True), ) Index("idx_label_views_project_id", label_views.c.project_id) @@ -176,7 +202,7 @@ Column("feature_service_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), - Column("feature_service_proto", LargeBinary, nullable=False), + Column("feature_service_proto", ProtoBytes, nullable=False), ) Index("idx_feature_services_project_id", feature_services.c.project_id) @@ -187,7 +213,7 @@ Column("saved_dataset_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), - Column("saved_dataset_proto", LargeBinary, nullable=False), + Column("saved_dataset_proto", ProtoBytes, nullable=False), ) Index("idx_saved_datasets_project_id", saved_datasets.c.project_id) @@ -198,7 +224,7 @@ Column("validation_reference_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), - Column("validation_reference_proto", LargeBinary, nullable=False), + Column("validation_reference_proto", ProtoBytes, nullable=False), ) Index("idx_validation_references_project_id", validation_references.c.project_id) @@ -208,7 +234,7 @@ Column("infra_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), - Column("infra_proto", LargeBinary, nullable=False), + Column("infra_proto", ProtoBytes, nullable=False), ) Index("idx_managed_infra_project_id", managed_infra.c.project_id) @@ -219,7 +245,7 @@ Column("permission_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), - Column("permission_proto", LargeBinary, nullable=False), + Column("permission_proto", ProtoBytes, nullable=False), ) Index("idx_permissions_project_id", permissions.c.project_id) @@ -231,7 +257,7 @@ Column("project_id", String(255), primary_key=True), Column("version_number", Integer, primary_key=True), Column("feature_view_type", String(50), nullable=False), - Column("feature_view_proto", LargeBinary, nullable=False), + Column("feature_view_proto", ProtoBytes, nullable=False), Column("created_timestamp", BigInteger, nullable=False), Column("description", Text, nullable=True), Column("version_id", String(36), nullable=False), @@ -305,6 +331,11 @@ def __init__( else: self.read_engine = self.write_engine metadata.create_all(self.write_engine) + self._warn_if_narrow_blob_columns(self.write_engine) + if self.read_engine is not self.write_engine: + # A read replica can be on a different schema version (e.g. mid + # blue-green switchover), so check it independently. + self._warn_if_narrow_blob_columns(self.read_engine) self.thread_pool_executor_worker_count = ( registry_config.thread_pool_executor_worker_count ) @@ -324,6 +355,94 @@ def __init__( if not self.purge_feast_metadata: self._maybe_init_project_metadata(project) + @staticmethod + def _warn_if_narrow_blob_columns( + engine: Engine, registry_metadata: MetaData = metadata + ) -> None: + """Log an error when a MySQL/MariaDB registry still has narrow BLOB columns. + + ``metadata.create_all`` only creates missing tables; it never widens + columns on tables that already exist. A registry created before the + LONGBLOB fix keeps its 64 KB ``BLOB`` proto columns, which silently + truncate large protos and later fail to deserialize. There is no + automatic migration, so surface the stale schema and point operators at + the documented ``ALTER TABLE`` migration. Logged at ERROR (not WARNING) + so monitoring pipelines that filter below ERROR still catch it. + + This only *reports* the problem; it deliberately does not refuse to + start, since a registry whose protos all fit in 64 KB is unaffected and + an upgrade should not break it. Operators run the documented migration. + + ``registry_metadata`` defaults to this module's ``metadata`` (the source + of truth for registry tables) and is a parameter only to keep the + dependency explicit and unit-testable. + + Runs once per ``SqlRegistry`` construction on MySQL/MariaDB only, via a + single ``information_schema`` query scoped to the registry's own + serialized-proto (``ProtoBytes``) columns. + It is a no-op when the engine URL specifies no default database + (``DATABASE()`` returns NULL, so the scoped query matches nothing). + Best-effort: any failure here must never block registry startup. + """ + if engine.dialect.name not in ("mysql", "mariadb"): + return + try: + registry_tables = { + table.name for table in registry_metadata.tables.values() + } + # Only serialized-proto columns (those typed ProtoBytes) are at risk. + # Scope the query to exactly those table+column names so an unrelated + # BLOB column in a shared schema — or a future non-proto BLOB column + # on a registry table — can't trigger a false-positive warning. + # Identity check works because every proto column reuses the single + # shared ProtoBytes instance (SQLAlchemy stores the type as-is). A + # column typed as plain LargeBinary would not match here; the + # name-suffix vs ProtoBytes drift check in test_sql_registry.py + # guards against that regression. + proto_columns = { + column.name + for table in registry_metadata.tables.values() + for column in table.columns + if column.type is ProtoBytes + } + if not proto_columns: + return + query = text( + "SELECT TABLE_NAME, COLUMN_NAME " + "FROM information_schema.COLUMNS " + "WHERE TABLE_SCHEMA = DATABASE() AND DATA_TYPE = 'blob' " + "AND TABLE_NAME IN :table_names " + "AND COLUMN_NAME IN :column_names" + ).bindparams( + bindparam("table_names", expanding=True), + bindparam("column_names", expanding=True), + ) + with engine.connect() as conn: + rows = conn.execute( + query, + { + "table_names": list(registry_tables), + "column_names": list(proto_columns), + }, + ).fetchall() + stale = [f"{table_name}.{column_name}" for table_name, column_name in rows] + if stale: + # NOTE: keep this doc path in sync with the actual file location. + logger.error( + "SQL registry has %d column(s) still typed BLOB (64 KB cap) " + "on this %s database: %s. Large protos (e.g. a FeatureView) " + "will be silently truncated and fail to deserialize. " + "create_all() does not migrate existing columns; run the " + "ALTER TABLE ... MODIFY ... LONGBLOB migration documented at " + "docs/reference/registries/sql.md to fix this.", + len(stale), + engine.dialect.name, + ", ".join(sorted(stale)), + ) + except Exception as e: + # Diagnostics must never break registry startup. + logger.debug("Could not check registry BLOB column widths: %s", e) + def _sync_feast_metadata_to_projects_table(self): feast_metadata_projects: dict = {} projects_set: set = [] diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index 735d714a1f3..4dcb5a8ba45 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -659,6 +659,113 @@ def test_apply_feature_view_success(test_registry: BaseRegistry): test_registry.teardown() +@pytest.mark.integration +@pytest.mark.parametrize( + "test_registry", + [ + pytest.param( + lazy_fixture("mysql_registry"), + marks=pytest.mark.xdist_group(name="mysql_registry"), + ), + pytest.param( + lazy_fixture("mysql_registry_async"), + marks=pytest.mark.xdist_group(name="mysql_registry"), + ), + ], +) +def test_apply_feature_view_large_proto_roundtrip_mysql(test_registry: BaseRegistry): + """Regression for the MySQL BLOB 64 KB truncation bug. + + The SQL registry stores each object as a serialized proto in a binary + column. With those columns typed BLOB (64 KB cap) on MySQL/MariaDB, a + FeatureView proto larger than 64 KB is silently truncated on write and + later fails to deserialize. The LONGBLOB fix must let a >64 KB proto + round-trip through a live MySQL connection intact. + + Reads use allow_cache=False (the default) so the assertion exercises the + DB column, not the in-memory cache populated at apply time. + """ + batch_source = FileSource( + file_format=ParquetFormat(), + path="file://feast/*", + timestamp_field="ts_col", + created_timestamp_column="timestamp", + ) + entity = Entity(name="large_proto_entity", join_keys=["test"]) + # A ~200 KB tag value pushes the serialized FeatureView proto well past the + # 64 KB BLOB cap, so a truncating column would corrupt it. + large_value = "x" * (200 * 1024) + fv = FeatureView( + name="large_proto_feature_view", + schema=[Field(name="test", dtype=Int64)], + entities=[entity], + tags={"big": large_value}, + source=batch_source, + ttl=timedelta(minutes=5), + ) + project = "project" + + test_registry.apply_feature_view(fv, project) + + # Pass allow_cache=False explicitly so this keeps reading the DB column even + # if the CachingRegistry default ever changes. + read_back = test_registry.get_feature_view( + "large_proto_feature_view", project, allow_cache=False + ) + # Equality proves the >64 KB proto round-tripped without truncation. + assert read_back.tags["big"] == large_value + + test_registry.delete_feature_view("large_proto_feature_view", project) + test_registry.teardown() + + +@pytest.mark.integration +@pytest.mark.parametrize( + "test_registry", + [ + pytest.param( + lazy_fixture("mysql_registry"), + marks=pytest.mark.xdist_group(name="mysql_registry"), + ), + ], +) +def test_warn_if_narrow_blob_columns_detects_real_blob(test_registry, caplog): + """Exercise the startup BLOB diagnostic against a real MySQL schema. + + The unit tests only mock the engine, so the live information_schema query + (DATA_TYPE = 'blob' filter + table-name scoping) is otherwise unverified. + Narrow a real proto column back to BLOB and assert the diagnostic fires; + then widen it to LONGBLOB and assert it stays silent (confirming the filter + does not also match LONGBLOB, i.e. no false positive on a migrated registry). + """ + from sqlalchemy import text + + engine = test_registry.write_engine + + try: + # Narrow a real proto column back to BLOB NOT NULL to mirror a pre-fix + # (legacy) registry schema. + with engine.begin() as conn: + conn.execute(text("ALTER TABLE entities MODIFY entity_proto BLOB NOT NULL")) + with caplog.at_level(logging.ERROR): + SqlRegistry._warn_if_narrow_blob_columns(engine) + assert "still typed BLOB" in caplog.text + assert "entities.entity_proto" in caplog.text + + # Widen back to LONGBLOB and confirm the DATA_TYPE='blob' filter does not + # also match LONGBLOB (no false positive on a migrated registry). + caplog.clear() + with engine.begin() as conn: + conn.execute( + text("ALTER TABLE entities MODIFY entity_proto LONGBLOB NOT NULL") + ) + with caplog.at_level(logging.ERROR): + SqlRegistry._warn_if_narrow_blob_columns(engine) + assert "still typed BLOB" not in caplog.text + finally: + test_registry.teardown() + + @pytest.mark.integration @pytest.mark.parametrize( "test_registry", diff --git a/sdk/python/tests/unit/infra/registry/test_sql_registry.py b/sdk/python/tests/unit/infra/registry/test_sql_registry.py index 1a3ec92a4a6..044dbfda36a 100644 --- a/sdk/python/tests/unit/infra/registry/test_sql_registry.py +++ b/sdk/python/tests/unit/infra/registry/test_sql_registry.py @@ -12,10 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import sys import tempfile import types from datetime import timedelta +from unittest.mock import MagicMock import dill import pytest @@ -26,7 +28,12 @@ from feast.errors import ConflictingFeatureViewNames from feast.feature_view import FeatureView from feast.infra.offline_stores.file_source import FileSource -from feast.infra.registry.sql import SqlRegistry, SqlRegistryConfig, feature_views +from feast.infra.registry.sql import ( + ProtoBytes, + SqlRegistry, + SqlRegistryConfig, + feature_views, +) from feast.protos.feast.core.Transformation_pb2 import ( FeatureTransformationV2, UserDefinedFunctionV2, @@ -58,6 +65,195 @@ def shared_sqlite_db_path(): yield path +def test_proto_columns_use_longblob_on_mysql(): + """On MySQL and MariaDB, serialized-proto columns must compile to LONGBLOB. + + MySQL maps SQLAlchemy's LargeBinary to BLOB, which caps at 64 KB and + silently truncates larger protos (e.g. a FeatureView proto), later + surfacing as a protobuf DecodeError. The dialect-aware ProtoBytes type + must emit LONGBLOB so large protos round-trip intact. MariaDB is checked + separately because SQLAlchemy 2.x reports dialect.name == "mariadb", which + would miss a "mysql"-only variant. + + NOTE: this asserts only the compiled DDL type. The live >64 KB proto + round-trip through a real MySQL/MariaDB connection is covered by + test_apply_feature_view_large_proto_roundtrip_mysql in + tests/integration/registration/test_universal_registry.py. + """ + from sqlalchemy.dialects import mysql, postgresql, sqlite + from sqlalchemy.schema import CreateTable + + # The MariaDB dialect lives at a SQLAlchemy-version-specific path; import it + # defensively so the rest of this test still runs on SQLAlchemy 1.4.x (a + # version the production code and dependency floor, >=1.4.19, support). The + # always-present mysql dialect keeps the LONGBLOB assertion unconditional. + try: + from sqlalchemy.dialects.mysql.mariadb import MariaDBDialect + + mariadb_dialect = MariaDBDialect() + except ImportError: # pragma: no cover - only on SQLAlchemy builds w/o MariaDB + mariadb_dialect = None + + # ProtoBytes columns share the same MetaData as every registry table, so + # reach it through an already-imported table instead of a second import. + registry_metadata = feature_views.metadata + + # Every column whose name ends in these suffixes stores a (potentially + # large) serialized proto or its companion metadata blob. + binary_suffixes = ("_proto", "_intervals", "user_metadata") + + checked = 0 + for table in registry_metadata.tables.values(): + binary_columns = [ + c.name for c in table.columns if c.name.endswith(binary_suffixes) + ] + # Guard against drift between this name-suffix heuristic and the + # production type predicate (`column.type is ProtoBytes`) used by the + # startup diagnostic: every suffix-matched column must be typed + # ProtoBytes, and no ProtoBytes column may be missed by the suffix list. + # Catches a proto column accidentally typed as plain LargeBinary. + proto_typed_columns = [c.name for c in table.columns if c.type is ProtoBytes] + assert set(binary_columns) == set(proto_typed_columns), ( + f"{table.name}: name-suffix vs ProtoBytes-typed columns disagree — " + f"suffix={sorted(binary_columns)} typed={sorted(proto_typed_columns)}" + ) + if not binary_columns: + continue + + mysql_ddl = str(CreateTable(table).compile(dialect=mysql.dialect())) + sqlite_ddl = str(CreateTable(table).compile(dialect=sqlite.dialect())) + postgres_ddl = str(CreateTable(table).compile(dialect=postgresql.dialect())) + mariadb_ddl = ( + str(CreateTable(table).compile(dialect=mariadb_dialect)) + if mariadb_dialect is not None + else None + ) + + for col_name in binary_columns: + # MySQL and MariaDB must use LONGBLOB for the column (not bare BLOB). + assert f"{col_name} LONGBLOB" in mysql_ddl, ( + f"{table.name}.{col_name} should compile to LONGBLOB on MySQL, " + f"got:\n{mysql_ddl}" + ) + if mariadb_ddl is not None: + assert f"{col_name} LONGBLOB" in mariadb_ddl, ( + f"{table.name}.{col_name} should compile to LONGBLOB on " + f"MariaDB, got:\n{mariadb_ddl}" + ) + # Other dialects keep LargeBinary's default mapping: BLOB on SQLite, + # BYTEA on PostgreSQL. + assert f"{col_name} BLOB" in sqlite_ddl, ( + f"{table.name}.{col_name} should compile to BLOB on SQLite, " + f"got:\n{sqlite_ddl}" + ) + assert f"{col_name} BYTEA" in postgres_ddl, ( + f"{table.name}.{col_name} should compile to BYTEA on PostgreSQL, " + f"got:\n{postgres_ddl}" + ) + checked += 1 + + assert checked > 0, "expected at least one serialized-proto column to check" + + +def _mock_mysql_engine(dialect_name, stale_rows): + """Build a MagicMock Engine that returns ``stale_rows`` from the BLOB query.""" + engine = MagicMock() + engine.dialect.name = dialect_name + conn = MagicMock() + engine.connect.return_value.__enter__.return_value = conn + conn.execute.return_value.fetchall.return_value = stale_rows + return engine + + +@pytest.mark.parametrize("dialect_name", ["mysql", "mariadb"]) +def test_warn_if_narrow_blob_columns_errors_on_stale(caplog, dialect_name): + """On MySQL/MariaDB, stale BLOB registry columns log an error at startup.""" + engine = _mock_mysql_engine( + dialect_name, + [("feature_views", "feature_view_proto"), ("entities", "entity_proto")], + ) + + with caplog.at_level(logging.ERROR): + SqlRegistry._warn_if_narrow_blob_columns(engine) + + assert "still typed BLOB" in caplog.text + assert "feature_views.feature_view_proto" in caplog.text + assert "entities.entity_proto" in caplog.text + # Reported at ERROR so monitoring that filters below ERROR still catches it. + assert any(r.levelno == logging.ERROR for r in caplog.records) + + +@pytest.mark.parametrize("dialect_name", ["mysql", "mariadb"]) +def test_warn_if_narrow_blob_columns_silent_when_migrated(caplog, dialect_name): + """A fully-migrated MySQL/MariaDB registry (no BLOB columns) stays silent.""" + engine = _mock_mysql_engine(dialect_name, []) + + with caplog.at_level(logging.ERROR): + SqlRegistry._warn_if_narrow_blob_columns(engine) + + assert "still typed BLOB" not in caplog.text + + +def test_warn_if_narrow_blob_columns_does_not_refuse_to_start(caplog): + """Stale columns are reported but must never raise — an upgrade can't break + a registry whose protos all fit in 64 KB.""" + engine = _mock_mysql_engine("mysql", [("feature_views", "feature_view_proto")]) + + # Must not raise. + SqlRegistry._warn_if_narrow_blob_columns(engine) + + +def test_warn_if_narrow_blob_columns_skips_non_mysql(): + """Non-MySQL/MariaDB dialects skip the information_schema query entirely.""" + engine = MagicMock() + engine.dialect.name = "sqlite" + + SqlRegistry._warn_if_narrow_blob_columns(engine) + + engine.connect.assert_not_called() + + +def test_init_runs_blob_check_on_read_and_write_engines(tmp_path, monkeypatch): + """When read_path differs from path, the BLOB check runs on both engines.""" + checked_engines = [] + monkeypatch.setattr( + SqlRegistry, + "_warn_if_narrow_blob_columns", + staticmethod(lambda engine, *args, **kwargs: checked_engines.append(engine)), + ) + # Same sqlite file for both engines so create_all (write) and the cache + # refresh (read) both see the registry tables, while remaining two distinct + # Engine objects. + db_url = f"sqlite:///{tmp_path / 'registry.db'}" + config = SqlRegistryConfig( + registry_type="sql", + path=db_url, + read_path=db_url, + purge_feast_metadata=False, + ) + + registry = SqlRegistry(config, "test_project", None) + try: + assert registry.read_engine is not registry.write_engine + assert checked_engines == [registry.write_engine, registry.read_engine] + finally: + registry.teardown() + + +def test_warn_if_narrow_blob_columns_swallows_query_errors(caplog): + """A failing diagnostic query must never propagate out of registry init.""" + engine = MagicMock() + engine.dialect.name = "mysql" + engine.connect.return_value.__enter__.side_effect = Exception("connection refused") + + with caplog.at_level(logging.ERROR): + # Must not raise even though the query path blows up. + SqlRegistry._warn_if_narrow_blob_columns(engine) + + # The failure is demoted to a debug note, never an error/exception. + assert "still typed BLOB" not in caplog.text + + def test_sql_registry(sqlite_registry): """ Test the SQL registry diff --git a/skills/feast-architecture/SKILL.md b/skills/feast-architecture/SKILL.md index a2132983d61..5a6049d237b 100644 --- a/skills/feast-architecture/SKILL.md +++ b/skills/feast-architecture/SKILL.md @@ -87,6 +87,11 @@ registry.apply_feature_view(feature_view, project) # → registry_store.update_registry_proto(proto) ``` +**How the SQL backend works:** +- Per-object tables, each storing that object's serialized proto in a binary column. +- **Binary proto columns must use `ProtoBytes`, not `LargeBinary` directly** (defined at the top of `infra/registry/sql.py`). `ProtoBytes` emits `LONGBLOB` on MySQL and MariaDB and falls back to `LargeBinary`'s default on every other dialect (`BLOB` on SQLite, `BYTEA` on PostgreSQL). Plain `LargeBinary` maps to MySQL `BLOB` (64 KB cap), which silently truncates large protos (e.g. a `FeatureView`) and later fails to deserialize. Any new serialized-proto/blob-metadata column reintroduces that bug if it uses `LargeBinary`. +- All tables are declared on the module-level `metadata` object in `sql.py` (the source of truth). `metadata.create_all` only creates missing tables — it never widens existing columns, so schema changes to existing registries require a manual migration (see `docs/reference/registries/sql.md`). On MySQL/MariaDB, `SqlRegistry._warn_if_narrow_blob_columns` logs an error at startup for any registry proto column still typed as the narrow `BLOB` — a new column is covered automatically as long as it's typed `ProtoBytes` (the diagnostic selects columns by `column.type is ProtoBytes`); a column typed as plain `LargeBinary` would be silently missed. + **Supporting files:** - `infra/registry/base_registry.py` — abstract interface - `infra/registry/proto_registry_utils.py` — proto serialization helpers diff --git a/skills/feast-testing/SKILL.md b/skills/feast-testing/SKILL.md index 5d778a9a33e..2704ff3feb2 100644 --- a/skills/feast-testing/SKILL.md +++ b/skills/feast-testing/SKILL.md @@ -129,6 +129,28 @@ def test_apply_feature_view(tmp_path): assert stored.name == "driver_id" ``` +### Pattern: SQL registry — dialect DDL and startup-check tests + +See `sdk/python/tests/unit/infra/registry/test_sql_registry.py`. + +- **Assert dialect-specific DDL without a live DB**: compile a table for a target dialect and check the emitted column type. Useful for column-type rules like `ProtoBytes` → `LONGBLOB` on MySQL/MariaDB. + ```python + from sqlalchemy.dialects import mysql + from sqlalchemy.schema import CreateTable + ddl = str(CreateTable(feature_views).compile(dialect=mysql.dialect())) + assert "feature_view_proto LONGBLOB" in ddl + ``` +- **Unit-test a startup diagnostic with a mock engine**: stub `engine.dialect.name` and the `engine.connect()` context manager so no DB is needed. + ```python + engine = MagicMock() + engine.dialect.name = "mysql" + engine.connect.return_value.__enter__.return_value.execute.return_value.fetchall.return_value = [ + ("feature_views", "feature_view_proto"), + ] + SqlRegistry._warn_if_narrow_blob_columns(engine) + ``` +- **Live round-trip behavior** (e.g. a >64 KB proto surviving the column) belongs in an integration test against the `mysql_registry` fixture in `tests/integration/registration/test_universal_registry.py` (DDL-compile tests can't catch runtime truncation). + ### Pattern: testing FeatureStore end-to-end (unit level) ```python