In [None]:
import pandas as pd
import csv
import numpy as np
import tensorflow as tf # require historical Tensorflow GPU version before 2.0 and Cuda packages.
from datetime import datetime
import pickle
import jieba
import unicodedata
import string

In [None]:
def cut(text_str):
    # stopwords
    stopword_dir = "Stopwords_Chinese.txt"
    sw_list = []
    f = open(stopword_dir, "r", encoding='utf-8-sig').read().splitlines()
    for word in f:
        sw_list.append(word)
    text = unicodedata.normalize('NFKC', text_str)
    seg_list = jieba.cut(text, cut_all = False, HMM = True)
    seg_list = [word.lower() for word in seg_list if word not in sw_list # remove stopwords
                and word not in string.punctuation # remove punctuation
                and not word.isnumeric() # remove digits
                and word not in ['\ue5e5',' ']] 
    text1 = " ".join(seg_list)
    return text1

In [None]:
# deep_learning_classifier
from keras.models import load_model
import tensorflow as tf
from keras.models import Model
from keras import layers
from keras import Input
from keras.layers import LSTM
from keras import optimizers
from keras import preprocessing
from keras.preprocessing.text import Tokenizer

# Set data directory
save_wordindex = 'trained models/model1/binary_rnn/tokenizer.pickle'
save_final_model1 = 'trained models/model1/binary_rnn/save_final_model.h5'
save_final_model2 = 'trained models/model2/binary_rnn/save_final_model.h5'

# Load model
import os
import tensorflow as tf
# tf.logging.set_verbosity(tf.logging.ERROR) # To ignore keep_dims warning
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
os.environ["CUDA_VISIBLE_DEVICES"] = "0" # Choose default GPU for computing
model1 = load_model(save_final_model1)
model2 = load_model(save_final_model2)

# loading tokenizer
with open(save_wordindex, 'rb') as handle:
    tokenizer = pickle.load(handle)
def deep_learning_classifier(content_string):
    # Set parameters
    maxlen = 50
    # text
    encoded_text = tokenizer.texts_to_sequences([content_string])
    x_predict = np.asarray(preprocessing.sequence.pad_sequences(encoded_text, maxlen = maxlen))
    label1_array = model1.predict(x_predict)
    label2_array = model2.predict(x_predict)
    a = label1_array[0]
    b = label2_array[0]
    # output binary result
    label1_array = np.array([0,0])
    label2_array = np.array([0,0])
    label1_array[np.argmax(a)]=1
    label2_array[np.argmax(b)]=1
    return label1_array, label2_array

In [None]:
# dictionary_sentiment_classifier

# Choose dictionary to create sentiment lists
dictionaries = ['NTUSD', 'Tsinghua_lijun']
sentiment = ['positive.txt','negative.txt']

# Positive
pos_list = []
for d in dictionaries:
    inputdir = 'sentiment dictionary/' + d + '/positive.txt'
    with open(inputdir, 'r', encoding = 'utf-8-sig') as f:
        print('Loading positive dictionary: %s' %d)
        for word in f:
            if word.strip() not in pos_list:
                pos_list.append(word.strip())
# Negative
neg_list = []
for d in dictionaries:
    inputdir = 'sentiment dictionary/' + d + '/negative.txt'
    with open(inputdir, 'r', encoding = 'utf-8-sig') as f:
        print('Loading negative dictionary: %s' %d)
        for word in f:
            if word.strip() not in pos_list:
                neg_list.append(word.strip())
print('Completed!')

# Create classifier function
def dict_classifier(content_string):
    text_data = content_string.split()
    pos_words = [x for x in text_data if x in pos_list]
    neg_words = [x for x in text_data if x in neg_list]
    label1_array = np.array([0,0])
    label2_array = np.array([0,0])
    if pos_words > neg_words:
        label2_array = np.array([0,1])
    if pos_words < neg_words:
        label2_array = np.array([1,0])
    return label1_array,label2_array

In [None]:
# Combined classifier
from keras import preprocessing

# Set weights of classifiers
dl = 1
svm = 1
dic = 1
dl_label1_acc = 1
dl_label2_acc = 1
svm_label1_acc = 1
svm_label2_acc = 1
dic_label2_acc = 1

def combined_classifier(topic_content_list,topic_list):
    label1 = []
    label2 = []
    i = 1
    for num in range(0,len(topic_content_list)):
        label1_array = 0
        label2_array = 0
        # RNN classifier result
        result1 = deep_learning_classifier(topic_content_list[num])
        label1_array = dl*dl_label1_acc * result1[0]
        label2_array = dl*dl_label2_acc * result1[1]
        # SVM classifier result
        result2 = svm_classifier(topic_list[num])
        label1_array += svm*svm_label1_acc * result2[0]
        label2_array += svm*svm_label2_acc * result2[1]
        # Dictionary classifier result
        result1 = dict_classifier(topic_content_list[num])
        label2_array += dic*dic_label2_acc * result1[1]
        # predict
        label1.append(np.argmax(label1_array))
        label2.append(np.argmax(label2_array))
        if i % 200 == 0:
            print(i,'out of ', len(topic_content_list))
        i += 1
    return label1,label2

In [None]:
# Test accuracy in training set
from sklearn.metrics import accuracy_score
import jieba
import unicodedata
import string

def test_data(seed,test_size):
    
    test_dir = "data/news corpus/subset_2650.csv"
    tdf = pd.read_csv(test_dir, sep=',', quotechar='"', encoding="utf8").sample(frac=1, random_state = seed)

    test_sublist_1 = []
    test_sublist_2 = []
    test_sublabels_1 = []
    test_sublabels_2 = []

    for index, row in tdf.iterrows():
        title = row['title']
        content = row['content']

        if index >= len(tdf) * (1 - test_size):

            # binary label
            old_label1 = tdf["topic_country"].iloc[index]
            old_label2 = tdf["sentiment"].iloc[index]

            if old_label1 == 4:
                new_label1 = 0
            else:
                new_label1 = 1
            if old_label2 == 1 or old_label2 == 2:
                new_label2 = 1
            else:
                new_label2 = 0

            test_sublist_1.append(cut(title + content)) # Record "text" column to train_list
            test_sublist_2.append(cut(title))
            test_sublabels_1.append(new_label1) # Record "label_1" column to train_labels
            test_sublabels_2.append(new_label2) # Record "label_2" column to train_labels
    test_labels_1_array = np.array(test_sublabels_1)
    test_labels_2_array = np.array(test_sublabels_2)

    return test_sublist_1,test_sublist_2,test_labels_1_array,test_labels_2_array

In [None]:
seed_list = []
loss_list = []

for seed in [34]:
    results = test_data(seed,1)
    test_sublist_1 = results[0]
    test_sublist_2 = results[1]
    test_labels_1_array = results[2]
    test_labels_2_array = results[3]
    output = combined_classifier(test_sublist_1,test_sublist_2)
    label1 = output[0]
    label2 = output[1]
    # Test accuracy
    acc1 = accuracy_score(test_labels_1_array, np.array(label1))
    print('\nTesting accuracy of Topic label is: %.2f' % acc1)
    acc2 = accuracy_score(test_labels_2_array, np.array(label2))
    print('\nTesting accuracy of Sentiment label is: %.2f' % acc2)
    loss = (1-acc1) + (1-acc2)**2
    
    seed_list.append(seed)
    loss_list.append(loss)

In [None]:
results = combined_classifier(test_sublist_1,test_sublist_2)
label1 = results[0]
label2 = results[1]

In [None]:
# Test accuracy
acc1 = accuracy_score(test_labels_1_array, np.array(label1))
print('\nTesting accuracy of Topic label is: %.2f' % acc1)
acc2 = accuracy_score(test_labels_2_array, np.array(label2))
print('\nTesting accuracy of Sentiment label is: %.2f' % acc2)

In [None]:
# Apply classifier to predict

In [None]:
import pandas as pd
import csv
import numpy as np

# Set data directory
input_dir = "data/news corpus/US_contents_clean.csv"
output_dir = "data/annotated/annotated_contents.csv"
        
# Load data
df = pd.read_csv(input_dir, sep=',', quotechar='"', encoding="utf-8-sig")
df = df.dropna(how='any') # drop row with any missing value
df = df.sort_values(by = 'created_at')
df.reset_index(inplace=True) # reset index after sorting
del df['index'] # delete old index

In [None]:
# Create training and testing list and loading data
pred_sublist_1 = []
pred_sublist_2 = []
pred_sublabels_1 = []
pred_sublabels_2 = []

for index, row in df.iterrows():
    # change full size to half size characters
    title = row['title']
    content = row['content']

    pred_sublist_1.append(cut(title + content))
    pred_sublist_2.append(cut(title))
    
results = combined_classifier(pred_sublist_1,pred_sublist_2)
label1 = results[0]
label2 = results[1]

In [None]:
# Detect and create output file
with open(output_dir, "w", newline = '',encoding = 'utf-8') as csvfile:
    w = csv.writer(csvfile)
    w.writerow(['topic_country','sentiment','created_at','title','content'])

# Save labels
for index, row in df.iterrows():
    created_at = row['created_at']
    title = row['title']
    content = row['content']
    topic_country = label1[index]
    sentiment = label2[index]
    record = [topic_country,sentiment,created_at,title,content]
    with open(output_dir, "a", newline = '',encoding = 'utf-8') as csvfile:
        w = csv.writer(csvfile)
        w.writerow(record)