Imports

In [None]:
import bisect
import csv
import os
import numpy as np
from cvxpy import Variable, Minimize, Problem, norm
from scipy.sparse import find
from io import open
import re
from nltk.stem import *
from collections import Counter
import math
import time
from collections import deque
from argparse import ArgumentParser
from csv import reader
from os.path import basename, isdir, isfile, splitext, join
from sys import stderr

Settings.py

In [None]:
###################################### Timestamp parameters ######################################
EVENT_TIMESTAMPS = {}

###################################### Preprocessing parameters ######################################
IGNORE_URLS = True
IGNORE_RETWEETS = True
IGNORE_DUPLICATES = True
IGNORE_USERNAME = True
STEMMING = True
TIME_WINDOW = -1

###################################### Event detection parameters ######################################
PREVIOUS_PERIODS = -1

EVENT_DETECTION_THRESHOLD = {}

BURST_DETECTION_THRESHOLD = {}
###################################### Output parameters ######################################
TWEETS_SUMMARY = -1
SUMMARY_SIMILARITY = -1
SUMMARY_PERIODS = -1

Util.py

In [None]:
class Util:
    """Class that is used to gather utility methods"""
    def __init__(self):
        pass

    @staticmethod
    def beauty_print(to_print):
        """Method that is used to "beautify" a input list"""
        if not to_print:
            return []
        max_len = len(max(to_print, key=len))
        result = "*" * (max_len + 4) + "\n"
        for i in range(len(to_print)):
            result += "* " + to_print[i].center(max_len) + " *" + "\n"
        result += "*" * (max_len + 4) + "\n"
        return str(result)


ReadData.py

In [None]:
class Reader:
    """Class that is used to read event files in a specified format"""

    def __init__(self):
        self.file_pointer = None
        self.load_files = True
        self.loaded_file_name = ""
        self.loaded_clear_name = "" # <---
        self.starting_timestamp = 0
        self.file_periods, self.starting_tweets = [], []

    def get_tweets(self, file_name):
        """ The method that should be used to retrieve the tweets of the next time period"""
        if self.loaded_file_name != file_name:
            self.loaded_file_name = file_name
            self.loaded_clear_name = file_name.split("/")[-1]
            self.load_files = os.stat(self.loaded_file_name).st_size < 524288000
            if self.load_files:  # Load files with size less than 500MB)
                self.load_file()
            else:
                self.starting_timestamp = Settings.EVENT_TIMESTAMPS.get(self.loaded_clear_name, [0, 0])[0]
                self.starting_tweets = []
                if self.file_pointer is not None:
                    self.file_pointer.close()
                self.file_pointer = open(file_name, 'rb')

        return self.next_period() if self.load_files else self.read_next()

    def next_period(self):
        """A method that is used to retrieve the next available period when a file is loaded in memory"""
        return self.file_periods.pop(0) if len(self.file_periods) != 0 else None

    def load_file(self):
        """Reads the specified file and returns a list with tweets for the next period or an empty list."""
        start_timestamp = Settings.EVENT_TIMESTAMPS.get(self.loaded_clear_name, [0, 0])[0]
        end_timestamp = Settings.EVENT_TIMESTAMPS.get(self.loaded_clear_name, [0, 0])[1]

        if end_timestamp != 0 and end_timestamp < start_timestamp:
            return None
        window_end_timestamp = start_timestamp + int(Settings.TIME_WINDOW)
        with open(self.loaded_file_name, 'rb') as f:
            reader = csv.reader(f)
            temp_tweets = []
            tweets = []
            for row in reader:
                if start_timestamp == 0:
                    start_timestamp = int(row[0])
                    window_end_timestamp = start_timestamp + int(Settings.TIME_WINDOW)
                if int(row[0]) < start_timestamp:
                    continue
                if int(row[0]) > window_end_timestamp:
                    tweets.append(temp_tweets)
                    temp_tweets = []
                    start_timestamp = window_end_timestamp
                    window_end_timestamp += Settings.TIME_WINDOW
                    while int(row[0]) > window_end_timestamp:
                        tweets.append(temp_tweets)
                        temp_tweets = []
                        start_timestamp = window_end_timestamp
                        window_end_timestamp += Settings.TIME_WINDOW
                if int(row[0]) > end_timestamp > 0:
                    break
                temp_tweets.append(row)
        if temp_tweets:
            tweets.append(temp_tweets)
        self.file_periods = tweets

    def read_next(self):
        """Reads the specified file and returns a list with tweets for the next period or an empty list."""
        tweets = self.starting_tweets
        self.starting_tweets = []
        event_end_timestamp = Settings.EVENT_TIMESTAMPS.get(self.loaded_clear_name, [0, 0])[1]
        window_end_timestamp = self.starting_timestamp + int(Settings.TIME_WINDOW)

        if len(tweets) != 0 and int(tweets[0][1]) > window_end_timestamp:
            tweets = []
        limit = int(tweets[0][1]) if len(tweets) == 1 else 0
        if 0 < event_end_timestamp <= max(self.starting_timestamp, limit) or self.file_pointer.closed:
            self.file_pointer.close()
            return None

        reader = csv.reader(self.file_pointer)
        eof = True
        for row in reader:
            if self.starting_timestamp == 0:
                self.starting_timestamp = int(row[0])
                window_end_timestamp = self.starting_timestamp + int(Settings.TIME_WINDOW)

            if int(row[0]) < self.starting_timestamp:
                continue

            if int(row[0]) > window_end_timestamp or (int(row[0]) > event_end_timestamp > 0):
                self.starting_tweets = [row]
                self.starting_timestamp = window_end_timestamp
                eof = False
                break
            tweets.append(row)
        if eof:
            self.file_pointer.close()
        return tweets

    @staticmethod
    def count_tweets(file_name):
        from Preprocessor import Preprocessor
        from csv import reader
        from time import time
        with open(file_name, 'rb') as f:
            for row in reader(f):
                Settings.EVENT_TIMESTAMPS[file_name] = [int(row[0]), int(round(time() * 1000))]
                break
        Settings.TIME_WINDOW = 1 * 60000
        reader = Reader()
        c = 0
        while True:
            tweets = reader.get_tweets(file_name)
            if tweets is None:
                break
            elif len(tweets) == 0:
                continue
            tweets, full_tweets, tweets_timestamps, vocabulary, timestamps = Preprocessor.preprocess(tweets)
            c += len(tweets)
        return c


In [None]:
Preprocessor.py

In [None]:
class Preprocessor:
    """Class that is used to preprocess the input tweets"""

    def __init__(self):
        pass

    stopwords = []
    stemmer = None

    @staticmethod
    def contains_url(tweet):
        """Method that is used to check if a tweet contains a url"""
        return len(re.findall(r"http[s]?\S+", tweet)) != 0

    @staticmethod
    def is_retweet(tweet):
        """Method that is used to check if a tweet is retweet"""
        return len(re.findall('rt @?[a-zA-Z0-9_]+:? .*', tweet)) != 0

    @staticmethod
    def contains_username(tweet):
        """Method that is used to check if a tweet contains the character "@", therefore refers to a user"""
        return '@' in tweet

    @classmethod
    def preprocess(cls, tweets):
        """Method that is used to preprocess the incoming tweet. Returns a list with the preprocessed tweets, the full
        text of the tweets that are returned, the corresponding timestamps and the vocabulary of all the tweets"""
        if not cls.stopwords:
            cls.stemmer = PorterStemmer()
            cls.stopwords = [re.sub('[\s+]', ' ', word.decode("utf-8-sig").encode("utf-8")).strip() for word in
                             open("stop_words.txt", 'rb')]
        full_tweets, timestamps, result = [], [], []
        unique_words_index = set()
        for tweet in tweets:
            timestamp = tweet[0]
            tweet = tweet[1].lower()  # Keep the text part
            if (Settings.IGNORE_URLS and cls.contains_url(tweet)) or \
                    (Settings.IGNORE_RETWEETS and cls.is_retweet(tweet)) or \
                    (Settings.IGNORE_USERNAME and cls.contains_username(tweet)):
                continue
            if not Settings.IGNORE_URLS and cls.contains_url(tweet):
                tweet = re.sub(r"http[s]?\S+", "", tweet)
            if not Settings.IGNORE_RETWEETS and cls.is_retweet(tweet):
                tweet = re.sub('rt @?[a-zA-Z0-9_]+:?', '', tweet)
            if not Settings.IGNORE_USERNAME and cls.contains_username(tweet):
                tweet = re.sub('@[a-zA-Z0-9_]+:?', '', tweet)

            tweet = re.sub(r'\W+', ' ', tweet)  # Remove special characters
            tweet = re.sub('[\s+]', ' ', tweet).strip()  # Remove spaces and new lines

            full_tweet = tweet

            tweet = [word for word in tweet.split(" ") if cls.stopwords[bisect.bisect(cls.stopwords, word) - 1] != word]
            if Settings.STEMMING:
                tweet = [cls.stemmer.stem(word) for word in tweet]

            if not Settings.IGNORE_DUPLICATES or tweet not in result:
                unique_words_index.update(tweet)
                result.append(tweet)
                full_tweets.append(full_tweet)
                timestamps.append(timestamp)
        return result, full_tweets, timestamps, sorted(unique_words_index), [tweets[0][1], tweets[-1][1]]


Period.py

In [None]:
class Period:
    """Class that is used to represent a time period and to detect sub-events inside it"""

    file_name = ""

    def __init__(self, tweets, file_name, printing=True, write=True):
        self.tweets_edges = []
        self.adjacency_matrix = None
        tweets, self.full_tweets, self.tweets_timestamps, self.vocabulary, self.timestamps = Preprocessor.preprocess(
            tweets)

        self.tweets_number = len(tweets)
        self.generate_adjacency_matrix_dense(tweets)
        self.toPrint = ["Period starts: " + self.timestamps[0] + " Period ends: " + self.timestamps[1],
                        "Number of tweets: " + str(self.tweets_number),
                        "Vocabulary size: " + str(len(self.vocabulary))]
        self.write = write
        self.printing = printing
        Period.file_name = file_name

    def generate_adjacency_matrix_dense(self, tweets):
        """Method that is used to generate the adjacency matrix of the given tweets"""
        wordsNumber = len(self.vocabulary)
        adjacency_matrix = np.zeros((wordsNumber, wordsNumber))
        tweet_counter = -1
        for tweet in tweets:
            tweet = set(tweet)  # Duplicates
            indexes = [bisect.bisect_left(self.vocabulary, word) for word in tweet]
            counter = 0
            tweet_counter += 1
            self.tweets_edges.append([])
            for i in indexes:
                for j in indexes[counter:]:
                    if i == j:
                        continue
                    adjacency_matrix[i, j] += 1.0 / len(tweet)
                    adjacency_matrix[j, i] += 1.0 / len(tweet)
                    self.tweets_edges[tweet_counter].append(sorted([self.vocabulary[i], self.vocabulary[j]]))
                counter += 1
        self.adjacency_matrix = adjacency_matrix

    def get_edges_weight(self, edges_list, nodes_list):
        """Method that is used to extract the weight for each edge in the given list. The nodes_list parameter is a
        list that contains the nodes that are included in the given edges """
        nodes = {}
        for node in nodes_list:
            index = bisect.bisect(self.vocabulary, node) - 1
            if (0 <= index <= len(self.vocabulary)) and self.vocabulary[index] == node:
                nodes[node] = index

        weight_list = []
        for edge in edges_list:
            first_word, second_word = edge[0], edge[1]
            if all(word in nodes for word in (first_word, second_word)):
                indexes = [nodes[first_word], nodes[second_word]]
                indexes.sort()
                weight_list.append(self.adjacency_matrix[indexes[0], indexes[1]])
            else:
                weight_list.append(0)
        return weight_list

    @staticmethod
    def get_nonzero_edges(matrix):
        """Method that is used to extract from the adjacency matrix the edges with no-negative weights"""
        rows, columns, values = find(matrix)
        return [[rows[i], columns[i], float(values[i])] for i in range(len(rows))]

    def generate_vector(self):
        """Method that is used to generate a vector for the current period"""
        non_zero_edges = self.get_nonzero_edges(self.adjacency_matrix)
        vector = np.zeros((len(non_zero_edges), 1))
        vector_edges = []
        vector_nodes = set()
        weighted_edges = {}
        counter = 0
        for row, column, value in non_zero_edges:
            vector[counter] = value
            nodes = [self.vocabulary[row], self.vocabulary[column]]
            vector_edges.append(nodes)
            vector_nodes.update(nodes)
            weighted_edges[tuple(sorted(nodes))] = value
            counter += 1
        return vector, vector_nodes, vector_edges, weighted_edges

    def detect_event(self, previous_periods):
        """Method that is used to decide if the current period is an event by taking advantage of the Least Squares
        Optimization """
        if self.tweets_number == 0:
            return [[], False, "No tweets found in the current period."]
        period_score = -1
        vector, vector_nodes, vector_edges, weighted_edges = self.generate_vector()
        repr_tweets = self.submodular_tweets(weighted_edges, [], [], Settings.TWEETS_SUMMARY)
        if len(previous_periods) != 0:
            weights = np.zeros((len(vector_edges), len(previous_periods)))
            for i in range(len(previous_periods)):
                weights[:, i] = np.asarray(previous_periods[i].get_edges_weight(vector_edges, vector_nodes))

            period_score = Period.optimize(weights, vector)

        is_event = period_score >= Settings.EVENT_DETECTION_THRESHOLD[Period.file_name.split("/")[-1][:-4] + ".csv"]
        self.toPrint += [" "] + repr_tweets + [" "] + ["Result: " + str(is_event) + " " + str(period_score)]
        if self.printing:
            print Util.beauty_print(self.toPrint)

        if self.write:
            f = open(Period.file_name, 'a')
            f.write(Util.beauty_print(["Timestamp: " + ' '.join(self.timestamps)] + repr_tweets +
                                      ["-------------------------------------------"] +
                                      ["Result: " + str(is_event)] +
                                      ["Score: " + str(period_score)]))
            f.close()
        timestamps = self.timestamps
        del self.full_tweets
        del self.timestamps
        del self.toPrint
        del self.tweets_edges
        del self.tweets_number
        del self.tweets_timestamps
        return [timestamps, is_event, ". ".join(repr_tweets) + " || " + str(period_score)]

    @staticmethod
    def optimize(A, b):
        """Method that solves the Least Squares problem"""
        x = Variable(A.shape[1])
        objective = Minimize(norm(A * x - b))  # Minimize(norm(A * x - b) + lamp * norm(x, p=1))
        constraints = [0 <= x, sum(x) == 1]
        minimum = Problem(objective, constraints).solve()
        value = A.dot(x.value) - b
        value[value > 0] = 0
        minimum = np.linalg.norm(value)
        return minimum

    def submodular_tweets(self, weighted_edges, excluded_tweets, excluded_edges, tweets_number):
        """Method that is used to generate a summary for the current period"""
        if tweets_number == 0:
            return []
        tweets_ranking = [0] * self.tweets_number
        for tweet_counter in range(len(self.tweets_edges)):
            if tweet_counter in excluded_tweets:
                continue
            tweet_edges = self.tweets_edges[tweet_counter]
            tweet_value = 0
            for edge in tweet_edges:
                if tuple(edge) not in excluded_edges:
                    tweet_value += weighted_edges[tuple(edge)]
            tweets_ranking[tweet_counter] = tweet_value
        sorted_indexes = sorted(range(len(tweets_ranking)), key=lambda i: tweets_ranking[i], reverse=True)
        tweet_index = sorted_indexes[:Settings.TWEETS_SUMMARY][0]
        excluded_tweets.append(tweet_index)
        excluded_edges += [tuple(edge) for edge in self.tweets_edges[tweet_index]]
        return [self.full_tweets[tweet_index]] + self.submodular_tweets(weighted_edges, excluded_tweets, excluded_edges,
                                                                        tweets_number - 1)


SummaryWriter.py

In [None]:
class SummaryWriter:
    """A class that is used to print the summary of a event"""
    def __init__(self):
        pass

    @classmethod
    def write_summary(cls, file_name, sentences, indexes):
        """A method that given a list of sentences and their indexes will write to the specified file the summary. To
        accomplish that, cosine similarity is employed to remove sentences that look the same as previous sentences"""
        sentences = [sentence.split("||")[0] for sentence in sentences]
        summary = [sentences[0]]
        indexes_set = set(indexes)
        for i in range(1, len(sentences)):
            intersection = set(range(indexes[i] - Settings.SUMMARY_PERIODS, indexes[i])).intersection(indexes_set)
            similarities = [cls.cosine_similarity(sentences[i], sentences[indexes.index(val)]) for val in intersection]
            if (similarities and max(similarities) <= Settings.SUMMARY_SIMILARITY) or not similarities:
                summary.append(sentences[i])
        if file_name is not None:
            f = open(file_name, 'w')
            f.write(Util.beauty_print(summary))
            f.close()
        else:
            return summary

    @classmethod
    def cosine_similarity(cls, sentence1, sentence2):
        """A method that is used to calculate the cosine similarity between two sentences"""
        vec1 = Counter(re.compile(r'\w+').findall(sentence1))
        vec2 = Counter(re.compile(r'\w+').findall(sentence2))
        intersection = set(vec1.keys()) & set(vec2.keys())
        numerator = sum([vec1[x] * vec2[x] for x in intersection])

        denominator1 = math.sqrt(sum([vec1[x] ** 2 for x in vec1.keys()]))
        denominator2 = math.sqrt(sum([vec2[x] ** 2 for x in vec2.keys()]))

        return float(numerator) / (denominator1 * denominator2) if (denominator1 * denominator2) != 0 else 0


BurstEventDetector.py

In [None]:
class BurstEventDetector:
    """Event detector that determines if a period is a sub-event based on the number of tweets"""

    total_tweets = {}
    file_name = ""

    def __init__(self, tweets, file_name, printing=True, write=True):
        self.tweets, self.full_tweets, self.tweets_timestamps, self.vocabulary, self.timestamps = Preprocessor.preprocess(tweets)
        self.tweets_number = len(self.tweets)
        self.file_name = file_name
        self.write = write
        self.printing = printing
        BurstEventDetector.total_tweets[file_name] = BurstEventDetector.total_tweets.get(file_name, 0) + self.tweets_number
        BurstEventDetector.file_name = file_name

    def detect_event(self):
        """Method that is used to decide if the current period is a sub-event or not"""
        toPrint = ["Period starts: " + self.timestamps[0] + " Period ends: " + self.timestamps[1],
                   "Number of tweets: " + str(self.tweets_number),
                   "Vocabulary size: " + str(len(self.vocabulary))]

        is_event = self.contains_event()
        period_score = self.tweets_number
        toPrint += [" No summary during baseline"]
        toPrint += [" "] + ["Burst detector result: " + str(self.contains_event())]
        if self.printing:
            print Util.beauty_print(toPrint)
        if self.write:
            f = open(BurstEventDetector.file_name, 'a')
            f.write(Util.beauty_print(["Timestamp: " + ' '.join(self.timestamps)] + ["No summary during baseline"] +
                                      ["-------------------------------------------"] +
                                      ["Result: " + str(is_event)] +
                                      ["Score: " + str(period_score)]))
            f.close()
        return [self.timestamps, is_event, "No summary during baseline || " + str(self.tweets_number)]

    def contains_event(self):
        """Returns True if the number of tweets in this period is greater than the specified threshold"""
        return self.tweets_number >= Settings.BURST_DETECTION_THRESHOLD[self.file_name.split("/")[-1][:-3] + "csv"]


detection_process.py

In [None]:
def start(args):
    script_time = time.time()
    reader = Reader()
    file_name = splitext(basename(args.input))[0]
    results_file = join(args.output, file_name + ".txt")
    summary_file = join(args.output, file_name + "_summary.txt")
    open(results_file, 'w').close()
    open(summary_file, 'w').close()
    print "Starting the processing of file:", file_name
    iterations = 0
    periods_window = deque(maxlen=args.periods)
    summary, summary_index = [], []
    while True:
        iterations += 1
        start_time = time.time()
        tweets = reader.get_tweets(args.input)
        if tweets is None:
            break
        elif len(tweets) == 0:
            if args.v:
                print "Time period", iterations, "was skipped because there was 0 tweets published."
            continue
        if args.v:
            print "#" * 25, "Time period", iterations, "#" * 25

        if args.burst:
            period = BurstEventDetector(tweets, results_file, args.v)
            event = period.detect_event()
        else:
            period = Period(tweets, results_file, args.v)
            event = period.detect_event(periods_window)
        if event[1]:
            summary.append(event[2])
            summary_index.append(iterations)
        periods_window.append(period)
        if args.v:
            print "Time period #", iterations, ", Completion time:", time.time() - start_time
    SummaryWriter.write_summary(summary_file, summary, summary_index)

    print "Total time:", time.time() - script_time


Main

In [None]:
def parse_arguments():
    parser = ArgumentParser()
    parser.add_argument("input", help="path to the file containing the tweets that will be used as input", type=str)
    parser.add_argument("--output", help="path to the directory where the results would be stored", default="./output",
                        type=str)
    parser.add_argument("--threshold",
                        help="the threshold that will be used to determine if a period contains a sub-event",
                        default=0, type=float)
    parser.add_argument("--periods", help="number of previous periods that are considered (default 10)", default=10,
                        type=int)
    parser.add_argument("--duration", help="duration of the period set in minutes (default 1 min)", default=1,
                        type=float)

    parser.add_argument("--urls", help="do not remove urls during preprocessing", action="store_false")
    parser.add_argument("--retweets", help="do not remove retweets during preprocessing", action="store_false")
    parser.add_argument("--mentions", help="do not remove users' mentions during preprocessing", action="store_false")
    parser.add_argument("--stemming", help="do not perform stemming during preprocessing", action="store_false")
    parser.add_argument("--duplicates", help="do not remove duplicate tweets during preprocessing",
                        action="store_false")

    parser.add_argument("--burst", help="use the burst detector instead of the optimized method", action="store_true")
    parser.add_argument("--v", help="increase output verbosity", action="store_true")

    parser.add_argument("--summary", help="number of tweets that the summary will contains (default 2)", default=2,
                        type=int)
    parser.add_argument("--sp", help="number of previous periods where the reappearance of an sub-event is considered "
                                     "duplicate (default 3)", default=3, type=int)
    parser.add_argument("--c", help="cosine similarity threshold to compare the summaries (default 0.6)", default=0.6,
                        type=float)

    return parser.parse_args()


def argument_check():
    args = parse_arguments()
    if not isfile(args.input):
        stderr.write('The specified input file ' + str(args.input) + ' was not found.')
        raise SystemExit(1)
    elif not isdir(args.output):
        stderr.write('The specified parameter should be a directory name, not a file.')
        raise SystemExit(1)
    elif args.duration <= 0:
        stderr.write('The duration of the period should be a non-negative number.')
        raise SystemExit(1)
    elif args.periods <= 0:
        stderr.write('The number of periods should be a non-negative number.')
        raise SystemExit(1)
    elif args.summary <= 0:
        stderr.write('The number of tweets should be a non-negative number.')
        raise SystemExit(1)
    elif args.sp <= 0:
        stderr.write('The number of periods should be a non-negative number.')
        raise SystemExit(1)
    elif not (0 <= args.c <= 1):
        stderr.write('The cosine similarity threshold should belong in range [0, 1].')
        raise SystemExit(1)
    elif args.threshold < 0:
        stderr.write('The threshold should be a non-negative number.')
        raise SystemExit(1)
    return args


def set_parameters(args):
    Settings.TIME_WINDOW = args.duration * 60000
    Settings.IGNORE_URLS = args.urls
    Settings.IGNORE_RETWEETS = args.retweets
    Settings.IGNORE_DUPLICATES = args.duplicates
    Settings.IGNORE_USERNAME = args.mentions
    Settings.STEMMING = args.stemming
    Settings.TWEETS_SUMMARY = args.summary
    Settings.SUMMARY_SIMILARITY = args.c
    Settings.SUMMARY_PERIODS = args.sp
    Settings.PREVIOUS_PERIODS = args.periods

    if args.burst:
        Settings.BURST_DETECTION_THRESHOLD[splitext(args.input)] = args.threshold
    else:
        Settings.EVENT_DETECTION_THRESHOLD[basename(args.input)] = args.threshold

    with open(args.input, 'rb') as f:
        for row in reader(f):
            Settings.EVENT_TIMESTAMPS[basename(args.input)] = [int(row[0]), int(round(time() * 1000))]
            break


if __name__ == "__main__":
    args = argument_check()
    set_parameters(args)
    start(args)
