Skip to content

Commit

Permalink
ability to whitelist on feature table level (#101)
Browse files Browse the repository at this point in the history
Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>

linting

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>

fix parameter

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>

fix linting

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>

add options to mount config maps

add options to mount config maps

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>
  • Loading branch information
khorshuheng committed Dec 13, 2021
1 parent 7825d58 commit 6e40e27
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 8 deletions.
2 changes: 1 addition & 1 deletion infra/charts/feast-spark/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: v1
description: Feast Extension for running Ingestion on Spark
name: feast-spark
version: 0.2.15
version: 0.2.16
4 changes: 2 additions & 2 deletions infra/charts/feast-spark/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# feast-spark

Feast Extension for running Ingestion on Spark 0.2.15
Feast Extension for running Ingestion on Spark 0.2.16

## Installation

Expand All @@ -10,7 +10,7 @@ https://docs.feast.dev/v/master/getting-started/deploying-feast/kubernetes

| Repository | Name | Version |
|------------|------|---------|
| | feast-jobservice | 0.2.15 |
| | feast-jobservice | 0.2.16 |
| | prometheus-statsd-exporter | 0.1.2 |

## Values
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: v1
description: Feast Job Service manage ingestion jobs.
name: feast-jobservice
version: 0.2.15
version: 0.2.16
9 changes: 8 additions & 1 deletion infra/charts/feast-spark/charts/feast-jobservice/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# feast-jobservice

![Version: 0.2.15](https://img.shields.io/badge/Version-0.2.15-informational?style=flat-square)
![Version: 0.2.16](https://img.shields.io/badge/Version-0.2.16-informational?style=flat-square)

Feast Job Service manage ingestion jobs.

## Values

| Key | Type | Default | Description |
|-----|------|---------|-------------|
| configMaps | list | `[]` | Arbitrary config maps to be mounted on the job service pod, on /etc/configs/<config name> |
| envOverrides | object | `{}` | Extra environment variables to set |
| gcpProjectId | string | `""` | Project ID to use when using Google Cloud services such as BigQuery, Cloud Storage and Dataflow |
| gcpServiceAccount.enabled | bool | `false` | Flag to use [service account](https://cloud.google.com/iam/docs/creating-managing-service-account-keys) JSON key |
Expand Down Expand Up @@ -58,8 +59,14 @@ Feast Job Service manage ingestion jobs.
| service.http.port | int | `80` | Service port for HTTP requests |
| service.http.targetPort | int | `8080` | Container port serving HTTP requests and Prometheus metrics |
| service.type | string | `"ClusterIP"` | Kubernetes service type |
| sparkOperator.batchJobTemplate | object | `{}` | |
| sparkOperator.enabled | bool | `false` | Flag to create and mount custom job template on the jobservice deployment as configmap |
| sparkOperator.historicalJobTemplate | object | `{}` | |
| sparkOperator.jobTemplate | object | `{}` | Content of the job template, in yaml format |
| sparkOperator.streamJobTemplate | object | `{}` | |
| whitelist | object | `{"enabled":false,"featureTables":[]}` | If enabled, only <project>:<feature table> in the whitelist can be ingested |
| whitelist.enabled | bool | `false` | Flag to create and mount whitelist as configmap |
| whitelist.featureTables | list | `[]` | Whitelisted feature tables, in the form of <project>:<feature table> |

----------------------------------------------
Autogenerated from chart metadata using [helm-docs v1.5.0](https://github.com/norwoodj/helm-docs/releases/v1.5.0)
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,56 @@ spec:
{{- toYaml . | nindent 8 }}
{{- end }}

{{- if or .Values.secrets .Values.sparkOperator.enabled }}
{{- if or .Values.secrets .Values.sparkOperator.enabled .Values.configMaps .Values.whitelist.enabled }}
volumes:
{{- end }}
{{- range $secret := .Values.secrets }}
- name: {{ $secret }}
secret:
secretName: {{ $secret }}
{{- end }}
{{- range $configMap := .Values.configMaps }}
- name: {{ $configMap }}
configMap:
name: {{ $configMap }}
{{- end }}
{{- if .Values.sparkOperator.enabled }}
- name: {{ template "feast-jobservice.fullname" . }}-spark-template
configMap:
name: {{ template "feast-jobservice.fullname" . }}-spark-template
{{- end }}
{{- if .Values.whitelist.enabled }}
- name: {{ template "feast-jobservice.fullname" . }}-whitelist
configMap:
name: {{ template "feast-jobservice.fullname" . }}-whitelist
{{- end }}

containers:
- name: {{ .Chart.Name }}
image: {{ .Values.image.repository }}:{{ .Values.image.tag }}
imagePullPolicy: {{ .Values.image.pullPolicy }}

{{- if or .Values.secrets .Values.sparkOperator.enabled }}
{{- if or .Values.secrets .Values.sparkOperator.enabled .Values.configMaps .Values.whitelist.enabled }}
volumeMounts:
{{- end }}
{{- range $secret := .Values.secrets }}
- name: {{ $secret }}
mountPath: "/etc/secrets/{{ $secret }}"
readOnly: true
{{- end }}
{{- range $configMap := .Values.configMaps }}
- name: {{ $configMap }}
mountPath: "/etc/configs/{{ $configMap }}"
readOnly: true
{{- end }}
{{- if .Values.sparkOperator.enabled }}
- name: {{ template "feast-jobservice.fullname" . }}-spark-template
mountPath: "/etc/configs"
{{- end }}
{{- if .Values.whitelist.enabled }}
- name: {{ template "feast-jobservice.fullname" . }}-whitelist
mountPath: "/etc/whitelist"
{{- end }}

env:
{{- if .Values.sparkOperator.enabled }}
Expand All @@ -83,6 +102,10 @@ spec:
- name: FEAST_SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH
value: /etc/configs/historicalJobTemplate.yaml
{{- end }}
{{- if .Values.whitelist.enabled }}
- name: FEAST_WHITELISTED_FEATURE_TABLES_PATH
value: /etc/whitelist/whitelist.txt
{{- end }}
{{- range $key, $value := .Values.envOverrides }}
- name: {{ printf "%s" $key | replace "." "_" | upper | quote }}
{{- if eq (kindOf $value) "map" }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{- if .Values.whitelist.enabled }}
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ template "feast-jobservice.fullname" . }}-whitelist
namespace: {{ .Release.Namespace }}
labels:
app: {{ template "feast-jobservice.name" . }}
component: jobservice
chart: {{ .Chart.Name }}-{{ .Chart.Version | replace "+" "_" }}
release: {{ .Release.Name }}
heritage: {{ .Release.Service }}
data:
whitelist.txt: |
{{- range $featureTable := .Values.whitelist.featureTables }}
{{ $featureTable }}
{{- end }}
{{- end }}
10 changes: 10 additions & 0 deletions infra/charts/feast-spark/charts/feast-jobservice/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,13 @@ podLabels: {}

# secrets -- Arbitrary secrets to mount on the job service pod, on /etc/secrets/<secret name>
secrets: []

# configMaps -- Arbitrary config maps to be mounted on the job service pod, on /etc/configs/<config name>
configMaps: []

# whitelist -- If enabled, only <project>:<feature table> in the whitelist can be ingested
whitelist:
# whitelist.enabled -- Flag to create and mount whitelist as configmap
enabled: false
# whitelist.featureTables -- Whitelisted feature tables, in the form of <project>:<feature table>
featureTables: []
2 changes: 1 addition & 1 deletion infra/charts/feast-spark/requirements.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dependencies:
- name: feast-jobservice
version: 0.2.15
version: 0.2.16
condition: feast-jobservice.enabled
- name: prometheus-statsd-exporter
version: 0.1.2
Expand Down
4 changes: 4 additions & 0 deletions python/feast_spark/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ class ConfigOptions(metaclass=ConfigMeta):
#: Whitelisted Feast projects
WHITELISTED_PROJECTS: Optional[str] = None

#: File path to a whitelist containing all the feature tables allowed for ingestion.
#: Each line in the file should be in the format of <project>:<feature table>
WHITELISTED_FEATURE_TABLES_PATH: Optional[str] = None

#: If set - streaming ingestion job will be consuming incoming rows not continuously,
#: but periodically with configured interval (in seconds).
#: That may help to control amount of write requests to storage
Expand Down
34 changes: 34 additions & 0 deletions python/feast_spark/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def _job_to_proto(spark_job: SparkJob) -> JobProto:
class JobServiceServicer(JobService_pb2_grpc.JobServiceServicer):
def __init__(self, client: Client):
self.client = client
self._whitelisted_project_feature_table_pairs_cached: List[Tuple[str, str]] = []

@property
def _whitelisted_projects(self) -> Optional[List[str]]:
Expand All @@ -112,12 +113,40 @@ def _whitelisted_projects(self) -> Optional[List[str]]:
return whitelisted_projects.split(",")
return None

@property
def _whitelisted_project_feature_table_pairs(
self,
) -> Optional[List[Tuple[str, str]]]:
if self._whitelisted_project_feature_table_pairs_cached:
return self._whitelisted_project_feature_table_pairs_cached

if self.client.config.exists(opt.WHITELISTED_FEATURE_TABLES_PATH):
_whitelisted_feature_tables = self.client.config.get(
opt.WHITELISTED_FEATURE_TABLES_PATH
)
with open(str(_whitelisted_feature_tables), "r") as whitelist:
whitelist.seek(0)
whitelisted_feature_tables = [
(line.strip().split(":")[0], line.strip().split(":")[-1])
for line in whitelist.readlines()
]
self._whitelisted_project_feature_table_pairs_cached = (
whitelisted_feature_tables
)
return whitelisted_feature_tables
return None

def is_whitelisted(self, project: str):
# Whitelisted projects not specified, allow all projects
if not self._whitelisted_projects:
return True
return project in self._whitelisted_projects

def is_feature_table_whitelisted(self, project: str, feature_table: str):
if not self._whitelisted_project_feature_table_pairs:
return True
return (project, feature_table) in self._whitelisted_project_feature_table_pairs

def StartOfflineToOnlineIngestionJob(
self, request: StartOfflineToOnlineIngestionJobRequest, context
):
Expand All @@ -128,6 +157,11 @@ def StartOfflineToOnlineIngestionJob(
f"Project {request.project} is not whitelisted. Please contact your Feast administrator to whitelist it."
)

if not self.is_feature_table_whitelisted(request.project, request.table_name):
raise ValueError(
f"Project {request.project}:{request.table_name} is not whitelisted. Please contact your Feast administrator to whitelist it."
)

feature_table = self.client.feature_store.get_feature_table(
request.table_name, request.project
)
Expand Down
23 changes: 23 additions & 0 deletions python/tests/test_jobservice_servicer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import tempfile

from feast import Client
from feast_spark import Client as JobClient
from feast_spark.job_service import JobServiceServicer


def test_feature_table_default_whitelist():
feast_client = Client()
job_client = JobClient(feast_client)
job_servicer = JobServiceServicer(job_client)
assert job_servicer.is_feature_table_whitelisted("some project", "some table")


def test_feature_table_whitelist():
with tempfile.NamedTemporaryFile() as tmp:
tmp.writelines([b"project1:table1\n", b"project1:table2"])
tmp.seek(0)
feast_client = Client(whitelisted_feature_tables_path=tmp.name)
job_client = JobClient(feast_client)
job_servicer = JobServiceServicer(job_client)
assert not job_servicer.is_feature_table_whitelisted("project2", "table1")
assert job_servicer.is_feature_table_whitelisted("project1", "table1")

0 comments on commit 6e40e27

Please sign in to comment.