In [None]:
import os
import time
import json
import datetime
import tweepy
import operator
import random

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn import svm

In [None]:
similar_users = ["astro_ricky", "AstroVicGlover"]
different_users = ["DaveMustaine", "jack"]

twitter_api_key = ""
twitter_api_secret = ""

collections_dir = "collections"
max_pages = 100
max_page_fail_retries = 3
max_consecutive_pages_failed = 3
request_wait = 1
page_fail_wait = 30

test_set_percentage = 0.3

In [None]:
tweet_attributes = [
    "display_text_range",
    "entities",
    "favorite_count",
    "full_text",
    "id",
    "retweet_count",
    "truncated"
]

def copy_attributes(dest, src, attributes):
    for attribute in attributes:
        try:
            dest[attribute] = src[attribute]
        except KeyError:
            i = None
            try:
                i = src["id"]
            except KeyError:
                pass
            print("KeyError: {}, {}".format(attribute, i))

def copy_tweet(tweet):
    dest = {}
    copy_attributes(dest, vars(tweet), tweet_attributes)
    dest["author_screen_name"] = tweet.author.screen_name
    dest["created_at_timestamp"] = tweet.created_at.timestamp()
    dest["retweeted_status"] = hasattr(tweet, "retweeted_status")
    if hasattr(tweet, "extended_entities"):
        dest["extended_entities"] = tweet.extended_entities
    return dest

In [None]:
def retrieve_comments(twitter_client, name):

    tweets = []
    page = 1
    page_fail_retries = 0
    consecutive_pages_failed = 0

    print("Retrieving {}".format(name))

    while page < max_pages and consecutive_pages_failed < max_consecutive_pages_failed:

        try:
            statuses = twitter_client.user_timeline(id=name, tweet_mode="extended", page=page)
        except tweepy.TweepError:
            statuses = None

        if statuses:
            for status in statuses:
                tweets.append(copy_tweet(status))

            page += 1
            page_fail_retries = 0
            consecutive_pages_failed = 0

            print("Retrieved Tweets: {} (Pages: {}/{})".format(len(tweets), page, max_pages), end="\r")
            time.sleep(request_wait)

        else:

            if page_fail_retries < max_page_fail_retries:
                page_fail_retries += 1

                print("Failed to retrieve page {} attempts {}/{}, sleeping {} seconds".format(
                    page,
                    page_fail_retries,
                    max_page_fail_retries,
                    page_fail_wait))
                print("Retrieved Tweets: {} (Pages: {}/{})".format(len(tweets), page, max_pages), end="\r")

            else:
                consecutive_pages_failed += 1

                print("{} consecutive failed attempts on page {}, skipping page {}, (Consecutive pages failed {}/{}), sleeping {} seconds".format(
                    max_page_fail_retries,
                    page, page,
                    consecutive_pages_failed,
                    max_consecutive_pages_failed,
                    page_fail_wait))
                print("Retrieved Tweets: {} (Pages: {}/{})".format(len(tweets), page, max_pages), end="\r")

                page += 1
                page_fail_retries = 0

            time.sleep(page_fail_wait)

    print()

    return tweets

def get_collection(twitter_client, name):

    tweets = retrieve_comments(twitter_client, name)

    collection = {
        "collection_name": name,
        "tweets": tweets,
        "date": datetime.datetime.now().isoformat()
    }

    return collection

In [None]:
def get_collection_path(collection_dir, name):
    return os.path.join(collection_dir, "{}.json".format(name))

def get_collections(names):

    twitter_client = None
    collections = {}

    os.makedirs(collections_dir, exist_ok=True)

    for collection_name in names:

        collection_filename = get_collection_path(collections_dir, collection_name)

        if os.path.isfile(collection_filename):
            # read collection
            print("Using saved collection: {}".format(collection_filename))
            with open(collection_filename) as collection_file:
                collection = json.load(collection_file)

            print("  Tweets: {}".format(len(collection["tweets"])))

        else:
            # retrieve collection

            if not twitter_client:
                twitter_auth = tweepy.AppAuthHandler(twitter_api_key, twitter_api_secret)
                twitter_client = tweepy.API(twitter_auth)

            collection = get_collection(twitter_client, collection_name)

            print("Saving collection: {}".format(collection_filename))
            with open(collection_filename, "w") as collection_file:
                json.dump(collection, collection_file)

        collections[collection_name] = collection

    return collections

In [None]:
twitter_collections = get_collections(similar_users + different_users)

In [None]:
def split_training_test(corpus, test_percentage):

    corpus_sorted = list(map(
        operator.itemgetter('text'),
        sorted(corpus, key=operator.itemgetter('timestamp'))
    ))

    training_size = int(len(corpus_sorted) * (1-test_percentage))

    return corpus_sorted[:training_size], corpus_sorted[training_size:]

def extract_corpus(collections):

    corpus_sets = {}

    for collection in collections:

        corpus = []

        for tweet in collections[collection]["tweets"]:
            start = tweet["display_text_range"][0]
            end = tweet["display_text_range"][1]

            if not tweet["retweeted_status"] and (end-start) != 0:
                corpus.append({'text': tweet["full_text"][start:end], 'timestamp': tweet['created_at_timestamp']})

        training, test = split_training_test(corpus, test_set_percentage)
        corpus_sets[collection] = {'training': training, 'test': test}

    return corpus_sets

In [None]:
collections_corpus_sets = extract_corpus(twitter_collections)

In [None]:
def vectorize_corpus(corpus, collections):

    vectors = {'collections': collections,
               'label_map': {},
               'training': {'labels': []},
               'test': {'labels': []}}

    training_corpus = []
    test_corpus = []
    label = 0

    for collection in collections:

        training_corpus += corpus[collection]['training']
        test_corpus += corpus[collection]['test']

        vectors['training']['labels'] += [label for _ in range(0, len(corpus[collection]['training']))]
        vectors['test']['labels'] += [label for _ in range(0, len(corpus[collection]['test']))]

        vectors['label_map'][label] = collection
        label += 1

    random.Random(1024).shuffle(training_corpus)
    random.Random(1024).shuffle(vectors['training']['labels'])

    vectorizer = TfidfVectorizer(stop_words='english')
    vectors['training']['vectors'] = vectorizer.fit_transform(training_corpus)
    vectors['test']['vectors'] = vectorizer.transform(test_corpus)

    return vectors

In [None]:
similar_users_vectors = vectorize_corpus(collections_corpus_sets, similar_users)

different_users_vectors = vectorize_corpus(collections_corpus_sets, different_users)

In [None]:
def calculate_confusion_matrix(ground_truth, prediction):

    confusion_matrix = [[0, 0], [0, 0]]

    for gt, p in zip(ground_truth, prediction):
        confusion_matrix[gt][p] += 1

    return confusion_matrix

def calculate_accuracy(ground_truth, prediction):

    correct = 0

    for gt, p in zip(ground_truth, prediction):
        if gt == p:
            correct += 1

    return correct / len(ground_truth)

def print_confusion_matrix_accuracy(confusion_matrix, accuracy, label_map):

    print('{:<16}{:<16}{:<16}'.format(' ', label_map[0], label_map[1]))
    print('{:<16}{:<16}{:<16}'.format(label_map[0], confusion_matrix[0][0], confusion_matrix[0][1]))
    print('{:<16}{:<16}{:<16}  Accuracy: {:0.3f}'.format(label_map[1], confusion_matrix[1][0], confusion_matrix[1][1], accuracy))
    print()

def train_predict(vectors, *args, **kwargs):

    print('Configuration:', args, kwargs)

    classifier = svm.SVC(*args, **kwargs)
    classifier.fit(vectors['training']['vectors'], vectors['training']['labels'])
    prediction = classifier.predict(vectors['test']['vectors'])

    confusion_matrix = calculate_confusion_matrix(vectors['test']['labels'], prediction)
    accuracy = calculate_accuracy(vectors['test']['labels'], prediction)

    print_confusion_matrix_accuracy(confusion_matrix, accuracy, vectors['label_map'])

    return confusion_matrix, accuracy

In [None]:
train_predict(similar_users_vectors)
train_predict(different_users_vectors)