<a href="https://colab.research.google.com/github/Linho1150/kafka-python-produce-consumer/blob/main/kafka-producer-consumer-example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install kafka-python

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[K     |████████████████████████████████| 246 kB 4.9 MB/s 
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


In [12]:
from datetime import datetime
import threading, time

from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic

#카프카 프로듀서 운영 객체
class Producer():
    bootstrap_servers='43.201.95.87:9092'
    def sendMessage(self,topic_name:str, message:str):
        '''Set Message to Producer
            :param str topic_name: Name of the topic
            :param str message: a message you want to send
            :returns: producer send result
        '''
        producer = KafkaProducer(bootstrap_servers=Producer.bootstrap_servers)
        producer.send(topic_name, message.encode('utf-8'))
        producer.close()

#카프카 스트리밍 프로듀서 운영 객체
class StreamingProducer():
    bootstrap_servers='43.201.95.87:9092'
    def sendMessage(self, message:str):
        '''Set Message to Producer (with Kafka Streaming topic)
            :param str message: a message you want to send
            :returns: producer send result
        '''
        producer = KafkaProducer(bootstrap_servers=Producer.bootstrap_servers)
        producer.send('streams-plaintext-input', message.encode('utf-8'))
        producer.close()

#카프카 컨슈머 객체
class Consumer():
    bootstrap_servers = '43.201.95.87:9092'
    def getMessage(self, topic_name:str):
      '''Get Consumer All Message
          :param str topic_name: Name of the topic
          :returns: consmer message list
      '''
      consumer = KafkaConsumer(bootstrap_servers=Consumer.bootstrap_servers,
                                auto_offset_reset='earliest',
                                consumer_timeout_ms=1000)
      consumer.subscribe([topic_name])
      for message in consumer:
        print("Topic: ",message.topic,"Message: ",message.value)
      consumer.close()

#카프카 토픽 운영 객체
class KafkaTopicAdministrator:
    kafkaAdmin = KafkaAdminClient(bootstrap_servers='43.201.95.87:9092')

    def setKafkaTopic(self, topic_name: str): # Set topic
        '''Set topic list
            :param str topic_name: Name of the topic you want to create
            :returns: create result
        '''
        if not self.isKafkaTopic(topic_name):
            topic = NewTopic(name=topic_name,
                             num_partitions=1,
                             replication_factor=1)
            return KafkaTopicAdministrator.kafkaAdmin.create_topics([topic])
        raise Exception("already exist")

    def getKafkaTopicList(self) -> list:
        '''Get topic list
            :returns: topic List
        '''
        return KafkaTopicAdministrator.kafkaAdmin.list_topics()

    def deleteKafkaTopic(self, topic_name: str) -> list:
        '''Check if topic exists
            :param str topic_name: Name of the topic you want to remove
            :returns: delete result
        '''   
        if self.isKafkaTopic(topic_name):
            return KafkaTopicAdministrator.kafkaAdmin.delete_topics([topic_name])
        raise Exception("no topic")

    def isKafkaTopic(self, topic_name: str):
        '''Check if topic exists
            :param str topic_name: The topic name you want to check
            :returns: topic lists
        '''  
        if topic_name in KafkaTopicAdministrator.kafkaAdmin.list_topics():
            return True
        return False


if __name__ == "__main__":
    # Kafka Topic Manager
    kafkaAdmin = KafkaTopicAdministrator() # Kafka 관리자 객체 생성
    kafkaAdmin.setKafkaTopic("my-kafka-example") # Kafka 토픽 생성
    kafkaAdmin.deleteKafkaTopic("my-kafka-example") # Kafka 토픽 제거
    print(kafkaAdmin.getKafkaTopicList()) #Kafka 토픽 리스트

    # Kafka Producer & Consumer
    producer=Producer()
    producer.sendMessage("my-kafka","Kafka")
    consumer=Consumer()
    consumer.getMessage("my-kafka")

    # Kafka Streaming Producer
    producer = KafkaProducer(bootstrap_servers='43.201.95.87:9092')
    streamingProducer=StreamingProducer()
    while True:
      # exit is break
      inputTxt=input()
      if inputTxt == "exit":
        break
      streamingProducer.sendMessage(inputTxt)



['streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition', 'my-kafka', 'connect-test', 'streams-plaintext-input', '__consumer_offsets', 'streams-wordcount-output', 'streams-planintext-input', 'streams-planintext-output', 'streams-plaintext-output', 'streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog']




Topic:  my-kafka Message:  b'Hello'
Topic:  my-kafka Message:  b'Hello'
Topic:  my-kafka Message:  b'Hello'
Topic:  my-kafka Message:  b'Hello'
Topic:  my-kafka Message:  b'Hello'
Topic:  my-kafka Message:  b'Hello'
Topic:  my-kafka Message:  b'Hello'
Topic:  my-kafka Message:  b'Hello'
Topic:  my-kafka Message:  b'Hello'
Topic:  my-kafka Message:  b'Hello'
Topic:  my-kafka Message:  b'Hello'
Topic:  my-kafka Message:  b'Kafka'
Topic:  my-kafka Message:  b'Kafka'
Topic:  my-kafka Message:  b'Kafka'
Topic:  my-kafka Message:  b'Kafka'
Topic:  my-kafka Message:  b'Kafka'
Topic:  my-kafka Message:  b'Kafka'
Topic:  my-kafka Message:  b'Kafka'


ERROR:kafka.consumer.fetcher:Fetch to node 1 failed: Cancelled: <BrokerConnection node_id=1 host=43.201.95.87:9092 <connected> [IPv4 ('43.201.95.87', 9092)]>


exit
