start zookeeper and kafka server
- zookeeper-server-start.sh \\$KAFKA_HOME/config/zookeeper.properties
- kafka-server-start.sh \\$KAFKA_HOME/config/server.properties

creat topic and list zookeeper topics
- kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic covid_v1
- kafka-topics.sh --list --zookeeper localhost:2181
- kafka-topics.sh --describe --zookeeper localhost:2181 --topic covid_v1

start console consumer
- kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic covid_v1 --from-beginning

In [None]:
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream

from pykafka import KafkaClient
import json

import twitter_credentials

In [None]:
class TwitterAuthenticator():
    
    # class to authenticate tweets 
    # auth method returns credentials 
    
    def auth(self):
        auth = OAuthHandler(twitter_credentials.CONSUMER_KEY, twitter_credentials.CONSUMER_SECRET)
        auth.set_access_token(twitter_credentials.ACCESS_TOKEN, twitter_credentials.ACCESS_TOKEN_SECRET)
        return auth
        

# NOTE
### An event listener is a procedure or function in a computer program that waits for an event to occur. ... The listener is programmed to react to an input or signal by calling the event's handler. The term event listener is often specific to Java and JavaScript.

In [None]:
# class that inherits from Tweepys StreamListener class
# StreamListener provides methods like we can override like on_data

# USE LISTENER TO SEND MESSAGE TO KAFKA BROKER 

class TwitterListener(StreamListener):
    
    # create constructor aka def __init__(self)
    # tweets_filer is a TwitterListener object thats associated to where the tweets are being written
    # since we are writing to Kafka might want to write to hdfs
    
    def __init__(self, tweets_file):
        
        # a class variable self.x = x 
        self.tweets_file = tweets_file

    # on_data method reads tweets stream
    def on_data(self, data):
        try:
            #### DO WHAT YOU WANT WITH THE DATA >:) SEND TO KAFKA PRODUCER HERE 
            producer.produce(str(data).encode('utf-8'))
            print(type(data))
            
            # writes tweets to file, a means appends
            with open(self.tweets_file, 'a') as tf:
                tf.write(data)
            return True
        except BaseException as e:
            print("Error on_data %s" % str(e))
        return True
    
    def on_error(self, status):
        if status == 420:
            # returns False if rates limit occurs 
            return False
        print(status)


In [None]:
class TwitterStreamer():
    # class for processing streams 
    # aka filter the tweets by hashtag
    
    def __init__(self):
        self.twitter_auth = TwitterAuthenticator()
    
    def stream_tweets(self, tweets_file, hash_tags):
        
        listener = TwitterListener(tweets_file)
        auth = self.twitter_auth.auth()
        stream = Stream(auth, listener)
        stream.filter(track = hash_tags)

In [None]:
if __name__ == "__main__":
    
    client = KafkaClient(hosts="localhost:9092")
    topic = client.topics['covid_v1']
    producer = topic.get_sync_producer()
    
    hash_tags = ["covid19", "corona"]
    tweets_file = "tweets_trial1.json"
    
    # define an instance of TwitterStream class
    streamer = TwitterStreamer()
    
    # call method stream_tweets, pass filename and path, and hash_tags to filter 
    streamer.stream_tweets(tweets_file, hash_tags)