Skip to content

Commit

Permalink
Add ingestion job health metrics retrieval (#115)
Browse files Browse the repository at this point in the history
* Add ingestion job health metrics retrieval

Signed-off-by: Terence Lim <terencelimxp@gmail.com>

* Fix linting

Signed-off-by: Terence Lim <terencelimxp@gmail.com>

* Address PR comments

Signed-off-by: Terence Lim <terencelimxp@gmail.com>

* More fixes

Signed-off-by: Terence Lim <terencelimxp@gmail.com>
  • Loading branch information
terryyylim committed Feb 23, 2022
1 parent fa3a417 commit 10d0e7d
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 6 deletions.
15 changes: 14 additions & 1 deletion protos/feast_spark/api/JobService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ service JobService {

// Get details of a single job
rpc GetJob (GetJobRequest) returns (GetJobResponse);

// Get ingestion health metrics for a Feature Table
rpc GetHealthMetrics (GetHealthMetricsRequest) returns (GetHealthMetricsResponse);
}


Expand Down Expand Up @@ -240,4 +243,14 @@ message CancelJobRequest{
string job_id = 1;
}

message CancelJobResponse {}
message CancelJobResponse {}

message GetHealthMetricsRequest {
string project = 1;
repeated string table_names = 2;
}

message GetHealthMetricsResponse {
repeated string passed = 1;
repeated string failed = 2;
}
31 changes: 29 additions & 2 deletions python/feast_spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import uuid
from datetime import datetime
from itertools import groupby
from typing import List, Optional, Union, cast
from typing import Dict, List, Optional, Union, cast

import pandas as pd
import redis
from croniter import croniter

import feast
Expand All @@ -19,6 +20,7 @@
table_reference_from_string,
)
from feast_spark.api.JobService_pb2 import (
GetHealthMetricsRequest,
GetHistoricalFeaturesRequest,
GetJobRequest,
ListJobsRequest,
Expand All @@ -31,6 +33,7 @@
from feast_spark.constants import ConfigOptions as opt
from feast_spark.pyspark.abc import RetrievalJob, SparkJob
from feast_spark.pyspark.launcher import (
get_health_metrics,
get_job_by_id,
list_jobs,
schedule_offline_to_online_ingestion,
Expand Down Expand Up @@ -59,6 +62,14 @@ def __init__(self, feast_client: feast.Client):
self._feast = feast_client
self._job_service_stub: Optional[JobServiceStub] = None

if self.config.exists(opt.SPARK_METRICS_REDIS_HOST) and self.config.exists(
opt.SPARK_METRICS_REDIS_PORT
):
self._metrics_redis = redis.Redis(
host=self.config.get(opt.SPARK_METRICS_REDIS_HOST),
port=self.config.get(opt.SPARK_METRICS_REDIS_PORT),
)

@property
def config(self) -> Config:
return self._feast._config
Expand All @@ -75,6 +86,10 @@ def feature_store(self) -> feast.Client:
def _use_job_service(self) -> bool:
return self.config.exists(opt.JOB_SERVICE_URL)

@property
def metrics_redis(self) -> redis.Redis:
return self._metrics_redis

@property
def _job_service(self):
"""
Expand Down Expand Up @@ -222,7 +237,7 @@ def get_historical_features(
)

def get_historical_features_df(
self, feature_refs: List[str], entity_source: Union[FileSource, BigQuerySource],
self, feature_refs: List[str], entity_source: Union[FileSource, BigQuerySource]
):
"""
Launch a historical feature retrieval job.
Expand Down Expand Up @@ -442,3 +457,15 @@ def get_job_by_id(self, job_id: str) -> SparkJob:
return get_remote_job_from_proto(
self._job_service, self._feast._extra_grpc_params, response.job
)

def get_health_metrics(
self, project: str, table_names: List[str],
) -> Dict[str, List[str]]:
if not self._use_job_service:
return get_health_metrics(self, project, table_names)
else:
request = GetHealthMetricsRequest(
project=cast(str, project), table_names=table_names,
)
response = self._job_service.GetHealthMetrics(request)
return {"passed": response.passed, "failed": response.failed}
6 changes: 6 additions & 0 deletions python/feast_spark/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ class ConfigOptions(metaclass=ConfigMeta):
# SparkApplication resource template for Historical Retrieval Jobs
SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH: Optional[str] = ""

#: Default Redis host to Redis Instance which stores Spark Ingestion Job metrics
SPARK_METRICS_REDIS_HOST: Optional[str] = None

#: Default Redis port to Redis Instance which stores Spark Ingestion Job metrics
SPARK_METRICS_REDIS_PORT: Optional[str] = None

#: File format of historical retrieval features
HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet"

Expand Down
11 changes: 11 additions & 0 deletions python/feast_spark/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from feast_spark.api import JobService_pb2_grpc
from feast_spark.api.JobService_pb2 import (
CancelJobResponse,
GetHealthMetricsResponse,
GetHistoricalFeaturesRequest,
GetHistoricalFeaturesResponse,
GetJobResponse,
Expand Down Expand Up @@ -53,6 +54,7 @@
StreamIngestionJob,
)
from feast_spark.pyspark.launcher import (
get_health_metrics,
get_job_by_id,
get_stream_to_online_ingestion_params,
list_jobs,
Expand Down Expand Up @@ -357,6 +359,15 @@ def GetJob(self, request, context):
job = get_job_by_id(request.job_id, client=self.client)
return GetJobResponse(job=_job_to_proto(job))

def GetHealthMetrics(self, request, context):
"""Return ingestion jobs health metrics"""
metrics = get_health_metrics(
project=request.project, table_names=request.table_names, client=self.client
)
return GetHealthMetricsResponse(
passed=metrics["passed"], failed=metrics["failed"]
)


def start_prometheus_serving(port: int = 8080) -> None:
"""Initialize Prometheus metric server"""
Expand Down
48 changes: 45 additions & 3 deletions python/feast_spark/pyspark/launcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json
import os
import tempfile
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, List, Optional, Union
from typing import TYPE_CHECKING, Dict, List, Optional, Union
from urllib.parse import urlparse, urlunparse

from feast.config import Config
Expand Down Expand Up @@ -293,8 +294,8 @@ def create_bq_view_of_joined_features_and_entities(
source: BigQuerySource, entity_source: BigQuerySource, entity_names: List[str]
) -> BigQuerySource:
"""
Creates BQ view that joins tables from `source` and `entity_source` with join key derived from `entity_names`.
Returns BigQuerySource with reference to created view. The BQ view will be created in the same BQ dataset as `entity_source`.
Creates BQ view that joins tables from `source` and `entity_source` with join key derived from `entity_names`.
Returns BigQuerySource with reference to created view. The BQ view will be created in the same BQ dataset as `entity_source`.
"""
from google.cloud import bigquery

Expand Down Expand Up @@ -484,6 +485,47 @@ def get_job_by_id(job_id: str, client: "Client") -> SparkJob:
return launcher.get_job_by_id(job_id)


def get_health_metrics(
client: "Client", project: str, table_names: List[str],
) -> Dict[str, List[str]]:
all_redis_keys = [f"{project}:{table}" for table in table_names]
metrics = client.metrics_redis.mget(all_redis_keys)

passed_feature_tables = []
failed_feature_tables = []

for metric, name in zip(metrics, table_names):
feature_table = client.feature_store.get_feature_table(
project=project, name=name
)
max_age = feature_table.max_age
# Only perform ingestion health checks for Feature tables with max_age
if not max_age:
passed_feature_tables.append(name)
continue

# If there are missing metrics in Redis; None is returned if there is no such key
if not metric:
failed_feature_tables.append(name)
continue

# Ensure ingestion times are in epoch timings
last_ingestion_time = json.loads(metric)["last_consumed_kafka_timestamp"][
"value"
]
valid_ingestion_time = datetime.timestamp(
datetime.now() - timedelta(seconds=max_age)
)

# Check if latest ingestion timestamp > cur_time - max_age
if valid_ingestion_time > last_ingestion_time:
failed_feature_tables.append(name)
else:
passed_feature_tables.append(name)

return {"passed": passed_feature_tables, "failed": failed_feature_tables}


def stage_dataframe(df, event_timestamp_column: str, config: Config) -> FileSource:
"""
Helper function to upload a pandas dataframe in parquet format to a temporary location (under
Expand Down
1 change: 1 addition & 0 deletions python/requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ pytest-mock==1.10.4
PyYAML==5.3.1
great-expectations==0.13.2
adlfs==0.5.9
redis==4.1.*
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"grpcio-tools==1.31.0",
"mypy-protobuf==2.5",
"croniter==1.*",
"redis==4.1.*",
]

# README file from Feast repo root directory
Expand Down

0 comments on commit 10d0e7d

Please sign in to comment.