# Ingesting realtime tweets using Apache Kafka, Tweepy and Python

### Purpose:
- main data source for the lambda architecture pipeline
- uses twitter streaming API to simulate new events coming in every minute
- Kafka Producer sends the tweets as records to the Kafka Broker

### Contents: 
- [Twitter setup](#1)
- [Defining the Kafka producer](#2)
- [Producing and sending records to the Kafka Broker](#3)
- [Deployment](#4)

### Required libraries

In [1]:
import tweepy
import time
from kafka import KafkaConsumer, KafkaProducer

<a id="1"></a>
### Twitter setup
- getting the API object using authorization information
- you can find more details on how to get the authorization here:
https://developer.twitter.com/en/docs/basics/authentication/overview

In [2]:
# twitter setup ## fill here
consumer_key = "fDWZ7bd7l2iYzyZdeZbz2Kv0s"
consumer_secret = "u1FsSOGTCPKPXeQXvHltK0yu1Oo2utjTYJEWRAxcEqM0Yghn3K"
access_token = "939519071380525058-csGMphZlQY25exMW9zZ6LH3nrs1e6xJ"
access_token_secret = "Lj52HnyIpPpFhSvnVyq0yfjKpfqTKCt7vNfmQHd3p7HdP"

In [3]:

# Creating the authentication object
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
# Setting your access token and secret
auth.set_access_token(access_token, access_token_secret)
# Creating the API object by passing in auth information
api = tweepy.API(auth) 



A helper function to normalize the time a tweet was created with the time of our system

In [4]:
from datetime import datetime, timedelta

def normalize_timestamp(time):
    mytime = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
    mytime += timedelta(hours=1)   # the tweets are timestamped in GMT timezone, while I am in +1 timezone
    return (mytime.strftime("%Y-%m-%d %H:%M:%S")) 

<a id="2"></a>
### Defining the Kafka producer
- specify the Kafka Broker
- specify the topic name
- optional: specify partitioning strategy

In [5]:
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic_name = 'test'

<a id="3"></a>
### Producing and sending records to the Kafka Broker
- querying the Twitter API Object
- extracting relevant information from the response
- formatting and sending the data to proper topic on the Kafka Broker
- resulting tweets have following attributes:
    - id 
    - created_at
    - followers_count
    - location
    - favorite_count
    - retweet_count

In [6]:
def get_twitter_data():
    res = api.search_tweets(q ="#", lang='en-us',result_type="recent")
    for i in res:
        record = ''
         
        #record += str(i.user.id_str )
        #record += ';'
        #record += str(i.created_at)
        #record += ';'
        #record += str(i.user.followers_count)
        #record += ';'      
        #record += str(i.coordinates)
        #record += ';'
        #record += str(i.favorite_count)
        #record += ';'
        #record += str(i.retweet_count)
        #record += ';'
        #if i.coordinates !=None:
        record += str(i.created_at)
        record += ';'
        record += str(i.coordinates)
        record += ';'
        record = str(i.text)
        print(record)
        print("-------------------------------")
        producer.send(topic_name,  record.encode('utf-8')  )

In [7]:
get_twitter_data()

RT @naser99799: but NASA and others hide this truth from the worlds and NASA the so-called protector of planet Earth from the dangers of th…
-------------------------------
@NASA_Marshall @NASAUniverse You might want to check thermal nuc invac perclution with emac math
-------------------------------
@SkyNews About as qualified as I am to apply for NASA
-------------------------------
RT @CNBC: NASA awards Blue Origin, Northrop Grumman and Nanoracks with contracts to build space stations https://t.co/0hXcYf5YJ4
-------------------------------
@KennedyMmari @Semkae @MustaphaBurhani @NASA Would prefer that for I certainly know you would use same inconclusive… https://t.co/SsEOwiVeOv
-------------------------------
RT @NASA360: Here's a bright, asymmetrical planetary nebula to liven up your day. This @NASAHubble image reveals a wealth of structure, inc…
-------------------------------
RT @dylan: Thank you @NASA for entrusting the @VoyagerSH @Nanoracks and @LockheedMartin team with this cr

<a id="4"></a>
### Deployment 
- perform the task every couple of minutes and wait in between

In [8]:
def periodic_work(interval):
    while True:
        get_twitter_data()
        #interval should be an integer, the number of seconds to wait
        time.sleep(interval)


In [None]:
periodic_work(30) 

RT @naser99799: but NASA and others hide this truth from the worlds and NASA the so-called protector of planet Earth from the dangers of th…
-------------------------------
@NASA_Marshall @NASAUniverse You might want to check thermal nuc invac perclution with emac math
-------------------------------
@SkyNews About as qualified as I am to apply for NASA
-------------------------------
RT @CNBC: NASA awards Blue Origin, Northrop Grumman and Nanoracks with contracts to build space stations https://t.co/0hXcYf5YJ4
-------------------------------
@KennedyMmari @Semkae @MustaphaBurhani @NASA Would prefer that for I certainly know you would use same inconclusive… https://t.co/SsEOwiVeOv
-------------------------------
RT @NASA360: Here's a bright, asymmetrical planetary nebula to liven up your day. This @NASAHubble image reveals a wealth of structure, inc…
-------------------------------
RT @dylan: Thank you @NASA for entrusting the @VoyagerSH @Nanoracks and @LockheedMartin team with this cr