In [40]:
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import nltk.classify.util as util
from nltk.classify import NaiveBayesClassifier
from nltk.metrics import BigramAssocMeasures
from nltk.collocations import BigramCollocationFinder as BCF
import itertools
import pickle

import os.path
from statistics import mode
from nltk.classify import ClassifierI
from nltk.stem import PorterStemmer, WordNetLemmatizer
from nltk.corpus import wordnet
from nltk import pos_tag

nltk.download('stopwords')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/vincent/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /Users/vincent/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package wordnet to /Users/vincent/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [41]:
emoji_dict = {}
with open('emoji.txt', 'r', encoding='latin-1') as emoji_file:
    for line in emoji_file:
        line = line.strip()
        if line:
            emoji, value = line.split('\t')
            emoji_dict[emoji] = int(value)

In [42]:
def replace_emojis(text, emoji_dict):
    for emoji, value in emoji_dict.items():
        if value == 1:
            text = re.sub(re.escape(emoji), 'happy', text)
        elif value == -1:
            text = re.sub(re.escape(emoji), 'sad', text)
    return text

In [43]:
import re
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

#vincent
def get_wordnet_pos(treebank_tag):
    if treebank_tag.startswith('J'):
        return wordnet.ADJ
    elif treebank_tag.startswith('V'):
        return wordnet.VERB
    elif treebank_tag.startswith('N'):
        return wordnet.NOUN
    elif treebank_tag.startswith('R'):
        return wordnet.ADV
    else:
        return None
    
# def preprocess_text(text):
#     # Remove special characters
#     text = re.sub(r"[^a-zA-Z0-9\s]", "", text)

#     # Convert to lowercase
#     text = text.lower()

#     # Remove stopwords
#     stop_words = set(stopwords.words('english'))
#     words = word_tokenize(text)
#     filtered_words = [word for word in words if word.lower() not in stop_words]
#     text = ' '.join(filtered_words)

#     return text

def preprocess_text(text):
    # Convert to lowercase
    text = text.lower()
    
    # Remove special characters
    text = re.sub(r"[^a-zA-Z0-9\s]", "", text)

    # Tokenize the words
    words = word_tokenize(text)
    
    # Apply stemming
    stemmer = PorterStemmer()
    words = [stemmer.stem(word) for word in words]

    # Apply lemmatization
    lemmatizer = WordNetLemmatizer()
    tagged = pos_tag(words)
    words = [lemmatizer.lemmatize(word, pos=get_wordnet_pos(pos)) if get_wordnet_pos(pos) else word for word, pos in tagged]

    # Remove stopwords
    stop_words = set(stopwords.words('english'))
    filtered_words = [word for word in words if word not in stop_words]
    
    return ' '.join(filtered_words)


# Preprocess positive text
positive_preprocessed = []
with open('positive.txt', 'r', encoding='latin-1') as pos_file:
    for line in pos_file:
        comment = line.strip()
        comment = replace_emojis(comment, emoji_dict)
        comment = preprocess_text(comment)
        positive_preprocessed.append(comment)

# Preprocess negative text
negative_preprocessed = []
with open('negative.txt', 'r', encoding='latin-1') as neg_file:
    for line in neg_file:
        comment = line.strip()
        comment = replace_emojis(comment, emoji_dict)
        comment = preprocess_text(comment)
        negative_preprocessed.append(comment)


In [44]:
all_texts = positive_preprocessed + negative_preprocessed
from nltk.probability import FreqDist

# Create frequency distribution
word_freq = FreqDist()

for text in all_texts:
    words = text.split()
    word_freq.update(words)
known_words = {word for word, freq in word_freq.items() if freq > 3}
#len(known_words)

In [45]:
import math
total_words = 0
num_comments = len(all_texts)

for comment in all_texts:
    total_words += len(comment.split())

average_words = total_words / num_comments

variance = 0
for comment in all_texts:
    num_words = len(comment.split())
    variance += (num_words - average_words) ** 2

std_deviation = math.sqrt(variance / num_comments)
M = round(average_words + std_deviation)

def comment_to_vector(comment, known_words, M):
    words = comment.split()
    vector = []
    for i in range(M):
        if i < len(words):
            word = words[i]
            if word in known_words:
                vector.append(known_words.index(word) + 1)
            else:
                vector.append(0)
        else:
            vector.append(0)
    return vector

# Convert preprocessed comments to vector representation
positive_comment_vectors = []
for comment in positive_preprocessed:
    vector = comment_to_vector(comment, list(known_words), M)
    positive_comment_vectors.append(vector)
    

negative_comment_vectors = []
for comment in negative_preprocessed:
    vector = comment_to_vector(comment, list(known_words), M)
    negative_comment_vectors.append(vector)
positive_comment_vectors

[[2691,
  2536,
  2857,
  2928,
  2012,
  0,
  3933,
  2921,
  1387,
  2959,
  2214,
  1266,
  1993,
  0,
  2534,
  0],
 [384,
  2544,
  793,
  2853,
  482,
  989,
  607,
  0,
  3761,
  1720,
  1109,
  1232,
  0,
  2905,
  113,
  2315],
 [2598, 0, 3734, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [769, 3749, 3933, 3564, 2681, 0, 1568, 2260, 3349, 0, 0, 0, 0, 0, 0, 0],
 [472, 737, 3667, 2784, 3564, 2366, 0, 80, 1194, 4280, 3749, 2479, 0, 0, 0, 0],
 [2467,
  2922,
  2214,
  1473,
  1902,
  3174,
  4417,
  2959,
  799,
  1097,
  4007,
  2403,
  0,
  0,
  0,
  0],
 [734, 3667, 1743, 98, 2882, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [4269, 2384, 1632, 2921, 1908, 219, 153, 871, 1, 0, 1568, 822, 0, 0, 0, 0],
 [807, 4274, 2401, 2051, 0, 1484, 2985, 3311, 2400, 3798, 3572, 0, 0, 0, 0, 0],
 [3886, 4471, 2089, 734, 879, 1090, 2669, 964, 473, 0, 0, 0, 0, 0, 0, 0],
 [2344, 2467, 2162, 1262, 920, 4137, 1254, 4427, 0, 0, 0, 0, 0, 0, 0, 0],
 [283, 1839, 0, 1022, 518, 1574, 4385, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 

In [46]:
# from sklearn.model_selection import train_test_split
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Embedding, Conv1D, Dense, Dropout, Flatten
# import numpy as np

# positive_comment_vectors = np.array(positive_comment_vectors)
# negative_comment_vectors = np.array(negative_comment_vectors)

# all_comment_vectors = np.concatenate((positive_comment_vectors, negative_comment_vectors))

# positive_labels = np.ones(len(positive_comment_vectors))
# negative_labels = np.zeros(len(negative_comment_vectors))

# all_labels = np.concatenate((positive_labels, negative_labels))

# X_train, X_test, y_train, y_test = train_test_split(all_comment_vectors, all_labels, test_size=0.2, random_state=42)

# embedding_dim = 300

# # Define the model
# model = Sequential()
# model.add(Embedding(input_dim=len(known_words) + 1, output_dim=embedding_dim, input_length=M))
# model.add(Conv1D(filters=32, kernel_size=3, activation='relu'))
# model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
# model.add(Dense(units=50, activation='relu'))
# model.add(Dropout(0.5))
# model.add(Dense(units=1, activation='tanh'))

# # Compile the model
# model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

# # Train the model
# model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_test, y_test))


In [47]:
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout
# from tensorflow.keras.optimizers import Adam
# from tensorflow.keras.preprocessing.text import Tokenizer
# from tensorflow.keras.preprocessing.sequence import pad_sequences

# # 更改预处理函数，添加词干提取和词形还原
# from nltk.stem import WordNetLemmatizer, SnowballStemmer
# nltk.download('wordnet')

# stemmer = SnowballStemmer("english")
# lemmatizer = WordNetLemmatizer()

# def preprocess_text(text):
#     # Remove special characters
#     text = re.sub(r"[^a-zA-Z0-9\s]", "", text)
#     # Convert to lowercase
#     text = text.lower()
#     # Remove stopwords
#     stop_words = set(stopwords.words('english'))
#     words = word_tokenize(text)
#     # Add stemming and lemmatization
#     filtered_words = [lemmatizer.lemmatize(stemmer.stem(word)) for word in words if word.lower() not in stop_words]
#     text = ' '.join(filtered_words)
#     return text

# # Tokenization and padding
# tokenizer = Tokenizer()
# tokenizer.fit_on_texts(all_texts) # Use all available texts to build the token dictionary

# X = tokenizer.texts_to_sequences(all_texts)
# X = pad_sequences(X)  # Use default padding ('pre')

# # Change the labels to a numpy array
# Y = np.array(all_labels)

# X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

# # 优化模型结构，这里使用LSTM替换CNN
# embedding_dim = 300
# vocab_size = len(tokenizer.word_index) + 1  # Adding 1 because 0 is used for padding

# model = Sequential()
# model.add(Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=X.shape[1]))
# model.add(LSTM(64, return_sequences=True))
# model.add(LSTM(32))
# model.add(Dense(64, activation='relu'))
# model.add(Dropout(0.5))
# model.add(Dense(1, activation='sigmoid'))  # Using sigmoid as the activation function in the last layer, as this is a binary classification problem

# # Using Adam optimizer that adjusts learning rate automatically
# optimizer = Adam(learning_rate=0.001)

# model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])

# # Increasing the number of training epochs
# model.fit(X_train, y_train, epochs=20, batch_size=32, validation_data=(X_test, y_test))


In [48]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Conv1D, Dense, Dropout, Flatten
from tensorflow.keras.callbacks import EarlyStopping
import numpy as np

positive_comment_vectors = np.array(positive_comment_vectors)
negative_comment_vectors = np.array(negative_comment_vectors)

all_comment_vectors = np.concatenate((positive_comment_vectors, negative_comment_vectors))

positive_labels = np.ones(len(positive_comment_vectors))
negative_labels = np.zeros(len(negative_comment_vectors))

all_labels = np.concatenate((positive_labels, negative_labels))

X_train, X_test, y_train, y_test = train_test_split(all_comment_vectors, all_labels, test_size=0.2, random_state=42)

embedding_dim = 300

# Define the model
model = Sequential()
model.add(Embedding(input_dim=len(known_words) + 1, output_dim=embedding_dim, input_length=M))
model.add(Conv1D(filters=32, kernel_size=3, activation='relu'))
model.add(Dropout(0.5)) # Dropout layer for regularization
model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
model.add(Dropout(0.5)) # Dropout layer for regularization
model.add(Dense(units=50, activation='relu'))
model.add(Dropout(0.5)) # Dropout layer for regularization
model.add(Flatten())
model.add(Dense(units=1, activation='tanh'))

# Compile the model
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

# Add early stopping
early_stopping = EarlyStopping(monitor='val_loss', patience=3)

# Train the model
model.fit(X_train, y_train, epochs=20, batch_size=32, validation_data=(X_test, y_test), callbacks=[early_stopping])


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20


<keras.src.callbacks.History at 0x2c5136190>

In [54]:
import tensorflow as tf
from tensorflow.keras import layers

class MultiHeadSelfAttention(layers.Layer):
    def __init__(self, embed_dim, num_heads=8):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        if embed_dim % num_heads != 0:
            raise ValueError("embedding dimension = %d should be divisible by number of heads = %d" % (embed_dim, num_heads))
        self.projection_dim = embed_dim // num_heads
        self.query_dense = layers.Dense(embed_dim)
        self.key_dense = layers.Dense(embed_dim)
        self.value_dense = layers.Dense(embed_dim)
        self.combine_heads = layers.Dense(embed_dim)

    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = score / tf.math.sqrt(dim_key)
        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)
        return output, weights

    def separate_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        query = self.query_dense(inputs)
        key = self.key_dense(inputs)
        value = self.value_dense(inputs)
        query = self.separate_heads(query, batch_size)
        key = self.separate_heads(key, batch_size)
        value = self.separate_heads(value, batch_size)
        attention, weights = self.attention(query, key, value)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(attention, (batch_size, -1, self.embed_dim))
        output = self.combine_heads(concat_attention)
        return output


class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = tf.keras.Sequential([layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)


embed_dim = 32  # Embedding size for each


In [55]:
from tensorflow.keras import regularizers
from tensorflow.keras.layers import Dropout
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.metrics import BinaryAccuracy

class TransformerClassifier(tf.keras.Model):
    def __init__(self, vocab_size, maxlen, embed_dim, num_heads, ff_dim):
        super(TransformerClassifier, self).__init__()
        self.embedding = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
        self.dropout = Dropout(0.5)
        self.pool = layers.GlobalAveragePooling1D()
        self.classifier = layers.Dense(1, activation='sigmoid', 
                                       kernel_regularizer=regularizers.l2(0.01))

    def call(self, inputs):
        x = self.embedding(inputs)
        x = self.transformer_block(x)
        x = self.dropout(x)
        x = self.pool(x)
        x = self.classifier(x)
        return x

# Parameters for the model
vocab_size = 20000  
maxlen = 200  
embed_dim = 32  
num_heads = 2  
ff_dim = 32  

# Initialize the model
model = TransformerClassifier(vocab_size, maxlen, embed_dim, num_heads, ff_dim)

# Compile the model
model.compile(optimizer=Adam(), loss=BinaryCrossentropy(), metrics=[BinaryAccuracy()])

# Set up early stopping
early_stopping = EarlyStopping(monitor='val_loss', patience=2)

# Train the model
history = model.fit(
    X_train, y_train, 
    epochs=10, 
    batch_size=32, 
    validation_data=(X_test, y_test), 
    callbacks=[early_stopping]
)

# Evaluate the model
test_loss, test_accuracy = model.evaluate(X_test, y_test)

print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")




Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Test Loss: 0.6570149064064026
Test Accuracy: 0.7351148724555969
