This notebook contains code that performs a dynamic sentiment analysis based on emojis.

Emojis are extracted from tweets, and the tweet text is used to predict the emoji.

In [None]:
import glob
import json
import string

def get_full_text(tweet):
    if 'extended_tweet' in tweet:
        return tweet['extended_tweet']['full_text']
    else:
        return tweet['text']
    
def clean_up_text(text):
    text = text.replace('\n','').replace('\r','').replace('\t','')
    while '  ' in text:
        text = text.replace('  ',' ')
    return text.strip()
        
def get_orig_text(tweet):
    if 'retweeted_status' in tweet:
        return get_orig_text(tweet['retweeted_status'])
    else:
        return clean_up_text(get_full_text(tweet))
    
def get_raw_tweets(foldername,stop):
    directory=[f for f in glob.iglob('Downloads/'+foldername+'/*')]
    raw_tweets = []
    i = 0
    msg = True
    for filepath in directory:
        file = open(filepath, 'r')
        for line in file:
            
            tweet = json.loads(line)
            
            text = get_orig_text(tweet)
            
            if ('http://' not in text) and ('https://' not in text):
                text = text.translate(str.maketrans('', '', string.punctuation)).lower()
                
                raw_tweets.append(text)
                msg = True
                
                if len(raw_tweets) >= stop:
                    file.close()
                    return list(set(raw_tweets))
                
                if (stop<float('inf')) and msg:
                    if (len(raw_tweets) > 0) and (len(raw_tweets) % (stop//20) == 0):
                        print(str(len(raw_tweets))+' of '+str(stop)+' tweets read')
                    msg = False
                
        file.close()
        
        if stop==float('inf'):
            if (i > 0) and (i % (len(directory)//20) == 0):
                print(str(i)+' of '+str(len(directory))+' files read ('+
                      str(len(raw_tweets))+' total tweets)')
            
        i += 1
        
    return list(set(raw_tweets))

raw_tweets = get_raw_tweets('BackLAOut',float('inf'))
raw_tweets += get_raw_tweets('BackNYOut',float('inf'))

In [None]:
joined_tweets = ' '.join(raw_tweets)

len(raw_tweets), raw_tweets[:10]

In [None]:
from collections import Counter, OrderedDict

def get_counts(i):
    d = Counter()
    for j in i:
        d[j] += 1
    return d

raw_char_counts = get_counts(joined_tweets)
raw_word_counts = get_counts(joined_tweets.split(' '))
    
def display_top_10_counts(d):
    odct = OrderedDict(sorted(d.items(), key=lambda x: x[1], reverse=True))
    for i, k in enumerate(list(odct)):
        if i > 9:
            del odct[k]
    return odct

display_top_10_counts(raw_char_counts), display_top_10_counts(raw_word_counts)

In [None]:
emoji_sentiment = {'☹': -1, '☺':  1, '♥':  1, '❣':  1, '❤':  1, '🐱':  1, '🐵':  0, 
                   '👅':  1, '👺': -1, '👿': -1, '💑':  1, '💓':  1, '💔': -1, '💕':  1,
                   '💖':  1, '💗':  1, '💘':  1, '💙': -1, '💚':  1, '💛':  1, '💜':  1,
                   '💝':  1, '💞':  1, '💟':  1, '🖤': -1, '😀':  1, '😁':  1, '😂':  1,
                   '😃':  1, '😄':  1, '😅':  1, '😆':  1, '😇':  1, '😈':  1, '😉':  1,
                   '😊':  1, '😋':  1, '😌':  1, '😍':  1, '😎':  1, '😏':  1, '😐':  0,
                   '😑': -1, '😒': -1, '😓': -1, '😔': -1, '😕': -1, '😖': -1, '😗':  1,
                   '😘':  1, '😙':  1, '😚':  1, '😛':  1, '😜':  1, '😝':  1, '😞': -1,
                   '😟': -1, '😠': -1, '😡': -1, '😢': -1, '😣': -1, '😤': -1, '😥': -1,
                   '😦': -1, '😧': -1, '😨': -1, '😩': -1, '😪':  0, '😫': -1, '😬':  0,
                   '😭': -1, '😮':  0, '😯':  0, '😰': -1, '😱': -1, '😲': -1, '😳': -1,
                   '😴':  0, '😵': -1, '😶':  0, '😷':  0, '😸':  1, '😹':  0, '😺':  1,
                   '😻':  1, '😼':  1, '😽':  1, '😾': -1, '😿': -1, '🙀':  0, '🙁': -1,
                   '🙂':  1, '🙃':  1, '🙄': -1, '🙈':  0, '🙉':  0, '🙊':  0, '🤍':  1,
                   '🤎':  1, '🤐':  0, '🤑':  1, '🤒': -1, '🤓':  1, '🤔':  0, '🤕': -1,
                   '🤖':  0, '🤗':  1, '🤛':  1, '🤛🏻':  1, '🤛🏼':  1, '🤛🏽':  1, '🤛🏾':  1,
                   '🤛🏿':  1, '🤜':  1, '🤜🏻':  1, '🤜🏼':  1, '🤜🏽':  1, '🤜🏾':  1, '🤜🏿':  1,
                   '🤠':  1, '🤡':  1, '🤢': -1, '🤣':  0, '🤤':  1, '🤥': -1, '🤦': -1,
                   '🤦🏻': -1, '🤦🏼': -1, '🤦🏽': -1, '🤦🏾': -1, '🤦🏿': -1, '🤧':  0, '🤨':  0,
                   '🤪':  1, '🤫':  0, '🤬': -1, '🤭':  1, '🤮': -1, '🤯':  0, '🥰':  1,
                   '🥱':  0, '🥳':  1, '🥴':  1, '🥵': -1, '🥶': -1, '🥺': -1, '🧐':  0,
                   '🧡':  1}
def clean_tweets(X):
    cleaned_tweets = []
    i = 0
    for i, origtweet in enumerate(X):
        i += 1
        if i % (len(X)//10) == 0:
            print(str(10*i/(len(X)//10))+'% read')

        emojis = []
        tweet = ''
        no_emojis = True
        for e in emoji_sentiment:
            if e in origtweet:
                no_emojis = False
                for char in origtweet:
                    if char == e:
                        tweet += ' '+char+' ' # this is so we can parse out the emoji

                        emojis.append(e)
                    else:
                        tweet += char
        if no_emojis:
            continue

        tweet_cleaned = []
        for word in clean_up_text(tweet).split(' '):
            if word[0] != '@':
                tweet_cleaned.append(word)
            else:
                tweet_cleaned.append('')
        tweet_cleaned = ' '.join(tweet_cleaned)

        cleaned_tweets.append(tweet_cleaned)
        
    return list(set(cleaned_tweets))

cleaned_tweets = clean_tweets(raw_tweets)

In [None]:
batch_size = 64

def split_tr_val_te(X):
    percent_tr = 0.5
    percent_val = 0.25

    tr_size = batch_size*int(percent_tr*len(X)/batch_size)
    val_size = int((len(X)-tr_size)*(percent_val/(1-percent_tr)))
    te_size = len(X) - tr_size - val_size

    X_val = X[:val_size]
    X_tr = X[val_size:-te_size]
    X_te = X[-te_size:]
#     assert((len(X_tr)==len(y_tr)) and
#            (len(X_val)==len(y_val)) and
#            (len(X_te)==len(y_te)))
    print(len(X_tr)/len(X),len(X_val)/len(X),len(X_te)/len(X),len(X_tr)/batch_size)
    return X_tr, X_val, X_te

cleaned_tweets_tr,cleaned_tweets_val,cleaned_tweets_te = split_tr_val_te(cleaned_tweets)

In [None]:
def get__X_and__y(cleaned_tweets):
    _X = []
    _y = []
    not_emojis = 0
    for i, tweet in enumerate(cleaned_tweets):
        sentiments = []
        not_emoji = False
        for e in emoji_sentiment:
            if 'not '+e in tweet:
                not_emoji = True
        if not_emoji:
            not_emojis += 1
            continue
        emojis = []
        for word in tweet.split(' '):
            for e in emoji_sentiment:
                if word == e:
                    sentiments.append(emoji_sentiment[e])
                    emojis.append(e)
        # only include tweets that have exactly one of the selected emojis
        if len(set(sentiments)) == 1:
            for e in emojis:
                tweet = tweet.replace(e,'')
            _X.append(clean_up_text(tweet))
            _y.append(sentiments[0])
    return _X, _y
_X_tr_strs, _y_tr = get__X_and__y(cleaned_tweets_tr)
_X_val_strs, _y_val = get__X_and__y(cleaned_tweets_val)
_X_te_strs, _y_te = get__X_and__y(cleaned_tweets_te)

In [None]:
def split_tweets_into_words(_X):
    return [[word for word in tweet.split(' ')] for tweet in _X]
_X_tr = split_tweets_into_words(_X_tr_strs)
_X_val = split_tweets_into_words(_X_val_strs)
_X_te = split_tweets_into_words(_X_te_strs)

In [None]:
import numpy as np

# def get_most_frequent_keys(d,n):
#     a = np.array(list(d.values()))
#     thresh = min(a[np.argpartition(a,-n)][-n:])
#     inds = a > 0.9 * thresh
#     d = dict(np.array(list(d.items()))[inds])
#     for k in d:
#         d[k] = int(d[k])
#     odct = OrderedDict(sorted(d.items(),key=lambda x:x[1], reverse=True))
#     return odct

emoji_counts = {-1:0,0:0,1:0}

for i in _y_tr:
    emoji_counts[i] += 1
emoji_counts

In [None]:
# get word counts and vocabulary size
runonsentence = ' '.join([' '.join(tweet) for tweet in _X_tr]+
                         [' '.join(tweet) for tweet in _X_val]+
                         [' '.join(tweet) for tweet in _X_te]).split(' ')
words = set(runonsentence)

def get_counts_adv(_X):
    d = dict(zip(words,[0]*len(words)))
    for x in _X:
        for w in x:
            d[w] += 1
    return d

wc_tr = get_counts_adv(_X_tr)
wc_val = get_counts_adv(_X_val)
wc_te = get_counts_adv(_X_te)

len(runonsentence)

In [None]:
zeros = []
nonzeros = []
for w in words:
    if wc_tr[w]*wc_val[w]*wc_te[w]==0:
        zeros.append(w)
    else:
        nonzeros.append(w)
len(zeros),len(nonzeros)

In [None]:
runonsentence = np.array(runonsentence,dtype='str')

for i,w in enumerate(zeros):
    if i % 10000 == 0: print(i)
    runonsentence[runonsentence == w] = ''

In [None]:
word_counts = get_counts(runonsentence)
    
assert(min(list(word_counts.values())) > 1)

word_counts = OrderedDict(sorted(word_counts.items()), key=lambda x: x[0], reverse=True)

vocabulary_size = len(word_counts)
vocabulary_size

In [None]:
T=np.array([(0,len(x)) for x in _X_tr]+
           [(1,len(x)) for x in _X_val]+
           [(2,len(x)) for x in _X_te])
T=np.vstack([T.T,np.arange(len(T))]).T
tr=len(_X_tr)
val=len(_X_val)
T[:,2][tr:] -= tr
T[:,2][tr+val:] -= val
_X_tr[0:2],_X_val[0:2],_X_te[0:2],T[0:2],T[tr:tr+2],T[tr+val:tr+val+2]

In [None]:
i = 0
for t in T:
    for j in range(t[1]):
        if t[0] == 0:
            _X_tr[t[2]][j] = runonsentence[i]
        elif t[0] == 1:
            _X_val[t[2]][j] = runonsentence[i]
        else:
            _X_te[t[2]][j] = runonsentence[i]
        i += 1
_X_tr[0:2],_X_val[0:2],_X_te[0:2]

In [None]:
from keras.preprocessing import sequence
from keras.utils import to_categorical

def convert_to_id(_X):
    return [[word2id[word] for word in tweet] for tweet in _X]

def convert_to_one_hot(_y):
    #return enc.transform(np.array(_y).reshape(-1,1)).toarray()
    return to_categorical(_y, num_classes=3)

word2id = {}
for i,word in enumerate(word_counts):
    word2id[word] = i

max_words = 140
X_tr= sequence.pad_sequences(convert_to_id(_X_tr), maxlen=max_words)
X_val = sequence.pad_sequences(convert_to_id(_X_val), maxlen=max_words)
X_te = sequence.pad_sequences(convert_to_id(_X_te), maxlen=max_words)

y_tr = convert_to_one_hot(_y_tr)
y_val = convert_to_one_hot(_y_val)
y_te = convert_to_one_hot(_y_te)

X_tr[:10], y_tr[:10]

This concludes the data grooming. Run the analysis!

In [None]:
from sklearn.utils import class_weight

class_weights = class_weight.compute_class_weight('balanced', classes=np.arange(3)-1, y=_y_tr)

_y_tr[:10], class_weights

In [None]:
from keras import Sequential
from keras.layers import Embedding, LSTM, Dense, Dropout

embedding_size=32
model=Sequential()
model.add(Embedding(vocabulary_size, embedding_size, input_length=max_words))

def calc_n_neurons():
    alpha = np.log2(len(X_tr))
    Ni = model.layers[-1].input.shape[-1]
    No = model.layers[-1].output.shape[-1]
    n_neurons = max(int(len(X_tr) / (2 * alpha * (Ni + No))), 1)
    print(alpha,Ni,No,n_neurons)
    return n_neurons

model.add(LSTM(calc_n_neurons(), return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(calc_n_neurons()))
model.add(Dropout(0.2))
model.add(Dense(3, activation='softmax'))

model.summary()

In [None]:
model.compile(loss='categorical_crossentropy', 
              optimizer='adam', 
              metrics=['accuracy'])

In [None]:
from keras.callbacks import ModelCheckpoint
filepath = 'temp.hdf5'
checkpoint = ModelCheckpoint(filepath, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
num_epochs = 3
model.fit(X_tr, y_tr, validation_data=(X_val, y_val), batch_size=batch_size,
          epochs=num_epochs, class_weight=class_weights, callbacks=[checkpoint])

In [None]:
model.load_weights(filepath)
yhat_te = np.array([model.predict(np.array(x)[np.newaxis]) for x in X_te])[:,0]
y_te - yhat_te