|
73 | 73 | import java.util.Map; |
74 | 74 | import java.util.Objects; |
75 | 75 | import java.util.OptionalInt; |
76 | | -import java.util.concurrent.atomic.AtomicBoolean; |
77 | 76 | import java.util.concurrent.atomic.AtomicInteger; |
78 | 77 | import java.util.concurrent.atomic.AtomicLong; |
79 | 78 | import java.util.regex.Pattern; |
|
105 | 104 | public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase { |
106 | 105 |
|
107 | 106 | private S3Service service; |
108 | | - private AtomicBoolean shouldErrorOnDns; |
| 107 | + private volatile boolean shouldErrorOnDns; |
109 | 108 | private RecordingMeterRegistry recordingMeterRegistry; |
110 | 109 |
|
111 | 110 | @Before |
112 | 111 | public void setUp() throws Exception { |
113 | | - shouldErrorOnDns = new AtomicBoolean(false); |
| 112 | + shouldErrorOnDns = false; |
114 | 113 | service = new S3Service(Mockito.mock(Environment.class), Settings.EMPTY, Mockito.mock(ResourceWatcherService.class)) { |
115 | 114 | @Override |
116 | 115 | protected AmazonS3ClientBuilder buildClientBuilder(S3ClientSettings clientSettings) { |
117 | 116 | final AmazonS3ClientBuilder builder = super.buildClientBuilder(clientSettings); |
118 | 117 | final DnsResolver defaultDnsResolver = builder.getClientConfiguration().getDnsResolver(); |
119 | 118 | builder.getClientConfiguration().setDnsResolver(host -> { |
120 | | - if (shouldErrorOnDns.get() && randomBoolean() && randomBoolean()) { |
| 119 | + if (shouldErrorOnDns && randomBoolean() && randomBoolean()) { |
121 | 120 | throw new UnknownHostException(host); |
122 | 121 | } |
123 | 122 | return defaultDnsResolver.resolve(host); |
@@ -653,7 +652,7 @@ public void testReadWithIndicesPurposeRetriesForever() throws IOException { |
653 | 652 |
|
654 | 653 | final byte[] bytes = randomBlobContent(512); |
655 | 654 |
|
656 | | - shouldErrorOnDns.set(true); |
| 655 | + shouldErrorOnDns = true; |
657 | 656 | final AtomicInteger failures = new AtomicInteger(); |
658 | 657 | @SuppressForbidden(reason = "use a http server") |
659 | 658 | class FlakyReadHandler implements HttpHandler { |
@@ -776,7 +775,7 @@ public void testSuppressedDeletionErrorsAreCapped() { |
776 | 775 | int maxBulkDeleteSize = randomIntBetween(1, 10); |
777 | 776 | final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize); |
778 | 777 | httpServer.createContext("/", exchange -> { |
779 | | - if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) { |
| 778 | + if (isMultiDeleteRequest(exchange)) { |
780 | 779 | exchange.sendResponseHeaders( |
781 | 780 | randomFrom( |
782 | 781 | HttpStatus.SC_INTERNAL_SERVER_ERROR, |
@@ -810,7 +809,7 @@ public void testTrimmedLogAndCappedSuppressedErrorOnMultiObjectDeletionException |
810 | 809 |
|
811 | 810 | final Pattern pattern = Pattern.compile("<Key>(.+?)</Key>"); |
812 | 811 | httpServer.createContext("/", exchange -> { |
813 | | - if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) { |
| 812 | + if (isMultiDeleteRequest(exchange)) { |
814 | 813 | final String requestBody = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8)); |
815 | 814 | final var matcher = pattern.matcher(requestBody); |
816 | 815 | final StringBuilder deletes = new StringBuilder(); |
@@ -962,6 +961,10 @@ private Map<String, Object> metricAttributes(String action) { |
962 | 961 | return Map.of("repo_type", "s3", "repo_name", "repository", "operation", "GetObject", "purpose", "Indices", "action", action); |
963 | 962 | } |
964 | 963 |
|
| 964 | + private static boolean isMultiDeleteRequest(HttpExchange exchange) { |
| 965 | + return new S3HttpHandler("bucket").parseRequest(exchange).isMultiObjectDeleteRequest(); |
| 966 | + } |
| 967 | + |
965 | 968 | /** |
966 | 969 | * Asserts that an InputStream is fully consumed, or aborted, when it is closed |
967 | 970 | */ |
|
0 commit comments