Skip to content

ChristopherHaydenTodd/ctodd-python-lib-kafka

Repository files navigation

Christopher H. Todd's Python Library For Interacting With Kafka

The ctodd-python-lib-kafka project is responsible for interacting with Apache Kafka. This includes producing and consuming records from topics, utilizing .avro format, and other tasks in creating event driven applications with Python.

Table of Contents

Dependencies

Python Packages

  • confluent-kafka==0.11.6
  • simplejson==3.16.0

Libraries

This library is used to interacting with Kafka Admin functionality. This includes getting the admin object that will return details about kafka state.

Functions:

def get_kafka_admin_client(kafka_brokers):
    """
    Purpose:
        Get a Kafka Admin Client Object. Allows for polling information about Kafka
        configuration and creating objects in Kafka
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa
            brokers
    Return:
        kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
            brokers
    """

This library is used to aid in creating kafka consumers.

Functions:

def get_kafka_consumer(
    kafka_brokers,
    consumer_group="default",
    timeout=6000,
    offset_start="latest",
    get_stats=True
):
    """
    Purpose:
        Get a Kafka Consumer Object (not yet connected to a topic)
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
        consumer_group (String): Consumer group to consume as. default is "default"
        timeout (String): Timeout in ms if no messages are found (during poll). Default
            is 6000
        offset_start (String): Where to start consuming with respect to the consumer
            group/topic offset. Default is "latest", which ignores any messages in the
            topic before the consumer begins consuming
        get_stats (Bool): Whether or not to print statistics. Default is True
    Return:
        kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
    """
def consume_topic(kafka_consumer, kafka_topics):
    """
    Purpose:
        Consume Kafka Topics
    Args:
        kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
        kafka_topics (List of Strings): List of Kafka Topics to Consume.
    Yields:
        msg (Kafka Message Obj): Message Obj returned from the topic
    """

File for holding custom exception types that will be generated by the kafka_helpers libraries

Classes:

class TopicNotFound(Exception):
    """
    Purpose:
        The TopicNotFound will be raised when attempting to consume a topic that
        does not exist
    """

This library is used to interact with kafka not specificlly related to consuming or producing messages

Functions:

N/A

This library is used to aid in creating kafka producers.

Functions:

def get_kafka_producer(kafka_brokers, get_stats=True):
    """
    Purpose:
        Get a Kafka Producer Object (not yet connected to a topic)
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
        get_stats (Bool): Whether or not to print statistics. Default is True
    Return:
        kafka_producer (Kafka Producer Obj): Kafka Producer Object
    """
def produce_message(kafka_producer, kafka_topic, msg):
    """
    Purpose:
        Consume Kafka Topics
    Args:
        kafka_producer (Kafka Producer Obj): Kafka Producer Object
        kafka_topic (String): Kafka Topic to Produce message to.
        msg (String): Message to produce to Kafka
    Returns:
        N/A
    """
def produce_results_callback(err, msg):
    """
    Purpose:
        Optional per-message delivery callback (triggered by poll() or
        flush()) when a message has been successfully delivered or
        permanently failed delivery (after retries).
    Args:
        err (String): Error Message
        msg (Object): Kafka Callback Message Object
    Return:
        N/A
    """

This library is used to interact with kafka topics. This includes getting a list of the topics, finding details about a topic, creating topics, and more.

Functions:

def get_topics(kafka_admin_client, return_system_topics=False):
    """
    Purpose:
        Get a List of Kafka Topics.
    Args:
        kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
            brokers
    Return:
        kafka_topics (Dict of Kafka Topics): Key is the topic name and value is a
            Kafka metadata object that has basic topic information
    """
def create_kafka_topic(
    kafka_admin_client, topic_name, topic_replication=1, topic_partitions=1
):
    """
    Purpose:
        Create a Kafka Topic
    Args:
        kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
            brokers
        topic_name (String): Name of the topic to create
        topic_replication (Int): Replication factor for the new topic
        topic_partitions (Int): Number of partitions to devide the topic into
    Return:
        N/A
    """

Example Scripts

Example executable Python scripts/modules for testing and interacting with the library. These show example use-cases for the libraries and can be used as templates for developing with the libraries or to use as one-off development efforts.

    Purpose:
        Consume from a Kafka Topic

    Steps:
        - Connect to Kafka
        - Create Consumer Object
        - Poll Topic
        - Parse Message
        - Print Message

    example script call:
        python3 consume_from_kafka_topic.py --topic="test-env-topic" \
            --broker="0.0.0.0:9092" --consumer-group="test-env-consumer"
    Purpose:
        Produce to a Kafka Topic

    Steps:
        - Connect to Kafka
        - Create Producer Object
        - Prompt for Input
        - Parse Input
        - Produce Input to Kafka

    example script call:
        python3 produce_to_kafka_topic.py --topic="test-env-topic" \
            --broker="localhost:9092"
    Purpose:
        Create a Kafka Topic. Takes in replication and parition information

    Steps:
        - Connect to Kafka
        - Create Kafka Admin Client
        - Create Topic In Kafka

    function call:
        ---
    example script call:
        python3 create_kafka_topic.py --topic-name="test-env-topic" \
            --topic-replication=3 --topic-partitions=4 \
            --broker="localhost:9092"

Notes

  • Relies on f-string notation, which is limited to Python3.6. A refactor to remove these could allow for development with Python3.0.x through 3.5.x

TODO

  • Unittest framework in place, but lacking tests