diff --git a/.prow/scripts/test-end-to-end-batch.sh b/.prow/scripts/test-end-to-end-batch.sh index 5ff947f20f5..c729c4a4a83 100755 --- a/.prow/scripts/test-end-to-end-batch.sh +++ b/.prow/scripts/test-end-to-end-batch.sh @@ -4,7 +4,7 @@ set -e set -o pipefail if ! cat /etc/*release | grep -q stretch; then - echo ${BASH_SOURCE} only supports Debian stretch. + echo ${BASH_SOURCE} only supports Debian stretch. echo Please change your operating system to use this script. exit 1 fi @@ -16,7 +16,7 @@ This script will run end-to-end tests for Feast Core and Batch Serving. 2. Install Redis as the job store for Feast Batch Serving. 4. Install Postgres for persisting Feast metadata. 5. Install Kafka and Zookeeper as the Source in Feast. -6. Install Python 3.7.4, Feast Python SDK and run end-to-end tests from +6. Install Python 3.7.4, Feast Python SDK and run end-to-end tests from tests/e2e via pytest. " @@ -185,6 +185,8 @@ feast: jobs: staging-location: gs://feast-templocation-kf-feast/staging-location store-type: REDIS + bigquery-initial-retry-delay-secs: 1 + bigquery-total-timeout-secs: 900 store-options: host: $REMOTE_HOST port: 6379 diff --git a/serving/src/main/java/feast/serving/FeastProperties.java b/serving/src/main/java/feast/serving/FeastProperties.java index e511835b0aa..505d7d03301 100644 --- a/serving/src/main/java/feast/serving/FeastProperties.java +++ b/serving/src/main/java/feast/serving/FeastProperties.java @@ -113,6 +113,8 @@ public void setRedisPoolMaxIdle(int redisPoolMaxIdle) { public static class JobProperties { private String stagingLocation; + private int bigqueryInitialRetryDelaySecs; + private int bigqueryTotalTimeoutSecs; private String storeType; private Map storeOptions; @@ -120,6 +122,14 @@ public String getStagingLocation() { return this.stagingLocation; } + public int getBigqueryInitialRetryDelaySecs() { + return bigqueryInitialRetryDelaySecs; + } + + public int getBigqueryTotalTimeoutSecs() { + return bigqueryTotalTimeoutSecs; + } + public String getStoreType() { return this.storeType; } @@ -132,6 +142,14 @@ public void setStagingLocation(String stagingLocation) { this.stagingLocation = stagingLocation; } + public void setBigqueryInitialRetryDelaySecs(int bigqueryInitialRetryDelaySecs) { + this.bigqueryInitialRetryDelaySecs = bigqueryInitialRetryDelaySecs; + } + + public void setBigqueryTotalTimeoutSecs(int bigqueryTotalTimeoutSecs) { + this.bigqueryTotalTimeoutSecs = bigqueryTotalTimeoutSecs; + } + public void setStoreType(String storeType) { this.storeType = storeType; } diff --git a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java index 9380ded4c4e..ea6dbc6ef71 100644 --- a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java @@ -132,6 +132,8 @@ public ServingService servingService( specService, jobService, jobStagingLocation, + feastProperties.getJobs().getBigqueryInitialRetryDelaySecs(), + feastProperties.getJobs().getBigqueryTotalTimeoutSecs(), storage); break; case CASSANDRA: diff --git a/serving/src/main/java/feast/serving/service/BigQueryServingService.java b/serving/src/main/java/feast/serving/service/BigQueryServingService.java index 53d071b57d1..0743245d164 100644 --- a/serving/src/main/java/feast/serving/service/BigQueryServingService.java +++ b/serving/src/main/java/feast/serving/service/BigQueryServingService.java @@ -20,6 +20,7 @@ import static feast.serving.store.bigquery.QueryTemplater.generateFullTableName; import static feast.serving.util.Metrics.requestLatency; +import com.google.cloud.RetryOption; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.Field; @@ -57,12 +58,12 @@ import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; -import org.joda.time.Duration; import org.slf4j.Logger; +import org.threeten.bp.Duration; public class BigQueryServingService implements ServingService { - public static final long TEMP_TABLE_EXPIRY_DURATION_MS = Duration.standardDays(1).getMillis(); + public static final long TEMP_TABLE_EXPIRY_DURATION_MS = Duration.ofDays(1).toMillis(); private static final Logger log = org.slf4j.LoggerFactory.getLogger(BigQueryServingService.class); private final BigQuery bigquery; @@ -71,6 +72,8 @@ public class BigQueryServingService implements ServingService { private final CachedSpecService specService; private final JobService jobService; private final String jobStagingLocation; + private final int initialRetryDelaySecs; + private final int totalTimeoutSecs; private final Storage storage; public BigQueryServingService( @@ -80,6 +83,8 @@ public BigQueryServingService( CachedSpecService specService, JobService jobService, String jobStagingLocation, + int initialRetryDelaySecs, + int totalTimeoutSecs, Storage storage) { this.bigquery = bigquery; this.projectId = projectId; @@ -87,6 +92,8 @@ public BigQueryServingService( this.specService = specService; this.jobService = jobService; this.jobStagingLocation = jobStagingLocation; + this.initialRetryDelaySecs = initialRetryDelaySecs; + this.totalTimeoutSecs = totalTimeoutSecs; this.storage = storage; } @@ -156,6 +163,8 @@ public GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeat .setEntityTableColumnNames(entityNames) .setFeatureSetInfos(featureSetInfos) .setJobStagingLocation(jobStagingLocation) + .setInitialRetryDelaySecs(initialRetryDelaySecs) + .setTotalTimeoutSecs(totalTimeoutSecs) .build()) .start(); @@ -199,7 +208,7 @@ private Table loadEntities(DatasetSource datasetSource) { loadJobConfiguration = loadJobConfiguration.toBuilder().setUseAvroLogicalTypes(true).build(); Job job = bigquery.create(JobInfo.of(loadJobConfiguration)); - job.waitFor(); + waitForJob(job); TableInfo expiry = bigquery @@ -239,7 +248,7 @@ private TableId generateUUIDs(Table loadedEntityTable) { .setDestinationTable(TableId.of(projectId, datasetId, createTempTableName())) .build(); Job queryJob = bigquery.create(JobInfo.of(queryJobConfig)); - queryJob.waitFor(); + Job completedJob = waitForJob(queryJob); TableInfo expiry = bigquery .getTable(queryJobConfig.getDestinationTable()) @@ -247,7 +256,7 @@ private TableId generateUUIDs(Table loadedEntityTable) { .setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS) .build(); bigquery.update(expiry); - queryJobConfig = queryJob.getConfiguration(); + queryJobConfig = completedJob.getConfiguration(); return queryJobConfig.getDestinationTable(); } catch (InterruptedException | BigQueryException e) { throw Status.INTERNAL @@ -257,6 +266,22 @@ private TableId generateUUIDs(Table loadedEntityTable) { } } + private Job waitForJob(Job queryJob) throws InterruptedException { + Job completedJob = queryJob.waitFor( + RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)), + RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs))); + if (completedJob == null) { + throw Status.INTERNAL + .withDescription("Job no longer exists") + .asRuntimeException(); + } else if (completedJob.getStatus().getError() != null) { + throw Status.INTERNAL + .withDescription("Job failed: " + completedJob.getStatus().getError()) + .asRuntimeException(); + } + return completedJob; + } + public static String createTempTableName() { return "_" + UUID.randomUUID().toString().replace("-", ""); } diff --git a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java index d437294dfc3..62d7abe7c43 100644 --- a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java +++ b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java @@ -21,6 +21,7 @@ import static feast.serving.store.bigquery.QueryTemplater.createTimestampLimitQuery; import com.google.auto.value.AutoValue; +import com.google.cloud.RetryOption; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.DatasetId; @@ -51,6 +52,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.threeten.bp.Duration; @AutoValue public abstract class BatchRetrievalQueryRunnable implements Runnable { @@ -75,6 +77,10 @@ public abstract class BatchRetrievalQueryRunnable implements Runnable { public abstract String jobStagingLocation(); + public abstract int initialRetryDelaySecs(); + + public abstract int totalTimeoutSecs(); + public abstract Storage storage(); public static Builder builder() { @@ -101,6 +107,10 @@ public abstract static class Builder { public abstract Builder setJobStagingLocation(String jobStagingLocation); + public abstract Builder setInitialRetryDelaySecs(int initialRetryDelaySecs); + + public abstract Builder setTotalTimeoutSecs(int totalTimeoutSecs); + public abstract Builder setStorage(Storage storage); public abstract BatchRetrievalQueryRunnable build(); @@ -126,7 +136,7 @@ public void run() { ExtractJobConfiguration.of( queryConfig.getDestinationTable(), exportTableDestinationUri, "Avro"); Job extractJob = bigquery().create(JobInfo.of(extractConfig)); - extractJob.waitFor(); + waitForJob(extractJob); } catch (BigQueryException | InterruptedException | IOException e) { jobService() .upsert( @@ -174,7 +184,6 @@ private List parseOutputFileURIs() { Job runBatchQuery(List featureSetQueries) throws BigQueryException, InterruptedException, IOException { - Job queryJob; ExecutorService executorService = Executors.newFixedThreadPool(featureSetQueries.size()); ExecutorCompletionService executorCompletionService = new ExecutorCompletionService<>(executorService); @@ -225,8 +234,8 @@ Job runBatchQuery(List featureSetQueries) QueryJobConfiguration.newBuilder(joinQuery) .setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName())) .build(); - queryJob = bigquery().create(JobInfo.of(queryJobConfig)); - queryJob.waitFor(); + Job queryJob = bigquery().create(JobInfo.of(queryJobConfig)); + Job completedQueryJob = waitForJob(queryJob); TableInfo expiry = bigquery() @@ -236,7 +245,7 @@ Job runBatchQuery(List featureSetQueries) .build(); bigquery().update(expiry); - return queryJob; + return completedQueryJob; } private List generateQueries(FieldValueList timestampLimits) { @@ -270,7 +279,7 @@ private FieldValueList getTimestampLimits(String entityTableName) { .build(); try { Job job = bigquery().create(JobInfo.of(getTimestampLimitsQuery)); - TableResult getTimestampLimitsQueryResult = job.waitFor().getQueryResults(); + TableResult getTimestampLimitsQueryResult = waitForJob(job).getQueryResults(); TableInfo expiry = bigquery() .getTable(getTimestampLimitsQuery.getDestinationTable()) @@ -293,4 +302,21 @@ private FieldValueList getTimestampLimits(String entityTableName) { .asRuntimeException(); } } + + private Job waitForJob(Job queryJob) throws InterruptedException { + Job completedJob = queryJob.waitFor( + RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())), + RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs()))); + if (completedJob == null) { + throw Status.INTERNAL + .withDescription("Job no longer exists") + .asRuntimeException(); + } else if (completedJob.getStatus().getError() != null) { + throw Status.INTERNAL + .withDescription("Job failed: " + completedJob.getStatus().getError()) + .asRuntimeException(); + } + return completedJob; + } + } diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml index 2daa83fbfb2..072787492f0 100644 --- a/serving/src/main/resources/application.yml +++ b/serving/src/main/resources/application.yml @@ -26,13 +26,18 @@ feast: redis-pool-max-idle: ${FEAST_REDIS_POOL_MAX_IDLE:16} jobs: - # job-staging-location specifies the URI to store intermediate files for batch serving. + # staging-location specifies the URI to store intermediate files for batch serving. # Feast Serving client is expected to have read access to this staging location # to download the batch features. # # For example: gs://mybucket/myprefix # Please omit the trailing slash in the URI. staging-location: ${FEAST_JOB_STAGING_LOCATION:} + # + # Retry options for BigQuery jobs: + bigquery-initial-retry-delay-secs: 1 + bigquery-total-timeout-secs: 21600 + # # Type of store to store job metadata. This only needs to be set if the # serving store type is Bigquery. store-type: ${FEAST_JOB_STORE_TYPE:}