# Ingesting realtime tweets using Apache Kafka, Tweepy and Python

### Purpose:
- main data source for the lambda architecture pipeline
- uses twitter streaming API to simulate new events coming in every minute
- Kafka Producer sends the tweets as records to the Kafka Broker

### Contents: 
- [Twitter setup](#1)
- [Defining the Kafka producer](#2)
- [Producing and sending records to the Kafka Broker](#3)
- [Deployment](#4)

### Required libraries

In [1]:
import tweepy 
import time
from kafka import KafkaConsumer, KafkaProducer

<a id="1"></a>
### Twitter setup
- getting the API object using authorization information
- you can find more details on how to get the authorization here:
https://developer.twitter.com/en/docs/basics/authentication/overview

In [18]:
# twitter setup ## fill here
ACCESS_TOKEN = ''
ACCESS_SECRET = ''
CONSUMER_KEY = ''
CONSUMER_SECRET = ''

 
 

In [20]:

auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)

 
api = tweepy.API(auth) 


A helper function to normalize the time a tweet was created with the time of our system

In [21]:
from datetime import datetime, timedelta

def normalize_timestamp(time):
    mytime = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
    mytime += timedelta(hours=1)   # the tweets are timestamped in GMT timezone, while I am in +1 timezone
    return (mytime.strftime("%Y-%m-%d %H:%M:%S")) 

<a id="2"></a>
### Defining the Kafka producer
- specify the Kafka Broker
- specify the topic name
- optional: specify partitioning strategy

In [22]:
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic_name = 'tweets-lambda1'

<a id="3"></a>
### Producing and sending records to the Kafka Broker
- querying the Twitter API Object
- extracting relevant information from the response
- formatting and sending the data to proper topic on the Kafka Broker
- resulting tweets have following attributes:
    - id 
    - created_at
    - followers_count
    - location
    - favorite_count
    - retweet_count

In [23]:
def get_twitter_data():
    res = api.search("Apple OR iphone OR iPhone")
    for i in res:
        record = ''
         
        record += str(i.user.id_str )
        record += ';'
        record += str(normalize_timestamp(str(i.created_at)))
        record += ';'
        record += str(i.user.followers_count)
        record += ';'      
        record += str(i.user.location.encode('UTF-8'))
        record += ';'
        record += str(i.favorite_count)
        record += ';'
        record += str(i.retweet_count)
        record += ';'
        print(record)
        producer.send(topic_name,  bytes(record,encoding='ascii')  )
        print(record)

In [24]:
get_twitter_data()

434797562;2021-04-12 17:39:33;120;b'';0;20;
434797562;2021-04-12 17:39:33;120;b'';0;20;
737292332;2021-04-12 17:39:33;4325;b'';0;234;
737292332;2021-04-12 17:39:33;4325;b'';0;234;
1358045924472156161;2021-04-12 17:39:33;63;b'\xf0\x9f\x93\xb8@akino_coto \xe6\xa7\x98\xe3\x82\x88\xe3\x82\x8a';0;0;
1358045924472156161;2021-04-12 17:39:33;63;b'\xf0\x9f\x93\xb8@akino_coto \xe6\xa7\x98\xe3\x82\x88\xe3\x82\x8a';0;0;
959897406;2021-04-12 17:39:32;726;b'New Jersey, USA';0;0;
959897406;2021-04-12 17:39:32;726;b'New Jersey, USA';0;0;
759239742521237505;2021-04-12 17:39:32;1224;b'Puerto Iguaz\xc3\xba, Argentina';0;0;
759239742521237505;2021-04-12 17:39:32;1224;b'Puerto Iguaz\xc3\xba, Argentina';0;0;
867718136;2021-04-12 17:39:32;0;b'';0;0;
867718136;2021-04-12 17:39:32;0;b'';0;0;
304216034;2021-04-12 17:39:32;174;b'New Orleans, LA';0;555;
304216034;2021-04-12 17:39:32;174;b'New Orleans, LA';0;555;
46292283;2021-04-12 17:39:32;1107;b'INDIANAPOLIS --- ATL';0;0;
46292283;2021-04-12 17:39:32;1107;b'IND

<a id="4"></a>
### Deployment 
- perform the task every couple of minutes and wait in between

In [25]:
def periodic_work(interval):
    while True:
        get_twitter_data()
        #interval should be an integer, the number of seconds to wait
        time.sleep(interval)


In [26]:
periodic_work(60 * 0.1)  # get data every couple of minutes

434797562;2021-04-12 17:39:33;120;b'';0;20;
434797562;2021-04-12 17:39:33;120;b'';0;20;
737292332;2021-04-12 17:39:33;4325;b'';0;234;
737292332;2021-04-12 17:39:33;4325;b'';0;234;
1358045924472156161;2021-04-12 17:39:33;63;b'\xf0\x9f\x93\xb8@akino_coto \xe6\xa7\x98\xe3\x82\x88\xe3\x82\x8a';0;0;
1358045924472156161;2021-04-12 17:39:33;63;b'\xf0\x9f\x93\xb8@akino_coto \xe6\xa7\x98\xe3\x82\x88\xe3\x82\x8a';0;0;
959897406;2021-04-12 17:39:32;726;b'New Jersey, USA';0;0;
959897406;2021-04-12 17:39:32;726;b'New Jersey, USA';0;0;
759239742521237505;2021-04-12 17:39:32;1224;b'Puerto Iguaz\xc3\xba, Argentina';0;0;
759239742521237505;2021-04-12 17:39:32;1224;b'Puerto Iguaz\xc3\xba, Argentina';0;0;
867718136;2021-04-12 17:39:32;0;b'';0;0;
867718136;2021-04-12 17:39:32;0;b'';0;0;
304216034;2021-04-12 17:39:32;174;b'New Orleans, LA';0;556;
304216034;2021-04-12 17:39:32;174;b'New Orleans, LA';0;556;
46292283;2021-04-12 17:39:32;1107;b'INDIANAPOLIS --- ATL';0;0;
46292283;2021-04-12 17:39:32;1107;b'IND

KeyboardInterrupt: 