In [1]:
# Twitter Streaming API
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream

# API Key & Token Info
import const

# Kafka
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError

import json

In [2]:
class Producer():
    def __init__(self, bootstrap_servers):
        try:
            self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                                          max_block_ms = 10000,
                                          retries = 0, 
                                          acks = 1, 
                                          value_serializer = lambda v: json.dumps(v).encode("utf-8"))
        except KafkaError as exc:
            print(f"kafka producer - Exception during connecting to broker - {exc}")
            return False
    
    def stop(self):
        self.producer.close()

    def send_data(self, topic, data):        
        # Asynchronous by default
        self.producer.send(topic, data).add_callback(self.on_send_success).add_errback(self.on_send_error)
        
        # block until all async messages are sent
        self.producer.flush()
    
    def on_send_success(self, record_metadata):
        print("**********Send Success***********")
        print("record_metadata.topic: ", record_metadata.topic)
        print("record_metadata.partition: ", record_metadata.partition)
        print("record_metadata.offset: ", record_metadata.offset)
        pass

    def on_send_error(self, excp):
        print("**********Send Error Occur**********")
        log.error("I am an errback", exc_info=excp)

In [3]:
# Read the credentials form const
CONSUMER_KEY = const.CONSUMER_KEY
CONSUMER_SECRET = const.CONSUMER_SECRET
ACCESS_TOKEN = const.ACCESS_TOKEN
ACCESS_SECRET = const.ACCESS_SECRET

In [4]:
class TweetsListener(Stream):
    def __init__(self, producer, topic_name, **kwargs):
        super(TweetsListener, self).__init__(**kwargs)
        self.producer = producer
        self.topic_name = topic_name
        
    def on_data(self, data):
        try:
            msg = json.loads(data)
            
            if not msg['truncated']:
                text = msg['text']
            else:
                text = msg['extended_tweet']['full_text']

            print(text)
            
            self.producer.send_data(self.topic_name, text)

        except BaseException as e:
            print(f"error: {str(e)}")

        return True
    
    def on_error(self, status):
        print(status)
        
        return True

In [5]:
# Create Kafka Producer 
bootstrap_servers = ['localhost:9092'] # kafka broker ip
topic_name = 'tweets' # kafka topic name
kafka_producer = Producer(bootstrap_servers=bootstrap_servers)

In [6]:
# Send tweets to Kafka Broker
twitter_stream = TweetsListener(consumer_key=CONSUMER_KEY,
                                consumer_secret=CONSUMER_SECRET,
                                access_token=ACCESS_TOKEN,
                                access_token_secret=ACCESS_SECRET, 
                                producer=kafka_producer, 
                                topic_name=topic_name)

twitter_stream.sample(languages=["ko"])

@faruu0123 라고 아르단이 부릅니다 ^^♡♡♡
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  29530
RT @_only_moon: 일단 막던져서 논란을 만들고 대장동 이슈에서 빨리 벋어나자!!
-이재명-
😨
이재명 "의사도 숫자 제한, '총량제' 논쟁 만들어줘 감사" | 다음뉴스 https://t.co/81xq7Tc3OR
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  29531
RT @ITZYofficial: [ #아이돌리그 ] 
얘들아 인사해볼까? 예지의 리더 자리를 노리는 류진
https://t.co/hD1E1WZ0nI
https://t.co/Ow2bW6G7UH

#ITZY #MIDZY #CRAZYINLOVE
#ITZY…
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  29532
RT @H_H_H_H_H_H_H_7: 커피 타는 것까지는 K꼰대컬쳐로 용인이 될지 몰라도 9시 출근인데 8시에 커피를 타오라고 한게 진짜 문제잖아...
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  29533
귀엽다 생각하고 보면 다 옛날 글이네
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0


KeyboardInterrupt: 