Skip to content

Commit

Permalink
whitelist job request based on configuration (#123)
Browse files Browse the repository at this point in the history
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng committed Mar 3, 2022
1 parent 623367c commit 6de9a95
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
22 changes: 21 additions & 1 deletion python/feast_spark/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,16 @@ def is_feature_table_whitelisted(self, project: str, feature_table: str):
def is_job_type_whitelisted(self, job_type: SparkJobType):
if not self._whitelisted_job_types:
return True
return job_type in self._whitelisted_job_types
return job_type.name in self._whitelisted_job_types

def StartOfflineToOnlineIngestionJob(
self, request: StartOfflineToOnlineIngestionJobRequest, context
):
"""Start job to ingest data from offline store into online store"""
if not self.is_job_type_whitelisted(SparkJobType.BATCH_INGESTION):
raise ValueError(
"This job service is not configured to accept batch ingestion"
)

job_submission_count.labels(
"batch_ingestion", request.project, request.table_name
Expand Down Expand Up @@ -218,6 +222,10 @@ def ScheduleOfflineToOnlineIngestionJob(
self, request: ScheduleOfflineToOnlineIngestionJobRequest, context
):
"""Schedule job to ingest data from offline store into online store periodically"""
if not self.is_job_type_whitelisted(SparkJobType.SCHEDULED_BATCH_INGESTION):
raise ValueError(
"This job service is not configured to schedule batch ingestion"
)

job_schedule_count.labels(request.project, request.table_name).inc()
feature_table = self.client.feature_store.get_feature_table(
Expand All @@ -236,6 +244,10 @@ def ScheduleOfflineToOnlineIngestionJob(
def UnscheduleOfflineToOnlineIngestionJob(
self, request: UnscheduleOfflineToOnlineIngestionJobRequest, context
):
if not self.is_job_type_whitelisted(SparkJobType.SCHEDULED_BATCH_INGESTION):
raise ValueError(
"This job service is not configured to unschedule ingestion job"
)
feature_table = self.client.feature_store.get_feature_table(
request.table_name, request.project
)
Expand All @@ -246,6 +258,10 @@ def UnscheduleOfflineToOnlineIngestionJob(

def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
"""Produce a training dataset, return a job id that will provide a file reference"""
if not self.is_job_type_whitelisted(SparkJobType.HISTORICAL_RETRIEVAL):
raise ValueError(
"This job service is not configured to accept historical retrieval job"
)

job_submission_count.labels("historical_retrieval", request.project, "").inc()

Expand Down Expand Up @@ -280,6 +296,10 @@ def StartStreamToOnlineIngestionJob(
self, request: StartStreamToOnlineIngestionJobRequest, context
):
"""Start job to ingest data from stream into online store"""
if not self.is_job_type_whitelisted(SparkJobType.STREAM_INGESTION):
raise ValueError(
"This job service is not configured to start streaming job"
)

job_submission_count.labels(
"streaming", request.project, request.table_name
Expand Down
12 changes: 8 additions & 4 deletions python/tests/test_jobservice_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from feast import Client
from feast_spark import Client as JobClient
from feast_spark.job_service import JobServiceServicer
from feast_spark.pyspark.abc import SparkJobType


def test_feature_table_default_whitelist():
Expand All @@ -27,13 +28,16 @@ def test_job_type_default_whitelist():
feast_client = Client()
job_client = JobClient(feast_client)
job_servicer = JobServiceServicer(job_client)
assert job_servicer.is_job_type_whitelisted("STREAM_INGESTION")
assert job_servicer.is_job_type_whitelisted(SparkJobType.STREAM_INGESTION)


def test_job_type_whitelist():
feast_client = Client(whitelisted_job_types="STREAM_INGESTION,BATCH_INGESTION")
job_client = JobClient(feast_client)
job_servicer = JobServiceServicer(job_client)
assert job_servicer.is_job_type_whitelisted("STREAM_INGESTION")
assert job_servicer.is_job_type_whitelisted("BATCH_INGESTION")
assert not job_servicer.is_job_type_whitelisted("HISTORICAL_RETRIEVAL")
assert job_servicer.is_job_type_whitelisted(SparkJobType.STREAM_INGESTION)
assert job_servicer.is_job_type_whitelisted(SparkJobType.BATCH_INGESTION)
assert not job_servicer.is_job_type_whitelisted(SparkJobType.HISTORICAL_RETRIEVAL)
assert not job_servicer.is_job_type_whitelisted(
SparkJobType.SCHEDULED_BATCH_INGESTION
)

0 comments on commit 6de9a95

Please sign in to comment.