From eff05b1ad577ecedc32135f3b8c40d355db9e129 Mon Sep 17 00:00:00 2001 From: Ches Martin Date: Thu, 12 Dec 2019 02:38:33 +0700 Subject: [PATCH 1/2] Fail formatting check before tests execute By default, the spotless Maven plugin binds its check goal to the verify phase (late in the lifecycle, after integration tests). Because we currently only run `mvn test` for CI, it doesn't proceed as far as verify so missed formatting is not caught by CI. This binds the check to an earlier phase, in between test-compile and test, so that it will fail before `mvn test` but not disrupt your dev workflow of compiling main and test sources as you work. This strikes a good compromise on failing fast for code standards without being _too_ nagging. For the complete lifecycle reference, see: https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html --- pom.xml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pom.xml b/pom.xml index fb03c3045b7..0e5ce843e2f 100644 --- a/pom.xml +++ b/pom.xml @@ -350,6 +350,16 @@ + + + + spotless-check + process-test-classes + + check + + + org.apache.maven.plugins From 67e43d2027787869e88074f5763e63aa5b40c355 Mon Sep 17 00:00:00 2001 From: Ches Martin Date: Sat, 7 Mar 2020 17:46:53 +0700 Subject: [PATCH 2/2] Apply Spotless formatting Hopefully for last time as a bulk operation, after 636354092c. --- .../core/config/FeatureStreamConfig.java | 1 - .../feast/core/dao/FeatureSetRepository.java | 6 +- .../java/feast/core/grpc/CoreServiceImpl.java | 5 +- .../core/job/dataflow/DataflowJobManager.java | 1 - .../core/job/direct/DirectJobRegistry.java | 1 - .../job/direct/DirectRunnerJobManager.java | 4 +- .../main/java/feast/core/log/AuditLogger.java | 1 - .../java/feast/core/model/FeatureSet.java | 25 +-- .../core/service/JobCoordinatorService.java | 1 - .../java/feast/core/service/SpecService.java | 16 +- .../java/feast/core/util/PackageUtil.java | 6 +- .../java/feast/core/util/TypeConversion.java | 1 - .../feast/core/service/SpecServiceTest.java | 30 ++- .../feast/core/validators/MatchersTest.java | 1 - .../redis/FeatureRowToRedisMutationDoFn.java | 11 +- .../java/feast/ingestion/ImportJobTest.java | 87 ++++---- .../FeatureRowToRedisMutationDoFnTest.java | 211 +++++++++++------- .../bigquery/BatchRetrievalQueryRunnable.java | 3 +- 18 files changed, 221 insertions(+), 190 deletions(-) diff --git a/core/src/main/java/feast/core/config/FeatureStreamConfig.java b/core/src/main/java/feast/core/config/FeatureStreamConfig.java index 6d9a30f9e93..4f444b59e83 100644 --- a/core/src/main/java/feast/core/config/FeatureStreamConfig.java +++ b/core/src/main/java/feast/core/config/FeatureStreamConfig.java @@ -16,7 +16,6 @@ */ package feast.core.config; -import com.google.common.base.Strings; import feast.core.SourceProto.KafkaSourceConfig; import feast.core.SourceProto.SourceType; import feast.core.config.FeastProperties.StreamProperties; diff --git a/core/src/main/java/feast/core/dao/FeatureSetRepository.java b/core/src/main/java/feast/core/dao/FeatureSetRepository.java index ca4d6b9d1cb..fd996b331c2 100644 --- a/core/src/main/java/feast/core/dao/FeatureSetRepository.java +++ b/core/src/main/java/feast/core/dao/FeatureSetRepository.java @@ -36,11 +36,11 @@ public interface FeatureSetRepository extends JpaRepository List findByName(String name); // find all versions of featureSets with names matching the regex - @Query(nativeQuery = true, value = "SELECT * FROM feature_sets " - + "WHERE name LIKE ?1 ORDER BY name ASC, version ASC") + @Query( + nativeQuery = true, + value = "SELECT * FROM feature_sets " + "WHERE name LIKE ?1 ORDER BY name ASC, version ASC") List findByNameWithWildcardOrderByNameAscVersionAsc(String name); // find all feature sets and order by name and version List findAllByOrderByNameAscVersionAsc(); - } diff --git a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java index 6387fd806b4..5398a82e782 100644 --- a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java +++ b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java @@ -49,11 +49,8 @@ import lombok.extern.slf4j.Slf4j; import org.lognet.springboot.grpc.GRpcService; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.transaction.annotation.Transactional; -/** - * Implementation of the feast core GRPC service. - */ +/** Implementation of the feast core GRPC service. */ @Slf4j @GRpcService(interceptors = {MonitoringInterceptor.class}) public class CoreServiceImpl extends CoreServiceImplBase { diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index f19cf1a6569..fefb1145f53 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -20,7 +20,6 @@ import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.Job; -import com.google.common.base.Strings; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import com.google.protobuf.util.JsonFormat.Printer; diff --git a/core/src/main/java/feast/core/job/direct/DirectJobRegistry.java b/core/src/main/java/feast/core/job/direct/DirectJobRegistry.java index 8f6c87053ff..94b3a8fd571 100644 --- a/core/src/main/java/feast/core/job/direct/DirectJobRegistry.java +++ b/core/src/main/java/feast/core/job/direct/DirectJobRegistry.java @@ -16,7 +16,6 @@ */ package feast.core.job.direct; -import com.google.common.base.Strings; import java.io.IOException; import java.util.HashMap; import java.util.Map; diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index 85b8a95dd56..cf4c6213eed 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -16,7 +16,6 @@ */ package feast.core.job.direct; -import com.google.common.base.Strings; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import com.google.protobuf.util.JsonFormat.Printer; @@ -148,8 +147,7 @@ public void abortJob(String extId) { try { job.abort(); } catch (IOException e) { - throw new RuntimeException( - String.format("Unable to abort DirectRunner job %s", extId), e); + throw new RuntimeException(String.format("Unable to abort DirectRunner job %s", extId), e); } jobs.remove(extId); } diff --git a/core/src/main/java/feast/core/log/AuditLogger.java b/core/src/main/java/feast/core/log/AuditLogger.java index 2c60307805c..275aa74edfa 100644 --- a/core/src/main/java/feast/core/log/AuditLogger.java +++ b/core/src/main/java/feast/core/log/AuditLogger.java @@ -16,7 +16,6 @@ */ package feast.core.log; -import com.google.common.base.Strings; import java.util.Date; import java.util.Map; import java.util.TreeMap; diff --git a/core/src/main/java/feast/core/model/FeatureSet.java b/core/src/main/java/feast/core/model/FeatureSet.java index 8ba7162d2f2..755ef687e32 100644 --- a/core/src/main/java/feast/core/model/FeatureSet.java +++ b/core/src/main/java/feast/core/model/FeatureSet.java @@ -155,50 +155,49 @@ public FeatureSetSpec toProto() throws InvalidProtocolBufferException { * @return boolean denoting if the source or schema have changed. */ public boolean equalTo(FeatureSet other) { - if(!name.equals(other.getName())){ + if (!name.equals(other.getName())) { return false; } - if (!source.equalTo(other.getSource())){ + if (!source.equalTo(other.getSource())) { return false; } - if (maxAgeSeconds != other.maxAgeSeconds){ + if (maxAgeSeconds != other.maxAgeSeconds) { return false; } // Create a map of all fields in this feature set Map fields = new HashMap<>(); - for (Field e : entities){ + for (Field e : entities) { fields.putIfAbsent(e.getName(), e); } - for (Field f : features){ + for (Field f : features) { fields.putIfAbsent(f.getName(), f); } // Ensure map size is consistent with existing fields - if (fields.size() != other.features.size() + other.entities.size()) - { + if (fields.size() != other.features.size() + other.entities.size()) { return false; } // Ensure the other entities and fields exist in the field map - for (Field e : other.entities){ - if(!fields.containsKey(e.getName())){ + for (Field e : other.entities) { + if (!fields.containsKey(e.getName())) { return false; } - if (!e.equals(fields.get(e.getName()))){ + if (!e.equals(fields.get(e.getName()))) { return false; } } - for (Field f : features){ - if(!fields.containsKey(f.getName())){ + for (Field f : features) { + if (!fields.containsKey(f.getName())) { return false; } - if (!f.equals(fields.get(f.getName()))){ + if (!f.equals(fields.get(f.getName()))) { return false; } } diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index b5c9fc6c1bb..7f0d4cb79ca 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -16,7 +16,6 @@ */ package feast.core.service; -import com.google.common.base.Strings; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.core.SourceProto; import feast.core.StoreProto; diff --git a/core/src/main/java/feast/core/service/SpecService.java b/core/src/main/java/feast/core/service/SpecService.java index 99862fd5dc0..ffa1aa08e81 100644 --- a/core/src/main/java/feast/core/service/SpecService.java +++ b/core/src/main/java/feast/core/service/SpecService.java @@ -110,8 +110,9 @@ public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request) if (featureSet == null) { throw io.grpc.Status.NOT_FOUND - .withDescription(String.format("Feature set with name \"%s\" could not be found.", - request.getName())) + .withDescription( + String.format( + "Feature set with name \"%s\" could not be found.", request.getName())) .asRuntimeException(); } } else { @@ -121,13 +122,14 @@ public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request) if (featureSet == null) { throw io.grpc.Status.NOT_FOUND - .withDescription(String.format("Feature set with name \"%s\" and version \"%s\" could " - + "not be found.", request.getName(), request.getVersion())) + .withDescription( + String.format( + "Feature set with name \"%s\" and version \"%s\" could " + "not be found.", + request.getName(), request.getVersion())) .asRuntimeException(); } } - // Only a single item in list, return successfully return GetFeatureSetResponse.newBuilder().setFeatureSet(featureSet.toProto()).build(); } @@ -154,7 +156,9 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter fil if (name.equals("")) { featureSets = featureSetRepository.findAllByOrderByNameAscVersionAsc(); } else { - featureSets = featureSetRepository.findByNameWithWildcardOrderByNameAscVersionAsc(name.replace('*', '%')); + featureSets = + featureSetRepository.findByNameWithWildcardOrderByNameAscVersionAsc( + name.replace('*', '%')); featureSets = featureSets.stream() .filter(getVersionFilter(filter.getFeatureSetVersion())) diff --git a/core/src/main/java/feast/core/util/PackageUtil.java b/core/src/main/java/feast/core/util/PackageUtil.java index 20b2310644b..99c5d73ba78 100644 --- a/core/src/main/java/feast/core/util/PackageUtil.java +++ b/core/src/main/java/feast/core/util/PackageUtil.java @@ -44,9 +44,9 @@ public class PackageUtil { * points to the resource location. Note that the extraction process can take several minutes to * complete. * - *

One use case of this function is to detect the class path of resources to stage when - * using Dataflow runner. The resource URL however is in "jar:file:" format, which cannot be - * handled by default in Apache Beam. + *

One use case of this function is to detect the class path of resources to stage when using + * Dataflow runner. The resource URL however is in "jar:file:" format, which cannot be handled by + * default in Apache Beam. * *

    * 
diff --git a/core/src/main/java/feast/core/util/TypeConversion.java b/core/src/main/java/feast/core/util/TypeConversion.java
index a7dd2b0d2a3..5fe69819476 100644
--- a/core/src/main/java/feast/core/util/TypeConversion.java
+++ b/core/src/main/java/feast/core/util/TypeConversion.java
@@ -16,7 +16,6 @@
  */
 package feast.core.util;
 
-import com.google.common.base.Strings;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import java.lang.reflect.Type;
diff --git a/core/src/test/java/feast/core/service/SpecServiceTest.java b/core/src/test/java/feast/core/service/SpecServiceTest.java
index a11adf022b9..6d112676350 100644
--- a/core/src/test/java/feast/core/service/SpecServiceTest.java
+++ b/core/src/test/java/feast/core/service/SpecServiceTest.java
@@ -68,14 +68,11 @@
 
 public class SpecServiceTest {
 
-  @Mock
-  private FeatureSetRepository featureSetRepository;
+  @Mock private FeatureSetRepository featureSetRepository;
 
-  @Mock
-  private StoreRepository storeRepository;
+  @Mock private StoreRepository storeRepository;
 
-  @Rule
-  public final ExpectedException expectedException = ExpectedException.none();
+  @Rule public final ExpectedException expectedException = ExpectedException.none();
 
   private SpecService specService;
   private List featureSets;
@@ -102,11 +99,12 @@ public void setUp() {
     Field f3f1 = new Field("f3", "f3f1", Enum.INT64);
     Field f3f2 = new Field("f3", "f3f2", Enum.INT64);
     Field f3e1 = new Field("f3", "f3e1", Enum.STRING);
-    FeatureSet featureSet3v1 = new FeatureSet(
-        "f3", 1, 100L, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1), defaultSource);
+    FeatureSet featureSet3v1 =
+        new FeatureSet(
+            "f3", 1, 100L, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1), defaultSource);
 
-    featureSets = Arrays
-        .asList(featureSet1v1, featureSet1v2, featureSet1v3, featureSet2v1, featureSet3v1);
+    featureSets =
+        Arrays.asList(featureSet1v1, featureSet1v2, featureSet1v3, featureSet2v1, featureSet3v1);
     when(featureSetRepository.findAll()).thenReturn(featureSets);
     when(featureSetRepository.findAllByOrderByNameAscVersionAsc()).thenReturn(featureSets);
     when(featureSetRepository.findByName("f1")).thenReturn(featureSets.subList(0, 3));
@@ -347,7 +345,6 @@ public void applyFeatureSetShouldIncrementFeatureSetVersionIfAlreadyExists()
     assertThat(applyFeatureSetResponse.getFeatureSet(), equalTo(expected));
   }
 
-
   @Test
   public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered()
       throws InvalidProtocolBufferException {
@@ -355,20 +352,21 @@ public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered()
     Field f3f1 = new Field("f3", "f3f1", Enum.INT64);
     Field f3f2 = new Field("f3", "f3f2", Enum.INT64);
     Field f3e1 = new Field("f3", "f3e1", Enum.STRING);
-    FeatureSetProto.FeatureSetSpec incomingFeatureSet = (new FeatureSet(
-        "f3", 5, 100L, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1), defaultSource)).toProto();
+    FeatureSetProto.FeatureSetSpec incomingFeatureSet =
+        (new FeatureSet(
+                "f3", 5, 100L, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1), defaultSource))
+            .toProto();
 
     FeatureSetSpec expected = incomingFeatureSet;
     ApplyFeatureSetResponse applyFeatureSetResponse =
         specService.applyFeatureSet(incomingFeatureSet);
     assertThat(applyFeatureSetResponse.getStatus(), equalTo(Status.NO_CHANGE));
     assertThat(applyFeatureSetResponse.getFeatureSet().getMaxAge(), equalTo(expected.getMaxAge()));
-    assertThat(applyFeatureSetResponse.getFeatureSet().getEntities(0),
-        equalTo(expected.getEntities(0)));
+    assertThat(
+        applyFeatureSetResponse.getFeatureSet().getEntities(0), equalTo(expected.getEntities(0)));
     assertThat(applyFeatureSetResponse.getFeatureSet().getName(), equalTo(expected.getName()));
   }
 
-
   @Test
   public void shouldUpdateStoreIfConfigChanges() throws InvalidProtocolBufferException {
     when(storeRepository.findById("SERVING")).thenReturn(Optional.of(stores.get(0)));
diff --git a/core/src/test/java/feast/core/validators/MatchersTest.java b/core/src/test/java/feast/core/validators/MatchersTest.java
index 13c9e006a44..3bf09dd474f 100644
--- a/core/src/test/java/feast/core/validators/MatchersTest.java
+++ b/core/src/test/java/feast/core/validators/MatchersTest.java
@@ -19,7 +19,6 @@
 import static feast.core.validators.Matchers.checkLowerSnakeCase;
 import static feast.core.validators.Matchers.checkUpperSnakeCase;
 
-import com.google.common.base.Strings;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
diff --git a/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java b/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java
index c453c5c9206..a7471443617 100644
--- a/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java
+++ b/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java
@@ -24,11 +24,9 @@
 import feast.store.serving.redis.RedisCustomIO.RedisMutation;
 import feast.types.FeatureRowProto.FeatureRow;
 import feast.types.FieldProto.Field;
-import feast.types.ValueProto.Value;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.slf4j.Logger;
@@ -55,12 +53,9 @@ private RedisKey getKey(FeatureRow featureRow) {
     Builder redisKeyBuilder = RedisKey.newBuilder().setFeatureSet(featureRow.getFeatureSet());
     for (Field field : featureRow.getFieldsList()) {
       if (entityNames.contains(field.getName())) {
-        entityFields.putIfAbsent(field.getName(),
-            Field.newBuilder()
-                .setName(field.getName())
-                .setValue(field.getValue())
-                .build()
-        );
+        entityFields.putIfAbsent(
+            field.getName(),
+            Field.newBuilder().setName(field.getName()).setValue(field.getValue()).build());
       }
     }
     for (String entityName : entityNames) {
diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java
index bd034341ec9..4a09bee82ff 100644
--- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java
+++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java
@@ -170,12 +170,14 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow()
     Map expected = new HashMap<>();
 
     LOGGER.info("Generating test data ...");
-    IntStream.range(0, IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE).forEach(i -> {
-      FeatureRow randomRow = TestUtil.createRandomFeatureRow(spec);
-      RedisKey redisKey = TestUtil.createRedisKey(spec, randomRow);
-      input.add(randomRow);
-      expected.put(redisKey, randomRow);
-    });
+    IntStream.range(0, IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE)
+        .forEach(
+            i -> {
+              FeatureRow randomRow = TestUtil.createRandomFeatureRow(spec);
+              RedisKey redisKey = TestUtil.createRedisKey(spec, randomRow);
+              input.add(randomRow);
+              expected.put(redisKey, randomRow);
+            });
 
     LOGGER.info("Starting Import Job with the following options: {}", options.toString());
     PipelineResult pipelineResult = ImportJob.runPipeline(options);
@@ -183,43 +185,50 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow()
     Assert.assertEquals(pipelineResult.getState(), State.RUNNING);
 
     LOGGER.info("Publishing {} Feature Row messages to Kafka ...", input.size());
-    TestUtil.publishFeatureRowsToKafka(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, input,
-        ByteArraySerializer.class, KAFKA_PUBLISH_TIMEOUT_SEC);
-    TestUtil.waitUntilAllElementsAreWrittenToStore(pipelineResult,
+    TestUtil.publishFeatureRowsToKafka(
+        KAFKA_BOOTSTRAP_SERVERS,
+        KAFKA_TOPIC,
+        input,
+        ByteArraySerializer.class,
+        KAFKA_PUBLISH_TIMEOUT_SEC);
+    TestUtil.waitUntilAllElementsAreWrittenToStore(
+        pipelineResult,
         Duration.standardSeconds(IMPORT_JOB_MAX_RUN_DURATION_SEC),
         Duration.standardSeconds(IMPORT_JOB_CHECK_INTERVAL_DURATION_SEC));
 
     LOGGER.info("Validating the actual values written to Redis ...");
     Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
-    expected.forEach((key, expectedValue) -> {
-
-      // Ensure ingested key exists.
-      byte[] actualByteValue = jedis.get(key.toByteArray());
-      if (actualByteValue == null) {
-        LOGGER.error("Key not found in Redis: " + key);
-        LOGGER.info("Redis INFO:");
-        LOGGER.info(jedis.info());
-        String randomKey = jedis.randomKey();
-        if (randomKey != null) {
-          LOGGER.info("Sample random key, value (for debugging purpose):");
-          LOGGER.info("Key: " + randomKey);
-          LOGGER.info("Value: " + jedis.get(randomKey));
-        }
-        Assert.fail("Missing key in Redis.");
-      }
-
-      // Ensure value is a valid serialized FeatureRow object.
-      FeatureRow actualValue = null;
-      try {
-        actualValue = FeatureRow.parseFrom(actualByteValue);
-      } catch (InvalidProtocolBufferException e) {
-        Assert.fail(String
-            .format("Actual Redis value cannot be parsed as FeatureRow, key: %s, value :%s",
-                key, new String(actualByteValue, StandardCharsets.UTF_8)));
-      }
-
-      // Ensure the retrieved FeatureRow is equal to the ingested FeatureRow.
-      Assert.assertEquals(expectedValue, actualValue);
-    });
+    expected.forEach(
+        (key, expectedValue) -> {
+
+          // Ensure ingested key exists.
+          byte[] actualByteValue = jedis.get(key.toByteArray());
+          if (actualByteValue == null) {
+            LOGGER.error("Key not found in Redis: " + key);
+            LOGGER.info("Redis INFO:");
+            LOGGER.info(jedis.info());
+            String randomKey = jedis.randomKey();
+            if (randomKey != null) {
+              LOGGER.info("Sample random key, value (for debugging purpose):");
+              LOGGER.info("Key: " + randomKey);
+              LOGGER.info("Value: " + jedis.get(randomKey));
+            }
+            Assert.fail("Missing key in Redis.");
+          }
+
+          // Ensure value is a valid serialized FeatureRow object.
+          FeatureRow actualValue = null;
+          try {
+            actualValue = FeatureRow.parseFrom(actualByteValue);
+          } catch (InvalidProtocolBufferException e) {
+            Assert.fail(
+                String.format(
+                    "Actual Redis value cannot be parsed as FeatureRow, key: %s, value :%s",
+                    key, new String(actualByteValue, StandardCharsets.UTF_8)));
+          }
+
+          // Ensure the retrieved FeatureRow is equal to the ingested FeatureRow.
+          Assert.assertEquals(expectedValue, actualValue);
+        });
   }
 }
diff --git a/ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java b/ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java
index 6e0db2dd49c..f74f7667850 100644
--- a/ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java
+++ b/ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java
@@ -1,3 +1,19 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package feast.store.serving.redis;
 
 import static org.junit.Assert.*;
@@ -6,11 +22,8 @@
 import feast.core.FeatureSetProto.EntitySpec;
 import feast.core.FeatureSetProto.FeatureSetSpec;
 import feast.core.FeatureSetProto.FeatureSpec;
-import feast.ingestion.transform.ValidateFeatureRows;
 import feast.storage.RedisProto.RedisKey;
-import feast.store.serving.redis.RedisCustomIO.Method;
 import feast.store.serving.redis.RedisCustomIO.RedisMutation;
-import feast.test.TestUtil;
 import feast.types.FeatureRowProto.FeatureRow;
 import feast.types.FieldProto.Field;
 import feast.types.ValueProto.Value;
@@ -31,64 +44,77 @@
 
 public class FeatureRowToRedisMutationDoFnTest {
 
-  @Rule
-  public transient TestPipeline p = TestPipeline.create();
-
-  private FeatureSetSpec fs = FeatureSetSpec.newBuilder()
-      .setName("feature_set")
-      .setVersion(1)
-      .addEntities(
-          EntitySpec.newBuilder()
-              .setName("entity_id_primary")
-              .setValueType(Enum.INT32)
-              .build())
-      .addEntities(
-          EntitySpec.newBuilder()
-              .setName("entity_id_secondary")
-              .setValueType(Enum.STRING)
-              .build())
-      .addFeatures(
-          FeatureSpec.newBuilder().setName("feature_1").setValueType(Enum.STRING).build())
-      .addFeatures(
-          FeatureSpec.newBuilder().setName("feature_2").setValueType(Enum.INT64).build())
-      .build();
+  @Rule public transient TestPipeline p = TestPipeline.create();
+
+  private FeatureSetSpec fs =
+      FeatureSetSpec.newBuilder()
+          .setName("feature_set")
+          .setVersion(1)
+          .addEntities(
+              EntitySpec.newBuilder().setName("entity_id_primary").setValueType(Enum.INT32).build())
+          .addEntities(
+              EntitySpec.newBuilder()
+                  .setName("entity_id_secondary")
+                  .setValueType(Enum.STRING)
+                  .build())
+          .addFeatures(
+              FeatureSpec.newBuilder().setName("feature_1").setValueType(Enum.STRING).build())
+          .addFeatures(
+              FeatureSpec.newBuilder().setName("feature_2").setValueType(Enum.INT64).build())
+          .build();
 
   @Test
   public void shouldConvertRowWithDuplicateEntitiesToValidKey() {
     Map featureSetSpecs = new HashMap<>();
     featureSetSpecs.put("feature_set", fs);
 
-    FeatureRow offendingRow = FeatureRow.newBuilder()
-        .setFeatureSet("feature_set")
-        .setEventTimestamp(Timestamp.newBuilder().setSeconds(10))
-        .addFields(Field.newBuilder().setName("entity_id_primary")
-            .setValue(Value.newBuilder().setInt32Val(1)))
-        .addFields(Field.newBuilder().setName("entity_id_primary")
-            .setValue(Value.newBuilder().setInt32Val(2)))
-        .addFields(Field.newBuilder().setName("entity_id_secondary")
-            .setValue(Value.newBuilder().setStringVal("a")))
-        .build();
-
-    PCollection output = p
-        .apply(Create.of(Collections.singletonList(offendingRow)))
-        .setCoder(ProtoCoder.of(FeatureRow.class))
-        .apply(ParDo.of(new FeatureRowToRedisMutationDoFn(featureSetSpecs)));
-
-    RedisKey expectedKey = RedisKey.newBuilder()
-        .setFeatureSet("feature_set")
-        .addEntities(Field.newBuilder().setName("entity_id_primary")
-            .setValue(Value.newBuilder().setInt32Val(1)))
-        .addEntities(Field.newBuilder().setName("entity_id_secondary")
-            .setValue(Value.newBuilder().setStringVal("a")))
-        .build();
-
-    PAssert.that(output).satisfies((SerializableFunction, Void>) input -> {
-      input.forEach(rm -> {
-        assert(Arrays.equals(rm.getKey(), expectedKey.toByteArray()));
-        assert(Arrays.equals(rm.getValue(), offendingRow.toByteArray()));
-      });
-      return null;
-    });
+    FeatureRow offendingRow =
+        FeatureRow.newBuilder()
+            .setFeatureSet("feature_set")
+            .setEventTimestamp(Timestamp.newBuilder().setSeconds(10))
+            .addFields(
+                Field.newBuilder()
+                    .setName("entity_id_primary")
+                    .setValue(Value.newBuilder().setInt32Val(1)))
+            .addFields(
+                Field.newBuilder()
+                    .setName("entity_id_primary")
+                    .setValue(Value.newBuilder().setInt32Val(2)))
+            .addFields(
+                Field.newBuilder()
+                    .setName("entity_id_secondary")
+                    .setValue(Value.newBuilder().setStringVal("a")))
+            .build();
+
+    PCollection output =
+        p.apply(Create.of(Collections.singletonList(offendingRow)))
+            .setCoder(ProtoCoder.of(FeatureRow.class))
+            .apply(ParDo.of(new FeatureRowToRedisMutationDoFn(featureSetSpecs)));
+
+    RedisKey expectedKey =
+        RedisKey.newBuilder()
+            .setFeatureSet("feature_set")
+            .addEntities(
+                Field.newBuilder()
+                    .setName("entity_id_primary")
+                    .setValue(Value.newBuilder().setInt32Val(1)))
+            .addEntities(
+                Field.newBuilder()
+                    .setName("entity_id_secondary")
+                    .setValue(Value.newBuilder().setStringVal("a")))
+            .build();
+
+    PAssert.that(output)
+        .satisfies(
+            (SerializableFunction, Void>)
+                input -> {
+                  input.forEach(
+                      rm -> {
+                        assert (Arrays.equals(rm.getKey(), expectedKey.toByteArray()));
+                        assert (Arrays.equals(rm.getValue(), offendingRow.toByteArray()));
+                      });
+                  return null;
+                });
     p.run();
   }
 
@@ -97,36 +123,49 @@ public void shouldConvertRowWithOutOfOrderEntitiesToValidKey() {
     Map featureSetSpecs = new HashMap<>();
     featureSetSpecs.put("feature_set", fs);
 
-    FeatureRow offendingRow = FeatureRow.newBuilder()
-        .setFeatureSet("feature_set")
-        .setEventTimestamp(Timestamp.newBuilder().setSeconds(10))
-        .addFields(Field.newBuilder().setName("entity_id_secondary")
-            .setValue(Value.newBuilder().setStringVal("a")))
-        .addFields(Field.newBuilder().setName("entity_id_primary")
-            .setValue(Value.newBuilder().setInt32Val(1)))
-        .build();
-
-    PCollection output = p
-        .apply(Create.of(Collections.singletonList(offendingRow)))
-        .setCoder(ProtoCoder.of(FeatureRow.class))
-        .apply(ParDo.of(new FeatureRowToRedisMutationDoFn(featureSetSpecs)));
-
-    RedisKey expectedKey = RedisKey.newBuilder()
-        .setFeatureSet("feature_set")
-        .addEntities(Field.newBuilder().setName("entity_id_primary")
-            .setValue(Value.newBuilder().setInt32Val(1)))
-        .addEntities(Field.newBuilder().setName("entity_id_secondary")
-            .setValue(Value.newBuilder().setStringVal("a")))
-        .build();
-
-    PAssert.that(output).satisfies((SerializableFunction, Void>) input -> {
-      input.forEach(rm -> {
-        assert(Arrays.equals(rm.getKey(), expectedKey.toByteArray()));
-        assert(Arrays.equals(rm.getValue(), offendingRow.toByteArray()));
-      });
-      return null;
-    });
+    FeatureRow offendingRow =
+        FeatureRow.newBuilder()
+            .setFeatureSet("feature_set")
+            .setEventTimestamp(Timestamp.newBuilder().setSeconds(10))
+            .addFields(
+                Field.newBuilder()
+                    .setName("entity_id_secondary")
+                    .setValue(Value.newBuilder().setStringVal("a")))
+            .addFields(
+                Field.newBuilder()
+                    .setName("entity_id_primary")
+                    .setValue(Value.newBuilder().setInt32Val(1)))
+            .build();
+
+    PCollection output =
+        p.apply(Create.of(Collections.singletonList(offendingRow)))
+            .setCoder(ProtoCoder.of(FeatureRow.class))
+            .apply(ParDo.of(new FeatureRowToRedisMutationDoFn(featureSetSpecs)));
+
+    RedisKey expectedKey =
+        RedisKey.newBuilder()
+            .setFeatureSet("feature_set")
+            .addEntities(
+                Field.newBuilder()
+                    .setName("entity_id_primary")
+                    .setValue(Value.newBuilder().setInt32Val(1)))
+            .addEntities(
+                Field.newBuilder()
+                    .setName("entity_id_secondary")
+                    .setValue(Value.newBuilder().setStringVal("a")))
+            .build();
+
+    PAssert.that(output)
+        .satisfies(
+            (SerializableFunction, Void>)
+                input -> {
+                  input.forEach(
+                      rm -> {
+                        assert (Arrays.equals(rm.getKey(), expectedKey.toByteArray()));
+                        assert (Arrays.equals(rm.getValue(), offendingRow.toByteArray()));
+                      });
+                  return null;
+                });
     p.run();
   }
-
-}
\ No newline at end of file
+}
diff --git a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java
index 47587e1d0ef..43ac75d0b21 100644
--- a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java
+++ b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java
@@ -231,8 +231,7 @@ Job runBatchQuery(List featureSetQueries)
         bigquery()
             .getTable(queryJobConfig.getDestinationTable())
             .toBuilder()
-            .setExpirationTime(
-                System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
+            .setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
             .build();
     bigquery().update(expiry);