Skip to content

Commit

Permalink
KAFKA-14591: Move DeleteRecordsCommand to tools (apache#13278)
Browse files Browse the repository at this point in the history
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Federico Valeri <fedevaleri@gmail.com>
  • Loading branch information
nizhikov authored and Cerchie committed Jul 25, 2023
1 parent 53f2c19 commit 7302324
Show file tree
Hide file tree
Showing 21 changed files with 408 additions and 168 deletions.
2 changes: 1 addition & 1 deletion bin/kafka-delete-records.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.admin.DeleteRecordsCommand "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.DeleteRecordsCommand "$@"
2 changes: 1 addition & 1 deletion bin/windows/kafka-delete-records.bat
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

"%~dp0kafka-run-class.bat" kafka.admin.DeleteRecordsCommand %*
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.DeleteRecordsCommand %*
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package kafka.admin

import java.util.Random

import kafka.utils.Logging
import org.apache.kafka.common.errors.{InvalidPartitionsException, InvalidReplicationFactorException}
import org.apache.kafka.server.common.AdminOperationException

import collection.{Map, mutable, _}

Expand Down
137 changes: 0 additions & 137 deletions core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.admin
import java.util.Properties
import java.util.concurrent.ExecutionException
import joptsimple.util.EnumConverter
import kafka.common.AdminCommandFailedException
import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import kafka.utils.Json
Expand All @@ -31,6 +30,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.errors.ElectionNotNeededException
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}

import scala.jdk.CollectionConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.admin
import java.util
import java.util.Optional
import java.util.concurrent.ExecutionException
import kafka.common.AdminCommandFailedException
import kafka.server.DynamicConfig
import kafka.utils.{CoreUtils, Exit, Json, Logging}
import kafka.utils.Implicits._
Expand All @@ -30,6 +29,7 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, KafkaFuture, TopicPartition, TopicPartitionReplica}
import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package kafka.admin
import java.util
import java.util.{Collections, Optional, Properties}
import joptsimple._
import kafka.common.AdminCommandFailedException
import kafka.utils._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.CreatePartitionsOptions
Expand All @@ -33,6 +32,7 @@ import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, TopicExistsException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.server.util.TopicFilter.IncludeList
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.controller
import com.yammer.metrics.core.Timer

import java.util.concurrent.TimeUnit
import kafka.admin.AdminOperationException
import kafka.api._
import kafka.common._
import kafka.cluster.Broker
Expand All @@ -45,7 +44,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.ProducerIdsBlock
import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.zookeeper.KeeperException
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/ZkAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package kafka.server

import java.util
import java.util.Properties
import kafka.admin.{AdminOperationException, AdminUtils}
import kafka.admin.AdminUtils
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs, toLoggableProps}
import kafka.server.DynamicConfig.QuotaConfigs
Expand Down Expand Up @@ -47,6 +47,7 @@ import org.apache.kafka.common.requests.CreateTopicsRequest._
import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError}
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter}
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.storage.internals.log.LogConfig

import scala.collection.{Map, mutable, _}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/zk/AdminZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package kafka.zk

import java.util.Properties
import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata, RackAwareMode}
import kafka.admin.{AdminUtils, BrokerMetadata, RackAwareMode}
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.controller.ReplicaAssignment
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
Expand All @@ -26,6 +26,7 @@ import kafka.utils.Implicits._
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.KeeperException.NodeExistsException

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic}
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.apache.kafka.server.common.AdminOperationException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package kafka.admin

import kafka.common.AdminCommandFailedException
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.server.common.AdminCommandFailedException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package kafka.admin

import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path}

import kafka.common.AdminCommandFailedException
import kafka.server.IntegrationTestUtils.createTopic
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
Expand All @@ -29,6 +27,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.server.common.AdminCommandFailedException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Tag}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package kafka.admin
import java.util.concurrent.ExecutionException
import java.util.{Arrays, Collections}
import kafka.admin.ReassignPartitionsCommand._
import kafka.common.AdminCommandFailedException
import kafka.utils.Exit
import org.apache.kafka.clients.admin.{Config, MockAdminClient, PartitionReassignment}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidReplicationFactorException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo, TopicPartitionReplica}
import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package kafka.admin

import kafka.admin.TopicCommand.{PartitionDescription, TopicCommandOptions, TopicService}
import kafka.common.AdminCommandFailedException
import kafka.utils.Exit
import org.apache.kafka.clients.admin.{Admin, AdminClientTestUtils, CreatePartitionsOptions, CreateTopicsOptions, DeleteTopicsOptions, NewPartitions, NewTopic, PartitionReassignment, TopicDescription}
import org.apache.kafka.common.Node
import org.apache.kafka.common.TopicPartitionInfo
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*/
package kafka.server

import kafka.admin.AdminOperationException
import kafka.utils.CoreUtils._
import kafka.server.QuorumTestHarness
import org.apache.kafka.common.config._
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.server.common.AdminOperationException
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Test

Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
Expand All @@ -15,9 +15,14 @@
* limitations under the License.
*/

package kafka.admin
package org.apache.kafka.server.common;

class AdminOperationException(val error: String, cause: Throwable) extends RuntimeException(error, cause) {
def this(error: Throwable) = this(error.getMessage, error)
def this(msg: String) = this(msg, null)
}
public class AdminCommandFailedException extends RuntimeException {
public AdminCommandFailedException(String message) {
super(message);
}

public AdminCommandFailedException(String message, Throwable cause) {
super(message, cause);
}
}

0 comments on commit 7302324

Please sign in to comment.