### Tracker

In [1]:
import pymongo
from datetime import datetime  
from datetime import timedelta  
import numpy as np
from collections import Counter
import math

class Tracker(object):
     
    def __init__(self, db_name, collection,
                     window_size = 1*60, # 1 hour, in minutes
                     update_for_first_k = 200, update_for_every_n = 10,
                     hashtag_boost = 1.5, decay_window = 1, 
                     decay_factor_T = 1.0/2500, keep_top_perc = 95,similarity_threshold=0.3, host='localhost'):
        
        self.db = pymongo.MongoClient(host=host)[db_name]
        self.collection = self.db[collection]
        self.clusters = []
        self.window_size = window_size
        self.update_for_first_k = update_for_first_k
        self.update_for_every_n = update_for_every_n
        self.hashtag_boost = hashtag_boost
        self.decay_window = decay_window
        self.decay_factor_T = decay_factor_T
        self.keep_top_perc = keep_top_perc
        self.similarity_threshold = similarity_threshold  
        self.discarded=[]
        
    
    def clusterize(self,use_decay=False):
        
        similarity_threshold = self.__similarity_threshold
        
        stream = self.stream_from_collection(self.__window_size)
        
        for window in stream:
            #print len(window)
            
            for i,tweet in enumerate(window):
                #print "Tweet ",  tweet['id']
                
                if not tweet['entities']: # se non ho entità, non posso lavorarci
                    ##"Discarding"
                    self.discarded.append(tweet)
                    continue

                if len(self.clusters) == 0:#E' il primo tweet, creo il primo cluster
                    #print "First cluster with tweet id "+tweet['rumor']+"-"+str(tweet['id'])
                    tweet['sim'] = 0.0
                    self.start_new_cluster(tweet)
                    continue
                
                tweet_vector = self.get_tweet_vector(tweet)
                
                sim_values=[]
                for index, cl in enumerate(self.__clusters):
                    
                    if not cl.isActive:
                        continue
                        
                    current_time = tweet['date']
                    k = timedelta(hours= self.decay_window)#1 )# entità considerate solo per k(decay window) hours
                    #print (tn - t0) , k
                    
                    if (current_time - cl.last_add_time) > k: 
                        # se non faccio aggiunte da molto tempo,
                        #considero chiuso il cluster. Lo ignoro
                        #print current_time, cl.last_add_time, (current_time - cl.last_add_time) , k
                        cl.isActive=False
                        #print "Deactivated ", cl.cluster_index, " " , current_time - cl.last_add_time
                        continue
                    
                    # genero centroide     
                    if use_decay:
                        
                        cluster_vector = cl.get_labeled_decayed_centroid(current_time)
                    else:
                        cluster_vector = cl.get_labeled_centroid()
                    
                    s = self.cosine_similarity(tweet_vector,cluster_vector)
                    
                    sim_values.append((index,s))
                #-----end loop
                
                # ho serie di coppie (index,sim value) , che ordino per similarità.        
                s = sorted(sim_values, key=lambda s: s[1],reverse=True) # True ordine decrescente.
                
                
                if len(s)> 0:  # len: how many active clusters (active == had a recent update)         
                    best_cluster = s[0][0] 
                    best_sim = s[0][1] 
                    tweet['sim'] = best_sim

                    if best_sim > similarity_threshold:
                        # aggiungi al migliore
                        self.add_to_cluster(best_cluster,tweet)
                    else:
                        # se il migliore è sotto la soglia, serve un nuovo cluster
                        index = self.start_new_cluster(tweet)
                else:
                    # no active clusters:
                    tweet['sim'] = 0.0
                    index = self.start_new_cluster(tweet)
                        
            #END WINDOW
            if len(window) > 0:
                self.clear_entities()

    def cosine_similarity(self,tweet,cluster):    
        #https://stackoverflow.com/a/18424953
        v1= []
        v2= []
        t = set(tweet.keys())
        c = set(cluster.keys())        
        all_keys= t.union(c)
        for i,k in enumerate(all_keys):
            if k in t:
                v1.append(tweet[k])
            else:
                v1.append(0.0)

            if k in c:
                v2.append(cluster[k])
            else:
                v2.append(0.0) 
        
        s = np.dot(v1, v2)/(np.linalg.norm(v1)*np.linalg.norm(v2))
        #v1 = v1/np.max(v1)
        #v2 = v2/np.max(v2)
        #s = np.dot(v1, v2)/(np.linalg.norm(v1)*np.linalg.norm(v2))
        return s
    
    def get_tweet_vector(self,tweet):
        tweet_vector={}
        #tweet_entities = [(x[0].lower(),x[1],x[2]) for x in tweet['entities']]
        tweet_entities = [x for x in tweet['entities']]
        c = Counter([x[0]for x in tweet_entities])
        
        for x in tweet_entities:
            e = x[0]
            etype = x[1]
            icf_of_entity = self.icf(e)
            count = c[e]
            weight = (1 + np.log2(count)) * icf_of_entity
            if etype =='#':
                 weight = weight * self.__hashtag_boost
            tweet_vector[e]= weight
        return tweet_vector
        #return tweet_vector/np.max(tweet_vector)
    
    
    def start_new_cluster(self,tweet):
        index = len(self.__clusters)
        c = Cluster(index, update_for_first_k = self.update_for_first_k, update_for_every_n = self.update_for_every_n, 
                 hashtag_boost= self.hashtag_boost, decay_window = self.decay_window,decay_factor_T = self.decay_factor_T)
        c.add_tweet(tweet)
        self.__clusters.append(c)
        return index
        
    def add_to_cluster(self,cluster_index,tweet):
        self.__clusters[cluster_index].add_tweet(tweet)       
        q = len(self.clusters[cluster_index].tweets)
        #update every n and only for first k new tweets
        if (q < self.__update_for_first_k ) and (q %self.__update_for_every_n  == 0 ):
            #print cluster_index,q
            self.update_cluster(cluster_index,tweet['date'])
        
    def resemblance(self,tweet,cluster):
        tweet_entities = set([x[0] for x in tweet['entities']])
        cluster_entities = cluster.get_entities()
        i = tweet_entities.intersection(cluster_entities)
        u = tweet_entities.union(cluster_entities)
        resemblance = float(len(i))/float(len(u))
        return resemblance
    
    def containment(self,tweet,cluster):
        tweet_entities = set([x[0] for x in tweet['entities']])
        cluster_entities = cluster.get_entities()
        i = tweet_entities.intersection(cluster_entities)
        containment = float(len(i))/float(len(tweet_entities))
        return containment
    
    def clear_entities(self):
        for cluster in self.__clusters:
            if cluster.isActive:
                cluster.clear_entities(self.__keep_top_perc)
    
    #inverse cluster frequency: similar to IDF, icf(e) = total # of clusters / # of clusters that contain e
    def icf(self,entity):
        count = 0
        total = float(len(self.__clusters))
        
        if total == 0:
            return 1
        
        for cluster in self.__clusters:
            has_entity = cluster.hasEntity(entity)
            if has_entity:
                count +=1
        
        if count == 0:
            return 1
        else:
            return 1 + np.log(total/(count))
    
    def update_cluster(self,cluster_index, current_time):
        #print cl.entities
        cl = self.__clusters[cluster_index]
        dict_e = {v['index']:e for (e,v) in cl.entities.items()}
        for tweet_index in xrange(cl.M.shape[1]):
            col= cl.M[:,tweet_index]
            #print col
            tweet = cl.tweets[tweet_index]
            #print tweet['ins_id']
            items = [e[0] for e in tweet['entities']]
            #print items
            counter = Counter(items)
            #print counter
            # con count fai la tf: con np.log2
            for i,row in enumerate(col):
                #print i
                e = dict_e[i]
                #print e
                if e in items: # oppure if row >0. Se non è presente nel tweet, row == 0
                    #print e
                    # OLD TF
                    old_tf = cl.M[i,tweet_index]

                    # WEIGHT entity,tweet
                    icf_of_entity = self.icf(e)
                    count = counter[e]
                    weight = (1 + np.log2(count)) * icf_of_entity
                    #if cl.get_entity_type(e) =='#':
                    if cl.entities[e]['type'] =='#':
                         weight = weight * self.__hashtag_boost
                    #sim
                    sim=self.resemblance(tweet,cl)

                    cl.M[i,tweet_index] = old_tf +  weight * sim
                    cl.entities[e]['last_update'] = current_time

            #print col
            #print cl.M[:,tweet_index]
            #print ""
            #print "--------------"
            # per fare il calcolo della similarity usi o rsemblance o containment
    
    
    def stream_from_collection(self, window_size=1*60, selection=None):

        start = None
        windows = []
        tweets_in_this_window = []
        
        if selection is None:
            selection = {} # li prende tutti
        else:
            selection = selection # es: {"rumor":"obama"}   
        match = {'$match': selection}
        sort = {'$sort': {'date': 1}}
        pipeline = [match, sort]
        for record in self.collection.aggregate(pipeline):#for record in self.collection.find().sort([("date", 1)]):
            if not start: 
                start = record["date"]
                end = start + timedelta(minutes = window_size )
                tweets_in_this_window = []
                #tweets_in_this_window.append(record)

            if record['date'] <= end:
                tweets_in_this_window.append(record)
            else:
                # questa window è conclusa. Aggiungo i tweet e svuoto lista
                windows.append(tweets_in_this_window)

                # sposto avanti la finestra    
                start = end
                end = end + timedelta(minutes = window_size )
                # crea nuova lista che conterrà i tweet per questa finestra
                tweets_in_this_window = []

                # continuo a creare liste vuote finchè non sono nell'intervallo in cui cade il record
                while record['date'] > end:
                    # questa window è conclusa. Aggiungo i tweet e svuoto lista
                    windows.append(tweets_in_this_window)

                    # sposto avanti la finestra    
                    start = end
                    end = end  + timedelta(minutes = window_size )
                    # crea nuova lista
                    tweets_in_this_window = []

                tweets_in_this_window.append(record)
            
        #chiudi l'utima window
        windows.append(tweets_in_this_window)
        return iter(windows)#return windows
    
    def get_labels(self, cluster_size_gte= 100):
        #gt: ground truth rumor di appartenenza
        gt_labels=[]
        pred_labels=[]
        gt_relev_labels = []

        for i, cl in enumerate(self.clusters):
            if len(cl.tweets)<cluster_size_gte :
                continue

            for t in cl.tweets:
                gt_labels.append(t['rumor'])
                pred_labels.append(i)
                if t['label'] in ['11','12','13','14']:
                     gt_relev_labels.append(1)
                else:
                    gt_relev_labels.append(int(t['label']))
        # es: t in cluster 0: (0, obama). Con purity cerco di capire se la mggioranza di label nel cluster 0 è obama o altro
        d = {}
        for i,e in enumerate(set(gt_labels)):
            d[e]=i
        gt_labels_numeric = [d[k] for k in gt_labels ]
        
        return gt_labels, pred_labels, gt_relev_labels, gt_labels_numeric
    
    def coverage(self,cluster_size_threshold = 100):
        
        """
        Coverage signifies what proportion of all tweets found
        their way into our selectioned clusters which have more than 100
        tweets.
        """
        x = sum([len(cl.tweets) for cl in self.clusters if len(cl.tweets) >= cluster_size_threshold])
        #print x
        tot = sum([len(cl.tweets) for cl in self.clusters])
        #print tot
        try:
            return float(x)/tot
        except:
            print "No cluster over threshold"
            return 0.0   
        
    #---------------GETTERS AND SETTERS
    
    @property
    def similarity_threshold(self):
        return self.__similarity_threshold

    @similarity_threshold.setter
    def similarity_threshold(self, similarity_threshold):
        self.__similarity_threshold = similarity_threshold
    
    
    @property
    def clusters(self):
        return self.__clusters

    @clusters.setter
    def clusters(self, clusters):
        self.__clusters = clusters
        
    @property
    def window_size(self):
        return self.__window_size

    @window_size.setter
    def window_size(self, window_size):
        self.__window_size = window_size
        
    @property
    def update_for_first_k(self):
        return self.__update_for_first_k

    @update_for_first_k.setter
    def update_for_first_k(self, update_for_first_k):
        self.__update_for_first_k= update_for_first_k
    
    @property
    def update_for_every_n(self):
        return self.__update_for_every_n

    @update_for_every_n.setter
    def update_for_every_n(self,update_for_every_n):
        self.__update_for_every_n= update_for_every_n
        
    @property
    def hashtag_boost(self):
        return self.__hashtag_boost

    @hashtag_boost.setter
    def hashtag_boost(self, hashtag_boost):
        self.__hashtag_boost = hashtag_boost
        
    @property
    def decay_window(self):
        return self.__decay_window

    @decay_window.setter
    def decay_window(self, decay_window):
        self.__decay_window = decay_window    
    
    @property
    def decay_factor_T(self):
        return self.__decay_factor_T

    @decay_factor_T.setter
    def decay_factor_T(self, decay_factor_T):
        self.__decay_factor_T = decay_factor_T   
        
    @property
    def keep_top_perc(self):
        return self.__keep_top_perc

    @keep_top_perc.setter
    def keep_top_perc(self, keep_top_perc):
        self.__keep_top_perc = keep_top_perc
    
    @property
    def discarded(self):
        return self.__discarded

    @discarded.setter
    def discarded(self, discarded):
        self.__discarded= discarded    
        

## Cluster

In [379]:
from collections import Counter
from collections import OrderedDict

import numpy as np

class Cluster(object):
     
    def __init__(self,cluster_index,update_for_first_k = 200, update_for_every_n = 10, 
                 hashtag_boost= 1.5,decay_window = 1,decay_factor_T = 1.0/2500):
        
        self.update_for_first_k = update_for_first_k
        self.update_for_every_n = update_for_every_n
        self.hashtag_boost= hashtag_boost
        self.frequencies= Counter()
        self.entities = OrderedDict() # {}
        self.tweets = []
        self.M = np.zeros((len(self.tweets), len(self.entities)))
        self.cluster_index = cluster_index
        self.decay_window = decay_window
        self.decay_factor_T = decay_factor_T
        self.last_add_time = None
        self.isActive = True
        
    def add_tweet(self,tweet):
        frequencies = self.frequencies
        entities = self.entities
        tweets= self.tweets
        M = self.M
        self.__last_add_time=tweet['date']
        
        tweets.append(tweet)
        # Aggiungi colonna per il nuovo tweet i
        new_col_vector= [np.zeros(M.shape[0])] # lunghezza della nuova colonna = numero attuale di righe in matrice
        new_col_position = M.shape[1] # posizione = corrisponde al numero di colonne ( visto che index parte da zero)
        M = np.insert(M,new_col_position,new_col_vector,axis=1)
       
        j = new_col_position # == Met.shape[1]-1 # ultimo tweet aggiunto 
        
        # Per ogni nuova entità nel tweet appena aggiunto
        new_entities = {e[0]:e[1] for e in tweet['entities'] if e[0] not in entities} 
        
        for e, e_type in new_entities.items():
            # Aggiungi a entities
            #print e
            index = len(entities.keys())
            entities[e] = {'index':index,'type': e_type, 'last_update':tweet['date'] } 
            
            # Aggiungi una nuova riga per l'entità 
            new_row = [np.zeros(M.shape[1])] # nuova riga lunga come: il numero di colonne
            M = np.append(M,new_row,axis=0)
        
        #riempo la colonna j del nuovo tweet con le frequenze    
        c = Counter([e[0] for e in tweet['entities']])
        for e in entities:
            index = self.get_index(e) #entities[e]['index']
            #print "Set in M ", e, index, j
            M[index][j] = c[e]
        
        self.__M = M
        #print self.__M
    
    def get_decayed_centroid(self,current_time):
        
        decayed_M = np.zeros(self.M.shape)
        T= self.decay_factor_T
        tn = current_time
        for e,v in self.entities.items():
            t0 = v['last_update']
            k = timedelta(hours= self.decay_window)#1 )# entità considerate solo per k(decay window) hours
            delta= tn-t0
            delta_t = delta.total_seconds()
            #print "time ",delta,delta_t
            damp = np.exp(-(delta_t)*T)
            i = v['index']
            new_row = self.__M[i] * damp
            #print "orig ",self.M[i]
            #print "decayed ",new_row
            decayed_M[i] = new_row
        
        #print "decayedM",decayed_M
        norm_M= np.where(np.max(decayed_M, axis=0)==0, decayed_M, 0.5 + 0.5*decayed_M/np.max(decayed_M, axis=0)) 
        #print "NormM" , norm_M
        
        ## BOOST PER GLI HASHTAG
        # boosted tf entitity = boost x normalized tf
        hashtag_indexes = [ v['index'] for e,v in self.__entities.items() if v['type'] == '#']
        #print hashtag_indexes
        # boost a ogni riga in lista: moltiplicando per self.hashtag_boost
        for i in hashtag_indexes:
            #print norm_M[i]
            norm_M[i] = norm_M[i] * self.__hashtag_boost # * 1.5
        
        norm_centroid = [np.mean(r) for r in norm_M]
        return norm_centroid
    
    def get_centroid(self):
        #METODO 1: normalizzo le frequenze di ogni doc/vettore dividendo per il max nella colonna.
        # poi faccio la media
        #https://stackoverflow.com/questions/21870727/python-divide-values-in-cell-by-max-in-each-column 
        A = self.__M 
        #np.max(a, axis=0) # max of each column
        # controllo se max == 0 : avviene quando ho eliminato tutte le entità del tweet
        norm_M= np.where(np.max(A, axis=0)==0, A, 0.5 + 0.5*A/np.max(A, axis=0)) 
        #print norm_M
        
        ## BOOST PER GLI HASHTAG
        # boosted tf entitity = boost x normalized tf
        hashtag_indexes = [ v['index'] for e,v in self.__entities.items() if v['type'] == '#']
        #print hashtag_indexes
        # boost a ogni riga in lista: moltiplicando per self.hashtag_boost
        for i in hashtag_indexes:
            #print norm_M[i]
            norm_M[i] = norm_M[i] * self.__hashtag_boost # * 1.5
        
        norm_centroid = [np.mean(r) for r in norm_M]
        return norm_centroid
    
    def get_labeled_centroid(self):       
        
        norm_centroid = self.get_centroid()
        c = {}
        for e in self.__entities:
            index = self.__entities[e]['index']
            c[e] = norm_centroid[index]
        return c
    
    def get_labeled_decayed_centroid(self,current_time):       

        norm_centroid = self.get_decayed_centroid(current_time)
        #print norm_centroid
        c = {}
        for e in self.__entities:
            index = self.__entities[e]['index']
            c[e] = norm_centroid[index]
        return c
        
    def get_index(self,entity):
        try:
            index = self.__entities[entity]['index']
            return index
        except:
            return None
        
    def get_entity_type(self,entity):
        etype = c.entities[entity]['type']
        return etype
    
    def hasEntity(self,entity):
        if entity in self.__entities: #d = {'a': 1, 'b': 2} 'a' in d True
            return True
        else:
            return False
          
    def get_entities(self):
        return self.__entities.keys()
    
    def clear_entities(self,perc=95):
    
        b = [ sum(r) for r in self.__M ]
        #print b
        try:
            p = np.percentile(b,perc)
            #print p
            to_del = np.where(b<p)[0]
            to_del = sorted(to_del,reverse=True)
            dict_e = {v['index']:e for (e,v) in self.__entities.items()}
            realign = dict_e.values()
            #print self.__M.shape
            for k in to_del:
                #recupera stringa
                e = dict_e[k]
                #print k,e

                #rimuovi da entities
                del self.__entities[e]

                # rimuovi dalla M
                self.__M =np.delete(self.__M,[k],0)

                # rimuovi da array di supporto realign.
                realign.remove(e)

            # alla fine del ciclo, in realign trovi le entità col nuovo ordine
            # vai a sistemare index per ogni entity
            for new_index,e in enumerate(realign):
                self.__entities[e]['index']= new_index    
        
            #print self.__M.shape
            
        except Exception as e: 
            print b
            print(e)
            
            
#-----------------------GETTERS, SETTERS----------------------------------      

    @property
    def cluster_index(self):
        return self.__cluster_index

    @cluster_index.setter
    def cluster_index(self, cluster_index):
        self.__cluster_index = cluster_index
    
    @property
    def entities(self):
        return self.__entities

    @entities.setter
    def entities(self, entities):
        self.__entities = entities
        
    @property
    def M(self):
        return self.__M

    @M.setter
    def M(self, M):
        self.__M = M
        
    @property
    def frequencies(self):
        return self.__frequencies

    @frequencies.setter
    def frequencies(self, frequencies):
        self.__frequencies = frequencies
        
    @property
    def tweets(self):
        return self.__tweets

    @tweets.setter
    def tweets(self, tweets):
        self.__tweets= tweets  
   
    @property
    def update_for_first_k(self):
        return self.__update_for_first_k

    @update_for_first_k.setter
    def update_for_first_k(self, update_for_first_k):
        self.__update_for_first_k= update_for_first_k
    
    @property
    def update_for_every_n(self):
        return self.__update_for_every_n

    @update_for_every_n.setter
    def update_for_every_n(self, update_for_every_n):
        self.__update_for_every_n= update_for_every_n
        
    @property
    def hashtag_boost(self):
        return self.__hashtag_boost

    @hashtag_boost.setter
    def hashtag_boost(self, hashtag_boost):
        self.__hashtag_boost = hashtag_boost
        
    @property
    def decay_window(self):
        return self.__decay_window

    @decay_window.setter
    def decay_window(self, decay_window):
        self.__decay_window = decay_window    
    
    @property
    def decay_factor_T(self):
        return self.__decay_factor_T

    @decay_factor_T.setter
    def decay_factor_T(self, decay_factor_T):
        self.__decay_factor_T = decay_factor_T   
        
    @property
    def last_add_time(self):
        return self.__last_add_time

    @last_add_time.setter
    def last_add_time(self, last_add_time):
        self.__last_add_time = last_add_time 
        
    @property
    def isActive(self):
        return self.__isActive

    @isActive.setter
    def isActive(self, isActive):
        self.__isActive = isActive

In [368]:
jd = Tracker('inforet','rumors_entity_filtered', similarity_threshold=0.4, keep_top_perc=50,decay_window= 24)

In [369]:
print jd.window_size
print jd.update_for_first_k
print jd.update_for_every_n
print jd.hashtag_boost
print jd.decay_window
print jd.decay_factor_T
print jd.keep_top_perc
#print jd.icfs
print jd.clusters

3600
200
10
1.5
24
0.0004
50
[]


In [None]:
import datetime

a = datetime.datetime.now()
print "start ", a

jd.clusterize(use_decay=True)
b = datetime.datetime.now()

print "end ", b
print "delta", b-a

In [225]:
print len(jd.clusters)
print len(jd.discarded)

1188
0


In [226]:
cl = jd.clusters[2]
cl.M.shape

(16L, 1L)