Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15167: Tiered Storage Test Harness Framework #14116

Merged
merged 9 commits into from Aug 20, 2023
Merged
9 changes: 9 additions & 0 deletions build.gradle
Expand Up @@ -1694,6 +1694,10 @@ project(':storage:api') {
javadoc {
include "**/org/apache/kafka/server/log/remote/storage/*"
}

checkstyle {
configProperties = checkstyleConfigProperties("import-control-storage.xml")
}
}

project(':storage') {
Expand All @@ -1718,6 +1722,7 @@ project(':storage') {
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation libs.hamcrest
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.bcpkix
Expand Down Expand Up @@ -1796,6 +1801,10 @@ project(':storage') {
javadoc {
enabled = false
}

checkstyle {
configProperties = checkstyleConfigProperties("import-control-storage.xml")
}
}

project(':tools:tools-api') {
Expand Down
132 changes: 132 additions & 0 deletions checkstyle/import-control-storage.xml
@@ -0,0 +1,132 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<import-control pkg="org.apache.kafka">

<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->

<!-- common library dependencies -->
<allow pkg="java" />
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="org.opentest4j" />
<allow pkg="org.hamcrest" />
<allow pkg="org.mockito" />
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
<allow pkg="org.ietf.jgss" />
<allow pkg="net.jqwik.api" />

<!-- no one depends on the server -->
<disallow pkg="kafka" />

<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.memory" />


<subpackage name="server">
<allow pkg="org.apache.kafka.common" />

<subpackage name="log">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.api" />
<allow pkg="kafka.utils" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.log" />
<allow pkg="org.apache.kafka.server.record" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.storage"/>
<subpackage name="remote">
<allow pkg="scala.collection" />
<subpackage name="storage">
<allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>
</subpackage>
</subpackage>

<subpackage name="storage.internals">
<allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage.internals"/>
<allow pkg="org.apache.kafka.common" />
<allow pkg="com.github.benmanes.caffeine.cache" />
</subpackage>

<!-- START OF TIERED STORAGE INTEGRATION TEST IMPORT DEPENDENCIES -->
<subpackage name="tiered.storage">
<allow pkg="scala" />

<allow pkg="org.apache.kafka.tiered.storage" />
<allow pkg="org.apache.kafka.tiered.storage.actions" />
<allow pkg="org.apache.kafka.tiered.storage.specs" />
<allow pkg="org.apache.kafka.tiered.storage.utils" />

<allow pkg="kafka.api" />
<allow pkg="kafka.log" />
<allow pkg="kafka.server" />
<allow pkg="kafka.utils" />

<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.replica" />
<allow pkg="org.apache.kafka.common.network" />

<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.clients.producer" />

<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.storage"/>
<allow pkg="org.apache.kafka.storage.internals.log" />

<allow pkg="org.apache.kafka.server.log" />
<allow pkg="org.apache.kafka.server.log.remote" />
<allow pkg="org.apache.kafka.server.log.remote.storage" />

<allow pkg="org.apache.kafka.test" />
<subpackage name="actions">
</subpackage>

<subpackage name="specs">
</subpackage>

<subpackage name="utils">
</subpackage>

<subpackage name="integration">
</subpackage>
</subpackage>
<!-- END OF TIERED STORAGE INTEGRATION TEST IMPORT DEPENDENCIES -->

</import-control>
29 changes: 0 additions & 29 deletions checkstyle/import-control.xml
Expand Up @@ -247,35 +247,6 @@

<!-- This is required to make AlterConfigPolicyTest work. -->
<allow pkg="org.apache.kafka.server.policy" />

<subpackage name="log">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.api" />
<allow pkg="kafka.utils" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.log" />
<allow pkg="org.apache.kafka.server.record" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.storage"/>
<subpackage name="remote">
<allow pkg="scala.collection" />
<subpackage name="storage">
<allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>

</subpackage>
</subpackage>

<subpackage name="storage.internals">
<allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage.internals"/>
<allow pkg="org.apache.kafka.common" />
<allow pkg="com.github.benmanes.caffeine.cache" />
</subpackage>

<subpackage name="shell">
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaBroker.scala
Expand Up @@ -19,6 +19,7 @@ package kafka.server

import com.yammer.metrics.core.MetricName
import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
kamalcph marked this conversation as resolved.
Show resolved Hide resolved
import kafka.metrics.LinuxIoMetricsCollector
import kafka.network.SocketServer
import kafka.security.CredentialProvider
Expand Down Expand Up @@ -79,6 +80,7 @@ trait KafkaBroker extends Logging {
def kafkaScheduler: Scheduler
def kafkaYammerMetrics: KafkaYammerMetrics
def logManager: LogManager
def remoteLogManagerOpt: Option[RemoteLogManager]
def metrics: Metrics
def quotaManagers: QuotaFactory.QuotaManagers
def replicaManager: ReplicaManager
Expand Down
Expand Up @@ -258,13 +258,20 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
}

def killBroker(index: Int): Unit = {
if(alive(index)) {
if (alive(index)) {
_brokers(index).shutdown()
_brokers(index).awaitShutdown()
alive(index) = false
}
}

def startBroker(index: Int): Unit = {
if (!alive(index)) {
_brokers(index).startup()
alive(index) = true
}
}

/**
* Restart any dead brokers
*/
Expand Down
Expand Up @@ -26,9 +26,9 @@ public interface LocalTieredStorageTraverser {

/**
* Called when a new topic-partition stored on the remote storage is discovered.
* @param topicPartition The new topic-partition discovered.
* @param topicIdPartition The new topic-partition discovered.
*/
void visitTopicIdPartition(TopicIdPartition topicPartition);
void visitTopicIdPartition(TopicIdPartition topicIdPartition);

/**
* Called when a new segment is discovered for a given topic-partition.
Expand Down
11 changes: 11 additions & 0 deletions storage/src/test/java/org/apache/kafka/tiered/storage/README.md
@@ -0,0 +1,11 @@
# The Test Flow

Step 1: For every test, setup is done via TieredStorageTestHarness which extends IntegrationTestHarness and sets up a cluster with TS enabled on it.
kamalcph marked this conversation as resolved.
Show resolved Hide resolved

Step 2: The test is written as a specification consisting of sequential actions and assertions. The spec for the complete test is written down first which creates "actions" to be executed.

Step 3: Once we have the test spec in-place (which includes assertion actions), we execute the test which will execute each action sequentially.

Step 4: The test execution stops when any of the action throws an exception (or an assertion error).

Step 5: Clean-up for the test is performed on test exit
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage;

import java.io.PrintStream;

public interface TieredStorageTestAction {

default void execute(TieredStorageTestContext context) throws Exception {
try {
doExecute(context);
context.succeed(this);
} catch (Exception e) {
context.fail(this);
throw e;
}
}

void doExecute(TieredStorageTestContext context) throws Exception;

void describe(PrintStream output);
}