Skip to content

Commit

Permalink
Add experimental SOCKS5 support for S3
Browse files Browse the repository at this point in the history
Technically, I found a way to have proxy-side host name resolution, but it requires rewriting quite a bit of the AWS client machinery. Leaving it as is for now.
  • Loading branch information
ivanyu committed Mar 28, 2024
1 parent 2e0a299 commit 87b89b1
Show file tree
Hide file tree
Showing 11 changed files with 525 additions and 61 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ The cache is able to asynchronously prefetch next chunks, up to the specified nu

⚠️ This is an experimental feature subject for future changes.

| Object storage | Supported | Host name resolution |
|----------------------|:---------------:|:--------------------:|
| AWS S3 | ❌ (in progress) | |
| Azure Blob Storage | | Proxy-side |
| Google Cloud Storage | | Proxy-side |
| Object storage | Supported | Host name resolution |
|----------------------|:---------:|:--------------------:|
| AWS S3 | | Client-side |
| Azure Blob Storage | | Proxy-side |
| Google Cloud Storage | | Proxy-side |

## License

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import org.junit.jupiter.api.BeforeAll;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

class S3MinioSingleBrokerDirectTest extends S3MinioSingleBrokerTest {
static final String BUCKET = "test-bucket-direct";

@BeforeAll
static void createBucket() {
s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
}

@BeforeAll
static void startKafka() throws Exception {
setupKafka(kafka -> rsmPluginBasicSetup(kafka)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET));
}

@Override
protected String bucket() {
return BUCKET;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import org.junit.jupiter.api.BeforeAll;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

class S3MinioSingleBrokerSocks5Test extends S3MinioSingleBrokerTest {
static final String BUCKET = "test-bucket-socks5";

@BeforeAll
static void createBucket() {
s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
}

@BeforeAll
static void startKafka() throws Exception {
setupKafka(kafka -> rsmPluginBasicSetup(kafka)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_HOST", SOCKS5_NETWORK_ALIAS)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_PORT", Integer.toString(SOCKS5_PORT))
.withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_USERNAME", SOCKS5_USER)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_PASSWORD", SOCKS5_PASSWORD));
}

@Override
protected String bucket() {
return BUCKET;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.Testcontainers;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
Expand All @@ -38,45 +38,28 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;

public class S3MinioSingleBrokerTest extends SingleBrokerTest {

abstract class S3MinioSingleBrokerTest extends SingleBrokerTest {
static final int MINIO_PORT = 9000;
static final String MINIO_NETWORK_ALIAS = "minio";

static final GenericContainer<?> MINIO = new GenericContainer<>(DockerImageName.parse("minio/minio"))
.withCommand("server", "/data", "--console-address", ":9090")
.withExposedPorts(MINIO_PORT)
.withNetwork(NETWORK)
.withNetworkAliases("minio");
.withNetworkAliases(MINIO_NETWORK_ALIAS);

static final String ACCESS_KEY_ID = "minioadmin";
static final String SECRET_ACCESS_KEY = "minioadmin";
static final String REGION = "us-east-1";
static final String BUCKET = "test-bucket";

static final String MINIO_SERVER_URL = String.format("http://%s:%s", MINIO_NETWORK_ALIAS, MINIO_PORT);

static S3Client s3Client;

@BeforeAll
static void init() throws Exception {
static void init() {
MINIO.start();

final String minioServerUrl = String.format("http://minio:%s", MINIO_PORT);

createBucket(minioServerUrl);

initializeS3Client();

setupKafka(kafka -> kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS",
"io.aiven.kafka.tieredstorage.storage.s3.S3Storage")
.withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH",
"/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/s3/*")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", REGION)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", ACCESS_KEY_ID)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", SECRET_ACCESS_KEY)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL", minioServerUrl)
.dependsOn(MINIO));
}

private static void initializeS3Client() {
final Integer mappedPort = MINIO.getFirstMappedPort();
Testcontainers.exposeHostPorts(mappedPort);
s3Client = S3Client.builder()
Expand All @@ -93,21 +76,6 @@ private static void initializeS3Client() {
.forEach(bucket -> LOG.info("S3 bucket: {}", bucket.name()));
}

private static void createBucket(final String minioServerUrl) {
final String cmd =
"/usr/bin/mc config host add local "
+ minioServerUrl + " " + ACCESS_KEY_ID + " " + SECRET_ACCESS_KEY + " --api s3v4 &&"
+ "/usr/bin/mc mb local/test-bucket;\n";

final GenericContainer<?> mcContainer = new GenericContainer<>("minio/mc")
.withNetwork(NETWORK)
.withStartupCheckStrategy(new OneShotStartupCheckStrategy())
.withCreateContainerCmdModifier(containerCommand -> containerCommand
.withTty(true)
.withEntrypoint("/bin/sh", "-c", cmd));
mcContainer.start();
}

@AfterAll
static void cleanup() {
stopKafka();
Expand All @@ -117,16 +85,33 @@ static void cleanup() {
cleanupStorage();
}

static KafkaContainer rsmPluginBasicSetup(final KafkaContainer container) {
container
.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS",
"io.aiven.kafka.tieredstorage.storage.s3.S3Storage")
.withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH",
"/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/s3/*")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", REGION)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", ACCESS_KEY_ID)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", SECRET_ACCESS_KEY)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL", MINIO_SERVER_URL)
.dependsOn(MINIO);
return container;
}

protected abstract String bucket();

@Override
boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) {
final String prefix = String.format("%s-%s", topicName, topicId.toString());
final var request = ListObjectsV2Request.builder().bucket(BUCKET).prefix(prefix).build();
final var request = ListObjectsV2Request.builder().bucket(bucket()).prefix(prefix).build();
return s3Client.listObjectsV2(request).keyCount() == 0;
}

@Override
List<String> remotePartitionFiles(final TopicIdPartition topicIdPartition) {
ListObjectsV2Request request = ListObjectsV2Request.builder().bucket(BUCKET).build();
ListObjectsV2Request request = ListObjectsV2Request.builder().bucket(bucket()).build();
final List<S3Object> s3Objects = new ArrayList<>();
ListObjectsV2Response result;
while ((result = s3Client.listObjectsV2(request)).isTruncated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void worksWithUnauthenticatedProxy() throws StorageBackendException, IOException
protected abstract Map<String, Object> storageConfigForUnauthenticatedProxy();

@Test
void doesNotWorkWithoutProxy() {
protected void doesNotWorkWithoutProxy() {
// This test accompanies the other ones by ensuring that _without_ a proxy
// we cannot even resolve the host name of the server, which is internal to the Docker network.

Expand Down
1 change: 1 addition & 0 deletions storage/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
dep.exclude group: "org.slf4j"
}
implementation ("software.amazon.awssdk:s3:$awsSdkVersion") {excludeFromAWSDeps(it)}
compileOnly ("software.amazon.awssdk:apache-client:$awsSdkVersion") {excludeFromAWSDeps(it)}
runtimeOnly ("software.amazon.awssdk:sts:$awsSdkVersion") {excludeFromAWSDeps(it)}

implementation project(':commons')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.storage.s3;

import java.lang.reflect.Method;
import java.util.Map;

import io.aiven.kafka.tieredstorage.storage.BaseSocks5Test;
import io.aiven.kafka.tieredstorage.storage.TestUtils;

import com.github.dockerjava.api.model.ContainerNetwork;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestInfo;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

@Testcontainers
class S3Socks5Test extends BaseSocks5Test<S3Storage> {
static final Network NETWORK = Network.newNetwork();

@Container
private static final LocalStackContainer LOCALSTACK = S3TestContainer.container()
.withNetwork(NETWORK);

@Container
static final GenericContainer<?> PROXY_AUTHENTICATED = proxyContainer(true).withNetwork(NETWORK);
@Container
static final GenericContainer<?> PROXY_UNAUTHENTICATED = proxyContainer(false).withNetwork(NETWORK);

private static S3Client s3Client;
private String bucketName;

@BeforeAll
static void setUpClass() {
final var clientBuilder = S3Client.builder();
clientBuilder.region(Region.of(LOCALSTACK.getRegion()))
.endpointOverride(LOCALSTACK.getEndpointOverride(LocalStackContainer.Service.S3))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
LOCALSTACK.getAccessKey(),
LOCALSTACK.getSecretKey()
)
)
)
.build();
s3Client = clientBuilder.build();
}

@BeforeEach
void setUp(final TestInfo testInfo) {
bucketName = TestUtils.testNameToBucketName(testInfo);
s3Client.createBucket(CreateBucketRequest.builder().bucket(bucketName).build());
}

static String internalLocalstackEndpoint() {
try {
final String networkName = LOCALSTACK.getDockerClient()
.inspectNetworkCmd().withNetworkId(NETWORK.getId()).exec().getName();
final ContainerNetwork containerNetwork = LOCALSTACK.getContainerInfo()
.getNetworkSettings().getNetworks().get(networkName);
final String ipAddress = containerNetwork.getIpAddress();
final Method getServicePortField = LocalStackContainer.class
.getDeclaredMethod("getServicePort", LocalStackContainer.EnabledService.class);
getServicePortField.setAccessible(true);
final int port = (int) getServicePortField.invoke(LOCALSTACK, LocalStackContainer.Service.S3);
return String.format("http://%s:%d", ipAddress, port);
} catch (final ReflectiveOperationException e) {
throw new RuntimeException(e);
}

}

@Override
protected S3Storage createStorageBackend() {
return new S3Storage();
}

@Override
protected Map<String, Object> storageConfigForAuthenticatedProxy() {
final var proxy = PROXY_AUTHENTICATED;
return Map.of(
"s3.bucket.name", bucketName,
"s3.region", LOCALSTACK.getRegion(),
"s3.endpoint.url", internalLocalstackEndpoint(),
"aws.access.key.id", LOCALSTACK.getAccessKey(),
"aws.secret.access.key", LOCALSTACK.getSecretKey(),
"s3.path.style.access.enabled", true,
"proxy.host", proxy.getHost(),
"proxy.port", proxy.getMappedPort(SOCKS5_PORT),
"proxy.username", SOCKS5_USER,
"proxy.password", SOCKS5_PASSWORD
);
}

@Override
protected Map<String, Object> storageConfigForUnauthenticatedProxy() {
final var proxy = PROXY_UNAUTHENTICATED;
return Map.of(
"s3.bucket.name", bucketName,
"s3.region", LOCALSTACK.getRegion(),
"s3.endpoint.url", internalLocalstackEndpoint(),
"aws.access.key.id", LOCALSTACK.getAccessKey(),
"aws.secret.access.key", LOCALSTACK.getSecretKey(),
"s3.path.style.access.enabled", true,
"proxy.host", proxy.getHost(),
"proxy.port", proxy.getMappedPort(SOCKS5_PORT)
);
}

@Override
protected Map<String, Object> storageConfigForNoProxy() {
return Map.of(
"s3.bucket.name", bucketName,
"s3.region", LOCALSTACK.getRegion(),
"s3.endpoint.url", internalLocalstackEndpoint(),
"aws.access.key.id", LOCALSTACK.getAccessKey(),
"aws.secret.access.key", LOCALSTACK.getSecretKey(),
"s3.path.style.access.enabled", true
);
}

@Disabled("Not applicable for S3")
@Override
protected void doesNotWorkWithoutProxy() {
// Unfortunately, S3 does the client-side hostname resolution,
// so the trick with using the hostname visible only in Docker (i.e. to the proxy containers) won't work.
}

@Override
protected Iterable<String> possibleRootCauseMessagesWhenNoProxy() {
return null;
}
}

0 comments on commit 87b89b1

Please sign in to comment.