Skip to content

Commit

Permalink
Deprecate whitelist for non streaming jobs (#143)
Browse files Browse the repository at this point in the history
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng committed May 12, 2022
1 parent 899574a commit 2871ba6
Show file tree
Hide file tree
Showing 8 changed files with 4 additions and 298 deletions.
2 changes: 1 addition & 1 deletion infra/charts/feast-spark/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: v1
description: Feast Extension for running Ingestion on Spark
name: feast-spark
version: 0.2.24
version: 0.2.29

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ spec:
{{- toYaml . | nindent 8 }}
{{- end }}

{{- if or .Values.secrets .Values.sparkOperator.enabled .Values.configMaps .Values.whitelist.enabled }}
{{- if or .Values.secrets .Values.sparkOperator.enabled .Values.configMaps }}
volumes:
{{- end }}
{{- range $secret := .Values.secrets }}
Expand All @@ -58,18 +58,13 @@ spec:
configMap:
name: {{ template "feast-jobservice.fullname" . }}-spark-template
{{- end }}
{{- if .Values.whitelist.enabled }}
- name: {{ template "feast-jobservice.fullname" . }}-whitelist
configMap:
name: {{ template "feast-jobservice.fullname" . }}-whitelist
{{- end }}

containers:
- name: {{ .Chart.Name }}
image: {{ .Values.image.repository }}:{{ .Values.image.tag }}
imagePullPolicy: {{ .Values.image.pullPolicy }}

{{- if or .Values.secrets .Values.sparkOperator.enabled .Values.configMaps .Values.whitelist.enabled }}
{{- if or .Values.secrets .Values.sparkOperator.enabled .Values.configMaps }}
volumeMounts:
{{- end }}
{{- range $secret := .Values.secrets }}
Expand All @@ -86,10 +81,6 @@ spec:
- name: {{ template "feast-jobservice.fullname" . }}-spark-template
mountPath: "/etc/configs"
{{- end }}
{{- if .Values.whitelist.enabled }}
- name: {{ template "feast-jobservice.fullname" . }}-whitelist
mountPath: "/etc/whitelist"
{{- end }}

env:
{{- if .Values.sparkOperator.enabled }}
Expand All @@ -102,10 +93,6 @@ spec:
- name: FEAST_SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH
value: /etc/configs/historicalJobTemplate.yaml
{{- end }}
{{- if .Values.whitelist.enabled }}
- name: FEAST_WHITELISTED_FEATURE_TABLES_PATH
value: /etc/whitelist/whitelist.txt
{{- end }}
{{- range $key, $value := .Values.envOverrides }}
- name: {{ printf "%s" $key | replace "." "_" | upper | quote }}
{{- if eq (kindOf $value) "map" }}
Expand Down

This file was deleted.

This file was deleted.

49 changes: 0 additions & 49 deletions infra/charts/feast-spark/charts/feast-jobservice/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,48 +81,6 @@ service:
# service.grpc.nodePort -- Port number that each cluster node will listen to
nodePort:

ingress:
grpc:
# ingress.grpc.enabled -- Flag to create an ingress resource for the service
enabled: false
# ingress.grpc.class -- Which ingress controller to use
class: nginx
# ingress.grpc.hosts -- List of hostnames to match when routing requests
hosts: []
# ingress.grpc.annotations -- Extra annotations for the ingress
annotations: {}
https:
# ingress.grpc.https.enabled -- Flag to enable HTTPS
enabled: true
# ingress.grpc.https.secretNames -- Map of hostname to TLS secret name
secretNames: {}
# ingress.grpc.whitelist -- Allowed client IP source ranges
whitelist: ""
auth:
# ingress.grpc.auth.enabled -- Flag to enable auth
enabled: false
http:
# ingress.http.enabled -- Flag to create an ingress resource for the service
enabled: false
# ingress.http.class -- Which ingress controller to use
class: nginx
# ingress.http.hosts -- List of hostnames to match when routing requests
hosts: []
# ingress.http.annotations -- Extra annotations for the ingress
annotations: {}
https:
# ingress.http.https.enabled -- Flag to enable HTTPS
enabled: true
# ingress.http.https.secretNames -- Map of hostname to TLS secret name
secretNames: {}
# ingress.http.whitelist -- Allowed client IP source ranges
whitelist: ""
auth:
# ingress.http.auth.enabled -- Flag to enable auth
enabled: false
# ingress.http.auth.authUrl -- URL to an existing authentication service
authUrl: http://auth-server.auth-ns.svc.cluster.local/auth

# resources -- CPU/memory [resource requests/limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container)
resources: {}

Expand All @@ -140,10 +98,3 @@ secrets: []

# configMaps -- Arbitrary config maps to be mounted on the job service pod, on /etc/configs/<config name>
configMaps: []

# whitelist -- If enabled, only <project>:<feature table> in the whitelist can be ingested
whitelist:
# whitelist.enabled -- Flag to create and mount whitelist as configmap
enabled: false
# whitelist.featureTables -- Whitelisted feature tables, in the form of <project>:<feature table>
featureTables: []
98 changes: 1 addition & 97 deletions python/feast_spark/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,12 @@
)
from feast_spark.constants import ConfigOptions as opt
from feast_spark.lock_manager import JobOperation, JobOperationLock
from feast_spark.metrics import (
job_schedule_count,
job_submission_count,
job_whitelist_failure_count,
)
from feast_spark.metrics import job_schedule_count, job_submission_count
from feast_spark.pyspark.abc import (
BatchIngestionJob,
RetrievalJob,
SparkJob,
SparkJobStatus,
SparkJobType,
StreamIngestionJob,
)
from feast_spark.pyspark.launcher import (
Expand Down Expand Up @@ -114,7 +109,6 @@ def _job_to_proto(spark_job: SparkJob) -> JobProto:
class JobServiceServicer(JobService_pb2_grpc.JobServiceServicer):
def __init__(self, client: Client):
self.client = client
self._whitelisted_project_feature_table_pairs_cached: List[Tuple[str, str]] = []

@property
def _whitelisted_projects(self) -> Optional[List[str]]:
Expand All @@ -123,81 +117,20 @@ def _whitelisted_projects(self) -> Optional[List[str]]:
return whitelisted_projects.split(",")
return None

@property
def _whitelisted_project_feature_table_pairs(
self,
) -> Optional[List[Tuple[str, str]]]:
if self._whitelisted_project_feature_table_pairs_cached:
return self._whitelisted_project_feature_table_pairs_cached

if self.client.config.exists(opt.WHITELISTED_FEATURE_TABLES_PATH):
_whitelisted_feature_tables = self.client.config.get(
opt.WHITELISTED_FEATURE_TABLES_PATH
)
with open(str(_whitelisted_feature_tables), "r") as whitelist:
whitelist.seek(0)
whitelisted_feature_tables = [
(line.strip().split(":")[0], line.strip().split(":")[-1])
for line in whitelist.readlines()
]
self._whitelisted_project_feature_table_pairs_cached = (
whitelisted_feature_tables
)
return whitelisted_feature_tables
return None

@property
def _whitelisted_job_types(self) -> Optional[List[str]]:
if self.client.config.exists(opt.WHITELISTED_JOB_TYPES):
whitelisted_job_types = self.client.config.get(opt.WHITELISTED_JOB_TYPES)
return whitelisted_job_types.split(",")
return None

def is_whitelisted(self, project: str):
# Whitelisted projects not specified, allow all projects
if not self._whitelisted_projects:
return True
return project in self._whitelisted_projects

def is_feature_table_whitelisted(self, project: str, feature_table: str):
if not self._whitelisted_project_feature_table_pairs:
return True
return (project, feature_table) in self._whitelisted_project_feature_table_pairs

def is_job_type_whitelisted(self, job_type: SparkJobType):
if not self._whitelisted_job_types:
return True
return job_type.name in self._whitelisted_job_types

def StartOfflineToOnlineIngestionJob(
self, request: StartOfflineToOnlineIngestionJobRequest, context
):
"""Start job to ingest data from offline store into online store"""
if not self.is_job_type_whitelisted(SparkJobType.BATCH_INGESTION):
raise ValueError(
"This job service is not configured to accept batch ingestion"
)

job_submission_count.labels(
"batch_ingestion", request.project, request.table_name
).inc()

if not self.is_whitelisted(request.project):
job_whitelist_failure_count.labels(
request.project, request.table_name
).inc()
raise ValueError(
f"Project {request.project} is not whitelisted. Please contact your Feast administrator to whitelist it."
)

if not self.is_feature_table_whitelisted(request.project, request.table_name):
job_whitelist_failure_count.labels(
request.project, request.table_name
).inc()
raise ValueError(
f"Project {request.project}:{request.table_name} is not whitelisted. Please contact your Feast administrator to whitelist it."
)

feature_table = self.client.feature_store.get_feature_table(
request.table_name, request.project
)
Expand All @@ -223,11 +156,6 @@ def ScheduleOfflineToOnlineIngestionJob(
self, request: ScheduleOfflineToOnlineIngestionJobRequest, context
):
"""Schedule job to ingest data from offline store into online store periodically"""
if not self.is_job_type_whitelisted(SparkJobType.SCHEDULED_BATCH_INGESTION):
raise ValueError(
"This job service is not configured to schedule batch ingestion"
)

job_schedule_count.labels(request.project, request.table_name).inc()
feature_table = self.client.feature_store.get_feature_table(
request.table_name, request.project
Expand All @@ -245,10 +173,6 @@ def ScheduleOfflineToOnlineIngestionJob(
def UnscheduleOfflineToOnlineIngestionJob(
self, request: UnscheduleOfflineToOnlineIngestionJobRequest, context
):
if not self.is_job_type_whitelisted(SparkJobType.SCHEDULED_BATCH_INGESTION):
raise ValueError(
"This job service is not configured to unschedule ingestion job"
)
feature_table = self.client.feature_store.get_feature_table(
request.table_name, request.project
)
Expand All @@ -259,18 +183,8 @@ def UnscheduleOfflineToOnlineIngestionJob(

def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
"""Produce a training dataset, return a job id that will provide a file reference"""
if not self.is_job_type_whitelisted(SparkJobType.HISTORICAL_RETRIEVAL):
raise ValueError(
"This job service is not configured to accept historical retrieval job"
)

job_submission_count.labels("historical_retrieval", request.project, "").inc()

if not self.is_whitelisted(request.project):
raise ValueError(
f"Project {request.project} is not whitelisted. Please contact your Feast administrator to whitelist it."
)

job = start_historical_feature_retrieval_job(
client=self.client,
project=request.project,
Expand All @@ -297,11 +211,6 @@ def StartStreamToOnlineIngestionJob(
self, request: StartStreamToOnlineIngestionJobRequest, context
):
"""Start job to ingest data from stream into online store"""
if not self.is_job_type_whitelisted(SparkJobType.STREAM_INGESTION):
raise ValueError(
"This job service is not configured to start streaming job"
)

job_submission_count.labels(
"streaming", request.project, request.table_name
).inc()
Expand Down Expand Up @@ -356,11 +265,6 @@ def StartStreamToOnlineIngestionJob(
def ListJobs(self, request, context):
"""List all types of jobs"""

if not self.is_whitelisted(request.project):
raise ValueError(
f"Project {request.project} is not whitelisted. Please contact your Feast administrator to whitelist it."
)

jobs = list_jobs(
include_terminated=request.include_terminated,
project=request.project,
Expand Down

0 comments on commit 2871ba6

Please sign in to comment.