Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #750 from Parsely/feature/topics_apis
Browse files Browse the repository at this point in the history
Topics APIs
  • Loading branch information
Emmett J. Butler committed Dec 18, 2017
2 parents 5122395 + fff9fe3 commit 8a78285
Show file tree
Hide file tree
Showing 4 changed files with 349 additions and 4 deletions.
35 changes: 33 additions & 2 deletions pykafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
SyncGroupRequest, SyncGroupResponse, HeartbeatRequest, HeartbeatResponse,
LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse,
DescribeGroupsRequest, DescribeGroupsResponse, ApiVersionsRequest,
ApiVersionsResponse)
ApiVersionsResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest,
DeleteTopicsResponse)
from .utils.compat import range, iteritems, get_bytes

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -552,8 +553,38 @@ def describe_groups(self, group_ids):
future = self._req_handler.request(DescribeGroupsRequest(group_ids))
return future.get(DescribeGroupsResponse)

############################
# Create/Delete Topics API #
############################
@_check_handler
def create_topics(self, topic_reqs, timeout=0):
"""Create topics via the Topic Creation API
:param topic_reqs: The topic creation requests to issue
:type topics: Iterable of :class:`pykafka.protocol.CreateTopicRequest`
:param timeout: The time in ms to wait for a topic to be completely created.
Values <= 0 will trigger topic creation and return immediately.
:type timeout: int
"""
future = self._req_handler.request(CreateTopicsRequest(topic_reqs,
timeout=timeout))
return future.get(CreateTopicsResponse)

@_check_handler
def delete_topics(self, topics, timeout=0):
"""Delete topics viaa the Topic Deletion API
:param topics: The names of the topics to delete
:type topics: Iterable of str
:param timeout: The time in ms to wait for a topic to be completely deleted.
Values <= 0 will trigger topic deletion and return immediately.
:type timeout: int
"""
future = self._req_handler.request(DeleteTopicsRequest(topics, timeout=timeout))
return future.get(DeleteTopicsResponse)

@_check_handler
def fetch_api_versions(self):
"""Send an ApiVersionsRequest"""
"""Fetch supported API versions from this broker"""
future = self._req_handler.request(ApiVersionsRequest())
return future.get(ApiVersionsResponse.get_version_impl(self._api_versions))
70 changes: 68 additions & 2 deletions pykafka/cli/kafka_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import datetime as dt
import sys
import time
from pkg_resources import parse_version

import tabulate

import pykafka
from pykafka.common import OffsetType
from pykafka.protocol import PartitionOffsetCommitRequest
from pykafka.protocol import PartitionOffsetCommitRequest, CreateTopicRequest
from pykafka.utils.compat import PY3, iteritems

#
Expand Down Expand Up @@ -261,6 +262,24 @@ def reset_offsets(client, args):
)


def create_topic(client, args):
if parse_version(args.broker_version) < parse_version('0.10.0'):
raise ValueError("The topic creation API is not usable on brokers older than "
"0.10.0. Use --broker_version to specify the version")
topic_req = CreateTopicRequest(args.topic, args.num_partitions,
args.replication_factor,
args.replica_assignment or [],
args.config_entries or [])
client.cluster.controller_broker.create_topics([topic_req], args.timeout)


def delete_topic(client, args):
if parse_version(args.broker_version) < parse_version('0.10.0'):
raise ValueError("The topic deletoin API is not usable on brokers older than "
"0.10.0. Use --broker_version to specify the version")
client.cluster.controller_broker.delete_topics([args.topic], args.timeout)


def _encode_utf8(string):
"""Converts argument to UTF-8-encoded bytes.
Expand All @@ -286,6 +305,13 @@ def _add_limit(parser):
type=int, default=10)


def _add_timeout(parser):
parser.add_argument('-t', '--timeout',
help='Time in ms to wait for the operation to complete'
'(default: %(default)s)',
type=int, default=5000)


def _add_offset(parser):
"""Add offset to arg parser."""
parser.add_argument('offset',
Expand Down Expand Up @@ -320,6 +346,11 @@ def _get_arg_parser():
dest='host',
help='host:port of any Kafka broker. '
'[default: localhost:9092]')
output.add_argument('-o', '--broker_version',
required=False,
default='0.9.0',
dest='broker_version',
help="The version string of the broker with which to communicate")

subparsers = output.add_subparsers(help='Commands', dest='command')

Expand Down Expand Up @@ -383,14 +414,49 @@ def _get_arg_parser():
_add_consumer_group(parser)
_add_offset(parser)

parser = subparsers.add_parser(
'create_topic',
help='Create a topic'
)
parser.set_defaults(func=create_topic)
_add_topic(parser)
_add_timeout(parser)
parser.add_argument('-p', '--num_partitions',
help='Number of partitions to be created. -1 indicates unset. '
'(default: %(default)s)',
type=int, default=1)
parser.add_argument('-r', '--replication_factor',
help='Replication factor for the topic. -1 indicates unset. '
'(default: %(default)s)',
type=int, default=1)
parser.add_argument('-a', '--replica_assignment',
help='Replica assignment among kafka brokers for this topic '
'partitions. If this is set num_partitions and '
'replication_factor must be unset. Represent as a JSON '
'object with partition IDs as keys as lists of node IDs '
'as values',
type=_encode_utf8)
parser.add_argument('-c', '--config_entries',
help='Topic level configuration for topic to be set. Represent '
'as a JSON object.',
type=_encode_utf8)

parser = subparsers.add_parser(
'delete_topic',
help='Delete a topic'
)
parser.set_defaults(func=delete_topic)
_add_topic(parser)
_add_timeout(parser)

return output


def main():
parser = _get_arg_parser()
args = parser.parse_args()
if args.command:
client = pykafka.KafkaClient(hosts=args.host)
client = pykafka.KafkaClient(hosts=args.host, broker_version=args.broker_version)
args.func(client, args)
else:
parser.print_help()
Expand Down
195 changes: 195 additions & 0 deletions pykafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -2123,6 +2123,201 @@ def __init__(self, buff):
self.groups[group.group_id] = group


_CreateTopicRequest = namedtuple(
'CreateTopicRequest',
['topic_name', 'num_partitions', 'replication_factor', 'replica_assignment',
'config_entries']
)


class CreateTopicRequest(_CreateTopicRequest):
def __new__(cls,
topic_name,
num_partitions,
replication_factor,
replica_assignment,
config_entries):
return super(CreateTopicRequest, cls).__new__(
cls, topic_name, num_partitions, replication_factor, replica_assignment,
config_entries)


class CreateTopicsRequest(Request):
"""A create topics request
Specification::
CreateTopics Request (Version: 0) => [create_topic_requests] timeout
create_topic_requests => topic num_partitions replication_factor [replica_assignment] [config_entries]
topic => STRING
num_partitions => INT32
replication_factor => INT16
replica_assignment => partition [replicas]
partition => INT32
replicas => INT32
config_entries => config_name config_value
config_name => STRING
config_value => NULLABLE_STRING
timeout => INT32
"""
API_KEY = 19

def __init__(self, topic_requests, timeout=0):
self.topic_requests = topic_requests
self.timeout = timeout

def __len__(self):
"""Length of the serialized message, in bytes"""
# header + len(topic_reqs)
size = self.HEADER_LEN + 4
for topic_req in self.topic_requests:
# len(topic_name) + topic_name
size += 2 + len(topic_req.topic_name)
# num_partitions + replication_factor + len(replica_assignment)
size += 4 + 2 + 4
for partition, replicas in topic_req.replica_assignment:
# partition + len(replicas) + replicas
size += 4 + 4 + 4 * len(replicas)
# len(config_entries)
size += 4
for config_name, config_value in topic_req.config_entries:
# len(config_name) + config_name + len(config_value) + config_value
size += 2 + len(config_name) + 2 + len(config_value)
# timeout
size += 4
return size

def get_bytes(self):
"""Create a new create topics request"""
output = bytearray(len(self))
self._write_header(output)
offset = self.HEADER_LEN
fmt = '!i'
struct.pack_into(fmt, output, offset, len(self.topic_requests))
offset += struct.calcsize(fmt)
for topic_req in self.topic_requests:
fmt = '!h%dsihi' % len(topic_req.topic_name)
struct.pack_into(fmt, output, offset, len(topic_req.topic_name),
topic_req.topic_name, topic_req.num_partitions,
topic_req.replication_factor,
len(topic_req.replica_assignment))
offset += struct.calcsize(fmt)
for partition, replicas in topic_req.replica_assignment:
fmt = '!ii'
struct.pack_into(fmt, output, offset, partition, len(replicas))
offset += struct.calcsize(fmt)
for replica in replicas:
fmt = '!i'
struct.pack_into(fmt, output, offset, replica)
offset += struct.calcsize(fmt)
fmt = '!i'
struct.pack_into(fmt, output, offset, len(topic_req.config_entries))
offset += struct.calcsize(fmt)
for config_name, config_value in topic_req.config_entries:
fmt = '!h%dsh%ds' % (len(config_name), len(config_value))
struct.pack_into(fmt, output, offset, len(config_name), config_name,
len(config_value), config_value)
offset += struct.calcsize(fmt)
fmt = '!i'
struct.pack_into(fmt, output, offset, self.timeout)
offset += struct.calcsize(fmt)
return output


class CreateTopicsResponse(Response):
"""A create topics response
Specification::
CreateTopics Response (Version: 0) => [topic_errors]
topic_errors => topic error_code
topic => STRING
error_code => INT16
"""
API_KEY = 19

def __init__(self, buff):
"""Deserialize into a new Response
:param buff: Serialized message
:type buff: :class:`bytearray`
"""
fmt = '[Sh]'
response = struct_helpers.unpack_from(fmt, buff, 0)
for _, error_code in response:
if error_code != 0:
self.raise_error(error_code, response)


class DeleteTopicsRequest(Request):
"""A delete topics request
Specification::
DeleteTopics Request (Version: 0) => [topics] timeout
topics => STRING
timeout => INT32
"""
API_KEY = 20

def __init__(self, topics, timeout=0):
self.topics = topics
self.timeout = timeout

def __len__(self):
"""Length of the serialized message, in bytes"""
# header + len(topics)
size = self.HEADER_LEN + 4
for topic in self.topics:
# len(topic) + group_id
size += 2 + len(topic)
# timeout
size += 4
return size

def get_bytes(self):
"""Create a new delete topics request"""
output = bytearray(len(self))
self._write_header(output)
offset = self.HEADER_LEN
fmt = '!i'
struct.pack_into(fmt, output, offset, len(self.topics))
offset += struct.calcsize(fmt)
for topic in self.topics:
fmt = '!h%ds' % len(topic)
struct.pack_into(fmt, output, offset, len(topic), topic)
offset += struct.calcsize(fmt)
fmt = '!i'
struct.pack_into(fmt, output, offset, self.timeout)
offset += struct.calcsize(fmt)
return output


class DeleteTopicsResponse(Response):
"""A delete topics response
Specification::
DeleteTopics Response (Version: 0) => [topic_error_codes]
topic_error_codes => topic error_code
topic => STRING
error_code => INT16
"""
API_KEY = 20

def __init__(self, buff):
"""Deserialize into a new Response
:param buff: Serialized message
:type buff: :class:`bytearray`
"""
fmt = '[Sh]'
response = struct_helpers.unpack_from(fmt, buff, 0)
for _, error_code in response:
if error_code != 0:
self.raise_error(error_code, response)


class ApiVersionsRequest(Request):
"""An api versions request
Expand Down

0 comments on commit 8a78285

Please sign in to comment.