Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up OnlineServingService code #605

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
*/
package feast.serving.service;

import static feast.serving.util.Metrics.requestCount;
import static feast.serving.util.Metrics.staleKeyCount;
import static feast.serving.util.RefUtil.generateFeatureStringRef;

import com.google.common.collect.Maps;
import com.google.protobuf.Duration;
import feast.serving.ServingAPIProto.*;
import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow;
import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues;
import feast.serving.specs.CachedSpecService;
import feast.serving.util.Metrics;
import feast.serving.util.RefUtil;
import feast.storage.api.retrieval.FeatureSetRequest;
import feast.storage.api.retrieval.OnlineRetriever;
Expand Down Expand Up @@ -65,7 +62,7 @@ public GetFeastServingInfoResponse getFeastServingInfo(
/** {@inheritDoc} */
@Override
public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest request) {
try (Scope scope = tracer.buildSpan("Redis-getOnlineFeatures").startActive(true)) {
try (Scope scope = tracer.buildSpan("getOnlineFeatures").startActive(true)) {
GetOnlineFeaturesResponse.Builder getOnlineFeaturesResponseBuilder =
GetOnlineFeaturesResponse.newBuilder();
List<FeatureSetRequest> featureSetRequests =
Expand All @@ -85,13 +82,11 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ
List<FeatureRow> featureRowsForFs = featureRows.get(fsIdx);
FeatureSetRequest featureSetRequest = featureSetRequests.get(fsIdx);

String project = featureSetRequest.getSpec().getProject();

// In order to return values containing the same feature references provided by the user,
// we reuse the feature references in the request as the keys in the featureValuesMap
Map<String, FeatureReference> featureNames =
featureSetRequest.getFeatureReferences().stream()
.collect(
Collectors.toMap(
FeatureReference::getName, featureReference -> featureReference));
Map<String, FeatureReference> refsByName = featureSetRequest.getFeatureRefsByName();

// Each feature row returned (per feature set request) corresponds to a given entity row.
// For each feature row, update the featureValuesMap.
Expand All @@ -106,36 +101,23 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ
.parallelStream()
.forEach(
ref -> {
staleKeyCount
.labels(
featureSetRequest.getSpec().getProject(),
String.format("%s:%d", ref.getName(), ref.getVersion()))
.inc();
populateStaleKeyCountMetrics(project, ref);
featureValuesMap
.get(entityRow)
.put(RefUtil.generateFeatureStringRef(ref), Value.newBuilder().build());
});

} else {
featureSetRequest
.getFeatureReferences()
.parallelStream()
.forEach(
ref ->
requestCount
.labels(
featureSetRequest.getSpec().getProject(),
String.format("%s:%d", ref.getName(), ref.getVersion()))
.inc());
populateRequestCountMetrics(featureSetRequest);

// Else populate the featureValueMap at this entityRow with the values in the feature
// row.
featureRow.getFieldsList().stream()
.filter(field -> featureNames.containsKey(field.getName()))
.filter(field -> refsByName.containsKey(field.getName()))
.forEach(
field -> {
FeatureReference ref = featureNames.get(field.getName());
String id = generateFeatureStringRef(ref);
FeatureReference ref = refsByName.get(field.getName());
String id = RefUtil.generateFeatureStringRef(ref);
featureValuesMap.get(entityRow).put(id, field.getValue());
});
}
Expand All @@ -150,6 +132,24 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ
}
}

private void populateStaleKeyCountMetrics(String project, FeatureReference ref) {
Metrics.staleKeyCount
.labels(project, RefUtil.generateFeatureStringRefWithoutProject(ref))
.inc();
}

private void populateRequestCountMetrics(FeatureSetRequest featureSetRequest) {
String project = featureSetRequest.getSpec().getProject();
featureSetRequest
.getFeatureReferences()
.parallelStream()
.forEach(
ref ->
Metrics.requestCount
.labels(project, RefUtil.generateFeatureStringRefWithoutProject(ref))
.inc());
}

@Override
public GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeaturesRequest) {
throw Status.UNIMPLEMENTED.withDescription("Method not implemented").asRuntimeException();
Expand All @@ -162,14 +162,15 @@ public GetJobResponse getJob(GetJobRequest getJobRequest) {

private boolean isStale(
FeatureSetRequest featureSetRequest, EntityRow entityRow, FeatureRow featureRow) {
if (featureSetRequest.getSpec().getMaxAge().equals(Duration.getDefaultInstance())) {
Duration maxAge = featureSetRequest.getSpec().getMaxAge();
if (maxAge.equals(Duration.getDefaultInstance())) {
return false;
}
long givenTimestamp = entityRow.getEntityTimestamp().getSeconds();
if (givenTimestamp == 0) {
givenTimestamp = System.currentTimeMillis() / 1000;
}
long timeDifference = givenTimestamp - featureRow.getEventTimestamp().getSeconds();
return timeDifference > featureSetRequest.getSpec().getMaxAge().getSeconds();
return timeDifference > maxAge.getSeconds();
}
}
8 changes: 8 additions & 0 deletions serving/src/main/java/feast/serving/util/RefUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ public static String generateFeatureStringRef(FeatureReference featureReference)
return ref;
}

public static String generateFeatureStringRefWithoutProject(FeatureReference featureReference) {
String ref = String.format("%s", featureReference.getName());
if (featureReference.getVersion() > 0) {
return ref + String.format(":%d", featureReference.getVersion());
}
return ref;
}

public static String generateFeatureSetStringRef(FeatureSetSpec featureSetSpec) {
String ref = String.format("%s/%s", featureSetSpec.getProject(), featureSetSpec.getName());
if (featureSetSpec.getVersion() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.serving.ServingAPIProto.FeatureReference;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@AutoValue
public abstract class FeatureSetRequest {
Expand Down Expand Up @@ -50,4 +52,9 @@ public Builder addFeatureReference(FeatureReference featureReference) {

public abstract FeatureSetRequest build();
}

public Map<String, FeatureReference> getFeatureRefsByName() {
return getFeatureReferences().stream()
.collect(Collectors.toMap(FeatureReference::getName, featureReference -> featureReference));
}
}