# Kafka - Python - MongoDB 
## Twitter API Sentiment Analysis 

# Producer Section

In this Notebook, we will initialize our Kafka client after starting __zookeeper__ and __kafka server__ in the terminal. Then we will use Twitter stream and connect it to our Producer to send tweet text to our Topic.

__Step 1__: Open command prompt and change the directory to the kafka folder. First start zookeeper using the command given below:

#### For Windows: 
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
<br/>
<br/>


__Step 2__: Now open another command prompt and change the directory to the kafka folder. Run kafka server using the command:
#### For Windows: 
.\bin\windows\kafka-server-start.bat .\config\server.properties
<br/>
<br/>


#### Creating a Kafka Topic

__Step 1__: Open a new command prompt in the location C:\kafka\bin\windows

__Step 2__: Run the following command:

#### For Windows: 
kafka-topics.bat --create --topic caraccidents --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --
kafka-topics.bat --create --topic accidents --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --


In [1]:
# Necessay Imports
import tweepy
from pykafka import KafkaClient
import json

In [2]:
#custom function to initialize kafka client
def get_kafka_client():
    return KafkaClient(hosts='127.0.0.1:9092')

#To be able to fetch streaming tweets using Tweepy, you’ll have to define a custom class 
#that subclasses the StreamingClient class. This class needs to override a method called 
#on_tweet that gets executed every time a tweet is received: to make things simple we’ll 
#tell this method to print the tweet’s id and text.
class StdOutListener(tweepy.StreamingClient):
    def on_data(self, tweet):
        message = json.loads(tweet) #load tweet data using json.loads and save it into a variable
        data = message['data']['text'] #extract only the tweet text from the tweet data
        client = get_kafka_client() #initialize kafka client using the function we created earlier 
        topic = client.topics['accidents'] #access our topic and store it in a variable
        producer = topic.get_sync_producer() #access our producer
        producer.produce(data.encode('utf-8')) #send tweet text through our producer
        return True

    def on_error(self, status):
        print(status)
        

In [6]:
#getting all the rules 
stream.get_rules()

Response(data=[StreamRule(value='Tesla lang:en', tag=None, id='1605103016729886725'), StreamRule(value='world cup lang:en', tag=None, id='1605529790794158080'), StreamRule(value='iphone lang:en', tag=None, id='1605553282268659712'), StreamRule(value='tesla lang:en -is:retweet', tag=None, id='1605564358699917313'), StreamRule(value='weather lang:en -is:retweet', tag=None, id='1607664092176687104')], includes={}, errors=[], meta={'sent': '2022-12-27T10:58:35.911Z', 'result_count': 5})

In [6]:
#delete not needed rules
stream.delete_rules('1607697870748106757')

Response(data=None, includes={}, errors=[], meta={'sent': '2022-12-27T12:06:12.220Z', 'summary': {'deleted': 1, 'not_deleted': 0}})

In [4]:
#getting all the rules after the deletion process
stream.get_rules()

Response(data=None, includes={}, errors=[], meta={'sent': '2022-12-27T12:14:26.726Z', 'result_count': 0})

In [None]:
bearer_token = 'AAAAAAAAAAAAAAAAAAAAAF1UkAEAAAAAg%2B6%2FG4HfAQuO5ZZLnv1r5lzVcEQ%3DCvhKaygHZSAbROakkzuGr4ROGmciFQqMNktgvNQlfFT9gsyZEM'

#To instantiate a custom streamer out of this class, we need to pass the bearer token to the constructor.
stream = StdOutListener(bearer_token)

#Then, we’ll need to attach some rules that define what we want to search 
#(note that tweepy allows you to have multiple rules in a list but we’ll only set one in this example).
# use this link to find rules: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule#adding-removing

stream.add_rules(tweepy.StreamRule("weather lang:en "))

#Finally, calling the filter method opens up the stream and runs the on_tweet method on every incoming tweet.
stream.filter()