Skip to content

Commit

Permalink
add config setting for whitelisting feast job types (#110)
Browse files Browse the repository at this point in the history
* add config setting for whitelisting feast job types

* add config setting for whitelisting feast job types

Signed-off-by: KeshavSharma <keshav.sharma@gojek.com>

Co-authored-by: KeshavSharma <keshav.sharma@gojek.com>
  • Loading branch information
keshav2211 and keshav2211 committed Feb 21, 2022
1 parent 642d2c9 commit 363f240
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 0 deletions.
3 changes: 3 additions & 0 deletions python/feast_spark/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ class ConfigOptions(metaclass=ConfigMeta):
#: Log path of EMR cluster
EMR_LOG_LOCATION: Optional[str] = None

#: Whitelisted Feast Job Types
WHITELISTED_JOB_TYPES: Optional[str] = None

#: Whitelisted Feast projects
WHITELISTED_PROJECTS: Optional[str] = None

Expand Down
13 changes: 13 additions & 0 deletions python/feast_spark/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
RetrievalJob,
SparkJob,
SparkJobStatus,
SparkJobType,
StreamIngestionJob,
)
from feast_spark.pyspark.launcher import (
Expand Down Expand Up @@ -142,6 +143,13 @@ def _whitelisted_project_feature_table_pairs(
return whitelisted_feature_tables
return None

@property
def _whitelisted_job_types(self) -> Optional[List[str]]:
if self.client.config.exists(opt.WHITELISTED_JOB_TYPES):
whitelisted_job_types = self.client.config.get(opt.WHITELISTED_JOB_TYPES)
return whitelisted_job_types.split(",")
return None

def is_whitelisted(self, project: str):
# Whitelisted projects not specified, allow all projects
if not self._whitelisted_projects:
Expand All @@ -153,6 +161,11 @@ def is_feature_table_whitelisted(self, project: str, feature_table: str):
return True
return (project, feature_table) in self._whitelisted_project_feature_table_pairs

def is_job_type_whitelisted(self, job_type: SparkJobType):
if not self._whitelisted_job_types:
return True
return job_type in self._whitelisted_job_types

def StartOfflineToOnlineIngestionJob(
self, request: StartOfflineToOnlineIngestionJobRequest, context
):
Expand Down
16 changes: 16 additions & 0 deletions python/tests/test_jobservice_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,19 @@ def test_feature_table_whitelist():
job_servicer = JobServiceServicer(job_client)
assert not job_servicer.is_feature_table_whitelisted("project2", "table1")
assert job_servicer.is_feature_table_whitelisted("project1", "table1")


def test_job_type_default_whitelist():
feast_client = Client()
job_client = JobClient(feast_client)
job_servicer = JobServiceServicer(job_client)
assert job_servicer.is_job_type_whitelisted("STREAM_INGESTION")


def test_job_type_whitelist():
feast_client = Client(whitelisted_job_types="STREAM_INGESTION,BATCH_INGESTION")
job_client = JobClient(feast_client)
job_servicer = JobServiceServicer(job_client)
assert job_servicer.is_job_type_whitelisted("STREAM_INGESTION")
assert job_servicer.is_job_type_whitelisted("BATCH_INGESTION")
assert not job_servicer.is_job_type_whitelisted("HISTORICAL_RETRIEVAL")

0 comments on commit 363f240

Please sign in to comment.