![Twitter](https://upload.wikimedia.org/wikipedia/commons/thumb/5/51/Twitter_logo.svg/469px-Twitter_logo.svg.png)

In [None]:
import twitter
import nltk
import time

from pymongo import MongoClient

__version__ = '1.1'
__all__ = []
__author__ = 'Axel Oehmichen - ao1011@imperial.ac.uk'

In [None]:
# We download some necessary nltk dependencies
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')

def prepare_tweet(tweet_json):
    tweet_dic = dict(tweet_json)
    user_id = tweet_dic["user"]["id"]
    tweet_dic["user"] =str(user_id)
    return tweet_dic

def insert_timeline_into_mongo(twitter_user_id, api, MONGO_URL):

    mongo_client = MongoClient(MONGO_URL)
    users_collection = mongo_client.twitter.twitterUsers
    tweets_collection = mongo_client.twitter.tweets

    max_id = None  # since_id parameter to the greatest ID of all the Tweets your application has already processed.
    count = 100  # We retrieve 100 tweets at a time
    current_count = 0
    max_count = 400
    timeline_json = []
    # Spark output isn't idempotent so, although the insert occurs in a transaction,
    # it's possible for it to succeed in both tasks before one can be cancelled.
    if users_collection.count_documents({"user.id": twitter_user_id}) == 0:
        users_collection.insert_one({"id": twitter_user_id})
        while current_count <= max_count:
            # We retrieve the first chunk of tweets
            timeline_chunk = api.GetUserTimeline(twitter_user_id, max_id=max_id, count=count)
            if len(timeline_chunk) == 1 :
                current_count = max_count + 1
            else:
                max_id = timeline_chunk[-1].id
                timeline_json.extend(timeline_chunk)               
                # We insert the tweets into the collection
                tweets_collection.insert_many([ prepare_tweet(timeline_chunk[i]._json)  for i in range(len(timeline_chunk))])
                current_count += len(timeline_chunk)
        
        # We insert our user to the user collection
        users_collection.update_one({'id': twitter_user_id}, {"$set":  dict(timeline_json[0].user._json)}, upsert=False)
    # we close our mongo connection
    mongo_client.close()
    return "OK"

In [None]:
def process_user(twitter_user_id, api, MONGO_URL):

    tweets_inserted_status = insert_timeline_into_mongo(twitter_user_id, api, MONGO_URL)
    
    return (twitter_user_id , tweets_inserted_status)

# Main Program

We will now set all the paramters required to access twitter and the MongoDb database.

In [None]:
# Twitter key and secret for OAuth
consumer_key = "XXX"
consumer_secret = "YYY"

access_token = "AAA"
access_token_secret = "BBB"

api = twitter.Api(consumer_key=consumer_key,
                  consumer_secret=consumer_secret,
                  access_token_key=access_token,
                  access_token_secret=access_token_secret)

# The users chosen are
user_ids = ["813286", "1976143068", "52544275", "14260960", "3235334092", "3191500397"]

# Address of the mongo cluster
MONGO_URL = "mongodb://mongo:27017/"

We retieve the timelines for the specified users and print out "OK" when the task is completed by the worker.

In [None]:
from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext
users_ids_rdd = sc.parallelize(user_ids)


for user_id in user_ids:
  process_user(user_id, api, MONGO_URL)

# Natural Language Processing

### We do a small language processing on the tweets and we insert them back into a new collection.
If you are interested in discovering further the nltk library : http://www.nltk.org/

In [None]:
def process_tweets_for_user(twitter_user_id, MONGO_URL):
    mongo_client = MongoClient(MONGO_URL )
    tweets_collection = mongo_client.twitter.tweets
    tweets_processed = mongo_client.twitter.processedTweets

    for tweet in tweets_collection.find({"user": twitter_user_id}):
        text = tweet["text"]
        tokens = nltk.word_tokenize(text)
        tagged = nltk.pos_tag(tokens)
        doc = {"text": text,
               "tokens": tokens,
               "tagged": tagged
               }
        tweets_processed.insert_one(doc)
    return "Processed"

In [None]:
process_status = users_ids_rdd.map(lambda user_id : process_tweets_for_user(user_id, MONGO_URL))

In [None]:
process_status.collect()

# Exercises

**The reference documentation for pymongo is available at that address:** https://api.mongodb.com/python/current/ 

Queries:
* Count the number of tweets and users
* Print out the name of all the users inserted
* Find the most retweeted tweet
* Find the shortest tweet
* Count all the words and characters used in the tweets and find the top 5 most used

