# Ingesting realtime tweets using Apache Kafka, Tweepy and Python

### Purpose:
- main data source for the lambda architecture pipeline
- uses twitter streaming API to simulate new events coming in every minute
- Kafka Producer sends the tweets as records to the Kafka Broker

### Contents: 
- [Twitter setup](#1)
- [Defining the Kafka producer](#2)
- [Producing and sending records to the Kafka Broker](#3)
- [Deployment](#4)

### Required libraries

In [1]:
import tweepy
import time
from kafka import KafkaConsumer, KafkaProducer

<a id="1"></a>
### Twitter setup
- getting the API object using authorization information
- you can find more details on how to get the authorization here:
https://developer.twitter.com/en/docs/basics/authentication/overview

In [2]:
# twitter setup
ACCESS_TOKEN = '799844067701977088-qrHMnTaYFUcqBbeG5yT3G8GTieLJt6N'
ACCESS_SECRET = 'kRtA7MsTjvAqmft9BdtE7z2FtAouYsOY8OlAvByIy5m1l'
CONSUMER_KEY = 'yZUmKJQxfGmpLtvVTmGTHPKiD'
CONSUMER_SECRET = '1bwO3JIc664KkObN5LJYVpALDi63NBExoLSKsaMrJ7KPYKYiXM'
# Setup tweepy to authenticate with Twitter credentials:
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
# Creating the API object by passing in auth information
api = tweepy.API(auth) 


A helper function to normalize the time a tweet was created with the time of our system

In [3]:
from datetime import datetime, timedelta

def normalize_timestamp(time):
    mytime = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
    mytime += timedelta(hours=1)   # the tweets are timestamped in GMT timezone, while I am in +1 timezone
    return (mytime.strftime("%Y-%m-%d %H:%M:%S")) 

<a id="2"></a>
### Defining the Kafka producer
- specify the Kafka Broker
- specify the topic name
- optional: specify partitioning strategy

In [4]:
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic_name = 'tweets-kafka-test'

<a id="3"></a>
### Producing and sending records to the Kafka Broker
- querying the Twitter API Object
- extracting relevant information from the response
- formatting and sending the data to proper topic on the Kafka Broker
- resulting tweets have following attributes:
    - id 
    - created_at
    - followers_count
    - location
    - favorite_count
    - retweet_count

In [5]:
def get_twitter_data():
    res = api.search("Apple OR iphone OR iPhone")
    j=0
    for i in res:
        record = ''
        record += str(i.user.id_str)
        record += ';'
        record += str(normalize_timestamp(str(i.created_at)))
        record += ';'
        record += str(i.user.followers_count)
        record += ';'
        record += str(i.user.location)
        record += ';'
        record += str(i.favorite_count)
        record += ';'
        record += str(i.retweet_count)
        record += ';'
        producer.send(topic_name, str.encode(record))
        if j==0:
            print(record)
            j+=1

In [6]:
get_twitter_data()

1096497182;2019-03-25 01:08:11;161;;0;0;


<a id="4"></a>
### Deployment 
- perform the task every couple of minutes and wait in between

In [7]:
def periodic_work(interval):
    while True:
        get_twitter_data()
        #interval should be an integer, the number of seconds to wait
        time.sleep(interval)


In [8]:
periodic_work(60 * 0.1)  # get data every couple of minutes

1096497182;2019-03-25 01:08:11;161;;0;0;
1096497182;2019-03-25 01:08:11;161;;0;0;
1096497182;2019-03-25 01:08:11;161;;0;0;
810077802267426816;2019-03-25 01:08:34;428;;0;0;
810077802267426816;2019-03-25 01:08:34;428;;0;0;
810077802267426816;2019-03-25 01:08:34;428;;0;0;
2457401078;2019-03-25 01:08:54;2480;City Boy College;0;0;
2457401078;2019-03-25 01:08:54;2480;City Boy College;0;0;
2457401078;2019-03-25 01:08:54;2480;City Boy College;0;0;
1506364140;2019-03-25 01:09:14;8;;0;17292;
1506364140;2019-03-25 01:09:14;8;;0;17293;
1506364140;2019-03-25 01:09:14;8;;0;17293;
2302591195;2019-03-25 01:09:35;472;;0;307;
2302591195;2019-03-25 01:09:35;472;;0;307;
2302591195;2019-03-25 01:09:35;472;;0;307;
2302591195;2019-03-25 01:09:35;472;;0;307;
30093231;2019-03-25 01:09:58;6734;lalaland;0;2341;
30093231;2019-03-25 01:09:58;6734;lalaland;0;2341;
30093231;2019-03-25 01:09:58;6734;lalaland;0;2341;
984479643590971392;2019-03-25 01:10:20;40;;0;0;
984479643590971392;2019-03-25 01:10:20;40;;0;0;
984479

RateLimitError: [{'message': 'Rate limit exceeded', 'code': 88}]