diff --git a/.prow/scripts/test-end-to-end-batch.sh b/.prow/scripts/test-end-to-end-batch.sh index ac312391d55..4d1b1d2ecd7 100755 --- a/.prow/scripts/test-end-to-end-batch.sh +++ b/.prow/scripts/test-end-to-end-batch.sh @@ -235,6 +235,15 @@ set +e pytest bq-batch-retrieval.py --junitxml=${LOGS_ARTIFACT_PATH}/python-sdk-test-report.xml TEST_EXIT_CODE=$? +if [[ ${TEST_EXIT_CODE} != 0 ]]; then + echo "[DEBUG] Printing logs" + ls -ltrh /var/log/feast* + cat /var/log/feast-serving-warehouse.log /var/log/feast-core.log + + echo "[DEBUG] Printing Python packages list" + pip list +fi + cd ${ORIGINAL_DIR} exit ${TEST_EXIT_CODE} diff --git a/.prow/scripts/test-end-to-end.sh b/.prow/scripts/test-end-to-end.sh index cc65968ca22..b9d7fa90882 100755 --- a/.prow/scripts/test-end-to-end.sh +++ b/.prow/scripts/test-end-to-end.sh @@ -66,6 +66,7 @@ sleep 20 tail -n10 /var/log/kafka.log kafkacat -b localhost:9092 -L +if [[ ${SKIP_BUILD_JARS} != "true" ]]; then echo " ============================================================ Building jars for Feast @@ -81,6 +82,9 @@ mvn --quiet --batch-mode --define skipTests=true clean package ls -lh core/target/*jar ls -lh serving/target/*jar +else + echo "[DEBUG] Skipping building jars" +fi echo " ============================================================ @@ -225,5 +229,14 @@ set +e pytest basic-ingest-redis-serving.py --junitxml=${LOGS_ARTIFACT_PATH}/python-sdk-test-report.xml TEST_EXIT_CODE=$? +if [[ ${TEST_EXIT_CODE} != 0 ]]; then + echo "[DEBUG] Printing logs" + ls -ltrh /var/log/feast* + cat /var/log/feast-serving-online.log /var/log/feast-core.log + + echo "[DEBUG] Printing Python packages list" + pip list +fi + cd ${ORIGINAL_DIR} exit ${TEST_EXIT_CODE} diff --git a/CHANGELOG.md b/CHANGELOG.md index 401cabb90d9..7758ae3fb97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # Changelog +## [v0.4.6](https://github.com/gojek/feast/tree/v0.4.6) (2020-02-26) + +[Full Changelog](https://github.com/gojek/feast/compare/v0.4.5...v0.4.6) + +**Merged pull requests:** +- Rename metric name for request latency in feast serving [\#488](https://github.com/gojek/feast/pull/488) ([davidheryanto](https://github.com/davidheryanto)) +- Allow use of secure gRPC in Feast Python client [\#459](https://github.com/gojek/feast/pull/459) ([Yanson](https://github.com/Yanson)) +- Extend WriteMetricsTransform in Ingestion to write feature value stats to StatsD [\#486](https://github.com/gojek/feast/pull/486) ([davidheryanto](https://github.com/davidheryanto)) +- Remove transaction from Ingestion [\#480](https://github.com/gojek/feast/pull/480) ([imjuanleonard](https://github.com/imjuanleonard)) +- Fix fastavro version used in Feast to avoid Timestamp delta error [\#490](https://github.com/gojek/feast/pull/490) ([davidheryanto](https://github.com/davidheryanto)) +- Fail Spotless formatting check before tests execute [\#487](https://github.com/gojek/feast/pull/487) ([ches](https://github.com/ches)) +- Reduce refresh rate of specification refresh in Serving to 10 seconds [\#481](https://github.com/gojek/feast/pull/481) ([woop](https://github.com/woop)) + ## [v0.4.5](https://github.com/gojek/feast/tree/v0.4.5) (2020-02-14) [Full Changelog](https://github.com/gojek/feast/compare/v0.4.4...v0.4.5) diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index d80d6547186..56c6f9de5f2 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -37,7 +37,6 @@ import feast.ingestion.options.ImportOptions; import feast.ingestion.options.OptionCompressor; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; diff --git a/datatypes/java/README.md b/datatypes/java/README.md index d5124eabb46..28b693786c8 100644 --- a/datatypes/java/README.md +++ b/datatypes/java/README.md @@ -16,7 +16,7 @@ Dependency Coordinates dev.feast datatypes-java - 0.4.5-SNAPSHOT + 0.4.6-SNAPSHOT ``` diff --git a/examples/basic/basic.ipynb b/examples/basic/basic.ipynb index 94fc82f2ce9..b9893011d97 100644 --- a/examples/basic/basic.ipynb +++ b/examples/basic/basic.ipynb @@ -203,7 +203,7 @@ "outputs": [], "source": [ "days = [datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=utc) \\\n", - " - timedelta(day) for day in range(31)]\n", + " - timedelta(day) for day in range(3)][::-1]\n", "\n", "customers = [1001, 1002, 1003, 1004, 1005]" ] diff --git a/infra/charts/feast/Chart.yaml b/infra/charts/feast/Chart.yaml index e4ca21aa62f..35a02a7f894 100644 --- a/infra/charts/feast/Chart.yaml +++ b/infra/charts/feast/Chart.yaml @@ -1,4 +1,4 @@ apiVersion: v1 description: A Helm chart to install Feast on kubernetes name: feast -version: 0.4.5 +version: 0.4.6 diff --git a/infra/charts/feast/README.md b/infra/charts/feast/README.md index ca526ad0b9f..a49f0132303 100644 --- a/infra/charts/feast/README.md +++ b/infra/charts/feast/README.md @@ -85,7 +85,7 @@ The following table lists the configurable parameters of the Feast chart and the | `feast-core.prometheus-statsd-exporter.*` | Refer to this [link](charts/feast-core/charts/prometheus-statsd-exporter/values.yaml | | `feast-core.replicaCount` | No of pods to create | `1` | `feast-core.image.repository` | Repository for Feast Core Docker image | `gcr.io/kf-feast/feast-core` -| `feast-core.image.tag` | Tag for Feast Core Docker image | `0.4.5` +| `feast-core.image.tag` | Tag for Feast Core Docker image | `0.4.6` | `feast-core.image.pullPolicy` | Image pull policy for Feast Core Docker image | `IfNotPresent` | `feast-core.prometheus.enabled` | Add annotations to enable Prometheus scraping | `false` | `feast-core.application.yaml` | Configuration for Feast Core application | Refer to this [link](charts/feast-core/values.yaml) @@ -126,7 +126,7 @@ The following table lists the configurable parameters of the Feast chart and the | `feast-serving-online.core.enabled` | Flag for Feast Serving to use Feast Core in the same Helm release | `true` | `feast-serving-online.replicaCount` | No of pods to create | `1` | `feast-serving-online.image.repository` | Repository for Feast Serving Docker image | `gcr.io/kf-feast/feast-serving` -| `feast-serving-online.image.tag` | Tag for Feast Serving Docker image | `0.4.5` +| `feast-serving-online.image.tag` | Tag for Feast Serving Docker image | `0.4.6` | `feast-serving-online.image.pullPolicy` | Image pull policy for Feast Serving Docker image | `IfNotPresent` | `feast-serving-online.prometheus.enabled` | Add annotations to enable Prometheus scraping | `true` | `feast-serving-online.application.yaml` | Application configuration for Feast Serving | Refer to this [link](charts/feast-serving/values.yaml) @@ -168,7 +168,7 @@ The following table lists the configurable parameters of the Feast chart and the | `feast-serving-batch.core.enabled` | Flag for Feast Serving to use Feast Core in the same Helm release | `true` | `feast-serving-batch.replicaCount` | No of pods to create | `1` | `feast-serving-batch.image.repository` | Repository for Feast Serving Docker image | `gcr.io/kf-feast/feast-serving` -| `feast-serving-batch.image.tag` | Tag for Feast Serving Docker image | `0.4.5` +| `feast-serving-batch.image.tag` | Tag for Feast Serving Docker image | `0.4.6` | `feast-serving-batch.image.pullPolicy` | Image pull policy for Feast Serving Docker image | `IfNotPresent` | `feast-serving-batch.prometheus.enabled` | Add annotations to enable Prometheus scraping | `true` | `feast-serving-batch.application.yaml` | Application configuration for Feast Serving | Refer to this [link](charts/feast-serving/values.yaml) diff --git a/infra/charts/feast/charts/feast-core/Chart.yaml b/infra/charts/feast/charts/feast-core/Chart.yaml index 28b3297bba9..0f437d2b6c0 100644 --- a/infra/charts/feast/charts/feast-core/Chart.yaml +++ b/infra/charts/feast/charts/feast-core/Chart.yaml @@ -1,4 +1,4 @@ apiVersion: v1 description: A Helm chart for core component of Feast name: feast-core -version: 0.4.5 +version: 0.4.6 diff --git a/infra/charts/feast/charts/feast-serving/Chart.yaml b/infra/charts/feast/charts/feast-serving/Chart.yaml index c610474c3e5..d84d3377df2 100644 --- a/infra/charts/feast/charts/feast-serving/Chart.yaml +++ b/infra/charts/feast/charts/feast-serving/Chart.yaml @@ -1,4 +1,4 @@ apiVersion: v1 description: A Helm chart for serving component of Feast name: feast-serving -version: 0.4.5 +version: 0.4.6 diff --git a/infra/charts/feast/requirements.yaml b/infra/charts/feast/requirements.yaml index b30635dcdb9..3ed12f08871 100644 --- a/infra/charts/feast/requirements.yaml +++ b/infra/charts/feast/requirements.yaml @@ -1,12 +1,12 @@ dependencies: - name: feast-core - version: 0.4.5 + version: 0.4.6 condition: feast-core.enabled - name: feast-serving alias: feast-serving-batch - version: 0.4.5 + version: 0.4.6 condition: feast-serving-batch.enabled - name: feast-serving alias: feast-serving-online - version: 0.4.5 + version: 0.4.6 condition: feast-serving-online.enabled \ No newline at end of file diff --git a/infra/docker-compose/docker-compose.yml b/infra/docker-compose/docker-compose.yml index 27d82efc3ca..87d56cbe925 100644 --- a/infra/docker-compose/docker-compose.yml +++ b/infra/docker-compose/docker-compose.yml @@ -106,4 +106,6 @@ services: ZOOKEEPER_CLIENT_PORT: 2181 db: - image: postgres:12-alpine \ No newline at end of file + image: postgres:12-alpine + ports: + - "5432:5342" \ No newline at end of file diff --git a/ingestion/pom.xml b/ingestion/pom.xml index c829674a64d..001da1a1453 100644 --- a/ingestion/pom.xml +++ b/ingestion/pom.xml @@ -248,5 +248,12 @@ 2.8.1 + + + org.apache.commons + commons-math3 + 3.6.1 + + diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java index 6afdd80dd72..c1bdcd5fd17 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java @@ -26,6 +26,7 @@ /** Options passed to Beam to influence the job's execution environment */ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, DirectOptions { + @Required @Description( "JSON string representation of the FeatureSet that the import job will process, in BZip2 binary format." @@ -83,4 +84,13 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, int getStatsdPort(); void setStatsdPort(int StatsdPort); + + @Description( + "Fixed window size in seconds (default 30) to apply before aggregation of numerical value of features" + + "and writing the aggregated value to StatsD. Refer to feast.ingestion.transform.metrics.WriteFeatureValueMetricsDoFn" + + "for details on the metric names and types.") + @Default.Integer(30) + int getWindowSizeInSecForFeatureValueMetric(); + + void setWindowSizeInSecForFeatureValueMetric(int seconds); } diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java new file mode 100644 index 00000000000..8574d2414c3 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java @@ -0,0 +1,311 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.metrics; + +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_NAME_TAG_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_PROJECT_TAG_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_VERSION_TAG_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_TAG_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.INGESTION_JOB_NAME_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.METRIC_PREFIX; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.STORE_TAG_KEY; + +import com.google.auto.value.AutoValue; +import com.timgroup.statsd.NonBlockingStatsDClient; +import com.timgroup.statsd.StatsDClient; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import feast.types.ValueProto.Value; +import java.util.ArrayList; +import java.util.DoubleSummaryStatistics; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.apache.commons.math3.stat.descriptive.rank.Percentile; +import org.slf4j.Logger; + +/** + * WriteFeatureValueMetricsDoFn accepts key value of FeatureSetRef(str) to FeatureRow(List) and + * writes a histogram of the numerical values of each feature to StatsD. + * + *

The histogram of the numerical values is represented as the following in StatsD: + * + *

+ * + *

StatsD timing/histogram metric type is not used since it does not support negative values. + */ +@AutoValue +public abstract class WriteFeatureValueMetricsDoFn + extends DoFn>, Void> { + + abstract String getStoreName(); + + abstract String getStatsdHost(); + + abstract int getStatsdPort(); + + static Builder newBuilder() { + return new AutoValue_WriteFeatureValueMetricsDoFn.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setStoreName(String storeName); + + abstract Builder setStatsdHost(String statsdHost); + + abstract Builder setStatsdPort(int statsdPort); + + abstract WriteFeatureValueMetricsDoFn build(); + } + + private static final Logger log = + org.slf4j.LoggerFactory.getLogger(WriteFeatureValueMetricsDoFn.class); + private StatsDClient statsDClient; + public static String GAUGE_NAME_FEATURE_VALUE_MIN = "feature_value_min"; + public static String GAUGE_NAME_FEATURE_VALUE_MAX = "feature_value_max"; + public static String GAUGE_NAME_FEATURE_VALUE_MEAN = "feature_value_mean"; + public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50 = "feature_value_percentile_50"; + public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90 = "feature_value_percentile_90"; + public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95 = "feature_value_percentile_95"; + + @Setup + public void setup() { + // Note that exception may be thrown during StatsD client instantiation but no exception + // will be thrown when sending metrics (mimicking the UDP protocol behaviour). + // https://jar-download.com/artifacts/com.datadoghq/java-dogstatsd-client/2.1.1/documentation + // https://github.com/DataDog/java-dogstatsd-client#unix-domain-socket-support + try { + statsDClient = new NonBlockingStatsDClient(METRIC_PREFIX, getStatsdHost(), getStatsdPort()); + } catch (Exception e) { + log.error("StatsD client cannot be started: " + e.getMessage()); + } + } + + @Teardown + public void tearDown() { + if (statsDClient != null) { + statsDClient.close(); + } + } + + @ProcessElement + public void processElement( + ProcessContext context, + @Element KV> featureSetRefToFeatureRows) { + if (statsDClient == null) { + return; + } + + String featureSetRef = featureSetRefToFeatureRows.getKey(); + if (featureSetRef == null) { + return; + } + String[] colonSplits = featureSetRef.split(":"); + if (colonSplits.length != 2) { + log.error( + "Skip writing feature value metrics because the feature set reference '{}' does not" + + "follow the required format /:", + featureSetRef); + return; + } + String[] slashSplits = colonSplits[0].split("/"); + if (slashSplits.length != 2) { + log.error( + "Skip writing feature value metrics because the feature set reference '{}' does not" + + "follow the required format /:", + featureSetRef); + return; + } + String projectName = slashSplits[0]; + String featureSetName = slashSplits[1]; + String version = colonSplits[1]; + + Map featureNameToStats = new HashMap<>(); + Map> featureNameToValues = new HashMap<>(); + for (FeatureRow featureRow : featureSetRefToFeatureRows.getValue()) { + for (Field field : featureRow.getFieldsList()) { + updateStats(featureNameToStats, featureNameToValues, field); + } + } + + for (Entry entry : featureNameToStats.entrySet()) { + String featureName = entry.getKey(); + DoubleSummaryStatistics stats = entry.getValue(); + String[] tags = { + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_PROJECT_TAG_KEY + ":" + projectName, + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, + FEATURE_SET_VERSION_TAG_KEY + ":" + version, + FEATURE_TAG_KEY + ":" + featureName, + INGESTION_JOB_NAME_KEY + ":" + context.getPipelineOptions().getJobName() + }; + + // stats can return non finite values when there is no element + // or there is an element that is not a number. Metric should only be sent for finite values. + if (Double.isFinite(stats.getMin())) { + if (stats.getMin() < 0) { + // StatsD gauge will asssign a delta instead of the actual value, if there is a sign in + // the value. E.g. if the value is negative, a delta will be assigned. For this reason, + // the gauge value is set to zero beforehand. + // https://github.com/statsd/statsd/blob/master/docs/metric_types.md#gauges + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MIN, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MIN, stats.getMin(), tags); + } + if (Double.isFinite(stats.getMax())) { + if (stats.getMax() < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MAX, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MAX, stats.getMax(), tags); + } + if (Double.isFinite(stats.getAverage())) { + if (stats.getAverage() < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MEAN, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MEAN, stats.getAverage(), tags); + } + + // For percentile calculation, Percentile class from commons-math3 from Apache is used. + // Percentile requires double[], hence the conversion below. + if (!featureNameToValues.containsKey(featureName)) { + continue; + } + List valueList = featureNameToValues.get(featureName); + if (valueList == null || valueList.size() < 1) { + continue; + } + double[] values = new double[valueList.size()]; + for (int i = 0; i < values.length; i++) { + values[i] = valueList.get(i); + } + + double p50 = new Percentile().evaluate(values, 50); + if (p50 < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50, p50, tags); + + double p90 = new Percentile().evaluate(values, 90); + if (p90 < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90, p90, tags); + + double p95 = new Percentile().evaluate(values, 95); + if (p95 < 0) { + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95, 0, tags); + } + statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95, p95, tags); + } + } + + // Update stats and values array for the feature represented by the field. + // If the field contains non-numerical or non-boolean value, the stats and values array + // won't get updated because we are only concerned with numerical value in metrics data. + // For boolean value, true and false are treated as numerical value of 1 of 0 respectively. + private void updateStats( + Map featureNameToStats, + Map> featureNameToValues, + Field field) { + if (featureNameToStats == null || featureNameToValues == null || field == null) { + return; + } + + String featureName = field.getName(); + if (!featureNameToStats.containsKey(featureName)) { + featureNameToStats.put(featureName, new DoubleSummaryStatistics()); + } + if (!featureNameToValues.containsKey(featureName)) { + featureNameToValues.put(featureName, new ArrayList<>()); + } + + Value value = field.getValue(); + DoubleSummaryStatistics stats = featureNameToStats.get(featureName); + List values = featureNameToValues.get(featureName); + + switch (value.getValCase()) { + case INT32_VAL: + stats.accept(value.getInt32Val()); + values.add(((double) value.getInt32Val())); + break; + case INT64_VAL: + stats.accept(value.getInt64Val()); + values.add((double) value.getInt64Val()); + break; + case DOUBLE_VAL: + stats.accept(value.getDoubleVal()); + values.add(value.getDoubleVal()); + break; + case FLOAT_VAL: + stats.accept(value.getFloatVal()); + values.add((double) value.getFloatVal()); + break; + case BOOL_VAL: + stats.accept(value.getBoolVal() ? 1 : 0); + values.add(value.getBoolVal() ? 1d : 0d); + break; + case INT32_LIST_VAL: + for (Integer val : value.getInt32ListVal().getValList()) { + stats.accept(val); + values.add(((double) val)); + } + break; + case INT64_LIST_VAL: + for (Long val : value.getInt64ListVal().getValList()) { + stats.accept(val); + values.add(((double) val)); + } + break; + case DOUBLE_LIST_VAL: + for (Double val : value.getDoubleListVal().getValList()) { + stats.accept(val); + values.add(val); + } + break; + case FLOAT_LIST_VAL: + for (Float val : value.getFloatListVal().getValList()) { + stats.accept(val); + values.add(((double) val)); + } + break; + case BOOL_LIST_VAL: + for (Boolean val : value.getBoolListVal().getValList()) { + stats.accept(val ? 1 : 0); + values.add(val ? 1d : 0d); + } + break; + case BYTES_VAL: + case BYTES_LIST_VAL: + case STRING_VAL: + case STRING_LIST_VAL: + case VAL_NOT_SET: + default: + } + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java index 43f314aa861..10322ac812f 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java @@ -21,11 +21,16 @@ import feast.ingestion.values.FailedElement; import feast.types.FeatureRowProto.FeatureRow; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; @AutoValue public abstract class WriteMetricsTransform extends PTransform { @@ -79,6 +84,42 @@ public PDone expand(PCollectionTuple input) { .setStoreName(getStoreName()) .build())); + // 1. Apply a fixed window + // 2. Group feature row by feature set reference + // 3. Calculate min, max, mean, percentiles of numerical values of features in the window + // and + // 4. Send the aggregate value to StatsD metric collector. + // + // NOTE: window is applied here so the metric collector will not be overwhelmed with + // metrics data. And for metric data, only statistic of the values are usually required + // vs the actual values. + input + .get(getSuccessTag()) + .apply( + "FixedWindow", + Window.into( + FixedWindows.of( + Duration.standardSeconds( + options.getWindowSizeInSecForFeatureValueMetric())))) + .apply( + "ConvertTo_FeatureSetRefToFeatureRow", + ParDo.of( + new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c, @Element FeatureRow featureRow) { + c.output(KV.of(featureRow.getFeatureSet(), featureRow)); + } + })) + .apply("GroupByFeatureSetRef", GroupByKey.create()) + .apply( + "WriteFeatureValueMetrics", + ParDo.of( + WriteFeatureValueMetricsDoFn.newBuilder() + .setStatsdHost(options.getStatsdHost()) + .setStatsdPort(options.getStatsdPort()) + .setStoreName(getStoreName()) + .build())); + return PDone.in(input.getPipeline()); case "none": default: diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java index db2d1acd6d8..2cd1ee94ecc 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java @@ -31,13 +31,13 @@ public abstract class WriteRowMetricsDoFn extends DoFn { private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteRowMetricsDoFn.class); - private final String METRIC_PREFIX = "feast_ingestion"; - private final String STORE_TAG_KEY = "feast_store"; - private final String FEATURE_SET_PROJECT_TAG_KEY = "feast_project_name"; - private final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name"; - private final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version"; - private final String FEATURE_TAG_KEY = "feast_feature_name"; - private final String INGESTION_JOB_NAME_KEY = "ingestion_job_name"; + public static final String METRIC_PREFIX = "feast_ingestion"; + public static final String STORE_TAG_KEY = "feast_store"; + public static final String FEATURE_SET_PROJECT_TAG_KEY = "feast_project_name"; + public static final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name"; + public static final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version"; + public static final String FEATURE_TAG_KEY = "feast_feature_name"; + public static final String INGESTION_JOB_NAME_KEY = "ingestion_job_name"; public abstract String getStoreName(); diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java index 8c142b66c93..8541baaffc3 100644 --- a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java @@ -238,7 +238,6 @@ private void executeBatch() throws Exception { new Retriable() { @Override public void execute() { - pipeline.multi(); mutations.forEach( mutation -> { writeRecord(mutation); @@ -246,7 +245,6 @@ public void execute() { pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis()); } }); - pipeline.exec(); pipeline.sync(); mutations.clear(); } diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 58ecae8f045..1148fa40422 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -32,14 +32,12 @@ import feast.core.StoreProto.Store.Subscription; import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; -import feast.ingestion.options.OptionByteConverter; import feast.storage.RedisProto.RedisKey; import feast.test.TestUtil; import feast.test.TestUtil.LocalKafka; import feast.test.TestUtil.LocalRedis; import feast.types.FeatureRowProto.FeatureRow; import feast.types.ValueProto.ValueType.Enum; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -51,7 +49,6 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.joda.time.Duration; import org.junit.AfterClass; @@ -166,11 +163,13 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() .build(); ImportOptions options = PipelineOptionsFactory.create().as(ImportOptions.class); - BZip2Compressor compressor = new BZip2Compressor<>(option -> { - JsonFormat.Printer printer = - JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts(); - return printer.print(option).getBytes(); - }); + BZip2Compressor compressor = + new BZip2Compressor<>( + option -> { + JsonFormat.Printer printer = + JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts(); + return printer.print(option).getBytes(); + }); options.setFeatureSetJson(compressor.compress(spec)); options.setStoreJson(Collections.singletonList(JsonFormat.printer().print(redis))); options.setProject(""); diff --git a/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java new file mode 100644 index 00000000000..7f487601101 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java @@ -0,0 +1,315 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.metrics; + +import static org.junit.Assert.fail; + +import com.google.protobuf.ByteString; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FeatureRowProto.FeatureRow.Builder; +import feast.types.FieldProto.Field; +import feast.types.ValueProto.BoolList; +import feast.types.ValueProto.BytesList; +import feast.types.ValueProto.DoubleList; +import feast.types.ValueProto.FloatList; +import feast.types.ValueProto.Int32List; +import feast.types.ValueProto.Int64List; +import feast.types.ValueProto.StringList; +import feast.types.ValueProto.Value; +import java.io.BufferedReader; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.junit.Rule; +import org.junit.Test; + +public class WriteFeatureValueMetricsDoFnTest { + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + private static final int STATSD_SERVER_PORT = 17254; + private final DummyStatsDServer statsDServer = new DummyStatsDServer(STATSD_SERVER_PORT); + + @Test + public void shouldSendCorrectStatsDMetrics() throws IOException, InterruptedException { + PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); + pipelineOptions.setJobName("job"); + + Map> input = + readTestInput("feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input"); + List expectedLines = + readTestOutput("feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output"); + + pipeline + .apply(Create.of(input)) + .apply( + ParDo.of( + WriteFeatureValueMetricsDoFn.newBuilder() + .setStatsdHost("localhost") + .setStatsdPort(STATSD_SERVER_PORT) + .setStoreName("store") + .build())); + pipeline.run(pipelineOptions).waitUntilFinish(); + // Wait until StatsD has finished processed all messages, 3 sec is a reasonable duration + // based on empirical testing. + Thread.sleep(3000); + + List actualLines = statsDServer.messagesReceived(); + for (String expected : expectedLines) { + boolean matched = false; + for (String actual : actualLines) { + if (actual.equals(expected)) { + matched = true; + break; + } + } + if (!matched) { + System.out.println("Print actual metrics output for debugging:"); + for (String line : actualLines) { + System.out.println(line); + } + fail(String.format("Expected StatsD metric not found:\n%s", expected)); + } + } + } + + // Test utility method to read expected StatsD metrics output from a text file. + @SuppressWarnings("SameParameterValue") + private List readTestOutput(String path) throws IOException { + URL url = Thread.currentThread().getContextClassLoader().getResource(path); + if (url == null) { + throw new IllegalArgumentException( + "cannot read test data, path contains null url. Path: " + path); + } + List lines = new ArrayList<>(); + try (BufferedReader reader = Files.newBufferedReader(Paths.get(url.getPath()))) { + String line = reader.readLine(); + while (line != null) { + if (line.trim().length() > 1) { + lines.add(line); + } + line = reader.readLine(); + } + } + return lines; + } + + // Test utility method to create test feature row data from a text file. + @SuppressWarnings("SameParameterValue") + private Map> readTestInput(String path) throws IOException { + Map> data = new HashMap<>(); + URL url = Thread.currentThread().getContextClassLoader().getResource(path); + if (url == null) { + throw new IllegalArgumentException( + "cannot read test data, path contains null url. Path: " + path); + } + List lines = new ArrayList<>(); + try (BufferedReader reader = Files.newBufferedReader(Paths.get(url.getPath()))) { + String line = reader.readLine(); + while (line != null) { + lines.add(line); + line = reader.readLine(); + } + } + List colNames = new ArrayList<>(); + for (String line : lines) { + if (line.trim().length() < 1) { + continue; + } + String[] splits = line.split(","); + colNames.addAll(Arrays.asList(splits)); + + if (line.startsWith("featuresetref")) { + // Header line + colNames.addAll(Arrays.asList(splits).subList(1, splits.length)); + continue; + } + + Builder featureRowBuilder = FeatureRow.newBuilder(); + for (int i = 0; i < splits.length; i++) { + String colVal = splits[i].trim(); + if (i == 0) { + featureRowBuilder.setFeatureSet(colVal); + continue; + } + String colName = colNames.get(i); + Field.Builder fieldBuilder = Field.newBuilder().setName(colName); + if (!colVal.isEmpty()) { + switch (colName) { + case "int32": + fieldBuilder.setValue(Value.newBuilder().setInt32Val((Integer.parseInt(colVal)))); + break; + case "int64": + fieldBuilder.setValue(Value.newBuilder().setInt64Val((Long.parseLong(colVal)))); + break; + case "double": + fieldBuilder.setValue(Value.newBuilder().setDoubleVal((Double.parseDouble(colVal)))); + break; + case "float": + fieldBuilder.setValue(Value.newBuilder().setFloatVal((Float.parseFloat(colVal)))); + break; + case "bool": + fieldBuilder.setValue(Value.newBuilder().setBoolVal((Boolean.parseBoolean(colVal)))); + break; + case "int32list": + List int32List = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + int32List.add(Integer.parseInt(val)); + } + fieldBuilder.setValue( + Value.newBuilder().setInt32ListVal(Int32List.newBuilder().addAllVal(int32List))); + break; + case "int64list": + List int64list = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + int64list.add(Long.parseLong(val)); + } + fieldBuilder.setValue( + Value.newBuilder().setInt64ListVal(Int64List.newBuilder().addAllVal(int64list))); + break; + case "doublelist": + List doubleList = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + doubleList.add(Double.parseDouble(val)); + } + fieldBuilder.setValue( + Value.newBuilder() + .setDoubleListVal(DoubleList.newBuilder().addAllVal(doubleList))); + break; + case "floatlist": + List floatList = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + floatList.add(Float.parseFloat(val)); + } + fieldBuilder.setValue( + Value.newBuilder().setFloatListVal(FloatList.newBuilder().addAllVal(floatList))); + break; + case "boollist": + List boolList = new ArrayList<>(); + for (String val : colVal.split("\\|")) { + boolList.add(Boolean.parseBoolean(val)); + } + fieldBuilder.setValue( + Value.newBuilder().setBoolListVal(BoolList.newBuilder().addAllVal(boolList))); + break; + case "bytes": + fieldBuilder.setValue( + Value.newBuilder().setBytesVal(ByteString.copyFromUtf8("Dummy"))); + break; + case "byteslist": + fieldBuilder.setValue( + Value.newBuilder().setBytesListVal(BytesList.getDefaultInstance())); + break; + case "string": + fieldBuilder.setValue(Value.newBuilder().setStringVal("Dummy")); + break; + case "stringlist": + fieldBuilder.setValue( + Value.newBuilder().setStringListVal(StringList.getDefaultInstance())); + break; + } + } + featureRowBuilder.addFields(fieldBuilder); + } + + if (!data.containsKey(featureRowBuilder.getFeatureSet())) { + data.put(featureRowBuilder.getFeatureSet(), new ArrayList<>()); + } + List featureRowsByFeatureSetRef = data.get(featureRowBuilder.getFeatureSet()); + featureRowsByFeatureSetRef.add(featureRowBuilder.build()); + } + + // Convert List to Iterable to match the function signature in + // WriteFeatureValueMetricsDoFn + Map> dataWithIterable = new HashMap<>(); + for (Entry> entrySet : data.entrySet()) { + String key = entrySet.getKey(); + Iterable value = entrySet.getValue(); + dataWithIterable.put(key, value); + } + return dataWithIterable; + } + + // Modified version of + // https://github.com/tim-group/java-statsd-client/blob/master/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java + @SuppressWarnings("CatchMayIgnoreException") + private static final class DummyStatsDServer { + + private final List messagesReceived = new ArrayList(); + private final DatagramSocket server; + + public DummyStatsDServer(int port) { + try { + server = new DatagramSocket(port); + } catch (SocketException e) { + throw new IllegalStateException(e); + } + new Thread( + () -> { + try { + while (true) { + final DatagramPacket packet = new DatagramPacket(new byte[65535], 65535); + server.receive(packet); + messagesReceived.add( + new String(packet.getData(), StandardCharsets.UTF_8).trim() + "\n"); + Thread.sleep(50); + } + + } catch (Exception e) { + } + }) + .start(); + } + + public void stop() { + server.close(); + } + + public void waitForMessage() { + while (messagesReceived.isEmpty()) { + try { + Thread.sleep(50L); + } catch (InterruptedException e) { + } + } + } + + public List messagesReceived() { + List out = new ArrayList<>(); + for (String msg : messagesReceived) { + String[] lines = msg.split("\n"); + out.addAll(Arrays.asList(lines)); + } + return out; + } + } +} diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README new file mode 100644 index 00000000000..3c8759d1702 --- /dev/null +++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README @@ -0,0 +1,9 @@ +WriteFeatureValueMetricsDoFnTest.input file contains data that can be read by test utility +into map of FeatureSetRef -> [FeatureRow]. In the first row, the cell value corresponds to the +field name in the FeatureRow. This should not be changed as the test utility derives the value +type from this name. Empty value in the cell is a value that is not set. For list type, the values +of different element is separated by the '|' character. + +WriteFeatureValueMetricsDoFnTest.output file contains lines of expected StatsD metrics that should +be sent when WriteFeatureValueMetricsDoFn runs. It can be checked against the actual outputted +StatsD metrics to test for correctness. diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input new file mode 100644 index 00000000000..d2985711cee --- /dev/null +++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input @@ -0,0 +1,4 @@ +featuresetref,int32,int64,double,float,bool,int32list,int64list,doublelist,floatlist,boollist,bytes,byteslist,string,stringlist +project/featureset:1,1,5,8,5,true,1|4|3,5|1|12,5|7|3,-2.0,true|false,,,, +project/featureset:1,5,-10,8,10.0,true,1|12|5,,,-1.0|-3.0,false|true,,,, +project/featureset:1,6,-4,8,0.0,true,2,2|5,,,true|false,,,, \ No newline at end of file diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output new file mode 100644 index 00000000000..63bc7bbfa4e --- /dev/null +++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output @@ -0,0 +1,66 @@ +feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:6|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:4|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:6|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_min:-10|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:5|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:0|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:-3|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:-4|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:5|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:10|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:5|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:10|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:12|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:4|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:3|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:12|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:12|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:5|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:12|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:3|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:7|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:5|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:7|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_min:-3|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:-1|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:-2|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:-2|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:-1|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_max:1|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_mean:0.5|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_50:0.5|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_percentile_90:1|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store \ No newline at end of file diff --git a/pom.xml b/pom.xml index 3ba6a592cfa..b8310aca91d 100644 --- a/pom.xml +++ b/pom.xml @@ -36,7 +36,7 @@ - 0.4.5-SNAPSHOT + 0.4.6-SNAPSHOT https://github.com/gojek/feast UTF-8 @@ -372,6 +372,16 @@ + + + + spotless-check + process-test-classes + + check + + + org.apache.maven.plugins diff --git a/protos/feast/types/FeatureRow.proto b/protos/feast/types/FeatureRow.proto index 24293c6faa6..c170cd5d502 100644 --- a/protos/feast/types/FeatureRow.proto +++ b/protos/feast/types/FeatureRow.proto @@ -36,7 +36,7 @@ message FeatureRow { google.protobuf.Timestamp event_timestamp = 3; // Complete reference to the featureSet this featureRow belongs to, in the form of - // featureSetName:version. This value will be used by the feast ingestion job to filter + // /:. This value will be used by the feast ingestion job to filter // rows, and write the values to the correct tables. string feature_set = 6; -} \ No newline at end of file +} diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index a68f0fe2bc5..4147cc4af28 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -21,7 +21,6 @@ from collections import OrderedDict from math import ceil from typing import Dict, List, Tuple, Union, Optional -from typing import List from urllib.parse import urlparse import fastavro @@ -29,6 +28,7 @@ import pandas as pd import pyarrow as pa import pyarrow.parquet as pq + from feast.core.CoreService_pb2 import ( GetFeastCoreVersionRequest, ListFeatureSetsResponse, @@ -48,11 +48,11 @@ from feast.core.FeatureSet_pb2 import FeatureSetStatus from feast.feature_set import FeatureSet, Entity from feast.job import Job -from feast.serving.ServingService_pb2 import FeatureReference from feast.loaders.abstract_producer import get_producer from feast.loaders.file import export_source_to_staging_location from feast.loaders.ingest import KAFKA_CHUNK_PRODUCTION_TIMEOUT from feast.loaders.ingest import get_feature_row_chunks +from feast.serving.ServingService_pb2 import FeatureReference from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse from feast.serving.ServingService_pb2 import ( GetOnlineFeaturesRequest, @@ -69,9 +69,11 @@ GRPC_CONNECTION_TIMEOUT_DEFAULT = 3 # type: int GRPC_CONNECTION_TIMEOUT_APPLY = 600 # type: int -FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL" # type: str -FEAST_CORE_URL_ENV_KEY = "FEAST_CORE_URL" # type: str -FEAST_PROJECT_ENV_KEY = "FEAST_PROJECT" # type: str +FEAST_CORE_URL_ENV_KEY = "FEAST_CORE_URL" +FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL" +FEAST_PROJECT_ENV_KEY = "FEAST_PROJECT" +FEAST_CORE_SECURE_ENV_KEY = "FEAST_CORE_SECURE" +FEAST_SERVING_SECURE_ENV_KEY = "FEAST_SERVING_SECURE" BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS = 300 CPU_COUNT = os.cpu_count() # type: int @@ -82,7 +84,8 @@ class Client: """ def __init__( - self, core_url: str = None, serving_url: str = None, project: str = None + self, core_url: str = None, serving_url: str = None, project: str = None, + core_secure: bool = None, serving_secure: bool = None ): """ The Feast Client should be initialized with at least one service url @@ -91,10 +94,14 @@ def __init__( core_url: Feast Core URL. Used to manage features serving_url: Feast Serving URL. Used to retrieve features project: Sets the active project. This field is optional. - """ - self._core_url = core_url - self._serving_url = serving_url - self._project = project + core_secure: Use client-side SSL/TLS for Core gRPC API + serving_secure: Use client-side SSL/TLS for Serving gRPC API + """ + self._core_url: str = core_url + self._serving_url: str = serving_url + self._project: str = project + self._core_secure: bool = core_secure + self._serving_secure: bool = serving_secure self.__core_channel: grpc.Channel = None self.__serving_channel: grpc.Channel = None self._core_service_stub: CoreServiceStub = None @@ -149,6 +156,52 @@ def serving_url(self, value: str): """ self._serving_url = value + @property + def core_secure(self) -> bool: + """ + Retrieve Feast Core client-side SSL/TLS setting + + Returns: + Whether client-side SSL/TLS is enabled + """ + + if self._core_secure is not None: + return self._core_secure + return os.getenv(FEAST_CORE_SECURE_ENV_KEY, "").lower() is "true" + + @core_secure.setter + def core_secure(self, value: bool): + """ + Set the Feast Core client-side SSL/TLS setting + + Args: + value: True to enable client-side SSL/TLS + """ + self._core_secure = value + + @property + def serving_secure(self) -> bool: + """ + Retrieve Feast Serving client-side SSL/TLS setting + + Returns: + Whether client-side SSL/TLS is enabled + """ + + if self._serving_secure is not None: + return self._serving_secure + return os.getenv(FEAST_SERVING_SECURE_ENV_KEY, "").lower() is "true" + + @serving_secure.setter + def serving_secure(self, value: bool): + """ + Set the Feast Serving client-side SSL/TLS setting + + Args: + value: True to enable client-side SSL/TLS + """ + self._serving_secure = value + def version(self): """ Returns version information from Feast Core and Feast Serving @@ -185,7 +238,10 @@ def _connect_core(self, skip_if_connected: bool = True): raise ValueError("Please set Feast Core URL.") if self.__core_channel is None: - self.__core_channel = grpc.insecure_channel(self.core_url) + if self.core_secure or self.core_url.endswith(":443"): + self.__core_channel = grpc.secure_channel(self.core_url, grpc.ssl_channel_credentials()) + else: + self.__core_channel = grpc.insecure_channel(self.core_url) try: grpc.channel_ready_future(self.__core_channel).result( @@ -214,7 +270,10 @@ def _connect_serving(self, skip_if_connected=True): raise ValueError("Please set Feast Serving URL.") if self.__serving_channel is None: - self.__serving_channel = grpc.insecure_channel(self.serving_url) + if self.serving_secure or self.serving_url.endswith(":443"): + self.__serving_channel = grpc.secure_channel(self.serving_url, grpc.ssl_channel_credentials()) + else: + self.__serving_channel = grpc.insecure_channel(self.serving_url) try: grpc.channel_ready_future(self.__serving_channel).result( diff --git a/sdk/python/requirements-ci.txt b/sdk/python/requirements-ci.txt index f3df60a02ec..b0ad509510e 100644 --- a/sdk/python/requirements-ci.txt +++ b/sdk/python/requirements-ci.txt @@ -11,6 +11,7 @@ mock==2.0.0 pandas==0.* protobuf==3.* pytest +pytest-lazy-fixture==0.6.3 pytest-mock pytest-timeout PyYAML==5.1.2 diff --git a/sdk/python/setup.py b/sdk/python/setup.py index d0b37ad9419..3fc77540c02 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -37,7 +37,9 @@ "pandavro==1.5.*", "protobuf>=3.10", "PyYAML==5.1.*", - "fastavro==0.*", + # fastavro 0.22.10 and newer will throw this error for e2e batch test: + # TypeError: Timestamp subtraction must have the same timezones or no timezones + "fastavro==0.22.9", "kafka-python==1.*", "tabulate==0.8.*", "toml==0.10.*", diff --git a/sdk/python/tests/data/localhost.crt b/sdk/python/tests/data/localhost.crt new file mode 100644 index 00000000000..1f471506aab --- /dev/null +++ b/sdk/python/tests/data/localhost.crt @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC5zCCAc+gAwIBAgIJAKzukpnyuwsVMA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNV +BAMMCWxvY2FsaG9zdDAgFw0yMDAyMTcxMTE4NDNaGA8zMDE5MDYyMDExMTg0M1ow +FDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB +CgKCAQEAqoanhiy4EUZjPA/m8IWk50OyTjKAnqZvEW5glqmTHP6lQbfyWQnzj3Ny +c++4Xn901FO2v07h+7lE3BScjgCX6klsLOHRnWcLX8lQygR6zzO+Oey1yXuCebBA +yhrsqgTDC/8zoCxe0W3t0vqvE4AJs3tJHq5Y1ba/X9OiKKsDZuMSSsbdd4qVEL6y +BD8PRNLT/iiD84Kq58GZtOI3fJls8E/bYbvksugcPI3kmlU4Plg3VrVplMl3DcMz +7BbvQP6jmVqdPtUT7+lL0C5CsNqbdDOIwg09+Gwus+A/g8PerBBd+ZCmdvSa9LYJ +OmlJszgZPIL9AagXLfuGQvNN2Y6WowIDAQABozowODAUBgNVHREEDTALgglsb2Nh +bGhvc3QwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA0GCSqGSIb3 +DQEBCwUAA4IBAQAuF1/VeQL73Y1FKrBX4bAb/Rdh2+Dadpi+w1pgEOi3P4udmQ+y +Xn9GwwLRQmHRLjyCT5KT8lNHdldPdlBamqPGGku449aCAjA/YHVHhcHaXl0MtPGq +BfKhHYSsvI2sIymlzZIvvIaf04yuJ1g+L0j8Px4Ecor9YwcKDZmpnIXLgdUtUrIQ +5Omrb4jImX6q8jp6Bjplb4H3o4TqKoa74NLOWUiH5/Rix3Lo8MRoEVbX2GhKk+8n +0eD3AuyrI1i+ce7zY8qGJKKFHGLDWPA/+006ZIS4j/Hr2FWo07CPFQ4/3gdJ8Erw +SzgO9vvIhQrBJn2CIH4+P5Cb1ktdobNWW9XK +-----END CERTIFICATE----- diff --git a/sdk/python/tests/data/localhost.key b/sdk/python/tests/data/localhost.key new file mode 100644 index 00000000000..dbd9cda062c --- /dev/null +++ b/sdk/python/tests/data/localhost.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCqhqeGLLgRRmM8 +D+bwhaTnQ7JOMoCepm8RbmCWqZMc/qVBt/JZCfOPc3Jz77hef3TUU7a/TuH7uUTc +FJyOAJfqSWws4dGdZwtfyVDKBHrPM7457LXJe4J5sEDKGuyqBMML/zOgLF7Rbe3S ++q8TgAmze0kerljVtr9f06IoqwNm4xJKxt13ipUQvrIEPw9E0tP+KIPzgqrnwZm0 +4jd8mWzwT9thu+Sy6Bw8jeSaVTg+WDdWtWmUyXcNwzPsFu9A/qOZWp0+1RPv6UvQ +LkKw2pt0M4jCDT34bC6z4D+Dw96sEF35kKZ29Jr0tgk6aUmzOBk8gv0BqBct+4ZC +803ZjpajAgMBAAECggEADE4FHphxe8WheX8IQgjSumFXJ29bepc14oMdcyGvXOM/ +F3vnf+dI7Ov+sUD2A9OcoYmc4TcW9WwL/Pl7xn9iduRvatmsn3gFCRdkvf8OwY7R +Riq/f1drNc6zDiJdO3N2g5IZrpAlE2WkSJoQMg8GJC5cO1uHS3yRWJ/Tzq1wZGcW +Dot9hAFgN0qNdP0xFkOsPM5ptC3DjLqsZWboJhIM19hgsIYaWQWHvcYlCcWTVhkj +FYzvLj5GrzAgyE89RpdXus670q5E2R2Rlnja21TfcxK0UOdIrKghZ0jxZMsXEwdB +8V7kIzL5kh//RhT/dIt0mHNMSdLFFx3yMTb2wTzpWQKBgQDRiCRslDSjiNSFySkn +6IivAwJtV2gLSxV05D9u9lrrlskHogrZUJkpVF1VzSnwv/ASaCZX4AGTtNPaz+vy +yDviwfjADsuum8jkzoxKCHnR1HVMyX+vm/g+pE20PMskTUuDE4zROtrqo9Ky0afv +94mJrf93Q815rsbEM5osugaeBQKBgQDQWAPTKy1wcG7edwfu3EaLYHPZ8pW9MldP +FvCLTMwSDkSzU+wA4BGE/5Tuu0WHSAfUc5C1LnMQXKBQXun+YCaBR6GZjUAmntz3 +poBIOYaxe651zqzCmo4ip1h5wIfPvynsyGmhsbpDSNhvXFgH2mF3XSY1nduKSRHu +389cHk3ahwKBgA4gAWSYcRv9I2aJcw7PrDcwGr/IPqlUPHQO1v/h96seFRtAnz6b +IlgY6dnY5NTn+4UiJEOUREbyz71Weu949CCLNvurg6uXsOlLy0VKYPv2OJoek08B +UrDWXq6h0of19fs2HC4Wq59Zv+ByJcIVi94OLsSZe4aSc6/SUrhlKgEJAoGBAIvR +5Y88NNx2uBEYdPx6W+WBr34e7Rrxw+JSFNCHk5SyeqyWr5XOyjMliv/EMl8dmhOc +Ewtkxte+MeB+Mi8CvBSay/rO7rR8fPK+jOzrnldSF7z8HLjlHGppQFlFOl/TfQFp +ZmqbadNp+caShImQp0SCAPiOnh1p+F0FWpYJyFnVAoGAKhSRP0iUmd+tId94px2m +G248BhcM9/0r+Y3yRX1eBx5eBzlzPUPcW1MSbhiZ1DIyLZ/MyObl98A1oNBGun11 +H/7Mq0E8BcJoXmt/6Z+2NhREBV9tDNuINyS/coYBV7H50pnSqyPpREPxNmu3Ukbm +u7ggLRfH+DexDysbpbCZ9l4= +-----END PRIVATE KEY----- diff --git a/sdk/python/tests/data/localhost.pem b/sdk/python/tests/data/localhost.pem new file mode 100644 index 00000000000..1f471506aab --- /dev/null +++ b/sdk/python/tests/data/localhost.pem @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC5zCCAc+gAwIBAgIJAKzukpnyuwsVMA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNV +BAMMCWxvY2FsaG9zdDAgFw0yMDAyMTcxMTE4NDNaGA8zMDE5MDYyMDExMTg0M1ow +FDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB +CgKCAQEAqoanhiy4EUZjPA/m8IWk50OyTjKAnqZvEW5glqmTHP6lQbfyWQnzj3Ny +c++4Xn901FO2v07h+7lE3BScjgCX6klsLOHRnWcLX8lQygR6zzO+Oey1yXuCebBA +yhrsqgTDC/8zoCxe0W3t0vqvE4AJs3tJHq5Y1ba/X9OiKKsDZuMSSsbdd4qVEL6y +BD8PRNLT/iiD84Kq58GZtOI3fJls8E/bYbvksugcPI3kmlU4Plg3VrVplMl3DcMz +7BbvQP6jmVqdPtUT7+lL0C5CsNqbdDOIwg09+Gwus+A/g8PerBBd+ZCmdvSa9LYJ +OmlJszgZPIL9AagXLfuGQvNN2Y6WowIDAQABozowODAUBgNVHREEDTALgglsb2Nh +bGhvc3QwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA0GCSqGSIb3 +DQEBCwUAA4IBAQAuF1/VeQL73Y1FKrBX4bAb/Rdh2+Dadpi+w1pgEOi3P4udmQ+y +Xn9GwwLRQmHRLjyCT5KT8lNHdldPdlBamqPGGku449aCAjA/YHVHhcHaXl0MtPGq +BfKhHYSsvI2sIymlzZIvvIaf04yuJ1g+L0j8Px4Ecor9YwcKDZmpnIXLgdUtUrIQ +5Omrb4jImX6q8jp6Bjplb4H3o4TqKoa74NLOWUiH5/Rix3Lo8MRoEVbX2GhKk+8n +0eD3AuyrI1i+ce7zY8qGJKKFHGLDWPA/+006ZIS4j/Hr2FWo07CPFQ4/3gdJ8Erw +SzgO9vvIhQrBJn2CIH4+P5Cb1ktdobNWW9XK +-----END CERTIFICATE----- diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index 123cbe47fd6..2724fff52e3 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -11,9 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import pkgutil from datetime import datetime import tempfile +from unittest import mock + import grpc import pandas as pd from google.protobuf.duration_pb2 import Duration @@ -63,10 +66,38 @@ CORE_URL = "core.feast.example.com" SERVING_URL = "serving.example.com" +_PRIVATE_KEY_RESOURCE_PATH = 'data/localhost.key' +_CERTIFICATE_CHAIN_RESOURCE_PATH = 'data/localhost.pem' +_ROOT_CERTIFICATE_RESOURCE_PATH = 'data/localhost.crt' class TestClient: - @pytest.fixture(scope="function") + + @pytest.fixture + def secure_mock_client(self, mocker): + client = Client(core_url=CORE_URL, serving_url=SERVING_URL, core_secure=True, serving_secure=True) + mocker.patch.object(client, "_connect_core") + mocker.patch.object(client, "_connect_serving") + client._core_url = CORE_URL + client._serving_url = SERVING_URL + return client + + @pytest.fixture + def mock_client(self, mocker): + client = Client(core_url=CORE_URL, serving_url=SERVING_URL) + mocker.patch.object(client, "_connect_core") + mocker.patch.object(client, "_connect_serving") + client._core_url = CORE_URL + client._serving_url = SERVING_URL + return client + + @pytest.fixture + def server_credentials(self): + private_key = pkgutil.get_data(__name__, _PRIVATE_KEY_RESOURCE_PATH) + certificate_chain = pkgutil.get_data(__name__, _CERTIFICATE_CHAIN_RESOURCE_PATH) + return grpc.ssl_server_credentials(((private_key, certificate_chain),)) + + @pytest.fixture def core_server(self): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) Core.add_CoreServiceServicer_to_server(CoreServicer(), server) @@ -75,7 +106,7 @@ def core_server(self): yield server server.stop(0) - @pytest.fixture(scope="function") + @pytest.fixture def serving_server(self): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) Serving.add_ServingServiceServicer_to_server(ServingServicer(), server) @@ -85,48 +116,73 @@ def serving_server(self): server.stop(0) @pytest.fixture - def mock_client(self, mocker): - client = Client(core_url=CORE_URL, serving_url=SERVING_URL) - mocker.patch.object(client, "_connect_core") - mocker.patch.object(client, "_connect_serving") - client._core_url = CORE_URL - client._serving_url = SERVING_URL - return client + def secure_core_server(self, server_credentials): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + Core.add_CoreServiceServicer_to_server(CoreServicer(), server) + server.add_secure_port("[::]:50053", server_credentials) + server.start() + yield server + server.stop(0) + + @pytest.fixture + def secure_serving_server(self, server_credentials): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + Serving.add_ServingServiceServicer_to_server(ServingServicer(), server) + + server.add_secure_port("[::]:50054", server_credentials) + server.start() + yield server + server.stop(0) + + @pytest.fixture + def secure_client(self, secure_core_server, secure_serving_server): + root_certificate_credentials = pkgutil.get_data(__name__, _ROOT_CERTIFICATE_RESOURCE_PATH) + # this is needed to establish a secure connection using self-signed certificates, for the purpose of the test + ssl_channel_credentials = grpc.ssl_channel_credentials(root_certificates=root_certificate_credentials) + with mock.patch("grpc.ssl_channel_credentials", MagicMock(return_value=ssl_channel_credentials)): + yield Client(core_url="localhost:50053", serving_url="localhost:50054", core_secure=True, + serving_secure=True) @pytest.fixture def client(self, core_server, serving_server): return Client(core_url="localhost:50051", serving_url="localhost:50052") - def test_version(self, mock_client, mocker): - mock_client._core_service_stub = Core.CoreServiceStub(grpc.insecure_channel("")) - mock_client._serving_service_stub = Serving.ServingServiceStub( + @pytest.mark.parametrize("mocked_client", [pytest.lazy_fixture("mock_client"), + pytest.lazy_fixture("secure_mock_client") + ]) + def test_version(self, mocked_client, mocker): + mocked_client._core_service_stub = Core.CoreServiceStub(grpc.insecure_channel("")) + mocked_client._serving_service_stub = Serving.ServingServiceStub( grpc.insecure_channel("") ) mocker.patch.object( - mock_client._core_service_stub, + mocked_client._core_service_stub, "GetFeastCoreVersion", return_value=GetFeastCoreVersionResponse(version="0.3.2"), ) mocker.patch.object( - mock_client._serving_service_stub, + mocked_client._serving_service_stub, "GetFeastServingInfo", return_value=GetFeastServingInfoResponse(version="0.3.2"), ) - status = mock_client.version() + status = mocked_client.version() assert ( - status["core"]["url"] == CORE_URL - and status["core"]["version"] == "0.3.2" - and status["serving"]["url"] == SERVING_URL - and status["serving"]["version"] == "0.3.2" + status["core"]["url"] == CORE_URL + and status["core"]["version"] == "0.3.2" + and status["serving"]["url"] == SERVING_URL + and status["serving"]["version"] == "0.3.2" ) - def test_get_online_features(self, mock_client, mocker): + @pytest.mark.parametrize("mocked_client", [pytest.lazy_fixture("mock_client"), + pytest.lazy_fixture("secure_mock_client") + ]) + def test_get_online_features(self, mocked_client, mocker): ROW_COUNT = 300 - mock_client._serving_service_stub = Serving.ServingServiceStub( + mocked_client._serving_service_stub = Serving.ServingServiceStub( grpc.insecure_channel("") ) @@ -148,12 +204,12 @@ def test_get_online_features(self, mock_client, mocker): ) mocker.patch.object( - mock_client._serving_service_stub, + mocked_client._serving_service_stub, "GetOnlineFeatures", return_value=response, ) - response = mock_client.get_online_features( + response = mocked_client.get_online_features( entity_rows=entity_rows, feature_refs=[ "my_project/feature_1:1", @@ -169,17 +225,20 @@ def test_get_online_features(self, mock_client, mocker): ) # type: GetOnlineFeaturesResponse assert ( - response.field_values[0].fields["my_project/feature_1:1"].int64_val == 1 - and response.field_values[0].fields["my_project/feature_9:1"].int64_val == 9 + response.field_values[0].fields["my_project/feature_1:1"].int64_val == 1 + and response.field_values[0].fields["my_project/feature_9:1"].int64_val == 9 ) - def test_get_feature_set(self, mock_client, mocker): - mock_client._core_service_stub = Core.CoreServiceStub(grpc.insecure_channel("")) + @pytest.mark.parametrize("mocked_client", [pytest.lazy_fixture("mock_client"), + pytest.lazy_fixture("secure_mock_client") + ]) + def test_get_feature_set(self, mocked_client, mocker): + mocked_client._core_service_stub = Core.CoreServiceStub(grpc.insecure_channel("")) from google.protobuf.duration_pb2 import Duration mocker.patch.object( - mock_client._core_service_stub, + mocked_client._core_service_stub, "GetFeatureSet", return_value=GetFeatureSetResponse( feature_set=FeatureSetProto( @@ -214,29 +273,32 @@ def test_get_feature_set(self, mock_client, mocker): ) ), ) - mock_client.set_project("my_project") - feature_set = mock_client.get_feature_set("my_feature_set", version=2) + mocked_client.set_project("my_project") + feature_set = mocked_client.get_feature_set("my_feature_set", version=2) assert ( - feature_set.name == "my_feature_set" - and feature_set.version == 2 - and feature_set.fields["my_feature_1"].name == "my_feature_1" - and feature_set.fields["my_feature_1"].dtype == ValueType.FLOAT - and feature_set.fields["my_entity_1"].name == "my_entity_1" - and feature_set.fields["my_entity_1"].dtype == ValueType.INT64 - and len(feature_set.features) == 2 - and len(feature_set.entities) == 1 + feature_set.name == "my_feature_set" + and feature_set.version == 2 + and feature_set.fields["my_feature_1"].name == "my_feature_1" + and feature_set.fields["my_feature_1"].dtype == ValueType.FLOAT + and feature_set.fields["my_entity_1"].name == "my_entity_1" + and feature_set.fields["my_entity_1"].dtype == ValueType.INT64 + and len(feature_set.features) == 2 + and len(feature_set.entities) == 1 ) - def test_get_batch_features(self, mock_client, mocker): + @pytest.mark.parametrize("mocked_client", [pytest.lazy_fixture("mock_client"), + pytest.lazy_fixture("secure_mock_client") + ]) + def test_get_batch_features(self, mocked_client, mocker): - mock_client._serving_service_stub = Serving.ServingServiceStub( + mocked_client._serving_service_stub = Serving.ServingServiceStub( grpc.insecure_channel("") ) - mock_client._core_service_stub = Core.CoreServiceStub(grpc.insecure_channel("")) + mocked_client._core_service_stub = Core.CoreServiceStub(grpc.insecure_channel("")) mocker.patch.object( - mock_client._core_service_stub, + mocked_client._core_service_stub, "GetFeatureSet", return_value=GetFeatureSetResponse( feature_set=FeatureSetProto( @@ -283,7 +345,7 @@ def test_get_batch_features(self, mock_client, mocker): to_avro(file_path_or_buffer=final_results, df=expected_dataframe) mocker.patch.object( - mock_client._serving_service_stub, + mocked_client._serving_service_stub, "GetBatchFeatures", return_value=GetBatchFeaturesResponse( job=BatchFeaturesJob( @@ -297,7 +359,7 @@ def test_get_batch_features(self, mock_client, mocker): ) mocker.patch.object( - mock_client._serving_service_stub, + mocked_client._serving_service_stub, "GetJob", return_value=GetJobResponse( job=BatchFeaturesJob( @@ -311,7 +373,7 @@ def test_get_batch_features(self, mock_client, mocker): ) mocker.patch.object( - mock_client._serving_service_stub, + mocked_client._serving_service_stub, "GetFeastServingInfo", return_value=GetFeastServingInfoResponse( job_staging_location=f"file://{tempfile.mkdtemp()}/", @@ -319,8 +381,8 @@ def test_get_batch_features(self, mock_client, mocker): ), ) - mock_client.set_project("project1") - response = mock_client.get_batch_features( + mocked_client.set_project("project1") + response = mocked_client.get_batch_features( entity_rows=pd.DataFrame( { "datetime": [ @@ -348,9 +410,12 @@ def test_get_batch_features(self, mock_client, mocker): ] ) - def test_apply_feature_set_success(self, client): + @pytest.mark.parametrize("test_client", [pytest.lazy_fixture("client"), + pytest.lazy_fixture("secure_client") + ]) + def test_apply_feature_set_success(self, test_client): - client.set_project("project1") + test_client.set_project("project1") # Create Feature Sets fs1 = FeatureSet("my-feature-set-1") @@ -364,23 +429,24 @@ def test_apply_feature_set_success(self, client): fs2.add(Entity(name="fs2-my-entity-1", dtype=ValueType.INT64)) # Register Feature Set with Core - client.apply(fs1) - client.apply(fs2) + test_client.apply(fs1) + test_client.apply(fs2) - feature_sets = client.list_feature_sets() + feature_sets = test_client.list_feature_sets() # List Feature Sets assert ( - len(feature_sets) == 2 - and feature_sets[0].name == "my-feature-set-1" - and feature_sets[0].features[0].name == "fs1-my-feature-1" - and feature_sets[0].features[0].dtype == ValueType.INT64 - and feature_sets[1].features[1].dtype == ValueType.BYTES_LIST + len(feature_sets) == 2 + and feature_sets[0].name == "my-feature-set-1" + and feature_sets[0].features[0].name == "fs1-my-feature-1" + and feature_sets[0].features[0].dtype == ValueType.INT64 + and feature_sets[1].features[1].dtype == ValueType.BYTES_LIST ) - @pytest.mark.parametrize("dataframe", [dataframes.GOOD]) - def test_feature_set_ingest_success(self, dataframe, client, mocker): - client.set_project("project1") + @pytest.mark.parametrize("dataframe,test_client", [(dataframes.GOOD, pytest.lazy_fixture("client")), + (dataframes.GOOD, pytest.lazy_fixture("secure_client"))]) + def test_feature_set_ingest_success(self, dataframe, test_client, mocker): + test_client.set_project("project1") driver_fs = FeatureSet( "driver-feature-set", source=KafkaSource(brokers="kafka:9092", topic="test") ) @@ -390,12 +456,12 @@ def test_feature_set_ingest_success(self, dataframe, client, mocker): driver_fs.add(Entity(name="entity_id", dtype=ValueType.INT64)) # Register with Feast core - client.apply(driver_fs) + test_client.apply(driver_fs) driver_fs = driver_fs.to_proto() driver_fs.meta.status = FeatureSetStatusProto.STATUS_READY mocker.patch.object( - client._core_service_stub, + test_client._core_service_stub, "GetFeatureSet", return_value=GetFeatureSetResponse(feature_set=driver_fs), ) @@ -403,14 +469,16 @@ def test_feature_set_ingest_success(self, dataframe, client, mocker): # Need to create a mock producer with patch("feast.client.get_producer") as mocked_queue: # Ingest data into Feast - client.ingest("driver-feature-set", dataframe) + test_client.ingest("driver-feature-set", dataframe) - @pytest.mark.parametrize("dataframe,exception", [(dataframes.GOOD, TimeoutError)]) + @pytest.mark.parametrize("dataframe,exception,test_client", + [(dataframes.GOOD, TimeoutError, pytest.lazy_fixture("client")), + (dataframes.GOOD, TimeoutError, pytest.lazy_fixture("secure_client"))]) def test_feature_set_ingest_fail_if_pending( - self, dataframe, exception, client, mocker + self, dataframe, exception, test_client, mocker ): with pytest.raises(exception): - client.set_project("project1") + test_client.set_project("project1") driver_fs = FeatureSet( "driver-feature-set", source=KafkaSource(brokers="kafka:9092", topic="test"), @@ -421,12 +489,12 @@ def test_feature_set_ingest_fail_if_pending( driver_fs.add(Entity(name="entity_id", dtype=ValueType.INT64)) # Register with Feast core - client.apply(driver_fs) + test_client.apply(driver_fs) driver_fs = driver_fs.to_proto() driver_fs.meta.status = FeatureSetStatusProto.STATUS_PENDING mocker.patch.object( - client._core_service_stub, + test_client._core_service_stub, "GetFeatureSet", return_value=GetFeatureSetResponse(feature_set=driver_fs), ) @@ -434,18 +502,22 @@ def test_feature_set_ingest_fail_if_pending( # Need to create a mock producer with patch("feast.client.get_producer") as mocked_queue: # Ingest data into Feast - client.ingest("driver-feature-set", dataframe, timeout=1) + test_client.ingest("driver-feature-set", dataframe, timeout=1) @pytest.mark.parametrize( - "dataframe,exception", + "dataframe,exception,test_client", [ - (dataframes.BAD_NO_DATETIME, Exception), - (dataframes.BAD_INCORRECT_DATETIME_TYPE, Exception), - (dataframes.BAD_NO_ENTITY, Exception), - (dataframes.NO_FEATURES, Exception), + (dataframes.BAD_NO_DATETIME, Exception, pytest.lazy_fixture("client")), + (dataframes.BAD_INCORRECT_DATETIME_TYPE, Exception, pytest.lazy_fixture("client")), + (dataframes.BAD_NO_ENTITY, Exception, pytest.lazy_fixture("client")), + (dataframes.NO_FEATURES, Exception, pytest.lazy_fixture("client")), + (dataframes.BAD_NO_DATETIME, Exception, pytest.lazy_fixture("secure_client")), + (dataframes.BAD_INCORRECT_DATETIME_TYPE, Exception, pytest.lazy_fixture("secure_client")), + (dataframes.BAD_NO_ENTITY, Exception, pytest.lazy_fixture("secure_client")), + (dataframes.NO_FEATURES, Exception, pytest.lazy_fixture("secure_client")), ], ) - def test_feature_set_ingest_failure(self, client, dataframe, exception): + def test_feature_set_ingest_failure(self, test_client, dataframe, exception): with pytest.raises(exception): # Create feature set driver_fs = FeatureSet("driver-feature-set") @@ -454,15 +526,16 @@ def test_feature_set_ingest_failure(self, client, dataframe, exception): driver_fs.infer_fields_from_df(dataframe) # Register with Feast core - client.apply(driver_fs) + test_client.apply(driver_fs) # Ingest data into Feast - client.ingest(driver_fs, dataframe=dataframe) + test_client.ingest(driver_fs, dataframe=dataframe) - @pytest.mark.parametrize("dataframe", [dataframes.ALL_TYPES]) - def test_feature_set_types_success(self, client, dataframe, mocker): + @pytest.mark.parametrize("dataframe,test_client", [(dataframes.ALL_TYPES, pytest.lazy_fixture("client")), + (dataframes.ALL_TYPES, pytest.lazy_fixture("secure_client"))]) + def test_feature_set_types_success(self, test_client, dataframe, mocker): - client.set_project("project1") + test_client.set_project("project1") all_types_fs = FeatureSet( name="all_types", @@ -489,10 +562,10 @@ def test_feature_set_types_success(self, client, dataframe, mocker): ) # Register with Feast core - client.apply(all_types_fs) + test_client.apply(all_types_fs) mocker.patch.object( - client._core_service_stub, + test_client._core_service_stub, "GetFeatureSet", return_value=GetFeatureSetResponse(feature_set=all_types_fs.to_proto()), ) @@ -500,4 +573,38 @@ def test_feature_set_types_success(self, client, dataframe, mocker): # Need to create a mock producer with patch("feast.client.get_producer") as mocked_queue: # Ingest data into Feast - client.ingest(all_types_fs, dataframe) + test_client.ingest(all_types_fs, dataframe) + + @patch("grpc.channel_ready_future") + def test_secure_channel_creation_with_secure_client(self, _mocked_obj): + client = Client(core_url="localhost:50051", serving_url="localhost:50052", serving_secure=True, + core_secure=True) + with mock.patch("grpc.secure_channel") as _grpc_mock, \ + mock.patch("grpc.ssl_channel_credentials", MagicMock(return_value="test")) as _mocked_credentials: + client._connect_serving() + _grpc_mock.assert_called_with(client.serving_url, _mocked_credentials.return_value) + + @mock.patch("grpc.channel_ready_future") + def test_secure_channel_creation_with_secure_serving_url(self, _mocked_obj, ): + client = Client(core_url="localhost:50051", serving_url="localhost:443") + with mock.patch("grpc.secure_channel") as _grpc_mock, \ + mock.patch("grpc.ssl_channel_credentials", MagicMock(return_value="test")) as _mocked_credentials: + client._connect_serving() + _grpc_mock.assert_called_with(client.serving_url, _mocked_credentials.return_value) + + @patch("grpc.channel_ready_future") + def test_secure_channel_creation_with_secure_client(self, _mocked_obj): + client = Client(core_url="localhost:50053", serving_url="localhost:50054", serving_secure=True, + core_secure=True) + with mock.patch("grpc.secure_channel") as _grpc_mock, \ + mock.patch("grpc.ssl_channel_credentials", MagicMock(return_value="test")) as _mocked_credentials: + client._connect_core() + _grpc_mock.assert_called_with(client.core_url, _mocked_credentials.return_value) + + @patch("grpc.channel_ready_future") + def test_secure_channel_creation_with_secure_core_url(self, _mocked_obj): + client = Client(core_url="localhost:443", serving_url="localhost:50054") + with mock.patch("grpc.secure_channel") as _grpc_mock, \ + mock.patch("grpc.ssl_channel_credentials", MagicMock(return_value="test")) as _mocked_credentials: + client._connect_core() + _grpc_mock.assert_called_with(client.core_url, _mocked_credentials.return_value) \ No newline at end of file diff --git a/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java b/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java index 3c91c2765aa..0b3a2938b8e 100644 --- a/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java @@ -35,7 +35,7 @@ public class SpecServiceConfig { private static final Logger log = org.slf4j.LoggerFactory.getLogger(SpecServiceConfig.class); private String feastCoreHost; private int feastCorePort; - private static final int CACHE_REFRESH_RATE_MINUTES = 1; + private static final int CACHE_REFRESH_RATE_SECONDS = 10; @Autowired public SpecServiceConfig(FeastProperties feastProperties) { @@ -51,9 +51,9 @@ public ScheduledExecutorService cachedSpecServiceScheduledExecutorService( // reload all specs including new ones periodically scheduledExecutorService.scheduleAtFixedRate( cachedSpecStorage::scheduledPopulateCache, - CACHE_REFRESH_RATE_MINUTES, - CACHE_REFRESH_RATE_MINUTES, - TimeUnit.MINUTES); + CACHE_REFRESH_RATE_SECONDS, + CACHE_REFRESH_RATE_SECONDS, + TimeUnit.SECONDS); return scheduledExecutorService; } diff --git a/serving/src/main/java/feast/serving/service/RedisServingService.java b/serving/src/main/java/feast/serving/service/RedisServingService.java index 48fc485214d..24c69b9f796 100644 --- a/serving/src/main/java/feast/serving/service/RedisServingService.java +++ b/serving/src/main/java/feast/serving/service/RedisServingService.java @@ -313,7 +313,7 @@ private List sendMultiGet(List keys) { } finally { requestLatency .labels("sendMultiGet") - .observe((System.currentTimeMillis() - startTime) / 1000); + .observe((System.currentTimeMillis() - startTime) / 1000d); } } } diff --git a/serving/src/main/java/feast/serving/specs/CachedSpecService.java b/serving/src/main/java/feast/serving/specs/CachedSpecService.java index 1184f6da95a..35119589b27 100644 --- a/serving/src/main/java/feast/serving/specs/CachedSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CachedSpecService.java @@ -195,11 +195,7 @@ private Map getFeatureToFeatureSetMapping( HashMap mapping = new HashMap<>(); featureSets.values().stream() - .collect( - groupingBy( - featureSet -> - Pair.of( - featureSet.getProject(), featureSet.getName()))) + .collect(groupingBy(featureSet -> Pair.of(featureSet.getProject(), featureSet.getName()))) .forEach( (group, groupedFeatureSets) -> { groupedFeatureSets = diff --git a/serving/src/main/java/feast/serving/util/Metrics.java b/serving/src/main/java/feast/serving/util/Metrics.java index 99f6353e742..05546ec384b 100644 --- a/serving/src/main/java/feast/serving/util/Metrics.java +++ b/serving/src/main/java/feast/serving/util/Metrics.java @@ -24,9 +24,9 @@ public class Metrics { public static final Histogram requestLatency = Histogram.build() .buckets(0.001, 0.002, 0.004, 0.006, 0.008, 0.01, 0.015, 0.02, 0.025, 0.03, 0.035, 0.05) - .name("request_latency_ms") + .name("request_latency_seconds") .subsystem("feast_serving") - .help("Request latency in seconds.") + .help("Request latency in seconds") .labelNames("method") .register();