In [1]:
# Twitter Streaming API
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream

# API Key & Token Info
import const

# Kafka
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError

import json

In [2]:
class Producer():
    def __init__(self, bootstrap_servers):
        try:
            self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                       
                                          max_block_ms = 10000,
                                          retries = 0, 
                                          acks = 1)
        except KafkaError as exc:
            print(f"kafka producer - Exception during connecting to broker - {exc}")
            return False
    
    def stop(self):
        self.producer.close()

    def send_data(self, topic, data):        
        # Asynchronous by default
        self.producer.send(topic, data).add_callback(self.on_send_success).add_errback(self.on_send_error)
        
        # block until all async messages are sent
        self.producer.flush()
    
    def on_send_success(self, record_metadata):
        print("**********Send Success***********")
        print("record_metadata.topic: ", record_metadata.topic)
        print("record_metadata.partition: ", record_metadata.partition)
        print("record_metadata.offset: ", record_metadata.offset)
        pass

    def on_send_error(self, excp):
        print("**********Send Error Occur**********")
        log.error("I am an errback", exc_info=excp)

In [3]:
# Read the credentials form const
CONSUMER_KEY = const.CONSUMER_KEY
CONSUMER_SECRET = const.CONSUMER_SECRET
ACCESS_TOKEN = const.ACCESS_TOKEN
ACCESS_SECRET = const.ACCESS_SECRET

In [4]:
class TweetsListener(Stream):
    def __init__(self, producer, topic_name, **kwargs):
        super(TweetsListener, self).__init__(**kwargs)
        self.producer = producer
        self.topic_name = topic_name
        
    def on_data(self, data):
        try:
            msg = json.loads(data)
            
            if not msg['truncated']:
                text = msg['text']
            else:
                text = msg['extended_tweet']['full_text']

            # print(text)

            text = text.encode('utf-8')
            self.producer.send_data(self.topic_name, text)

        except BaseException as e:
            print(f"error: {str(e)}")

        return True
    
    def on_error(self, status):
        print(status)
        
        return True

In [5]:
# Create Kafka Producer 
bootstrap_servers = ['localhost:9092'] # kafka broker ip
topic_name = 'tweets' # kafka topic name
kafka_producer = Producer(bootstrap_servers=bootstrap_servers)

In [6]:
# Send tweets to Kafka Broker
twitter_stream = TweetsListener(consumer_key=CONSUMER_KEY,
                                consumer_secret=CONSUMER_SECRET,
                                access_token=ACCESS_TOKEN,
                                access_token_secret=ACCESS_SECRET, 
                                producer=kafka_producer, 
                                topic_name=topic_name)

twitter_stream.sample(languages=["ko"])
# twitter_stream.filter(track=[""])

**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477927
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477928
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477929
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477930
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477931
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477932
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477933
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition

**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477993
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477994
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477995
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477996
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477997
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477998
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220477999
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition

**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478057
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478058
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478059
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478060
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478061
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478062
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478063
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition

**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478123
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478124
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478125
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478126
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478127
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478128
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478129
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition

**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478189
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478190
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478191
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478192
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478193
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478194
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition:  0
record_metadata.offset:  220478195
**********Send Success***********
record_metadata.topic:  tweets
record_metadata.partition

KeyboardInterrupt: 