Skip to content

Commit

Permalink
Add prometheus metrics to jobservice (#102)
Browse files Browse the repository at this point in the history
* add prometheus metrics to jobservice

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>

* fix metric labelling

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>

* fix metric labelling

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>

* improve description

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>
  • Loading branch information
khorshuheng committed Dec 16, 2021
1 parent 6e40e27 commit e342b35
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 2 deletions.
2 changes: 1 addition & 1 deletion infra/charts/feast-spark/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: v1
description: Feast Extension for running Ingestion on Spark
name: feast-spark
version: 0.2.16
version: 0.2.17
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: v1
description: Feast Job Service manage ingestion jobs.
name: feast-jobservice
version: 0.2.16
version: 0.2.17
3 changes: 3 additions & 0 deletions python/feast_spark/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class ConfigOptions(metaclass=ConfigMeta):
#: Pause in seconds between starting new jobs in Control Loop
JOB_SERVICE_PAUSE_BETWEEN_JOBS: str = "5"

#: Port for which Prometheus metric server will be running on
JOB_SERVICE_PROMETHEUS_METRIC_PORT: int = 8080

#: Default timeout when running batch ingestion
BATCH_INGESTION_PRODUCTION_TIMEOUT: str = "120"

Expand Down
36 changes: 36 additions & 0 deletions python/feast_spark/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import grpc
from google.api_core.exceptions import FailedPrecondition
from google.protobuf.timestamp_pb2 import Timestamp
from prometheus_client import start_http_server

from feast import Client as FeastClient
from feast import FeatureTable
Expand Down Expand Up @@ -38,6 +39,11 @@
UnscheduleOfflineToOnlineIngestionJobResponse,
)
from feast_spark.constants import ConfigOptions as opt
from feast_spark.metrics import (
job_schedule_count,
job_submission_count,
job_whitelist_failure_count,
)
from feast_spark.pyspark.abc import (
BatchIngestionJob,
RetrievalJob,
Expand Down Expand Up @@ -152,12 +158,22 @@ def StartOfflineToOnlineIngestionJob(
):
"""Start job to ingest data from offline store into online store"""

job_submission_count.labels(
"batch_ingestion", request.project, request.table_name
).inc()

if not self.is_whitelisted(request.project):
job_whitelist_failure_count.labels(
request.project, request.table_name
).inc()
raise ValueError(
f"Project {request.project} is not whitelisted. Please contact your Feast administrator to whitelist it."
)

if not self.is_feature_table_whitelisted(request.project, request.table_name):
job_whitelist_failure_count.labels(
request.project, request.table_name
).inc()
raise ValueError(
f"Project {request.project}:{request.table_name} is not whitelisted. Please contact your Feast administrator to whitelist it."
)
Expand Down Expand Up @@ -187,6 +203,8 @@ def ScheduleOfflineToOnlineIngestionJob(
self, request: ScheduleOfflineToOnlineIngestionJobRequest, context
):
"""Schedule job to ingest data from offline store into online store periodically"""

job_schedule_count.labels(request.project, request.table_name).inc()
feature_table = self.client.feature_store.get_feature_table(
request.table_name, request.project
)
Expand Down Expand Up @@ -214,6 +232,8 @@ def UnscheduleOfflineToOnlineIngestionJob(
def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
"""Produce a training dataset, return a job id that will provide a file reference"""

job_submission_count.labels("historical_retrieval", request.project, "").inc()

if not self.is_whitelisted(request.project):
raise ValueError(
f"Project {request.project} is not whitelisted. Please contact your Feast administrator to whitelist it."
Expand Down Expand Up @@ -246,6 +266,10 @@ def StartStreamToOnlineIngestionJob(
):
"""Start job to ingest data from stream into online store"""

job_submission_count.labels(
"streaming", request.project, request.table_name
).inc()

if not self.is_whitelisted(request.project):
raise ValueError(
f"Project {request.project} is not whitelisted. Please contact your Feast administrator to whitelist it."
Expand Down Expand Up @@ -321,6 +345,11 @@ def GetJob(self, request, context):
return GetJobResponse(job=_job_to_proto(job))


def start_prometheus_serving(port: int = 8080) -> None:
"""Initialize Prometheus metric server"""
start_http_server(port)


def start_control_loop() -> None:
"""Starts control loop that continuously ensures that correct jobs are being run.
Expand Down Expand Up @@ -368,6 +397,13 @@ def start_job_service() -> None:
thread = threading.Thread(target=start_control_loop, daemon=True)
thread.start()

metricServerThread = threading.Thread(
target=start_prometheus_serving,
daemon=True,
args=[client.config.getint(opt.JOB_SERVICE_PROMETHEUS_METRIC_PORT)],
)
metricServerThread.start()

server = grpc.server(ThreadPoolExecutor(), interceptors=(LoggingInterceptor(),))
JobService_pb2_grpc.add_JobServiceServicer_to_server(
JobServiceServicer(client), server
Expand Down
15 changes: 15 additions & 0 deletions python/feast_spark/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from prometheus_client import Counter

job_whitelist_failure_count = Counter(
"feast_job_whitelist_failure_count",
"request failures due to feature table not being whitelisted",
["project", "table"],
)
job_submission_count = Counter(
"feast_job_submission_count",
"request to submit feast job",
["job_type", "project", "table"],
)
job_schedule_count = Counter(
"feast_job_schedule_count", "request to schedule feast job", ["project", "table"]
)

0 comments on commit e342b35

Please sign in to comment.