diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index 22f1c257fb7..05048c31cf4 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -71,6 +71,8 @@ message DataSource { // This is an internal field that is represents the python class for the data source object a proto object represents. // This should be set by feast, and not by users. + // The field is used primarily by custom data sources and is mandatory for them to set. Feast may set it for + // first party sources as well. string data_source_class_type = 17; // Defines options for DataSource that sources features from a file diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 3794b797cc5..0d49ce22492 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -134,6 +134,18 @@ def to_proto(self) -> DataSourceProto.KinesisOptions: return kinesis_options_proto +_DATA_SOURCE_OPTIONS = { + DataSourceProto.SourceType.BATCH_FILE: "feast.infra.offline_stores.file_source.FileSource", + DataSourceProto.SourceType.BATCH_BIGQUERY: "feast.infra.offline_stores.bigquery_source.BigQuerySource", + DataSourceProto.SourceType.BATCH_REDSHIFT: "feast.infra.offline_stores.redshift_source.RedshiftSource", + DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake_source.SnowflakeSource", + DataSourceProto.SourceType.STREAM_KAFKA: "feast.data_source.KafkaSource", + DataSourceProto.SourceType.STREAM_KINESIS: "feast.data_source.KinesisSource", + DataSourceProto.SourceType.REQUEST_SOURCE: "feast.data_source.RequestDataSource", + DataSourceProto.SourceType.PUSH_SOURCE: "feast.data_source.PushSource", +} + + class DataSource(ABC): """ DataSource that can be used to source features. @@ -210,48 +222,20 @@ def from_proto(data_source: DataSourceProto) -> Any: Raises: ValueError: The type of DataSource could not be identified. """ - if data_source.data_source_class_type: - cls = get_data_source_class_from_type(data_source.data_source_class_type) - return cls.from_proto(data_source) - - if data_source.request_data_options and data_source.request_data_options.schema: - data_source_obj = RequestDataSource.from_proto(data_source) - elif data_source.file_options.file_format and data_source.file_options.file_url: - from feast.infra.offline_stores.file_source import FileSource - - data_source_obj = FileSource.from_proto(data_source) - elif ( - data_source.bigquery_options.table_ref or data_source.bigquery_options.query - ): - from feast.infra.offline_stores.bigquery_source import BigQuerySource - - data_source_obj = BigQuerySource.from_proto(data_source) - elif data_source.redshift_options.table or data_source.redshift_options.query: - from feast.infra.offline_stores.redshift_source import RedshiftSource - - data_source_obj = RedshiftSource.from_proto(data_source) - - elif data_source.snowflake_options.table or data_source.snowflake_options.query: - from feast.infra.offline_stores.snowflake_source import SnowflakeSource - - data_source_obj = SnowflakeSource.from_proto(data_source) - - elif ( - data_source.kafka_options.bootstrap_servers - and data_source.kafka_options.topic - and data_source.kafka_options.message_format + data_source_type = data_source.type + if not data_source_type or ( + data_source_type + not in list(_DATA_SOURCE_OPTIONS.keys()) + + [DataSourceProto.SourceType.CUSTOM_SOURCE] ): - data_source_obj = KafkaSource.from_proto(data_source) - elif ( - data_source.kinesis_options.record_format - and data_source.kinesis_options.region - and data_source.kinesis_options.stream_name - ): - data_source_obj = KinesisSource.from_proto(data_source) - else: raise ValueError("Could not identify the source type being added.") - return data_source_obj + if data_source_type == DataSourceProto.SourceType.CUSTOM_SOURCE: + cls = get_data_source_class_from_type(data_source.data_source_class_type) + return cls.from_proto(data_source) + + cls = get_data_source_class_from_type(_DATA_SOURCE_OPTIONS[data_source_type]) + return cls.from_proto(data_source) @abstractmethod def to_proto(self) -> DataSourceProto: diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index cb1261d8c93..0bf73fcd24c 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -320,6 +320,9 @@ def apply_data_source( del registry.data_sources[idx] data_source_proto = data_source.to_proto() data_source_proto.project = project + data_source_proto.data_source_class_type = ( + f"{data_source.__class__.__module__}.{data_source.__class__.__name__}" + ) registry.data_sources.append(data_source_proto) if commit: self.commit()