# What topics are people tweeting about?

## Using Apache Spark for Twitter analysis

This notebook showcases analysis of data from the Twitter API in Apache Spark. The idea here is to extract hashtags used in tweets from the last week to examine which topics are most popular. It's a personalized analysis since the tweets fetched via the API are the last 40 tweets from each of the users I follow on Twitter.

### Steps performed

1. Authenticate to Twitter API using Tweepy
2. Get last 40 tweets of each of the users I follow (save to MongoDB for persistence)
3. Filter tweets with Apache Spark:
   * Omit tweets without hashtags
   * Omit tweets that are too old
4. Analyze tweets with Apache Spark:
   * Extract hashtags
   * Do a word count
   * Compute sample statistics
   * Output tags for use in 3rd party word cloud software

In [1]:
# Dependencies
import os
import ssl
import time
import tweepy
import requests
import datetime

from pprint import pprint
from dateutil import parser
from pymongo import MongoClient

# Fix for urllib SSL InsecurePlatformWarning
import urllib3.contrib.pyopenssl
urllib3.contrib.pyopenssl.inject_into_urllib3()

In [2]:
# Connect to local MongoDB instance
client = MongoClient()
db = client['twitter']

In [6]:
# Twitter credentials (FILL IN)
TW_CONSUMER_KEY='...'
TW_CONSUMER_SECRET='...'
TW_ACCESS_TOKEN='...'
TW_ACCESS_TOKEN_SECRET='...'

In [7]:
# Authenticate to Twitter API
auth = tweepy.OAuthHandler(TW_CONSUMER_KEY, TW_CONSUMER_SECRET)
auth.set_access_token(TW_ACCESS_TOKEN, TW_ACCESS_TOKEN_SECRET)

api = tweepy.API(auth)

In [8]:
# Get details about own user
me = api.me()
friends = api.friends_ids(me.id)

In [9]:
# Inspect the list of user IDs
friends[:10]

[17681614,
 3257840322L,
 2887341,
 348783600,
 39229885,
 246582746,
 9316452,
 372502604,
 2401864586L,
 235511635]

In [22]:
# Initialize data structure
tweets = {}

In [23]:
# Fetch lists of up to 40 recent tweets for each of the user IDs
for user in friends:
    # Only query Twitter for data not already cached
    if db.tweets.find({'user_id': user}).count() == 0:
        print('Get recent tweets for user {}...'.format(user))
        tweets[user] = []
        for page in tweepy.Cursor(api.user_timeline, id=user).pages(2):
            tweets[user].extend(page)
            print('  Got {} tweets so far...'.format(len(tweets[user])))
            # API is rate limited (5 sec sleep = 180 reqs in 15 min)
            time.sleep(5)
        # Save each tweet to database
        for tweet in tweets[user]:
            db.tweets.insert_one({'user_id': user, 'tweet': tweet._json})

Get recent tweets for user 3257840322...
  Got 20 tweets so far...
  Got 40 tweets so far...
Get recent tweets for user 2887341...
  Got 20 tweets so far...
  Got 40 tweets so far...
Get recent tweets for user 348783600...
  Got 20 tweets so far...
  Got 40 tweets so far...
Get recent tweets for user 39229885...
  Got 20 tweets so far...
  Got 40 tweets so far...
...

In [8]:
# Check Twitter API rate limit status
def walk(node):
    for key, item in node.items():
        if type(item) == dict:
            if 'remaining' not in item.keys():
                walk(item)
            else:
                if item['remaining'] < item['limit']:
                    print('{} remaining of {} for {}'.format(item['remaining'], item['limit'], key))

ratelimits = api.rate_limit_status()
walk(ratelimits)

14 remaining of 15 for /account/verify_credentials
179 remaining of 180 for /users/show/:id
14 remaining of 15 for /friends/ids
178 remaining of 180 for /application/rate_limit_status


In [27]:
# Show example of tweet object structure
example_user = tweets.keys()[0]
pprint(tweets[example_user][0]._json)

{u'contributors': None,
 u'coordinates': None,
 u'created_at': u'Sun Jul 12 19:29:09 +0000 2015',
 u'entities': {u'hashtags': [{u'indices': [75, 83], u'text': u'TurnAMC'},
                             {u'indices': [139, 140], u'text': u'RenewTURN'}],
               u'symbols': [],
               u'urls': [],
               u'user_mentions': [{u'id': 2262113536L,
                                   u'id_str': u'2262113536',
                                   u'indices': [3, 17],
                                   u'name': u'J.E. Matzer',
                                   u'screen_name': u'JeMatzerACTOR'},
                                  {u'id': 148807607,
                                   u'id_str': u'148807607',
                                   u'indices': [19, 26],
                                   u'name': u'AMC ',
                                   u'screen_name': u'AMC_TV'},
                                  {u'id': 1855584931,
                                   u'id_str': u'

In [3]:
# Check how many tweets we got 
db.tweets.count()

24271

In [4]:
# Filter to only get tweets from the last 7 days
DAYS_LIMIT=7
limit = datetime.datetime.now() - datetime.timedelta(days=DAYS_LIMIT)
limit_unixtime = time.mktime(limit.timetuple())

In [5]:
# Extract tweets from MongoDB and load into Spark for analysis
allTweets = []
for doc in db.tweets.find():
    allTweets.append(doc['tweet'])
    
allTweetsRDD = sc.parallelize(allTweets, 8)

In [6]:
# Filter tweets to get rid of those who either have no hashtags or are too old
tweetsWithTagsRDD = allTweetsRDD.filter(lambda t: len(t['entities']['hashtags']) > 0)
filteredTweetsRDD = tweetsWithTagsRDD.filter(lambda t: time.mktime(parser.parse(t['created_at']).timetuple()) > limit_unixtime)

# Alternative syntax:
#filteredTweetsRDD = (allTweetsRDD
#                     .filter(lambda t: len(t['entities']['hashtags']) > 0)
#                     .filter(lambda t: time.mktime(parser.parse(t['created_at']).timetuple()) > limit_unixtime)
#                    )

In [7]:
# Get some stats (precompute the counts since they are referenced several times below)
tweetCount = allTweetsRDD.count()
withTagsCount = tweetsWithTagsRDD.count()
filteredCount = filteredTweetsRDD.count()
print('Our filtered set of tweets contains {} tweets from the last {} days.'.format(filteredCount, DAYS_LIMIT))
print('Of a total {} tweets, {} had no hashtags. Of those who had, {} where too old.'.format(
        tweetCount, tweetCount - withTagsCount, withTagsCount - filteredCount))

Our filtered set of tweets contains 2456 tweets from the last 7 days.
Of a total 24271 tweets, 17150 had no hashtags. Of those who had, 4665 where too old.


In [8]:
# Count the number of occurrences for each hashtag, 
# by first extracting the hashtag and lowercasing it, 
# then do a standard word count with map and reduceByKey
countsRDD = (filteredTweetsRDD
             .flatMap(lambda tweet: [hashtag['text'].lower() for hashtag in tweet['entities']['hashtags']])
             .map(lambda tag: (tag, 1))
             .reduceByKey(lambda a, b: a + b)
            )

# Get the most used hashtags (order countsRDD descending by count)
countsRDD.takeOrdered(20, lambda (key, value): -value)

[(u'bigdata', 114),
 (u'openstack', 92),
 (u'gophercon', 71),
 (u'machinelearning', 68),
 (u'sdn', 66),
 (u'datascience', 58),
 (u'docker', 56),
 (u'dtm', 46),
 (u'audisport', 44),
 (u'dtmzandvoort', 42),
 (u'hpc', 40),
 (u'welcomechallenges', 38),
 (u'devops', 37),
 (u'analytics', 36),
 (u'awssummit', 36),
 (u'infosec', 33),
 (u'security', 32),
 (u'openstacknow', 29),
 (u'renewturn', 29),
 (u'mobil1scgp', 28)]

In [9]:
# Count the number of hashtags used
totalHashtags = countsRDD.map(lambda (key, value): value).reduce(lambda a, b: a + b)

# Compute average number of hashtags per tweet
print('A total of {} hashtags gives an average number of tags per tweet at {}.'.format(
    totalHashtags, round(totalHashtags/float(filteredTweetsRDD.count()), 2)))

A total of 4030 hashtags gives an average number of tags per tweet at 1.64.


In [10]:
# Get a list of all hashtags used
hashtagList = (filteredTweetsRDD
        .flatMap(lambda tweet: [hashtag['text'].lower() for hashtag in tweet['entities']['hashtags']])
        .collect()
        )

In [11]:
# Print the list to use as input in word cloud software
print(' '.join(hashtagList))

artificialintelligence ai machinelearning robots technology ai artificialintelligence robotics dataanalytics digitalmarketing predictivemodeling bigdata datascience datascience algorithmic personalization bigdata analytics machinelearning ecommerce customer attention bigdata datadriven bigdata analytics datascience bigdata opendata datascience smbs security datascience machinelearning smartcities dataanalytics data coralogix bigdata reddit onlinecommunities moderation socialnetworks technology machinelearning vc datascience clickbait mobile artificialintelligence cool innovation datascience machinelearning ff followup friday bigdata iot guru machinelearning ai cloudcomputing ibm datascience scipy2015 java algorithm dataanalytics machinelearning robots 3dprinted bigdata artificialintelligence gmail news startups howto analytics interviews bigdata analytics artificialintelligence fintech finserv businessintelligence data banks bigdata machinelearning artificialintelligence neuralnetworks