From 37923de7cf0d23cafaa9effc6fc485b399ba7652 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 12 Sep 2025 21:46:39 -0700 Subject: [PATCH 1/7] Add FeastDataFrame Signed-off-by: HaoXuAI --- sdk/python/feast/__init__.py | 3 + sdk/python/feast/dataframe.py | 83 ++++++++++++++++++++++ sdk/python/tests/unit/test_dataframe.py | 94 +++++++++++++++++++++++++ 3 files changed, 180 insertions(+) create mode 100644 sdk/python/feast/dataframe.py create mode 100644 sdk/python/tests/unit/test_dataframe.py diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index a2c59360908..6b6154ed270 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -11,6 +11,7 @@ from .batch_feature_view import BatchFeatureView from .data_source import KafkaSource, KinesisSource, PushSource, RequestSource +from .dataframe import DataFrameEngine, FeastDataFrame from .entity import Entity from .feature import Feature from .feature_service import FeatureService @@ -32,9 +33,11 @@ __all__ = [ "BatchFeatureView", + "DataFrameEngine", "Entity", "KafkaSource", "KinesisSource", + "FeastDataFrame", "Feature", "Field", "FeatureService", diff --git a/sdk/python/feast/dataframe.py b/sdk/python/feast/dataframe.py new file mode 100644 index 00000000000..0e673530486 --- /dev/null +++ b/sdk/python/feast/dataframe.py @@ -0,0 +1,83 @@ +"""FeastDataFrame: A lightweight container for DataFrame-like objects in Feast.""" + +from enum import Enum +from typing import Any, Dict, Optional, Union + +import pandas as pd +import pyarrow as pa + + +class DataFrameEngine(str, Enum): + """Supported DataFrame engines.""" + + PANDAS = "pandas" + SPARK = "spark" + DASK = "dask" + RAY = "ray" + ARROW = "arrow" + POLARS = "polars" + UNKNOWN = "unknown" + + +class FeastDataFrame: + """ + A lightweight container for DataFrame-like objects in Feast. + + This class wraps any DataFrame implementation and provides metadata + about the engine type for proper routing in Feast's processing pipeline. + """ + + def __init__( + self, + data: Any, + engine: Optional[DataFrameEngine] = None, + metadata: Optional[Dict[str, Any]] = None, + ): + """ + Initialize a FeastDataFrame. + + Args: + data: The wrapped DataFrame object (pandas, Spark, Dask, etc.) + engine: Explicitly specify the engine type (auto-detected if None) + metadata: Additional metadata (schema hints, etc.) + """ + self.data = data + self.metadata = metadata or {} + self._engine = engine or self._detect_engine() + + def _detect_engine(self) -> DataFrameEngine: + """Auto-detect the DataFrame engine based on type.""" + if isinstance(self.data, pd.DataFrame): + return DataFrameEngine.PANDAS + elif isinstance(self.data, pa.Table): + return DataFrameEngine.ARROW + + # For optional dependencies, check module name to avoid import errors + module = type(self.data).__module__ + if "pyspark" in module: + return DataFrameEngine.SPARK + elif "dask" in module: + return DataFrameEngine.DASK + elif "ray" in module: + return DataFrameEngine.RAY + elif "polars" in module: + return DataFrameEngine.POLARS + else: + return DataFrameEngine.UNKNOWN + + @property + def engine(self) -> DataFrameEngine: + """Get the detected or specified engine type.""" + return self._engine + + def __repr__(self): + return f"FeastDataFrame(engine={self.engine}, type={type(self.data).__name__})" + + @property + def is_lazy(self) -> bool: + """Check if the underlying DataFrame is lazy (Spark, Dask, Ray).""" + return self.engine in [ + DataFrameEngine.SPARK, + DataFrameEngine.DASK, + DataFrameEngine.RAY, + ] \ No newline at end of file diff --git a/sdk/python/tests/unit/test_dataframe.py b/sdk/python/tests/unit/test_dataframe.py new file mode 100644 index 00000000000..eef3d288193 --- /dev/null +++ b/sdk/python/tests/unit/test_dataframe.py @@ -0,0 +1,94 @@ +"""Unit tests for FeastDataFrame.""" + +import pandas as pd +import pyarrow as pa +import pytest + +from feast.dataframe import DataFrameEngine, FeastDataFrame + + +class TestFeastDataFrame: + """Test suite for FeastDataFrame functionality.""" + + def test_pandas_detection(self): + """Test auto-detection of pandas DataFrame.""" + df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) + feast_df = FeastDataFrame(df) + + assert feast_df.engine == DataFrameEngine.PANDAS + assert not feast_df.is_lazy + assert isinstance(feast_df.data, pd.DataFrame) + + def test_arrow_detection(self): + """Test auto-detection of Arrow Table.""" + table = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]}) + feast_df = FeastDataFrame(table) + + assert feast_df.engine == DataFrameEngine.ARROW + assert not feast_df.is_lazy + assert isinstance(feast_df.data, pa.Table) + + def test_explicit_engine(self): + """Test explicit engine specification.""" + data = {"mock": "data"} + feast_df = FeastDataFrame(data, engine=DataFrameEngine.SPARK) + + assert feast_df.engine == DataFrameEngine.SPARK + assert feast_df.is_lazy + + def test_unknown_engine(self): + """Test handling of unknown DataFrame types.""" + data = {"some": "dict"} + feast_df = FeastDataFrame(data) + + assert feast_df.engine == DataFrameEngine.UNKNOWN + + def test_metadata(self): + """Test metadata handling.""" + df = pd.DataFrame({"a": [1, 2, 3]}) + metadata = {"features": ["a"], "source": "test"} + feast_df = FeastDataFrame(df, metadata=metadata) + + assert feast_df.metadata == metadata + assert feast_df.metadata["features"] == ["a"] + + def test_repr(self): + """Test string representation.""" + df = pd.DataFrame({"a": [1, 2, 3]}) + feast_df = FeastDataFrame(df) + + repr_str = repr(feast_df) + assert "FeastDataFrame" in repr_str + assert "engine=pandas" in repr_str + assert "DataFrame" in repr_str + + @pytest.mark.parametrize( + "engine,expected_lazy", + [ + (DataFrameEngine.PANDAS, False), + (DataFrameEngine.ARROW, False), + (DataFrameEngine.POLARS, False), + (DataFrameEngine.SPARK, True), + (DataFrameEngine.DASK, True), + (DataFrameEngine.RAY, True), + (DataFrameEngine.UNKNOWN, False), + ], + ) + def test_is_lazy_property(self, engine, expected_lazy): + """Test is_lazy property for different engines.""" + feast_df = FeastDataFrame({"mock": "data"}, engine=engine) + assert feast_df.is_lazy == expected_lazy + + def test_polars_detection(self): + """Test detection of polars DataFrame (using mock).""" + # Mock polars DataFrame + class MockPolarsDF: + def __init__(self): + self.__module__ = "polars.dataframe.frame" + self.__class__.__name__ = "DataFrame" + + polars_df = MockPolarsDF() + feast_df = FeastDataFrame(polars_df) + + assert feast_df.engine == DataFrameEngine.POLARS + assert not feast_df.is_lazy From e965911b21b131501fd62ec82cccb6bbaeb74cca Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 12 Sep 2025 21:48:13 -0700 Subject: [PATCH 2/7] linting Signed-off-by: HaoXuAI --- sdk/python/feast/dataframe.py | 6 +++--- sdk/python/tests/unit/test_dataframe.py | 17 +++++++++-------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/dataframe.py b/sdk/python/feast/dataframe.py index 0e673530486..007faecb01d 100644 --- a/sdk/python/feast/dataframe.py +++ b/sdk/python/feast/dataframe.py @@ -1,7 +1,7 @@ """FeastDataFrame: A lightweight container for DataFrame-like objects in Feast.""" from enum import Enum -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, Optional import pandas as pd import pyarrow as pa @@ -51,7 +51,7 @@ def _detect_engine(self) -> DataFrameEngine: return DataFrameEngine.PANDAS elif isinstance(self.data, pa.Table): return DataFrameEngine.ARROW - + # For optional dependencies, check module name to avoid import errors module = type(self.data).__module__ if "pyspark" in module: @@ -80,4 +80,4 @@ def is_lazy(self) -> bool: DataFrameEngine.SPARK, DataFrameEngine.DASK, DataFrameEngine.RAY, - ] \ No newline at end of file + ] diff --git a/sdk/python/tests/unit/test_dataframe.py b/sdk/python/tests/unit/test_dataframe.py index eef3d288193..66d5e09585a 100644 --- a/sdk/python/tests/unit/test_dataframe.py +++ b/sdk/python/tests/unit/test_dataframe.py @@ -14,7 +14,7 @@ def test_pandas_detection(self): """Test auto-detection of pandas DataFrame.""" df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) feast_df = FeastDataFrame(df) - + assert feast_df.engine == DataFrameEngine.PANDAS assert not feast_df.is_lazy assert isinstance(feast_df.data, pd.DataFrame) @@ -23,7 +23,7 @@ def test_arrow_detection(self): """Test auto-detection of Arrow Table.""" table = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]}) feast_df = FeastDataFrame(table) - + assert feast_df.engine == DataFrameEngine.ARROW assert not feast_df.is_lazy assert isinstance(feast_df.data, pa.Table) @@ -32,7 +32,7 @@ def test_explicit_engine(self): """Test explicit engine specification.""" data = {"mock": "data"} feast_df = FeastDataFrame(data, engine=DataFrameEngine.SPARK) - + assert feast_df.engine == DataFrameEngine.SPARK assert feast_df.is_lazy @@ -40,7 +40,7 @@ def test_unknown_engine(self): """Test handling of unknown DataFrame types.""" data = {"some": "dict"} feast_df = FeastDataFrame(data) - + assert feast_df.engine == DataFrameEngine.UNKNOWN def test_metadata(self): @@ -48,7 +48,7 @@ def test_metadata(self): df = pd.DataFrame({"a": [1, 2, 3]}) metadata = {"features": ["a"], "source": "test"} feast_df = FeastDataFrame(df, metadata=metadata) - + assert feast_df.metadata == metadata assert feast_df.metadata["features"] == ["a"] @@ -56,7 +56,7 @@ def test_repr(self): """Test string representation.""" df = pd.DataFrame({"a": [1, 2, 3]}) feast_df = FeastDataFrame(df) - + repr_str = repr(feast_df) assert "FeastDataFrame" in repr_str assert "engine=pandas" in repr_str @@ -81,14 +81,15 @@ def test_is_lazy_property(self, engine, expected_lazy): def test_polars_detection(self): """Test detection of polars DataFrame (using mock).""" + # Mock polars DataFrame class MockPolarsDF: def __init__(self): self.__module__ = "polars.dataframe.frame" self.__class__.__name__ = "DataFrame" - + polars_df = MockPolarsDF() feast_df = FeastDataFrame(polars_df) - + assert feast_df.engine == DataFrameEngine.POLARS assert not feast_df.is_lazy From bc7f7f428aa79deb8f423575d11994434b3c0f18 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 12 Sep 2025 21:52:08 -0700 Subject: [PATCH 3/7] linting Signed-off-by: HaoXuAI --- sdk/python/feast/online_response.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index 967c507c6a0..8f359e581b6 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Any, Dict, List, Union +from typing import TYPE_CHECKING, Any, Dict, List, Union, TypeAlias import pandas as pd import pyarrow as pa @@ -25,9 +25,9 @@ if TYPE_CHECKING: import torch - TorchTensor = torch.Tensor + TorchTensor: TypeAlias = torch.Tensor else: - TorchTensor = Any + TorchTensor: TypeAlias = Any TIMESTAMP_POSTFIX: str = "__ts" From e786ef16fd77e5769ae59114cf18a1a3f9afa083 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 12 Sep 2025 21:56:35 -0700 Subject: [PATCH 4/7] linting Signed-off-by: HaoXuAI --- sdk/python/feast/online_response.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index 8f359e581b6..2491a28badc 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Any, Dict, List, Union, TypeAlias +from typing import TYPE_CHECKING, Any, Dict, List, TypeAlias, Union import pandas as pd import pyarrow as pa From bc825a156ed987d0f0c91492c6da3b5c62a200ce Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 12 Sep 2025 22:41:29 -0700 Subject: [PATCH 5/7] update init Signed-off-by: HaoXuAI --- sdk/python/feast/dataframe.py | 15 ++++- sdk/python/tests/unit/test_dataframe.py | 76 ++++++++++++++++++------- 2 files changed, 69 insertions(+), 22 deletions(-) diff --git a/sdk/python/feast/dataframe.py b/sdk/python/feast/dataframe.py index 007faecb01d..5fcef7290fd 100644 --- a/sdk/python/feast/dataframe.py +++ b/sdk/python/feast/dataframe.py @@ -43,7 +43,20 @@ def __init__( """ self.data = data self.metadata = metadata or {} - self._engine = engine or self._detect_engine() + + # Detect the actual engine from the data + detected_engine = self._detect_engine() + + if engine is not None: + # Validate that the provided engine matches the detected engine + if engine != detected_engine: + raise ValueError( + f"Provided engine '{engine}' does not match detected engine '{detected_engine}' " + f"for data type {type(data).__name__}" + ) + self._engine = engine + else: + self._engine = detected_engine def _detect_engine(self) -> DataFrameEngine: """Auto-detect the DataFrame engine based on type.""" diff --git a/sdk/python/tests/unit/test_dataframe.py b/sdk/python/tests/unit/test_dataframe.py index 66d5e09585a..2665cd04dd6 100644 --- a/sdk/python/tests/unit/test_dataframe.py +++ b/sdk/python/tests/unit/test_dataframe.py @@ -29,12 +29,12 @@ def test_arrow_detection(self): assert isinstance(feast_df.data, pa.Table) def test_explicit_engine(self): - """Test explicit engine specification.""" + """Test explicit engine specification with unknown data.""" data = {"mock": "data"} - feast_df = FeastDataFrame(data, engine=DataFrameEngine.SPARK) + feast_df = FeastDataFrame(data, engine=DataFrameEngine.UNKNOWN) - assert feast_df.engine == DataFrameEngine.SPARK - assert feast_df.is_lazy + assert feast_df.engine == DataFrameEngine.UNKNOWN + assert not feast_df.is_lazy def test_unknown_engine(self): """Test handling of unknown DataFrame types.""" @@ -62,34 +62,68 @@ def test_repr(self): assert "engine=pandas" in repr_str assert "DataFrame" in repr_str - @pytest.mark.parametrize( - "engine,expected_lazy", - [ - (DataFrameEngine.PANDAS, False), - (DataFrameEngine.ARROW, False), - (DataFrameEngine.POLARS, False), - (DataFrameEngine.SPARK, True), - (DataFrameEngine.DASK, True), - (DataFrameEngine.RAY, True), - (DataFrameEngine.UNKNOWN, False), - ], - ) - def test_is_lazy_property(self, engine, expected_lazy): + def test_is_lazy_property(self): """Test is_lazy property for different engines.""" - feast_df = FeastDataFrame({"mock": "data"}, engine=engine) - assert feast_df.is_lazy == expected_lazy + # Test with pandas DataFrame (not lazy) + df = pd.DataFrame({"a": [1, 2, 3]}) + feast_df = FeastDataFrame(df) + assert not feast_df.is_lazy + + # Test with Arrow table (not lazy) + table = pa.table({"a": [1, 2, 3]}) + feast_df = FeastDataFrame(table) + assert not feast_df.is_lazy + + # Test with unknown data type (not lazy) + unknown_data = {"mock": "data"} + feast_df = FeastDataFrame(unknown_data) + assert not feast_df.is_lazy + + # Test explicit lazy engines (using unknown data to avoid type validation) + for lazy_engine in [DataFrameEngine.SPARK, DataFrameEngine.DASK, DataFrameEngine.RAY]: + feast_df = FeastDataFrame(unknown_data, engine=DataFrameEngine.UNKNOWN) + feast_df._engine = lazy_engine # Override for testing + assert feast_df.is_lazy def test_polars_detection(self): """Test detection of polars DataFrame (using mock).""" # Mock polars DataFrame class MockPolarsDF: + __module__ = "polars.dataframe.frame" + def __init__(self): - self.__module__ = "polars.dataframe.frame" - self.__class__.__name__ = "DataFrame" + pass polars_df = MockPolarsDF() feast_df = FeastDataFrame(polars_df) assert feast_df.engine == DataFrameEngine.POLARS assert not feast_df.is_lazy + + def test_engine_validation_valid(self): + """Test that providing a correct engine passes validation.""" + df = pd.DataFrame({"a": [1, 2, 3]}) + feast_df = FeastDataFrame(df, engine=DataFrameEngine.PANDAS) + + assert feast_df.engine == DataFrameEngine.PANDAS + assert isinstance(feast_df.data, pd.DataFrame) + + def test_engine_validation_invalid(self): + """Test that providing an incorrect engine raises ValueError.""" + df = pd.DataFrame({"a": [1, 2, 3]}) + + with pytest.raises(ValueError, match="Provided engine 'spark' does not match detected engine 'pandas'"): + FeastDataFrame(df, engine=DataFrameEngine.SPARK) + + def test_engine_validation_arrow(self): + """Test engine validation with Arrow table.""" + table = pa.table({"a": [1, 2, 3]}) + + # Valid case + feast_df = FeastDataFrame(table, engine=DataFrameEngine.ARROW) + assert feast_df.engine == DataFrameEngine.ARROW + + # Invalid case + with pytest.raises(ValueError, match="Provided engine 'pandas' does not match detected engine 'arrow'"): + FeastDataFrame(table, engine=DataFrameEngine.PANDAS) From 4f972fc305aadb842454ec6ef616eb142c25551a Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 12 Sep 2025 22:44:15 -0700 Subject: [PATCH 6/7] linting Signed-off-by: HaoXuAI --- sdk/python/feast/dataframe.py | 4 ++-- sdk/python/tests/unit/test_dataframe.py | 32 ++++++++++++++++--------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/sdk/python/feast/dataframe.py b/sdk/python/feast/dataframe.py index 5fcef7290fd..8fd6a7d665b 100644 --- a/sdk/python/feast/dataframe.py +++ b/sdk/python/feast/dataframe.py @@ -43,10 +43,10 @@ def __init__( """ self.data = data self.metadata = metadata or {} - + # Detect the actual engine from the data detected_engine = self._detect_engine() - + if engine is not None: # Validate that the provided engine matches the detected engine if engine != detected_engine: diff --git a/sdk/python/tests/unit/test_dataframe.py b/sdk/python/tests/unit/test_dataframe.py index 2665cd04dd6..959df98c899 100644 --- a/sdk/python/tests/unit/test_dataframe.py +++ b/sdk/python/tests/unit/test_dataframe.py @@ -68,19 +68,23 @@ def test_is_lazy_property(self): df = pd.DataFrame({"a": [1, 2, 3]}) feast_df = FeastDataFrame(df) assert not feast_df.is_lazy - + # Test with Arrow table (not lazy) table = pa.table({"a": [1, 2, 3]}) feast_df = FeastDataFrame(table) assert not feast_df.is_lazy - + # Test with unknown data type (not lazy) unknown_data = {"mock": "data"} feast_df = FeastDataFrame(unknown_data) assert not feast_df.is_lazy - + # Test explicit lazy engines (using unknown data to avoid type validation) - for lazy_engine in [DataFrameEngine.SPARK, DataFrameEngine.DASK, DataFrameEngine.RAY]: + for lazy_engine in [ + DataFrameEngine.SPARK, + DataFrameEngine.DASK, + DataFrameEngine.RAY, + ]: feast_df = FeastDataFrame(unknown_data, engine=DataFrameEngine.UNKNOWN) feast_df._engine = lazy_engine # Override for testing assert feast_df.is_lazy @@ -91,7 +95,7 @@ def test_polars_detection(self): # Mock polars DataFrame class MockPolarsDF: __module__ = "polars.dataframe.frame" - + def __init__(self): pass @@ -105,25 +109,31 @@ def test_engine_validation_valid(self): """Test that providing a correct engine passes validation.""" df = pd.DataFrame({"a": [1, 2, 3]}) feast_df = FeastDataFrame(df, engine=DataFrameEngine.PANDAS) - + assert feast_df.engine == DataFrameEngine.PANDAS assert isinstance(feast_df.data, pd.DataFrame) def test_engine_validation_invalid(self): """Test that providing an incorrect engine raises ValueError.""" df = pd.DataFrame({"a": [1, 2, 3]}) - - with pytest.raises(ValueError, match="Provided engine 'spark' does not match detected engine 'pandas'"): + + with pytest.raises( + ValueError, + match="Provided engine 'spark' does not match detected engine 'pandas'", + ): FeastDataFrame(df, engine=DataFrameEngine.SPARK) def test_engine_validation_arrow(self): """Test engine validation with Arrow table.""" table = pa.table({"a": [1, 2, 3]}) - + # Valid case feast_df = FeastDataFrame(table, engine=DataFrameEngine.ARROW) assert feast_df.engine == DataFrameEngine.ARROW - + # Invalid case - with pytest.raises(ValueError, match="Provided engine 'pandas' does not match detected engine 'arrow'"): + with pytest.raises( + ValueError, + match="Provided engine 'pandas' does not match detected engine 'arrow'", + ): FeastDataFrame(table, engine=DataFrameEngine.PANDAS) From 64ac8a92c7529927ab5ee119c8c07b671105b7c5 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 12 Sep 2025 23:10:19 -0700 Subject: [PATCH 7/7] fix testing Signed-off-by: HaoXuAI --- sdk/python/feast/dataframe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/dataframe.py b/sdk/python/feast/dataframe.py index 8fd6a7d665b..0a54a11c232 100644 --- a/sdk/python/feast/dataframe.py +++ b/sdk/python/feast/dataframe.py @@ -51,7 +51,7 @@ def __init__( # Validate that the provided engine matches the detected engine if engine != detected_engine: raise ValueError( - f"Provided engine '{engine}' does not match detected engine '{detected_engine}' " + f"Provided engine '{engine.value}' does not match detected engine '{detected_engine.value}' " f"for data type {type(data).__name__}" ) self._engine = engine @@ -84,7 +84,7 @@ def engine(self) -> DataFrameEngine: return self._engine def __repr__(self): - return f"FeastDataFrame(engine={self.engine}, type={type(self.data).__name__})" + return f"FeastDataFrame(engine={self.engine.value}, type={type(self.data).__name__})" @property def is_lazy(self) -> bool: