diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 45e7ddd3e71..00b468786cf 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -44,9 +44,9 @@ jobs: make install-go-ci-dependencies COMPILE_GO=true python setup.py develop CGO_LDFLAGS_ALLOW=".*" COMPILE_GO=True python setup.py build_ext --inplace - - name: Test Milvus tests + - name: Test EG tests if: matrix.os == 'ubuntu-latest' - run: python -m pytest -n 1 --color=yes sdk/python/tests/expediagroup/test_eg_milvus_online_store.py + run: python -m pytest -n 1 --color=yes sdk/python/tests/expediagroup/ - name: Test Python if: matrix.os == 'ubuntu-latest' run: make test-python-unit @@ -133,4 +133,4 @@ jobs: - uses: actions/upload-artifact@v4 with: name: java-coverage-report - path: ${{ github.workspace }}/docs/coverage/java/target/site/jacoco-aggregate/ + path: ${{ github.workspace }}/docs/coverage/java/target/site/jacoco-aggregate/ \ No newline at end of file diff --git a/Makefile b/Makefile index 7f7f82ad6e3..43e404ef961 100644 --- a/Makefile +++ b/Makefile @@ -563,4 +563,4 @@ build-helm-docs: # Note: requires node and yarn to be installed build-ui: - cd $(ROOT_DIR)/sdk/python/feast/ui && yarn upgrade @feast-dev/feast-ui --latest && yarn install && npm run build --omit=dev + cd $(ROOT_DIR)/sdk/python/feast/ui && yarn upgrade @feast-dev/feast-ui --latest && yarn install && npm run build --omit=dev \ No newline at end of file diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 17f64ba0346..a0a879c95a0 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -715,7 +715,10 @@ def update( project = config.project for table in tables_to_keep: - self._create_table(config, project, table) + if self._table_exists(config, project, table): + self._alter_table(config, project, table) + else: + self._create_table(config, project, table) for table in tables_to_delete: self._drop_table(config, project, table) @@ -899,6 +902,51 @@ def _create_table( ) session.execute(create_cql) + def _resolve_table_names( + self, config: RepoConfig, project: str, table: FeatureView + ) -> Tuple[str, str]: + """ + Returns (fqtable, plain_table_name) for a given FeatureView, + where fqtable is '"keyspace"."table"' and plain_table_name + is the lower-cased unquoted table identifier. + """ + fqtable = CassandraOnlineStore._fq_table_name( + self._keyspace, + project, + table, + config.online_store.table_name_format_version, + ) + # extract bare identifier: split off keyspace, strip quotes, lower-case + quoted = fqtable.split(".", 1)[1] + plain_table_name = quoted.strip('"') + return fqtable, plain_table_name + + def _table_exists( + self, config: RepoConfig, project: str, table: FeatureView + ) -> bool: + self._get_session(config) + _, plain_table_name = self._resolve_table_names(config, project, table) + ks_meta = self._cluster.metadata.keyspaces[self._keyspace] + return plain_table_name in ks_meta.tables + + def _alter_table(self, config: RepoConfig, project: str, table: FeatureView): + session = self._get_session(config) + fqtable, plain_table_name = self._resolve_table_names(config, project, table) + + ks_meta = self._cluster.metadata.keyspaces[self._keyspace] + existing_cols = set(ks_meta.tables[plain_table_name].columns.keys()) + + desired_cols = {f.name for f in table.features} + new_cols = desired_cols - existing_cols + if new_cols: + cql_type = "BLOB" # Default type for features + col_defs = ", ".join(f"{col} {cql_type}" for col in new_cols) + alter_cql = f"ALTER TABLE {fqtable} ADD ({col_defs})" + session.execute(alter_cql) + logger.info( + f"Added columns [{', '.join(sorted(new_cols))}] to table: {fqtable}" + ) + def _build_sorted_table_cql( self, project: str, table: SortedFeatureView, fqtable: str ) -> str: diff --git a/sdk/python/tests/expediagroup/elasticsearch_online_store_creator.py b/sdk/python/tests/expediagroup/elasticsearch_online_store_creator.py index ee5361a600d..d846c50987d 100644 --- a/sdk/python/tests/expediagroup/elasticsearch_online_store_creator.py +++ b/sdk/python/tests/expediagroup/elasticsearch_online_store_creator.py @@ -17,7 +17,7 @@ def __init__(self, project_name: str): self.es_port = 9200 self.elasticsearch_container = ElasticSearchContainer( image="docker.elastic.co/elasticsearch/elasticsearch:8.8.2", - port_to_expose=self.es_port, + port=self.es_port, ) def create_online_store(self): diff --git a/sdk/python/tests/expediagroup/test_cassandra_online_store.py b/sdk/python/tests/expediagroup/test_cassandra_online_store.py index 452362c39e9..32470117538 100644 --- a/sdk/python/tests/expediagroup/test_cassandra_online_store.py +++ b/sdk/python/tests/expediagroup/test_cassandra_online_store.py @@ -92,6 +92,25 @@ def repo_config(cassandra_online_store_config, setup_keyspace) -> RepoConfig: ) +@pytest.fixture(scope="session") +def long_name_repo_config(cassandra_online_store_config, setup_keyspace) -> RepoConfig: + return RepoConfig( + registry=REGISTRY, + project=PROJECT, + provider=PROVIDER, + online_store=CassandraOnlineStoreConfig( + hosts=cassandra_online_store_config["hosts"], + port=cassandra_online_store_config["port"], + keyspace=setup_keyspace, + table_name_format_version=cassandra_online_store_config.get( + "table_name_format_version", 2 + ), + ), + offline_store=DaskOfflineStoreConfig(), + entity_key_serialization_version=ENTITY_KEY_SERIALIZATION_VERSION, + ) + + @pytest.fixture(scope="session") def online_store(repo_config) -> CassandraOnlineStore: store = CassandraOnlineStore() @@ -199,8 +218,8 @@ def test_create_table_from_sorted_feature_view( expected_columns = { "entity_key": "text", - "feature1": "bigint", - "feature2": "list", + "feature1": "blob", + "feature2": "blob", "sort_key1": "bigint", "sort_key2": "text", "event_ts": "timestamp", @@ -741,3 +760,197 @@ def _create_n_test_sample_features_all_datatypes(self, n=10): None, ) ] + + +def test_update_alters_existing_table_adds_new_column( + cassandra_session, repo_config, online_store +): + session, keyspace = cassandra_session + + fv1 = SortedFeatureView( + name="fv_alter_test", + entities=[Entity(name="id")], + source=FileSource(name="src", path="x.parquet", timestamp_field="ts"), + schema=[ + Field(name="sort_key", dtype=Int32), + Field(name="f1", dtype=String), + ], + sort_keys=[ + SortKey( + name="sort_key", + value_type=ValueType.INT32, + default_sort_order=SortOrder.Enum.ASC, + ) + ], + ) + + online_store._create_table(repo_config, repo_config.project, fv1) + + online_store_table = ( + online_store._fq_table_name( + keyspace, + repo_config.project, + fv1, + repo_config.online_store.table_name_format_version, + ) + .split(".", 1)[1] + .strip('"') + ) + + cols = session.execute( + textwrap.dedent(f""" + SELECT column_name + FROM system_schema.columns + WHERE keyspace_name='{keyspace}' AND table_name='{online_store_table}'; + """) + ) + names = {r.column_name for r in cols} + assert "f1" in names and "f2" not in names + + fv2 = SortedFeatureView( + name="fv_alter_test", + entities=[Entity(name="id")], + source=FileSource(name="src", path="x.parquet", timestamp_field="ts"), + schema=[ + Field(name="sort_key", dtype=Int32), + Field(name="f1", dtype=String), + Field(name="f2", dtype=String), + ], + sort_keys=fv1.sort_keys, + ) + + online_store.update( + config=repo_config, + tables_to_delete=[], + tables_to_keep=[fv2], + entities_to_delete=[], + entities_to_keep=[], + partial=False, + ) + + cols = session.execute( + textwrap.dedent(f""" + SELECT column_name + FROM system_schema.columns + WHERE keyspace_name='{keyspace}' AND table_name='{online_store_table}'; + """) + ) + names = {r.column_name for r in cols} + assert {"f1", "f2", "sort_key", "entity_key", "event_ts", "created_ts"}.issubset( + names + ) + + +def test_update_noop_when_schema_unchanged( + cassandra_session, repo_config, online_store +): + session, keyspace = cassandra_session + + fv = SortedFeatureView( + name="fv_noop_test", + entities=[Entity(name="id")], + source=FileSource(name="src", path="x.parquet", timestamp_field="ts"), + schema=[ + Field(name="sort_key", dtype=Int32), + Field(name="f1", dtype=String), + ], + sort_keys=[ + SortKey( + name="sort_key", + value_type=ValueType.INT32, + default_sort_order=SortOrder.Enum.ASC, + ) + ], + ) + + online_store.update( + config=repo_config, + tables_to_delete=[], + tables_to_keep=[fv], + entities_to_delete=[], + entities_to_keep=[], + partial=False, + ) + + online_store_table = ( + online_store._fq_table_name( + keyspace, + repo_config.project, + fv, + repo_config.online_store.table_name_format_version, + ) + .split(".", 1)[1] + .strip('"') + ) + + before = { + r.column_name + for r in session.execute( + f"SELECT column_name FROM system_schema.columns " + f"WHERE keyspace_name='{keyspace}' AND table_name='{online_store_table}';" + ) + } + + # run update again with identical fv + online_store.update( + config=repo_config, + tables_to_delete=[], + tables_to_keep=[fv], + entities_to_delete=[], + entities_to_keep=[], + partial=False, + ) + + after = { + r.column_name + for r in session.execute( + f"SELECT column_name FROM system_schema.columns " + f"WHERE keyspace_name='{keyspace}' AND table_name='{online_store_table}';" + ) + } + + assert before == after + + +def test_resolve_table_names_v2_preserves_case( + cassandra_session, long_name_repo_config, online_store +): + session, keyspace = cassandra_session + # build a feature view name so long that V2 hashing will trigger mixed-case + fv_name = "VeryLongFeatureViewName_" + "X" * 60 + sfv = SortedFeatureView( + name=fv_name, + entities=[Entity(name="id", join_keys=["id"])], + source=FileSource(name="src", path="path", timestamp_field="ts"), + schema=[ + Field(name="ts", dtype=UnixTimestamp), + ], + sort_keys=[ + SortKey( + name="ts", + value_type=ValueType.UNIX_TIMESTAMP, + default_sort_order=SortOrder.Enum.ASC, + ) + ], + ) + + # compute the fully‐qualified name + fqtable = online_store._fq_table_name( + keyspace, + long_name_repo_config.project, + sfv, + long_name_repo_config.online_store.table_name_format_version, + ) + + quoted = fqtable.split(".", 1)[1] + expected_plain = quoted.strip('"') + + _, actual_plain = online_store._resolve_table_names( + long_name_repo_config, long_name_repo_config.project, sfv + ) + + assert actual_plain == expected_plain, ( + f"resolve_table_names lowercased the identifier:\n" + f" expected: {expected_plain}\n" + f" got: {actual_plain}" + ) diff --git a/sdk/python/tests/expediagroup/test_elasticsearch_online_store.py b/sdk/python/tests/expediagroup/test_elasticsearch_online_store.py index 9fe7b54780c..def52e263d2 100644 --- a/sdk/python/tests/expediagroup/test_elasticsearch_online_store.py +++ b/sdk/python/tests/expediagroup/test_elasticsearch_online_store.py @@ -98,7 +98,7 @@ def setup_method(self, repo_config): @pytest.mark.parametrize("index_params", index_param_list) def test_elasticsearch_update_add_index(self, repo_config, caplog, index_params): - dimensions = 16 + dimensions = "16" vector_type = Float32 vector_tags = { "is_primary": "False",