Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental SOCKS5 support for S3 #524

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ The cache is able to asynchronously prefetch next chunks, up to the specified nu

⚠️ This is an experimental feature subject for future changes.

| Object storage | Supported | Host name resolution |
|----------------------|:---------------:|:--------------------:|
| AWS S3 | ❌ (in progress) | |
| Azure Blob Storage | | Proxy-side |
| Google Cloud Storage | | Proxy-side |
| Object storage | Supported | Host name resolution |
|----------------------|:---------:|:--------------------:|
| AWS S3 | | Client-side |
| Azure Blob Storage | | Proxy-side |
| Google Cloud Storage | | Proxy-side |

## License

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import org.junit.jupiter.api.BeforeAll;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

class S3MinioSingleBrokerDirectTest extends S3MinioSingleBrokerTest {
static final String BUCKET = "test-bucket-direct";

@BeforeAll
static void createBucket() {
s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
}

@BeforeAll
static void startKafka() throws Exception {
setupKafka(kafka -> rsmPluginBasicSetup(kafka)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET));
}

@Override
protected String bucket() {
return BUCKET;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import org.junit.jupiter.api.BeforeAll;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

class S3MinioSingleBrokerSocks5Test extends S3MinioSingleBrokerTest {
static final String BUCKET = "test-bucket-socks5";

@BeforeAll
static void createBucket() {
s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
}

@BeforeAll
static void startKafka() throws Exception {
setupKafka(kafka -> rsmPluginBasicSetup(kafka)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_HOST", SOCKS5_NETWORK_ALIAS)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_PORT", Integer.toString(SOCKS5_PORT))
.withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_USERNAME", SOCKS5_USER)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_PASSWORD", SOCKS5_PASSWORD));
}

@Override
protected String bucket() {
return BUCKET;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.Testcontainers;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
Expand All @@ -38,45 +38,28 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;

public class S3MinioSingleBrokerTest extends SingleBrokerTest {

abstract class S3MinioSingleBrokerTest extends SingleBrokerTest {
static final int MINIO_PORT = 9000;
static final String MINIO_NETWORK_ALIAS = "minio";

static final GenericContainer<?> MINIO = new GenericContainer<>(DockerImageName.parse("minio/minio"))
.withCommand("server", "/data", "--console-address", ":9090")
.withExposedPorts(MINIO_PORT)
.withNetwork(NETWORK)
.withNetworkAliases("minio");
.withNetworkAliases(MINIO_NETWORK_ALIAS);

static final String ACCESS_KEY_ID = "minioadmin";
static final String SECRET_ACCESS_KEY = "minioadmin";
static final String REGION = "us-east-1";
static final String BUCKET = "test-bucket";

static final String MINIO_SERVER_URL = String.format("http://%s:%s", MINIO_NETWORK_ALIAS, MINIO_PORT);

static S3Client s3Client;

@BeforeAll
static void init() throws Exception {
static void init() {
MINIO.start();

final String minioServerUrl = String.format("http://minio:%s", MINIO_PORT);

createBucket(minioServerUrl);

initializeS3Client();

setupKafka(kafka -> kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS",
"io.aiven.kafka.tieredstorage.storage.s3.S3Storage")
.withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH",
"/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/s3/*")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", REGION)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", ACCESS_KEY_ID)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", SECRET_ACCESS_KEY)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL", minioServerUrl)
.dependsOn(MINIO));
}

private static void initializeS3Client() {
final Integer mappedPort = MINIO.getFirstMappedPort();
Testcontainers.exposeHostPorts(mappedPort);
s3Client = S3Client.builder()
Expand All @@ -93,21 +76,6 @@ private static void initializeS3Client() {
.forEach(bucket -> LOG.info("S3 bucket: {}", bucket.name()));
}

private static void createBucket(final String minioServerUrl) {
final String cmd =
"/usr/bin/mc config host add local "
+ minioServerUrl + " " + ACCESS_KEY_ID + " " + SECRET_ACCESS_KEY + " --api s3v4 &&"
+ "/usr/bin/mc mb local/test-bucket;\n";

final GenericContainer<?> mcContainer = new GenericContainer<>("minio/mc")
.withNetwork(NETWORK)
.withStartupCheckStrategy(new OneShotStartupCheckStrategy())
.withCreateContainerCmdModifier(containerCommand -> containerCommand
.withTty(true)
.withEntrypoint("/bin/sh", "-c", cmd));
mcContainer.start();
}

@AfterAll
static void cleanup() {
stopKafka();
Expand All @@ -117,16 +85,33 @@ static void cleanup() {
cleanupStorage();
}

static KafkaContainer rsmPluginBasicSetup(final KafkaContainer container) {
container
.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS",
"io.aiven.kafka.tieredstorage.storage.s3.S3Storage")
.withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH",
"/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/s3/*")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", REGION)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", ACCESS_KEY_ID)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", SECRET_ACCESS_KEY)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL", MINIO_SERVER_URL)
.dependsOn(MINIO);
return container;
}

protected abstract String bucket();

@Override
boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) {
final String prefix = String.format("%s-%s", topicName, topicId.toString());
final var request = ListObjectsV2Request.builder().bucket(BUCKET).prefix(prefix).build();
final var request = ListObjectsV2Request.builder().bucket(bucket()).prefix(prefix).build();
return s3Client.listObjectsV2(request).keyCount() == 0;
}

@Override
List<String> remotePartitionFiles(final TopicIdPartition topicIdPartition) {
ListObjectsV2Request request = ListObjectsV2Request.builder().bucket(BUCKET).build();
ListObjectsV2Request request = ListObjectsV2Request.builder().bucket(bucket()).build();
final List<S3Object> s3Objects = new ArrayList<>();
ListObjectsV2Response result;
while ((result = s3Client.listObjectsV2(request)).isTruncated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void worksWithUnauthenticatedProxy() throws StorageBackendException, IOException
protected abstract Map<String, Object> storageConfigForUnauthenticatedProxy();

@Test
void doesNotWorkWithoutProxy() {
protected void doesNotWorkWithoutProxy() {
// This test accompanies the other ones by ensuring that _without_ a proxy
// we cannot even resolve the host name of the server, which is internal to the Docker network.

Expand Down
1 change: 1 addition & 0 deletions storage/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
dep.exclude group: "org.slf4j"
}
implementation ("software.amazon.awssdk:s3:$awsSdkVersion") {excludeFromAWSDeps(it)}
compileOnly ("software.amazon.awssdk:apache-client:$awsSdkVersion") {excludeFromAWSDeps(it)}
runtimeOnly ("software.amazon.awssdk:sts:$awsSdkVersion") {excludeFromAWSDeps(it)}

implementation project(':commons')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.storage.s3;

import java.lang.reflect.Method;
import java.util.Map;

import io.aiven.kafka.tieredstorage.storage.BaseSocks5Test;
import io.aiven.kafka.tieredstorage.storage.TestUtils;

import com.github.dockerjava.api.model.ContainerNetwork;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestInfo;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

@Testcontainers
class S3Socks5Test extends BaseSocks5Test<S3Storage> {
static final Network NETWORK = Network.newNetwork();

@Container
private static final LocalStackContainer LOCALSTACK = S3TestContainer.container()
.withNetwork(NETWORK);

@Container
static final GenericContainer<?> PROXY_AUTHENTICATED = proxyContainer(true).withNetwork(NETWORK);
@Container
static final GenericContainer<?> PROXY_UNAUTHENTICATED = proxyContainer(false).withNetwork(NETWORK);

private static S3Client s3Client;
private String bucketName;

@BeforeAll
static void setUpClass() {
final var clientBuilder = S3Client.builder();
clientBuilder.region(Region.of(LOCALSTACK.getRegion()))
.endpointOverride(LOCALSTACK.getEndpointOverride(LocalStackContainer.Service.S3))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
LOCALSTACK.getAccessKey(),
LOCALSTACK.getSecretKey()
)
)
)
.build();
s3Client = clientBuilder.build();
}

@BeforeEach
void setUp(final TestInfo testInfo) {
bucketName = TestUtils.testNameToBucketName(testInfo);
s3Client.createBucket(CreateBucketRequest.builder().bucket(bucketName).build());
}

static String internalLocalstackEndpoint() {
try {
final String networkName = LOCALSTACK.getDockerClient()
.inspectNetworkCmd().withNetworkId(NETWORK.getId()).exec().getName();
final ContainerNetwork containerNetwork = LOCALSTACK.getContainerInfo()
.getNetworkSettings().getNetworks().get(networkName);
final String ipAddress = containerNetwork.getIpAddress();
final Method getServicePortField = LocalStackContainer.class
.getDeclaredMethod("getServicePort", LocalStackContainer.EnabledService.class);
getServicePortField.setAccessible(true);
final int port = (int) getServicePortField.invoke(LOCALSTACK, LocalStackContainer.Service.S3);
return String.format("http://%s:%d", ipAddress, port);
} catch (final ReflectiveOperationException e) {
throw new RuntimeException(e);
}

}

@Override
protected S3Storage createStorageBackend() {
return new S3Storage();
}

@Override
protected Map<String, Object> storageConfigForAuthenticatedProxy() {
final var proxy = PROXY_AUTHENTICATED;
return Map.of(
"s3.bucket.name", bucketName,
"s3.region", LOCALSTACK.getRegion(),
"s3.endpoint.url", internalLocalstackEndpoint(),
"aws.access.key.id", LOCALSTACK.getAccessKey(),
"aws.secret.access.key", LOCALSTACK.getSecretKey(),
"s3.path.style.access.enabled", true,
"proxy.host", proxy.getHost(),
"proxy.port", proxy.getMappedPort(SOCKS5_PORT),
"proxy.username", SOCKS5_USER,
"proxy.password", SOCKS5_PASSWORD
);
}

@Override
protected Map<String, Object> storageConfigForUnauthenticatedProxy() {
final var proxy = PROXY_UNAUTHENTICATED;
return Map.of(
"s3.bucket.name", bucketName,
"s3.region", LOCALSTACK.getRegion(),
"s3.endpoint.url", internalLocalstackEndpoint(),
"aws.access.key.id", LOCALSTACK.getAccessKey(),
"aws.secret.access.key", LOCALSTACK.getSecretKey(),
"s3.path.style.access.enabled", true,
"proxy.host", proxy.getHost(),
"proxy.port", proxy.getMappedPort(SOCKS5_PORT)
);
}

@Override
protected Map<String, Object> storageConfigForNoProxy() {
return Map.of(
"s3.bucket.name", bucketName,
"s3.region", LOCALSTACK.getRegion(),
"s3.endpoint.url", internalLocalstackEndpoint(),
"aws.access.key.id", LOCALSTACK.getAccessKey(),
"aws.secret.access.key", LOCALSTACK.getSecretKey(),
"s3.path.style.access.enabled", true
);
}

@Disabled("Not applicable for S3")
@Override
protected void doesNotWorkWithoutProxy() {
// Unfortunately, S3 does the client-side hostname resolution,
// so the trick with using the hostname visible only in Docker (i.e. to the proxy containers) won't work.
}

@Override
protected Iterable<String> possibleRootCauseMessagesWhenNoProxy() {
return null;
}
}
Loading
Loading