Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-2983: Remove Scala consumers and related code #5230

Merged
merged 16 commits into from Jun 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 0 additions & 17 deletions bin/kafka-simple-consumer-shell.sh

This file was deleted.

17 changes: 0 additions & 17 deletions bin/windows/kafka-simple-consumer-shell.bat

This file was deleted.

20 changes: 0 additions & 20 deletions checkstyle/import-control-core.xml
Expand Up @@ -38,26 +38,9 @@
<allow pkg="kafka.serializer" />
<allow pkg="org.apache.kafka.common" />

<subpackage name="javaapi">
<subpackage name="consumer">
<allow pkg="kafka.consumer" />
</subpackage>

<subpackage name="message">
<allow pkg="kafka.message" />
</subpackage>

<subpackage name="producer">
<allow pkg="kafka.producer" />
</subpackage>
</subpackage>

<subpackage name="tools">
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="kafka.admin" />
<allow pkg="kafka.javaapi" />
<allow pkg="kafka.producer" />
<allow pkg="kafka.consumer" />
<allow pkg="joptsimple" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow class="javax.xml.datatype.Duration" />
Expand All @@ -71,9 +54,6 @@

<subpackage name="examples">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="kafka.api" />
<allow pkg="kafka.javaapi" />
<allow pkg="kafka.message" />
</subpackage>

</import-control>
Expand Up @@ -19,7 +19,6 @@
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ScatteringByteChannel;
import org.apache.kafka.common.memory.MemoryPool;
import org.slf4j.Logger;
Expand Down Expand Up @@ -90,33 +89,6 @@ public boolean complete() {
}

public long readFrom(ScatteringByteChannel channel) throws IOException {
return readFromReadableChannel(channel);
}

@Override
public boolean requiredMemoryAmountKnown() {
return requestedBufferSize != -1;
}

@Override
public boolean memoryAllocated() {
return buffer != null;
}


@Override
public void close() throws IOException {
if (buffer != null && buffer != EMPTY_BUFFER) {
memoryPool.release(buffer);
buffer = null;
}
}

// Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout
// See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
// This can go away after we get rid of BlockingChannel
@Deprecated
public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
int read = 0;
if (size.hasRemaining()) {
int bytesRead = channel.read(size);
Expand Down Expand Up @@ -151,6 +123,25 @@ public long readFromReadableChannel(ReadableByteChannel channel) throws IOExcept
return read;
}

@Override
public boolean requiredMemoryAmountKnown() {
return requestedBufferSize != -1;
}

@Override
public boolean memoryAllocated() {
return buffer != null;
}


@Override
public void close() throws IOException {
if (buffer != null && buffer != EMPTY_BUFFER) {
memoryPool.release(buffer);
buffer = null;
}
}

public ByteBuffer payload() {
return this.buffer;
}
Expand Down
77 changes: 9 additions & 68 deletions core/src/main/scala/kafka/admin/AdminUtils.scala
Expand Up @@ -363,76 +363,17 @@ object AdminUtils extends Logging with AdminUtilities {

@deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def deleteTopic(zkUtils: ZkUtils, topic: String) {
if (topicExists(zkUtils, topic)) {
try {
zkUtils.createPersistentPath(getDeleteTopicPath(topic))
} catch {
case _: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
"topic %s is already marked for deletion".format(topic))
case e2: Throwable => throw new AdminOperationException(e2)
}
} else {
throw new UnknownTopicOrPartitionException(s"Topic `$topic` to delete does not exist")
if (topicExists(zkUtils, topic)) {
try {
zkUtils.createPersistentPath(getDeleteTopicPath(topic))
} catch {
case _: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
"topic %s is already marked for deletion".format(topic))
case e2: Throwable => throw new AdminOperationException(e2)
}
} else {
throw new UnknownTopicOrPartitionException(s"Topic `$topic` to delete does not exist")
}

@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def isConsumerGroupActive(zkUtils: ZkUtils, group: String) = {
zkUtils.getConsumersInGroup(group).nonEmpty
}

/**
* Delete the whole directory of the given consumer group if the group is inactive.
*
* @param zkUtils Zookeeper utilities
* @param group Consumer group
* @return whether or not we deleted the consumer group information
*/
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def deleteConsumerGroupInZK(zkUtils: ZkUtils, group: String) = {
if (!isConsumerGroupActive(zkUtils, group)) {
val dir = new ZKGroupDirs(group)
zkUtils.deletePathRecursive(dir.consumerGroupDir)
true
}
else false
}

/**
* Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive.
* If the consumer group consumes no other topics, delete the whole consumer group directory.
*
* @param zkUtils Zookeeper utilities
* @param group Consumer group
* @param topic Topic of the consumer group information we wish to delete
* @return whether or not we deleted the consumer group information for the given topic
*/
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def deleteConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, group: String, topic: String) = {
val topics = zkUtils.getTopicsByConsumerGroup(group)
if (topics == Seq(topic)) {
deleteConsumerGroupInZK(zkUtils, group)
}
else if (!isConsumerGroupActive(zkUtils, group)) {
val dir = new ZKGroupTopicDirs(group, topic)
zkUtils.deletePathRecursive(dir.consumerOwnerDir)
zkUtils.deletePathRecursive(dir.consumerOffsetDir)
true
}
else false
}

/**
* Delete every inactive consumer group's information about the given topic in Zookeeper.
*
* @param zkUtils Zookeeper utilities
* @param topic Topic of the consumer group information we wish to delete
*/
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String): Set[String] = {
val groups = zkUtils.getAllConsumerGroupsForTopic(topic)
groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
groups
}

def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
Expand Down