In [4]:
import json
import time
import tweepy

import numpy as np

from confluent_kafka import Consumer, KafkaException, KafkaError, Producer
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream

from utilities import *
from tweet_producer import *

In [5]:
with open('TWITTER_CREDENTIALS.JSON') as file:
    twitter_credentials = json.load(file)
with open('KARAFKA_CREDENTIALS.JSON') as file:
    karafka_credentials = json.load(file)

conf = {
            'bootstrap.servers': karafka_credentials["SERVERS"],
            'session.timeout.ms': 6000,
            'group.id': "%s-consumer" % karafka_credentials["USERNAME"],
            'default.topic.config': {'auto.offset.reset': 'smallest'},
            'security.protocol': 'SASL_PLAINTEXT',
            'sasl.mechanisms': 'SCRAM-SHA-256',
            'sasl.username': karafka_credentials["USERNAME"],
            'sasl.password': karafka_credentials["PASSWORD"],
            'api.version.fallback.ms': 0,
            'broker.version.fallback': '0.10.0'
        
        }

conf_local = {
            'bootstrap.servers': 'localhost:9092',
            'session.timeout.ms': 6000,
            'default.topic.config': {'auto.offset.reset': 'smallest'}, 
            'queue.buffering.max.messages': 1000000, 
            'queue.buffering.max.ms' : 10,  
            'batch.num.messages': 10,   
            }

def kafka_setup(configuration_dict):
    return Producer(**configuration_dict)    

def twitter_setup():
    auth = OAuthHandler(twitter_credentials["KEY"], twitter_credentials["SECRET_KEY"])
    auth.set_access_token(twitter_credentials["ACCESS_TOKEN"], twitter_credentials["ACCESS_TOKEN_SECRET"])
    
    return auth

In [None]:
auth = twitter_setup()
tweet_producer = TweetKafkaProducer(conf_local)
stream = Stream(auth, tweet_producer)
stream.sample(stall_warnings=True)

In [15]:
producer = kafka_setup(conf_local)
TOPIC_NAME = "tweets"        

class TweetKafkaProducer(StreamListener):     
    def __init__(self):
        self._producer = 0
    
    def producer_setup(self, configuration_dict):
        self._producer = Producer(**configuration_dict)   

    def on_data(self, data):
        tweet_json = json.loads(data)
        if "text" in tweet_json:
            tweet_text = tweet_json["text"]
            tweet = Tweet(tweet_text)
            tweet.sanitise()
            self._producer.produce(TOPIC_NAME, tweet.get_tweet().encode('utf-8'), callback=delivery_report)
            self._producer.poll(0)

        return True
        
    def on_error(self, status):
        print (status)    
        producer.flush()
        
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
    Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))

In [16]:
auth = twitter_setup()
tweet_producer = TweetKafkaProducer()
tweet_producer.producer_setup(conf_local)
stream = Stream(auth, tweet_producer)
stream.sample(stall_warnings=True)

KeyboardInterrupt: 

In [11]:
adj_dict = Keywords()
time_keeper = Keeper()

class StdOutListener(StreamListener):     
    def on_data(self, data):
        tweet_json = json.loads(data)
        if "text" in tweet_json:
            tweet_text = tweet_json["text"]
            tweet = Tweet(tweet_text)
            tweet.process_tweet()
            adj_dict.update_keyword_count(tweet.get_keywords())
            time_keeper.increment_counter()
            
            if time.time() - time_keeper.get_time() > 600:
                time_keeper.reset_time()
                print(sum(adj_dict.get_dict().values()), time_keeper.get_counter())
                np.save("tweet_dict"+str(int(time.time()))+ "_" + str(time_keeper.get_counter())+".npy", adj_dict.get_dict())
                time_keeper.reset_counter()
                adj_dict.reset_dict()

            return True
        
    def on_error(self, status):
        print (status)        

In [12]:
auth = twitter_setup()
l = StdOutListener()
stream = Stream(auth, l)
#stream.filter(track=['gold', ], stall_warnings=True)
stream.sample(stall_warnings=True)

1502 16148
1350 15949


KeyboardInterrupt: 