In [None]:
import pandas as pd
from typing import List
from pprint import pprint
import numpy as np
import re
import plotly.express as px
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer

In [None]:
train_df = pd.read_excel('../data/train_df.xlsx')
test_df = pd.read_excel('../data/test_df.xlsx')

train_df['ckip_ws'] = train_df['ckip_ws'].apply(lambda x: eval(x))
test_df['ckip_ws'] = test_df['ckip_ws'].apply(lambda x: eval(x))

train_df['text'] = train_df['text'].apply(lambda x: str(x))
test_df['text'] = test_df['text'].apply(lambda x: str(x))

In [None]:
train_df, dev_df = train_test_split(train_df, 
                                    test_size=0.2, 
                                    stratify=train_df['score'], 
                                    random_state=42)

In [None]:
from opencc import OpenCC

cc = OpenCC('s2t')
train_df['text'] = train_df['text'].apply(lambda x: cc.convert(x))
test_df['text'] = test_df['text'].apply(lambda x: cc.convert(x))
dev_df['text'] = dev_df['text'].apply(lambda x: cc.convert(x))

In [None]:
import jieba
jieba.initialize()
jieba.load_userdict('../data/dict.txt')

train_df['ckip_ws'] = train_df['text'].apply(lambda x: jieba.lcut(x))
test_df['ckip_ws'] = test_df['text'].apply(lambda x: jieba.lcut(x))
dev_df['ckip_ws'] = dev_df['text'].apply(lambda x: jieba.lcut(x))

In [None]:
emoji_pattern = r'[\U0001F300-\U0001F5FF]|[\U0001F600-\U0001F64F\U0001F680-\U0001F6FF]|[\u2600-\u2B55]'
number_pattern = r'\d+'
english_pattern = r'[a-zA-Z]+'
punctuation_pattern = r'(?!！|!|\?|？|_|__)[^\w\s]'
url_pattern = r'https?://\S+|www\.\S+'
invite_word_pattern = r'(輸入|輸入:|輸入：|我的)?(邀請碼|推薦碼)' # 註：train 中，此 pattern 與 r'邀請碼|推薦碼' 結果相同
invite_code_pattern = r'(?!GOOD|NICE|BEST|GREAT|COOL|HAPPY|LOVE)[A-Z0-9]{4,6}$'

In [None]:
# ref: https://segmentfault.com/a/1190000007594620
emoji_pattern = r'[\U0001F300-\U0001F5FF]|[\U0001F600-\U0001F64F\U0001F680-\U0001F6FF]|[\u2600-\u2B55]'
number_pattern = r'\d+'
english_pattern = r'[a-zA-Z]+'
invite_word_pattern = r'(輸入|輸入:|輸入：|我的)?(邀請碼|推薦碼)' # 註：train 中，此 pattern 與 r'邀請碼|推薦碼' 結果相同
invite_code_pattern = r'(?!GOOD|NICE|BEST|GREAT|COOL|HAPPY)[A-Z0-9]{4,8}'
product_pattern = r'android\d{0,2}|google\d{0,2}|pixel\d{0,2}'

emoji_matches = train_df[train_df['text'].str.contains(emoji_pattern, na=False)]
num_matches = train_df[train_df['text'].str.contains(number_pattern, na=False)]
en_matches = train_df[train_df['text'].str.contains(english_pattern, na=False)]
invite_word_matches = train_df[train_df['text'].str.contains(invite_word_pattern, na=False)]
invite_code_matches = train_df[train_df['text'].str.contains(invite_code_pattern, na=False)]

In [None]:
from sklearn.preprocessing import MultiLabelBinarizer
mlb = MultiLabelBinarizer()
emoji_matches.reset_index(drop=True, inplace=True)
emojis = emoji_matches['text'].apply(lambda x: re.findall(emoji_pattern, x))
emoji_matches['emoji'] = emojis


emoji_matches = emoji_matches[['emoji', 'score']]
one_hot_emoji = mlb.fit_transform(emoji_matches['emoji'])
one_hot_emoji = pd.DataFrame(one_hot_emoji, columns=mlb.classes_)
emojis = one_hot_emoji.columns.tolist()

one_hot_emoji['score'] = emoji_matches['score']

In [None]:
emoji_cnt = {emoji: {'5 顆星': 0, '2 顆星': 0, '1 顆星': 0, '4 顆星': 0, '3 顆星': 0} 
             for emoji in emojis}
for emoji in emojis:
    df = one_hot_emoji[one_hot_emoji[emoji] == 1]
    emoji_cnt[emoji].update(df['score'].value_counts().to_dict())
pprint(emoji_cnt)

In [None]:
remove_emoji = [emoji for emoji, cnt in emoji_cnt.items() if sum(cnt.values()) < 2]
print(remove_emoji)

In [None]:
# 找到每個 emoji 的最大評分星級
emoji_cluster_mapping = {}
for emoji, rating_dict in emoji_cnt.items():
    # 尋找字典中最大值對應的星級
    max_star = max(rating_dict, key=rating_dict.get)
    emoji_cluster_mapping[emoji] = max_star

def cluster_emoji(text,
                  emoji_cluster_mapping=emoji_cluster_mapping,
                  emoji_pattern=r'[\U0001F300-\U0001F5FF]|[\U0001F600-\U0001F64F\U0001F680-\U0001F6FF]|[\u2600-\u2B55]') -> float:
    '''
    分配 emoji 給不同類別
    初步作法：先以 value counts 最多的評分類別，做為 cluster 分配給該 emoji
    '''
    found_emojis = re.findall(emoji_pattern, text)
    if not found_emojis:
        return '無資訊'
    ratings = [int(emoji_cluster_mapping[emoji][0]) for emoji in found_emojis if emoji in emoji_cluster_mapping]

    if not ratings:
        return '無資訊'  # 所有找到的 emoji 都不在映射表中

    # 如果只有一個分類，直接返回
    if len(ratings) == 1:
        return str(int(ratings[0])) + ' 顆星'
    # 如果有多個，計算平均值
    else:
        average_rating = sum(ratings) / len(ratings)
        return str(int(np.ceil(average_rating))) + ' 顆星'  # 回傳平均值，格式化為一位小數

In [None]:
with open('../data/stopwords.txt', 'r') as f:
    stopwords = f.read().splitlines()

def clean_func(word_seg_result: List[str],
               stop_words_list: List[str] = stopwords,
               remove_emoji = remove_emoji,
               emoji_pattern = emoji_pattern,
               url_pattern = url_pattern,
               number_pattern = r'\d+',
               punctuation_pattern = punctuation_pattern,
               invite_code_pattern = invite_code_pattern) -> str:
    '''
    1. remove duplicate emoji within each text
    2. lowercase the text
    3. remove punctuation
    4. remove stopwords
    5. remove url
    6. remove numbers
    7. remove emojie less than 2 times
    8. remove invite code
    
    Return[str] the word segmented text joined by space
    '''
    
    new_word_seg_result = []
    emojis_set = set()
    # text_length_cum_sum = 0
    for text in word_seg_result:
        # emojis_set = emojis_set.union(set(re.findall(emoji_pattern, text)))
        # text = text if len(text) == 1 else re.sub(emoji_pattern, '', text)
        # text = text if len(text) == 1 else re.sub(punctuation_pattern, '', text)
        # text = text if len(text) == 1 else re.sub(url_pattern, '', text)
        # text = text if len(text) == 1 else re.sub(invite_code_pattern, '', text)
        # text = text if len(text) == 1 else re.sub(number_pattern, '', text)
        # text = text if len(text) == 1 else text.lower()
        
        text = re.sub(emoji_pattern, '', text)
        text = re.sub(punctuation_pattern, '', text)
        text = re.sub(url_pattern, '', text)
        text = text if not re.match(invite_code_pattern, text) else '' 
        text = text if not re.match(number_pattern, text) else ''
        text = text.lower()
        
        new_word_seg_result.append(text)
        
    emojis_set = emojis_set - set(remove_emoji)
    if emojis_set:
        for emoji in emojis_set:
            new_word_seg_result.append(emoji)
            
    result = " ".join([text for text in new_word_seg_result if text not in stop_words_list])
    
    if not result:
        result = " ".join([text for text in new_word_seg_result])
    
    return result

In [None]:
def feature_extraction(data: pd.DataFrame, 
                       is_test: bool =True) -> pd.DataFrame:
    
    result_df = data.copy()
    result_df['original_comment_len'] = result_df['text'].apply(lambda x: len(x))
    result_df['comment_len'] = result_df['ckip_ws'].apply(lambda x: len(x))
    result_df['emoji_cluster'] = result_df['text'].apply(lambda x: cluster_emoji(x))
    result_df['cleaned_text'] = result_df['ckip_ws'].apply(lambda x: clean_func(x))
    
    
    column_set = ['index', 
                'text', 
                'ckip_ws', 
                'cleaned_text', 
                # 'is_invite', 
                # 'only_punc',
                # 'original_comment_len',
                'comment_len',
                'emoji_cluster'
                ]
    if not is_test:
        column_set.append('score')
    return result_df[column_set]

In [None]:
train_df = feature_extraction(train_df, is_test=False)
dev_df = feature_extraction(dev_df, is_test=False)
test_df = feature_extraction(test_df, is_test=True)

In [None]:
feature_set = train_df.columns.tolist()
feature_set = [col for col in feature_set if col not in ['index', 'text', 'ckip_ws', 'cleaned_text', 'score']]

In [None]:
object_feature = [col for col in feature_set if train_df[col].dtype == 'object']
object_feature

In [None]:
train_df.reset_index(drop=True, inplace=True)
dev_df.reset_index(drop=True, inplace=True)
test_df.reset_index(drop=True, inplace=True)

In [None]:
from gensim.models import Word2Vec

In [None]:
train_corpus = train_df['cleaned_text'].str.split(" ").tolist()

In [None]:
param = {
    "vector_size": 512,
    "window": 5,
    "min_count": 1,
    "workers": 4
}
model = Word2Vec(sentences=train_corpus, **param)

In [None]:
model.train(train_corpus, 
            total_examples=model.corpus_count, 
            epochs=100)

In [None]:
def text_to_vector(text, model):
    # 將文本分詞
    words = text.split()
    # 過濾掉不在詞彙表中的詞
    words = [word for word in words if word in model.wv.key_to_index]
    # 如果文本中沒有任何詞在詞彙表中，返回一個零向量
    if not words:
        return np.zeros(model.vector_size)
    # 否則，計算所有詞向量的平均值
    return np.mean([model.wv[word] for word in words], axis=0)

In [None]:
train_w2v = np.array(train_df['cleaned_text'].apply(lambda x: text_to_vector(x, model)).tolist())
dev_w2v = np.array(dev_df['cleaned_text'].apply(lambda x: text_to_vector(x, model)).tolist())
test_w2v = np.array(test_df['cleaned_text'].apply(lambda x: text_to_vector(x, model)).tolist())

In [None]:
vectorizer = TfidfVectorizer(token_pattern=r'\b\w+\b')
X = vectorizer.fit_transform(train_df['cleaned_text'])
feature_names = vectorizer.get_feature_names_out()
tfidf_dict = {word: vectorizer.idf_[i] for i, word in enumerate(feature_names)}

In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

In [None]:
def weighted_word_vector(text, tfidf, w2v, dimension):
    words = text.split()
    word_tfidfs = [(word, tfidf.get(word, 0)) for word in words if word in w2v.wv]
    word_tfidf_weights = [pair[1] for pair in word_tfidfs]
    
    if not word_tfidf_weights:
        return np.zeros(dimension)
    
    word_vectors = np.array([w2v.wv[pair[0]] * pair[1] for pair in word_tfidfs])
    weighted_vector = np.mean(word_vectors, axis=0)
    return weighted_vector

# 計算所有文本的加權向量

In [None]:
train_w2v = np.array(train_df['cleaned_text'].apply(lambda x: weighted_word_vector(x, tfidf_dict, model, model.vector_size)).tolist())
dev_w2v = np.array(dev_df['cleaned_text'].apply(lambda x: weighted_word_vector(x, tfidf_dict, model, model.vector_size)).tolist())
test_w2v = np.array(test_df['cleaned_text'].apply(lambda x: weighted_word_vector(x, tfidf_dict, model, model.vector_size)).tolist())

In [None]:
from sklearn.preprocessing import LabelEncoder
label_encoder = LabelEncoder()

train_y = label_encoder.fit_transform(train_df['score'])
dev_y = label_encoder.transform(dev_df['score'])

In [None]:
import xgboost as xgb
from sklearn.metrics import accuracy_score

In [None]:
from sklearn.ensemble import RandomForestClassifier

clf = RandomForestClassifier(n_estimators=100)
clf.fit(train_w2v, train_y)

In [None]:
dev_y_pred = clf.predict(dev_w2v)
accuracy = accuracy_score(dev_y, dev_y_pred)
print(accuracy)

In [None]:
test_y = clf.predict(test_w2v)
test_y_pred_label = label_encoder.inverse_transform(test_y)

In [None]:
submission_file = test_df.copy()
submission_file['pred'] = test_y_pred_label
submission_file = submission_file[['index', 'pred']]
submission_file.to_csv('../submission/submission.csv', index=False)