In [2]:
%%html
<style>
table {float:left}
</style>

# Scripts for Twitter Streaming Application

Description: This notebook was used to create or modified the scripts for this application.

---
# Storm Scripts

### Storm Application Files
|File Name|Location|Description|
|:--|:-|:--|
|tweets.py|./EX2Tweetwordcount/src/spouts/|Spout to collect tweets|
|parse.py|./EX2Tweetwordcount/src/bolts/|Bolt to parse tweet and clean words|
|wordcount.py|./EX2Tweetwordcount/src/bolts/|Bolt to count words and update Postgres|

### tweets.py (spout)

In [2]:
%%writefile EX2Tweetwordcount/src/spouts/tweets.py
from __future__ import absolute_import, print_function, unicode_literals
#TEST THIS
import itertools, time
import tweepy, copy 
import Queue, threading

from streamparse.spout import Spout

################################################################################
# Twitter credentials
################################################################################
twitter_credentials = {
}

def auth_get(auth_key):
    if auth_key in twitter_credentials:
        return twitter_credentials[auth_key]
    return None

################################################################################
# Class to listen and act on the incoming tweets
################################################################################
class TweetStreamListener(tweepy.StreamListener):

    def __init__(self, listener):
        self.listener = listener
        super(self.__class__, self).__init__(listener.tweepy_api())

    def on_status(self, status):
        self.listener.queue().put(status.text, timeout = 0.01)
        return True
  
    def on_error(self, status_code):
        return True # keep stream alive
  
    def on_limit(self, track):
        return True # keep stream alive

class Tweets(Spout):

    def initialize(self, stormconf, context):
        self._queue = Queue.Queue(maxsize = 100)

        consumer_key = auth_get("consumer_key") 
        consumer_secret = auth_get("consumer_secret") 
        auth = tweepy.OAuthHandler(consumer_key, consumer_secret)

        if auth_get("access_token") and auth_get("access_token_secret"):
            access_token = auth_get("access_token")
            access_token_secret = auth_get("access_token_secret")
            auth.set_access_token(access_token, access_token_secret)

        self._tweepy_api = tweepy.API(auth)

        # Create the listener for twitter stream
        listener = TweetStreamListener(self)

        # Create the stream and listen for english tweets
        stream = tweepy.Stream(auth, listener, timeout=None)
        stream.filter(languages=["en"], track=["a", "the", "i", "you", "u"], async=True)

    def queue(self):
        return self._queue

    def tweepy_api(self):
        return self._tweepy_api

    def next_tuple(self):
        try:
            tweet = self.queue().get(timeout = 0.1) 
            if tweet:
                self.queue().task_done()
                self.emit([tweet])
 
        except Queue.Empty:
            self.log("Empty queue exception")
            time.sleep(0.1) 

    def ack(self, tup_id):
        pass  # if a tuple is processed properly, do nothing

    def fail(self, tup_id):
        pass  # if a tuple fails to process, do nothing

Overwriting EX2Tweetwordcount/src/spouts/tweets.py


### parse.py (bolt)

In [157]:
%%writefile EX2Tweetwordcount/src/bolts/parse.py 
from __future__ import absolute_import, print_function, unicode_literals

import re
from streamparse.bolt import Bolt

################################################################################
# Function to check if the string contains only ascii chars
################################################################################
def ascii_string(s):
    return all(ord(c) < 128 for c in s)

class ParseTweet(Bolt):

    def process(self, tup):
        tweet = tup.values[0]  # extract the tweet

        # Split the tweet into words
        words = tweet.split()

        # Filter out the hash tags, RT, @ and urls
        valid_words = []
        for word in words:

            # Filter the hash tags
            if word.startswith("#"): continue

            # Filter the user mentions
            if word.startswith("@"): continue

            # Filter out retweet tags
            if word.startswith("RT"): continue

            # Filter out the urls
            if word.startswith("http"): continue

            # Strip leading and lagging punctuations
            aword = word.strip("\"?><,'.:;)")
            # Clean other charactes from string 
            aword = aword.lower()
            # Basic word cleaning
            aword = re.sub("'","",aword)
            aword = re.sub("/","", aword)
            aword = re.sub("\)","", aword)
            aword = re.sub("\(","", aword)
            aword = re.sub("[0-9!@#$%^&*-_+=~{}|:;<>?,.]","", aword)
            aword = aword.replace("\\","")

            # now check if the word contains only ascii
            if len(aword) > 0 and ascii_string(word):
                valid_words.append([aword])

        if not valid_words: return

        # Emit all the words
        self.emit_many(valid_words)

        # tuple acknowledgement is handled automatically

Overwriting src/bolts/parse.py


### wordcount.py (bolt)

In [1]:
%%writefile EX2Tweetwordcount/src/bolts/wordcount.py
from __future__ import absolute_import, print_function, unicode_literals

from collections import Counter
from redis import StrictRedis
from streamparse.bolt import Bolt

import psycopg2
import time

class WordCounter(Bolt):

    def initialize(self, conf, ctx):
        self.counts = Counter()
        self.redis = StrictRedis()

    def process(self, tup):
        word = tup.values[0]

        # Write codes to increment the word count in Postgres
        # Use psycopg to interact with Postgres
        # Database name: Tcount 
        # Table name: Tweetwordcount 
        
        # Connect to database
        conn = psycopg2.connect(database="tcount", user="postgres")
        cur = conn.cursor()
        
        # Increment the local count
        self.counts[word] += 1
        
        if self.counts[word] == 1:
            # New word > INSERT INTO
            sql = "INSERT INTO Tweetwordcount (word,count) VALUES ('%s', %d);" %(unicode(word), self.counts[word])
        else:
            # Update word count > UPDATE
            sql = "UPDATE Tweetwordcount SET count=%d WHERE word='%s';" %(self.counts[word], unicode(word))

        cur.execute(sql)
        conn.commit()
        #self.emit([word, self.counts[word]])

        # Log the count - just to see the topology running
        #self.log('%s: %d' % (word, self.counts[word]))

Overwriting EX2Tweetwordcount/src/bolts/wordcount.py


---
# Serving Scripts

### Servicing Files (Query Postgres Database)
|File Name|Location|Description|
|:--|:--|:--|
|finalresults.py|./analysis/|Return the count of a word. If a word is not provided, it returns all word counts|
|histogram.py|./analysis/|Return all words with count between a given interval|
|top20.py|./analysis/|Return the top-20 words by count and create a bar-chart saved as 'plot.png'|

### finalresults.py

In [171]:
%%writefile analysis/finalresults.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import psycopg2
import time
import sys
   
def get_wordCount(word, show=25):
    # Get count for the input word or all word counts. 
    # The output is restricted to the 'show' value.
    
    # Connect to database
    conn = psycopg2.connect(database="tcount", user="postgres")
    cur = conn.cursor()
    
    # Get total words counted in tweets
    sql = "SELECT count(*) FROM Tweetwordcount ;" 
    cur.execute(sql)
    total_words = cur.fetchall()
    conn.commit()
    
    # Get count for word or all words (depending on input)
    if word == None:
        sql = "SELECT * FROM Tweetwordcount ORDER BY word ASC LIMIT %d;"%(min(show,total_words[0][0]))        
    else:
        sql = "SELECT * FROM Tweetwordcount WHERE word='%s';" %(word)
    
    cur.execute(sql)
    result = cur.fetchall()
    conn.commit()
    conn.close()
    
    # Print results
    if word == None:
        print 'First %d word-counts (out of %d words):\n'%(min(show,total_words[0][0]),total_words[0][0])
        print "%12s  %s"%('Word','Count')
        for w,c in result:
            print "%12s  %d"%(w,c)
    else:
        print "Number of occurences of '%s':  %d"%(result[0][0], result[0][1]), "\t@", time.ctime(time.time())

# ===================================================================================
if __name__ == '__main__':
    '''
    To get the number of occurances of single word:
       python finalresults.py hello
    
    Get all the word counts, sorted alphabetically, one per line:
       python finalresults.py
    
    The default is to show only the first 25 words. 
    To increase the number of words shown, send any number as an input:
       python finalresults.py 2000
    '''    
    
    numToShow = 25
    word = None
    # Get input word if any
    if len(sys.argv) > 1:
        # Get target word
        if sys.argv[1].isdigit():
            numToShow = int(sys.argv[1])
        else:
            word = sys.argv[1]
    
    get_wordCount(word, numToShow)

Overwriting analysis/finalresults.py


### histogram.py

In [20]:
%%writefile analysis/histogram.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import psycopg2
import time
import sys
   
def histogram(min_frq, max_frq, show):
        
    # Connect to database
    conn = psycopg2.connect(database="tcount", user="postgres")
    cur = conn.cursor()
    
    # Get total words with count within interval
    sql = "SELECT count(*) FROM Tweetwordcount WHERE count >= %d and count <= %d;"%(min_frq, max_frq)
    cur.execute(sql)
    total_words = cur.fetchall()
    conn.commit()
    
    # Get words with count within interval
    sql = "SELECT * FROM Tweetwordcount WHERE "
    sql += "count >= %d and count <= %d ORDER BY count ASC LIMIT %d;"%(min_frq, max_frq, show)
    
    cur.execute(sql)
    result = cur.fetchall()
    conn.commit()
    conn.close()
    
    # Print results
    print 'Reporting %d words (out of %d words):\n'%(min(show,total_words[0][0]),total_words[0][0])
    print "%12s  %s"%('Word','Count')
    for w,c in result:
        print "%12s  %d"%(w,c)

# ===================================================================================
if __name__ == '__main__':
    '''
    Get all the words with count between MIN and MAX value provided:
       python histogram.py 100 400
    
    The default is to show only the first 25 words. 
    To increase the number of words shown, send the number to show as a 3rd parameter:
       python histogram.py 100 400 50
    '''    
    
    numToShow = 25
    if len(sys.argv) < 3:
        print "Few inputs - MIN and MAX values for count interval are requiered"
    else:
        minVal = int(sys.argv[1])
        maxVal = int(sys.argv[2])
        if len(sys.argv) == 4:
            numToShow = int(sys.argv[3])
        if minVal > maxVal:
            print "min value provided is greater than max value. Values will be switched."
    
    histogram(minVal, maxVal, numToShow)

Overwriting analysis/histogram.py


### top20.py

In [98]:
%%writefile analysis/top20.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import psycopg2
import numpy as np

import matplotlib
matplotlib.use('Agg')

import matplotlib.pyplot as plt
    
def bar_plot(names, values):    
    
    n = np.arange(len(values))
    plt.figure(figsize=(14,7))
    plt.title('Top-20 Words by Count\n', size=18)
    plt.ylabel("Occurrences (count)", size=14)
    plt.bar(n, values, color='#89C6DA')
    plt.xticks(n+0.5, names, rotation=0, size=12)
    
    plt.savefig('analysis/plot.png', format='png')
    
def top20():
        
    # Connect to database
    conn = psycopg2.connect(database="tcount", user="postgres")
    cur = conn.cursor()
    
    # Get total word count
    sql = "SELECT count(count) FROM Tweetwordcount;"
    cur.execute(sql)
    total_count = cur.fetchall()
    conn.commit()
    
    # Get top-20
    sql = "SELECT * FROM Tweetwordcount ORDER BY count DESC LIMIT 20;"
    
    cur.execute(sql)
    result = cur.fetchall()
    conn.commit()
    conn.close()
    
    # Print results
    print 'Top-20'
    print "%s %8s  %s"%('ID','Word','Count')
    rank = 1
    words = []
    counts = []
    for w,c in result:
        print "%2d  %8s  %d"%(rank,w,c)
        words.append(w)
        counts.append(c)
        rank += 1

    bar_plot(words, counts)

# ===================================================================================
if __name__ == '__main__':
    '''
    Get top-20 words by count
       python top20.py
    '''    
    top20()

Overwriting analysis/top20.py
