In [1]:
#tweepy functions used to stream tweets
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream

#Import credentials for the user
from twitter_credentials import KEY, SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET

#textblob will be the library that analizes the sentiments
from textblob import TextBlob

import json
import time

## Define streaming classes

In [6]:
#Twitter Streaner
class TwitterStreamer():
    """
    Class for streaming and processing tweets.
    """
    def __init__(self):
        pass

    def stream_tweets(self, fetched_tweets_filename, hash_tag_list,time_span,save_data):
        # Listener class will peform the processing of the live tweets
        listener = Listener(fetched_tweets_filename,time_span,save_data)

        #This section handles authentication to use Twitter API  
        auth = OAuthHandler(KEY, SECRET)
        auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

        #Stream will be the executer having authentificated and defined processing
        stream = Stream(auth, listener)

        # Filter Streams to capture data by keywords 
        stream.filter(languages = ['en'], track = hash_tag_list)


#TWITTER STREAM LISTENER 
class Listener(StreamListener):
    """
    Class for processing tweets.
    """
    def __init__(self, fetched_tweets_filename,time_span,save_data):
        self.fetched_tweets_filename = fetched_tweets_filename
        super().__init__()
        self.time_span = time_span #Time window that the listener will be streaming
        self.save_data = save_data #Will specify if the streamed tweets will be saved
        self.started = time.time() 

    #Process data:
    def on_data(self, data):
            try:
                starting = time.time()
                data = json.loads(data)
                #Get elements from fetched tweet info:
                text = data['text']
                user = data['user']
                retweets = data['retweet_count']
                likes = data['favorite_count']
                sentiment = TextBlob(text)

                tweet ={"user":user['screen_name'], "followers":user['followers_count'],"text" : text, "sentiment": sentiment.polarity, "subjetivity": sentiment.subjectivity}
                tweet = json.dumps(tweet)
                #save data on a external file:
                if self.save_data == True:
                    with open(self.fetched_tweets_filename, 'a') as tf:
                        tf.write(data)

                #Send processed data to Kafka topic
                producer.send('tweets', key = b'tweet', value = str(tweet))

                if starting - self.started <= self.time_span:
                    return True
                else:
                    return False

            except BaseException as e:
                print("Error on_data %s" % str(e))
            return True

    def on_error(self, status):
        print(status)

        if status == 420:
            #returning False if on_data disconnects the stream
            return False

# Kafka producer

## Create kafka server and tweets topic

In [None]:
# ZooKeeper Server
!./kafka_2.13-2.8.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.8.0/config/zookeeper.properties
# Kafka Server
!./kafka_2.13-2.8.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.8.0/config/server.properties

In [None]:
#Create tweets topic in kafka server
!./kafka_2.13-2.8.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic tweets
#Add a time limit for data to stay loaded in tweets topic
!./kafka_2.13-2.8.0/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic tweets --config retention.ms=1000

## Define producer and beggin streaming

In [7]:
from kafka import KafkaProducer

In [8]:
# Create producer and specify the value encoding
producer = KafkaProducer(value_serializer = lambda x: x.encode('utf-8'))

In [12]:
# Stream tweets and send them to kafka topic
hash_tag_list = ['etherum','bitcoin','dogecoin']
fetched_tweets_filename = "ETH_BIT_DOGE_tweets.txt"

twitter_streamer = TwitterStreamer()
twitter_streamer.stream_tweets(fetched_tweets_filename, hash_tag_list,300,save_data= False)