To run below code, you need to following python packages installed:

- tweepy
- kafka-python

install them with a package manager of your choice. 
We used pip and anaconda.

To run this code, you also need twitter developer access. See https://apps.twitter.com. After creating an app, add the consumer key, consumer secret, access token, and access token secret in a .json file named "credentials.json". The json structure we use is:

```bash
{ 
    "Credentials":
       { 
          "ACCESS_TOKEN":"",
          "ACCESS_TOKEN_SECRET":"",
          "CONSUMER_KEY":"",
          "CONSUMER_SECRET":""
       }
    
 }
```
 Now you can run the code in this notebook.

First we need to import you credentials for the Tweepy authentication

In [25]:
import json
with open('credentials.json') as f:
    data = json.load(f)
    cred = data["Credentials"]
    access_token = cred['ACCESS_TOKEN']
    access_token_secret = cred['ACCESS_TOKEN_SECRET']
    consumer_key = cred['CONSUMER_KEY']
    consumer_secret = cred['CONSUMER_SECRET']


Now we can start up the kafka broker. For this we need a kafka server and a zookeeper running. We started them with the following bash commands:

```bash
    $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
    $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties 
```

Create a topic called "tweets"
```bash
    $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic tweets
```

Now we define the kafka configuration, connection procedure, and publish function.

In [26]:
import uuid
from kafka import KafkaConsumer, KafkaProducer

#Kafka conf
server = 'localhost:9092'
topic_name = "tweets"

def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        #print('Message published successfully.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))


def connect_kafka_producer(server):
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=[server], api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer
kafka_producer = connect_kafka_producer(server)

Now run the producer. This producer connects to Twitter through Tweepy and posts the tweets as messages to Kafka.

In [28]:
import sys
from tweepy import OAuthHandler, API, Stream
from tweepy.streaming import StreamListener

# Replace the "None"s by your own credentials
ACCESS_TOKEN = access_token
ACCESS_TOKEN_SECRET = access_token_secret
CONSUMER_KEY = consumer_key
CONSUMER_SECRET = consumer_secret

auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = API(auth, wait_on_rate_limit=True,
          wait_on_rate_limit_notify=True)

class Listener(StreamListener):
    def __init__(self, output_file=sys.stdout):
        self.output_file = output_file
        super(Listener,self).__init__()
    def on_status(self, status):
        publish_message(kafka_producer, topic_name, str(uuid.uuid4()), json.dumps(status._json))
        #print(json.dumps(status._json), file=self.output_file)
    def on_error(self, status_code):
        print(status_code)
        return False

#For debugging the tweepy output to a file, uncomment the above print statement.
output = open('stream_output.json', 'w+')
listener = Listener(output_file=output)
stream = Stream(auth=api.auth, listener=listener)

try:
    print('Start streaming.')
    stream.sample(languages=['en'])
except KeyboardInterrupt:
    print("Stopped.")
finally:
    print('Done.')
    stream.disconnect()
    output.close()

Start streaming.
Stopped.
Done.


In [29]:
#For debugging purposes only. This python code does not print until you stop the producer. 
consumer = KafkaConsumer(topic_name,
                         auto_offset_reset='earliest',
                         bootstrap_servers=[server],
                         api_version=(0, 10),
                         value_deserializer = json.loads,
                         consumer_timeout_ms=1000)
for msg in consumer:
    print(msg.key.decode("utf-8"), msg.value)

2b0e71fb-1367-4343-86b5-5499c1273bbb {'created_at': 'Wed Oct 23 10:44:37 +0000 2019', 'id': 1186956587316973569, 'id_str': '1186956587316973569', 'text': "RT @w_terrence: WOW WOW! OBAMA &amp; SOROS \n\nObama was at a Democrat Fundraiser at Alex Soros's Home.\n\nAlex Soros is the Son of George Soros.…", 'source': '<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>', 'truncated': False, 'in_reply_to_status_id': None, 'in_reply_to_status_id_str': None, 'in_reply_to_user_id': None, 'in_reply_to_user_id_str': None, 'in_reply_to_screen_name': None, 'user': {'id': 924276777173831680, 'id_str': '924276777173831680', 'name': 'Prakash Patel', 'screen_name': 'Paka1953', 'location': None, 'url': None, 'description': None, 'translator_type': 'none', 'protected': False, 'verified': False, 'followers_count': 50, 'friends_count': 113, 'listed_count': 0, 'favourites_count': 14252, 'statuses_count': 13044, 'created_at': 'Sat Oct 28 14:08:50 +0000 2017', 'utc_offset': None