### Import Modules

In [17]:
import torch

import nltk
from nltk.tokenize import TreebankWordTokenizer, RegexpTokenizer
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.stem import PorterStemmer

from sklearn.feature_extraction.text import TfidfVectorizer

import numpy as np
import pickle
import os, re, json, random
from collections import defaultdict

# download nltk resources
nltk.download('wordnet')
nltk.download('stopwords')

# set device (MAC)
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device('cpu')

[nltk_data] Downloading package wordnet to /Users/kimjin-
[nltk_data]     seong/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to /Users/kimjin-
[nltk_data]     seong/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


### Load Dataset

In [2]:
# read file list
train_dataset_path = './dataset/AP_corpus_one_line_per_sentence'
tarin_dataset_list = os.listdir(train_dataset_path)

# init
text = []

# read files
for file in tarin_dataset_list:
    file_path = f'{train_dataset_path}/{file}'
    with open(file_path, 'r') as f:
        text.append(f.read())

### Text Cleaning

In [3]:
def textCleaning(sentence, is_stem=False):
    # get lowercase
    sentence = sentence.lower()

    # tokenization
    #tokenizer = TreebankWordTokenizer()
    tokenizer = RegexpTokenizer(r'\w+')
    sentence = tokenizer.tokenize(sentence)

    # lemmatization
    lemmatizer = WordNetLemmatizer()
    sentence = [lemmatizer.lemmatize(token) for token in sentence]

    # stemming
    if is_stem:
        stemmer = PorterStemmer()
        sentence = [stemmer.stem(token) for token in sentence]

    patterns = r"(@\[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)|^rt|http.+?"
    stop_words = set(stopwords.words('english'))

    # removing unicode characters(punctuations)
    sentence = [re.sub(patterns, '', token) for token in sentence if re.sub(patterns, '', token)]

    # removing numbers
    sentence = [re.sub(r'\d+', '', token) for token in sentence if re.sub(r'\d+', '', token)]

    # removing stopwords
    stop_words = set(stopwords.words('english'))
    sentence = [token for token in sentence if token not in stop_words]

    # removing words less than minimum word length
    processed_sentence = [token for token in sentence if len(token)>2]

    # processed_tokens = [token for token in sentence if re.sub(patterns, '', token) and not re.sub(r'\d+', '', token) and token not in stop_words and len(token) > 2]    

    return processed_sentence

def getPadded(tokens, max_len):
    seq_len = len(tokens)
    if seq_len < max_len:
        tokens = tokens + ['[PAD]']*(max_len-seq_len)
    return tokens

def preprocessing(text, save_root=None, is_save=False):
    text_clean = []
    dataset_len = len(text)

    for idx, sentence in enumerate(text):
        text_clean.append(textCleaning(sentence, is_stem=True))

        if idx%100==0:
            print(f'{idx+1:5d}/{dataset_len:5d} complete')

    # get max length
    max_len = 0
    for tokens in text_clean:
        token_length = len(tokens)

        if token_length > max_len:
            max_len = token_length

    # padding
    # padded_tokens = []

    # for idx, tokens in enumerate(text_clean):
    #     padded_tokens.append(getPadded(tokens, max_len))

    #     if idx%100==0:
    #         print(f'{idx+1:5d}/{dataset_len:5d} complete')


    # save files
    if is_save:
        with open(f'{save_root}/text_list_cleaning.pkl', 'wb') as file:
            pickle.dump(text_clean, file)

        # with open(f'{save_root}/text_list_cleaning_padded.pkl', 'wb') as file:
        #     pickle.dump(padded_tokens, file)

    return text_clean

def loadData(save_root):
    with open(f'{save_root}/text_list_cleaning.pkl', 'rb') as file:
        text_clean = pickle.load(file)

    # with open(f'{save_root}/text_list_cleaning_padded.pkl', 'rb') as file:
    #     padded_tokens = pickle.load(file)

    return text_clean

In [4]:
save_root = './dataset'

#text_clean = preprocessing(text, save_root, True)
text_clean = loadData(save_root)

### Create Vocab

In [5]:
def createVocab(word_tokens, save_root=None, is_save=False):
    # 빈도 계산을 위한 딕셔너리 초기화
    word_freq = defaultdict(int)

    # 문장을 문자열로 변환 (TfidfVectorizer는 문자열 입력을 받음)
    text_clean_str = [' '.join(sequence) for sequence in word_tokens]

    # TF-IDF Vectorizer 초기화
    vectorizer = TfidfVectorizer()

    # TF-IDF 값 계산
    tfidf_matrix = vectorizer.fit_transform(text_clean_str)

    # 각 단어의 TF-IDF 평균값 계산
    tfidf_scores = np.mean(tfidf_matrix.toarray(), axis=0)

    # 단어와 그에 해당하는 TF-IDF 점수 매핑
    vocab_tfidf = {word: score for word, score in zip(vectorizer.get_feature_names_out(), tfidf_scores)}

    # TF-IDF 기준 설정
    min_tfidf = np.float64(0.00005)
    filtered_vocab = [word for word, score in vocab_tfidf.items() if score >= min_tfidf]

    # 'PAD'와 'UNK'를 0번과 1번 인덱스에 추가
    final_vocab_list = ['[PAD]', '[UNK]'] + sorted(filtered_vocab)
    final_vocab_dict = {word: idx for idx, word in enumerate(final_vocab_list)}

    if is_save:
        with open(f'{save_root}/vocab.json', 'w') as json_file:
            json.dump(final_vocab_dict, json_file)

    return final_vocab_dict

def loadVocab(save_root):
    with open(f'{save_root}/vocab.json', 'r') as json_file:
        vocab = json.load(json_file)

    return vocab

In [6]:
#vocab = createVocab(text_clean, './dataset', True)
vocab = loadVocab('./dataset')
print(f'Vocab Length: {len(vocab)}')

Vocab Length: 37690


### Integer Encoding

In [7]:
def encodeTokens(corpus_tokens, word_to_index, save_root=None, is_save=False):
    final_corpus = []
    nb_data = len(corpus_tokens)

    for idx, tokens in enumerate(corpus_tokens):
        final_corpus.append([word_to_index.get(token, word_to_index['[UNK]']) for token in tokens])
        print(f'[{idx+1:3d} / {nb_data:3d}] complete', end='\r')

    if is_save:
        with open(f'{save_root}/text_list_cleaning_encoded.pkl', 'wb') as file:
            pickle.dump(final_corpus, file)

    return final_corpus

def loadEncodedTokens(save_root):
    with open(f'{save_root}/text_list_cleaning_encoded.pkl', 'rb') as file:
        tokens = pickle.load(file)

    return tokens

#encoded_tokens = encodeTokens(text_clean, vocab, './dataset', True)
encoded_tokens = loadEncodedTokens('./dataset')

### Negative Sampling

In [27]:
def create_training_pairs(encoded_sentences, vocab_size, window_size=2, negative_samples=1):
    training_data = []

    cnt = 1
    for sentence in encoded_sentences:
        sentence_length = len(sentence)
        
        for idx, center_word in enumerate(sentence):
            print(f'[{cnt}] {idx+1:5d} / {sentence_length}', end='\r')

            start = max(0, idx - window_size)
            end = min(sentence_length, idx + window_size + 1)

            # 긍정 샘플 생성
            for neighbor_idx in range(start, end):
                if neighbor_idx != idx:
                    target_word = sentence[neighbor_idx]
                    training_data.append(((center_word, target_word), 1))
                    
                    # 부정 샘플 생성
                    for _ in range(negative_samples):
                        negative_word = random.randint(1, vocab_size - 1)
                        while negative_word in sentence:
                            negative_word = random.randint(1, vocab_size - 1)
                        training_data.append(((center_word, negative_word), 0))
        print()
    
    return training_data

vocab_size = len(vocab)
window_size = 2  # 윈도우 크기
negative_samples = 1  # 부정 샘플 개수

# 윈도우 사이즈와 네거티브 샘플링을 적용한 학습 데이터 생성
training_data = create_training_pairs(encoded_tokens, vocab_size, window_size, negative_samples)

# 생성된 학습 데이터 출력
for data in training_data[:10]:  # 일부 데이터만 출력
    print(data)

len(training_data)

[0.10%] 49609 / 49609
[100.10%]  8806 / 75463

KeyboardInterrupt: 