Skip to content

Commit b17bf1c

Browse files
authored
fix(s3): Wrap eventing poll loop in try/catch and emit error metric (spinnaker#281)
1 parent 68da42a commit b17bf1c

File tree

3 files changed

+28
-13
lines changed

3 files changed

+28
-13
lines changed

front50-s3/src/main/java/com/netflix/spinnaker/front50/config/S3Config.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.fasterxml.jackson.databind.ObjectMapper;
1616
import com.netflix.appinfo.ApplicationInfoManager;
1717
import com.netflix.awsobjectmapper.AmazonObjectMapperConfigurer;
18+
import com.netflix.spectator.api.Registry;
1819
import com.netflix.spinnaker.clouddriver.aws.bastion.BastionConfig;
1920
import com.netflix.spinnaker.clouddriver.aws.security.AmazonClientProvider;
2021
import com.netflix.spinnaker.front50.model.EventingS3ObjectKeyLoader;
@@ -120,13 +121,15 @@ public TemporarySQSQueue temporaryQueueSupport(Optional<ApplicationInfoManager>
120121
public ObjectKeyLoader eventingS3ObjectKeyLoader(ObjectMapper objectMapper,
121122
S3Properties s3Properties,
122123
S3StorageService s3StorageService,
123-
TemporarySQSQueue temporaryQueueSupport) {
124+
TemporarySQSQueue temporaryQueueSupport,
125+
Registry registry) {
124126
return new EventingS3ObjectKeyLoader(
125127
Executors.newFixedThreadPool(1),
126128
objectMapper,
127129
s3Properties,
128130
temporaryQueueSupport,
129131
s3StorageService,
132+
registry,
130133
true
131134
);
132135
}

front50-s3/src/main/java/com/netflix/spinnaker/front50/model/EventingS3ObjectKeyLoader.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.common.cache.LoadingCache;
2525
import com.google.common.util.concurrent.ListenableFuture;
2626
import com.google.common.util.concurrent.ListenableFutureTask;
27+
import com.netflix.spectator.api.Registry;
2728
import com.netflix.spinnaker.front50.config.S3Properties;
2829
import com.netflix.spinnaker.front50.model.events.S3Event;
2930
import com.netflix.spinnaker.front50.model.events.S3EventWrapper;
@@ -62,8 +63,9 @@ public class EventingS3ObjectKeyLoader implements ObjectKeyLoader, Runnable {
6263
private static final Executor executor = Executors.newFixedThreadPool(5);
6364

6465
private final ObjectMapper objectMapper;
65-
private final S3StorageService s3StorageService;
6666
private final TemporarySQSQueue temporarySQSQueue;
67+
private final S3StorageService s3StorageService;
68+
private final Registry registry;
6769

6870
private final Cache<KeyWithObjectType, Long> objectKeysByLastModifiedCache;
6971
private final LoadingCache<ObjectType, Map<String, Long>> objectKeysByObjectTypeCache;
@@ -77,10 +79,12 @@ public EventingS3ObjectKeyLoader(ExecutorService executionService,
7779
S3Properties s3Properties,
7880
TemporarySQSQueue temporarySQSQueue,
7981
S3StorageService s3StorageService,
82+
Registry registry,
8083
boolean scheduleImmediately) {
8184
this.objectMapper = objectMapper;
8285
this.temporarySQSQueue = temporarySQSQueue;
8386
this.s3StorageService = s3StorageService;
87+
this.registry = registry;
8488

8589
this.objectKeysByLastModifiedCache = CacheBuilder
8690
.newBuilder()
@@ -173,19 +177,24 @@ public Map<String, Long> listObjectKeys(ObjectType objectType) {
173177
@Override
174178
public void run() {
175179
while (pollForMessages) {
176-
List<Message> messages = temporarySQSQueue.fetchMessages();
177-
178-
if (messages.isEmpty()) {
179-
continue;
180-
}
180+
try {
181+
List<Message> messages = temporarySQSQueue.fetchMessages();
181182

182-
messages.forEach(message -> {
183-
S3Event s3Event = unmarshall(objectMapper, message.getBody());
184-
if (s3Event != null) {
185-
tick(s3Event);
183+
if (messages.isEmpty()) {
184+
continue;
186185
}
187-
temporarySQSQueue.markMessageAsHandled(message.getReceiptHandle());
188-
});
186+
187+
messages.forEach(message -> {
188+
S3Event s3Event = unmarshall(objectMapper, message.getBody());
189+
if (s3Event != null) {
190+
tick(s3Event);
191+
}
192+
temporarySQSQueue.markMessageAsHandled(message.getReceiptHandle());
193+
});
194+
} catch (Exception e) {
195+
log.error("Failed to poll for messages", e);
196+
registry.counter("s3.eventing.pollErrors").increment();
197+
}
189198
}
190199
}
191200

front50-s3/src/test/groovy/com/netflix/spinnaker/front50/model/EventingS3ObjectKeyLoaderSpec.groovy

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.netflix.spinnaker.front50.model
1818

1919
import com.fasterxml.jackson.databind.ObjectMapper
20+
import com.netflix.spectator.api.Registry
2021
import com.netflix.spinnaker.front50.config.S3Properties
2122
import com.netflix.spinnaker.front50.model.events.S3Event
2223
import org.springframework.scheduling.TaskScheduler
@@ -34,6 +35,7 @@ class EventingS3ObjectKeyLoaderSpec extends Specification {
3435
)
3536
def temporarySQSQueue = Mock(TemporarySQSQueue)
3637
def s3StorageService = Mock(S3StorageService)
38+
def registry = Mock(Registry)
3739

3840
@Subject
3941
def objectKeyLoader = new EventingS3ObjectKeyLoader(
@@ -42,6 +44,7 @@ class EventingS3ObjectKeyLoaderSpec extends Specification {
4244
s3Properties,
4345
temporarySQSQueue,
4446
s3StorageService,
47+
registry,
4548
false
4649
)
4750

0 commit comments

Comments
 (0)