In [1]:
from argparse import Namespace
import re

import numpy as np
import pandas
import pandas as pd
import regex as re
import json
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [2]:
class Vocabulary:
    '''Contains dictionary of tokens and their indices'''
    def __init__(self, token_to_idx : dict = None, is_unk_token : bool = True, unk_token : str = '<UNK>'):
        if token_to_idx is None:
            token_to_idx = {}
        self._token_to_idx = token_to_idx
        self._idx_to_token = {idx : token for token, idx in token_to_idx.items()}
        
        self._is_unk_token = is_unk_token
        self._unk_token = unk_token
        self.unk_index = -1

        if is_unk_token:
            self.unk_index = self.add_token(unk_token)

    def __len__(self) -> int:
        return len(self._token_to_idx)

    def to_serializable(self) -> dict:
        return {'token_to_idx' : self._token_to_idx, 'is_unk_token' : self._is_unk_token, 'unk_token' : self._unk_token}
    
    def to_json(self, filepath : str):
        with open(filepath, 'w', encoding='utf-8') as file:
            json.dump(self.to_serializable(), file, ensure_ascii=False)

    @classmethod
    def from_json(cls, filepath : str):
        with open(filepath, encoding='utf-8') as file:
            return cls.from_serializable(json.load(file))

    @classmethod
    def from_serializable(cls, serializable : dict):
        return cls(**serializable)
    
    def add_token(self, token : str) -> int:
        if token not in self._token_to_idx:
            idx = len(self._token_to_idx)
            self._token_to_idx[token] = idx
            self._idx_to_token[idx] = token
            return idx
        else:
            return self._token_to_idx[token]

    def add_tokens(self, tokens : list[str]) -> list:
        return [self.add_token(token) for token in tokens]

    def get_token_index(self, token : str) -> int:
        if token in self._token_to_idx:
            return self._token_to_idx[token]
        else:
            return self.unk_index

    def get_token(self, index : int) -> str:
        if index in self._idx_to_token:
            return self._idx_to_token[index]
        else:
            return self._unk_token

In [3]:
class SeparatorTokenizer:
    '''Simple implementation one of tokenization algorithms'''
    def __init__(self):
        pass

    def tokenize(self, text : str, separator : str = ' ') -> list:
        text = re.sub(r'([^\w\s]|_)', r' \1 ', text)
        return text.split(separator)

In [4]:
class Vectorizer:
    def __init__(self, tokens_vocab : Vocabulary, label_vocab : Vocabulary = {}):
        self.tokens_vocab = tokens_vocab
        self.label_vocab = label_vocab

    def vectorize(self, tokens : list[str]) -> np.array:
        one_hot = np.zeros(len(self.tokens_vocab), dtype=np.float32)
        for token in tokens:
            one_hot[self.tokens_vocab.get_token_index(token)] = 1
        return one_hot

    @classmethod
    def from_dataframe(cls, texts_df, threshold_freq = 25):
        pass

    @classmethod
    def from_serializable(cls, serializable : dict):
        return Vectorizer(tokens_vocab=\
                          serializable['tokens_vocab'].from_serializable(),
                          label_vocab=\
                          serializable['label_vocab'].from_serializable())
    
    def to_serializable(self) -> dict:
        return {'tokens_vocab' : self.tokens_vocab.to_serializable(), 'label_vocab' : self.label_vocab.to_serializable()}

In [5]:
class CustomDataset:
    def __init__(self, dataframe : pandas.DataFrame, tokenizer, vectorizer : Vectorizer):
        self._vectorizer = vectorizer
        self._tokenizer = tokenizer

        self._main_df = dataframe

        self._train_df = self._main_df[self._main_df.split == 'train']
        self._train_len = len(self._train_df)

        self._valid_df = self._main_df[self._main_df.split == 'validation']
        self._valid_len = len(self._valid_df)

        self._test_df = self._main_df[self._main_df.split == 'test']
        self._test_len = len(self._test_df)

        self._lookup_split = {'train' : (self._train_df, self._train_len),\
                              'validation' : (self._valid_df, self._valid_len),\
                              'test' : (self._test_df, self._test_len)}
        
        self.set_dataframe_split('train')

    def __getitem__(self, index):
        '''data and target collumns must be named 'x_data' and 'y_target'! '''
        row = self._cw_dataframe.iloc[index]
        data_vector = self._vectorizer.vectorize(self._tokenizer.tokenize(row['x_data']))
        target = self._vectorizer.label_vocab.get_token_index(row['y_target'])
        return {'x_data' : data_vector,\
                'y_target' : target}
    
    def __len__(self):
        return self._cw_df_len
    
    def set_dataframe_split(self, split='train'):
        '''Set a current data split. Allowed values: train, test, validation'''
        self._cw_dataframe, self._cw_df_len = self._lookup_split[split]

    

In [6]:
def generate_batches(dataset, batch_size, shuffle=True, drop_last=True, device='cpu'):
    dataloader = DataLoader(dataset, batch_size, shuffle, drop_last=drop_last)
    for data_dict in dataloader:
        out_data_dict = {}
        for key, tensor in data_dict.items():
            out_data_dict[key] = data_dict[key].to(device) # Sending tensors to propper device
        yield out_data_dict

In [7]:
class SentimentAnalyzer(nn.Module):
    def __init__(self, num_features):
        super().__init__()
        self.fc1 = nn.Linear(num_features, 3)
    def forward(self, x_data):
        y_out = self.fc1(x_data)
        return y_out

In [8]:
df = pd.read_csv('C:/Files/Datasets/twitter_financial_news_sentiment/sent_train.csv')

In [9]:
df['split'] = 'train'
df = df.rename(columns={'text' : 'x_data', 'label' : 'y_target'})

In [10]:
df

Unnamed: 0,x_data,y_target,split
0,$BYND - JPMorgan reels in expectations on Beyo...,0,train
1,$CCL $RCL - Nomura points to bookings weakness...,0,train
2,"$CX - Cemex cut at Credit Suisse, J.P. Morgan ...",0,train
3,$ESS: BTIG Research cuts to Neutral https://t....,0,train
4,$FNKO - Funko slides after Piper Jaffray PT cu...,0,train
...,...,...,...
9538,The Week's Gainers and Losers on the Stoxx Eur...,2,train
9539,Tupperware Brands among consumer gainers; Unil...,2,train
9540,vTv Therapeutics leads healthcare gainers; Myo...,2,train
9541,"WORK, XPO, PYX and AMKR among after hour movers",2,train


In [11]:
for i in range(len(df)):
    df.loc[i, 'x_data'] = re.sub(r'https?://.*', r'URL', df.loc[i, 'x_data'])

In [12]:
tokenizer = SeparatorTokenizer()

Первое заполнение словаря и сохранение в файл

In [13]:
tokens_vocabulary = Vocabulary()
label_vocabulary = Vocabulary()

for i in range(len(df)):
    tokens_vocabulary.add_tokens(tokenizer.tokenize(df.loc[i, 'x_data']))
    label_vocabulary.add_token(str(df.loc[i, 'y_target']))

tokens_vocabulary.to_json('tokens_vocab.json')
label_vocabulary.to_json('label_vocab.json')

In [14]:
tokens_vocabulary = Vocabulary().from_json('tokens_vocab.json')
label_vocabulary = Vocabulary().from_json('label_vocab.json')
vectorizer = Vectorizer(tokens_vocabulary, label_vocabulary)

In [15]:
dataset = CustomDataset(df, tokenizer, vectorizer)

In [18]:
batch_generator = generate_batches(dataset, 64, device='cpu')

In [19]:
for batch_idx, batch in enumerate(batch_generator):
    print(type(batch))
    print(batch['x_data'])
    print(batch['y_target'])
    input()

<class 'dict'>
tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 1., 0.,  ..., 0., 0., 0.],
        [0., 1., 0.,  ..., 0., 0., 0.],
        [0., 1., 1.,  ..., 0., 0., 0.]])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])


KeyboardInterrupt: Interrupted by user