Skip to content

Commit

Permalink
KAFKA-14594: Move LogDirsCommand to tools module (#13122)
Browse files Browse the repository at this point in the history
Reviewers: Mickael Maison <mickael.maison@gmail.com>
  • Loading branch information
clolov committed May 4, 2023
1 parent d46c3f2 commit dc7819d
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 211 deletions.
2 changes: 1 addition & 1 deletion bin/kafka-log-dirs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.admin.LogDirsCommand "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.LogDirsCommand "$@"
2 changes: 1 addition & 1 deletion bin/windows/kafka-log-dirs.bat
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

"%~dp0kafka-run-class.bat" kafka.admin.LogDirsCommand %*
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.LogDirsCommand %*
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,37 @@ synchronized public AlterReplicaLogDirsResult alterReplicaLogDirs(
@Override
synchronized public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers,
DescribeLogDirsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
Map<Integer, Map<String, LogDirDescription>> unwrappedResults = new HashMap<>();

for (Integer broker : brokers) {
unwrappedResults.putIfAbsent(broker, new HashMap<>());
}

for (Map.Entry<String, TopicMetadata> entry : allTopics.entrySet()) {
String topicName = entry.getKey();
TopicMetadata topicMetadata = entry.getValue();
// For tests, we make the assumption that there will always be only 1 entry.
List<String> partitionLogDirs = topicMetadata.partitionLogDirs;
List<TopicPartitionInfo> topicPartitionInfos = topicMetadata.partitions;
for (TopicPartitionInfo topicPartitionInfo : topicPartitionInfos) {
List<Node> nodes = topicPartitionInfo.replicas();
for (Node node : nodes) {
Map<String, LogDirDescription> logDirDescriptionMap = unwrappedResults.get(node.id());
LogDirDescription logDirDescription = logDirDescriptionMap.getOrDefault(partitionLogDirs.get(0), new LogDirDescription(null, new HashMap<>()));
logDirDescription.replicaInfos().put(new TopicPartition(topicName, topicPartitionInfo.partition()), new ReplicaInfo(0, 0, false));
}
}
}

Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> results = new HashMap<>();

for (Map.Entry<Integer, Map<String, LogDirDescription>> entry : unwrappedResults.entrySet()) {
KafkaFutureImpl<Map<String, LogDirDescription>> kafkaFuture = new KafkaFutureImpl<>();
kafkaFuture.complete(entry.getValue());
results.put(entry.getKey(), kafkaFuture);
}

return new DescribeLogDirsResult(results);
}

@Override
Expand Down
132 changes: 0 additions & 132 deletions core/src/main/scala/kafka/admin/LogDirsCommand.scala

This file was deleted.

76 changes: 0 additions & 76 deletions core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala

This file was deleted.

0 comments on commit dc7819d

Please sign in to comment.