In [5]:
from sklearn.model_selection import train_test_split
import pandas as pd
import os

## Clean data

In [6]:
options = {
    'client_kwargs': {
        'endpoint_url': "https://storage.yandexcloud.net",
    }
}

data_train = pd.read_csv("s3://zoomcamp-mlops/data/twitter_training.csv", storage_options=options, header=None)
data_val = pd.read_csv("s3://zoomcamp-mlops/data/twitter_validation.csv", storage_options=options, header=None)
data = pd.concat([data_train, data_val])
data

In [7]:
data.drop_duplicates(inplace=True)
data.drop([0,1], axis=1, inplace=True)
data.columns = ['sentiment','text']
data.sentiment = data.sentiment.map({"Neutral":0, "Irrelevant":0 ,"Positive":1,"Negative":2})
data.dropna(inplace=True)
data.reset_index(drop=True, inplace=True)

In [8]:
data

Unnamed: 0,sentiment,text
0,1,im getting on borderlands and i will murder yo...
1,1,I am coming to the borders and I will kill you...
2,1,im getting on borderlands and i will kill you ...
3,1,im coming on borderlands and i will murder you...
4,1,im getting on borderlands 2 and i will murder ...
...,...,...
72134,0,♥️ Suikoden 2\n1️⃣ Alex Kidd in Miracle World\...
72135,1,Thank you to Matching funds Home Depot RW paym...
72136,0,Late night stream with the boys! Come watch so...
72137,0,⭐️ Toronto is the arts and culture capital of ...


In [9]:
def clean_text(text: str) -> str:
    replace_list = {r"i'm": 'i am',
                    r"'re": ' are',
                    r"let’s": 'let us',
                    r"'s":  ' is',
                    r"'ve": ' have',
                    r"can't": 'can not',
                    r"cannot": 'can not',
                    r"shan’t": 'shall not',
                    r"n't": ' not',
                    r"'d": ' would',
                    r"'ll": ' will',
                    r"'scuse": 'excuse',
                    ',': ' ,',
                    '.': ' .',
                    '!': ' !',
                    '?': ' ?',
                    '\s+': ' '}
    text = text.lower()
    for s in replace_list:
        text = text.replace(s, replace_list[s])
    return ' '.join(text.split())

data["text"] = data["text"].map(clean_text)
data = data[data["text"] != '']

In [10]:
data.to_parquet("s3://zoomcamp-mlops/data/origin/data.pq", storage_options=options)
data.to_parquet("s3://zoomcamp-mlops/data/origin/old/data.pq", storage_options=options)

In [11]:
train, val = train_test_split(data, test_size=0.1, stratify=data["sentiment"].tolist())

In [12]:
val, test = train_test_split(val, test_size=0.2, stratify=val["sentiment"].tolist())

In [13]:
val

Unnamed: 0,sentiment,text
35961,0,we still need the best women is clothes @ play...
43160,2,my
55760,2,@rainbow6game why the world is theme party sti...
1406,1,gonna get lost in borderlands 2 and batman: da...
65380,2,@cyberpunkgame maybe you announce release date...
...,...,...
6544,0,@ eljeeeey thank you for participating in summ...
11670,1,that i do miss basket ball more than anything ...
11848,1,this is what seems the entire world needs righ...
17343,2,nice looking mini router


In [14]:
val.to_parquet("s3://zoomcamp-mlops/data/val.pq", storage_options=options)
train.to_parquet("s3://zoomcamp-mlops/data/train.pq", storage_options=options)
test.to_parquet("s3://zoomcamp-mlops/data/test.pq", storage_options=options)

In [3]:
options = {
    'client_kwargs': {
        'endpoint_url': "https://storage.yandexcloud.net",
    }
}

val = pd.read_parquet("s3://zoomcamp-mlops/data/val.pq", storage_options=options)
train = pd.read_parquet("s3://zoomcamp-mlops/data/train.pq", storage_options=options)
test = pd.read_parquet("s3://zoomcamp-mlops/data/test.pq", storage_options=options)

In [4]:
val.to_parquet("s3://zoomcamp-mlops/data/temp/val.pq", storage_options=options)
train.to_parquet("s3://zoomcamp-mlops/data/temp/train.pq", storage_options=options)
test.to_parquet("s3://zoomcamp-mlops/data/temp/test.pq", storage_options=options)

In [5]:
val_curr = pd.read_parquet("s3://zoomcamp-mlops/data/val.pq", storage_options=options)
val_old = pd.read_parquet("s3://zoomcamp-mlops/data/temp/val.pq", storage_options=options)

In [9]:
pd.util.hash_pandas_object(val_curr).sum() == pd.util.hash_pandas_object(val_old).sum()

True

In [10]:
import hashlib

print(hashlib.sha256(val_curr.to_json().encode()).hexdigest())
print(hashlib.sha256(val_old.to_json().encode()).hexdigest())

90381ea4366b0d59eab9e862c070b8e21eb49d15f4a702a75e1d74a11d6f4d5d
90381ea4366b0d59eab9e862c070b8e21eb49d15f4a702a75e1d74a11d6f4d5d


## Dataloader

In [37]:
from __future__ import annotations

import pickle
from collections import defaultdict
from typing import Any, Dict, List

import torch
from tokenizers import Tokenizer, decoders
from tokenizers.models import WordLevel
from tokenizers.pre_tokenizers import WhitespaceSplit
from tokenizers.trainers import WordLevelTrainer
from torch import LongTensor
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset

In [28]:
import s3fs
from typing import Any
from pickle import dump
import pandas as pd

def _s3() -> s3fs.S3FileSystem:
    return s3fs.S3FileSystem(
                anon=False,
                key=os.environ["AWS_ACCESS_KEY_ID"],
                secret=os.environ["AWS_SECRET_ACCESS_KEY"],
                endpoint_url=os.environ["AWS_ENDPOINT_URL"])
    
def write_pickle_to_s3(obj: Any, s3_path: str) -> None:
    s3 = _s3()
    dump(obj, s3.open(s3_path, 'wb'))

def read_pickle_from_s3(s3_path: str) -> Any:
    s3 = _s3()
    return pickle.load(s3.open(s3_path))

def read_parquet_s3(s3_path: str) -> pd.DataFrame:
    options = {
                'client_kwargs': {
                    'endpoint_url': os.environ["AWS_ENDPOINT_URL"],
                }
            }

    return pd.read_parquet(s3_path, storage_options=options)

In [52]:
class VocabularyBpeSubWords:
    def __init__(
        self,
        n_freq: int = 5,
        batch_size: int = 5000,
        path_to_tokenizer: str = "./",
    ):

        self.stoi = dict()
        self.n_freq = n_freq
        self.batch_size = batch_size
        self.path_to_save_tokenizer = path_to_tokenizer + "vocab.pkl"
        self.special_tokens = ["<PAD>", "<UNK>"]
        
        self.tokenizer = Tokenizer(WordLevel(unk_token="<UNK>"))
        self.tokenizer.pre_tokenizer = WhitespaceSplit()
    
    def __len__(self) -> int:
        return self.tokenizer.get_vocab_size()

    def save_vocab(self):
        write_pickle_to_s3(self.__dict__, self.path_to_save_tokenizer)

    def read_vocab(self):
        self.__dict__ = read_pickle_from_s3(self.path_to_save_tokenizer)

    def _get_training_corpus(self, sents: List[str]) -> List[str]:
        len_sents = len(sents)
        for i in range(0, len_sents, self.batch_size):
            yield sents[i : i + self.batch_size]

    def build_vocabulary(self, sentences_list: List[str]):
        trainer = WordLevelTrainer(
            special_tokens=self.special_tokens,
            min_frequency=self.n_freq,
        )
        self.tokenizer.train_from_iterator(self._get_training_corpus(sentences_list), trainer=trainer)
        self.stoi = self.tokenizer.get_vocab()
        self.save_vocab()

    def numericalize(self, text: str) -> List[int]:
        tokenized: List[int] = []
        # cad add sos token in the beginning
        encoding = self.tokenizer.encode(text)
        return encoding.ids

In [59]:
class Collate:
    def __init__(self, pad_idx: int = 0):
        self.pad_idx = pad_idx

    def __call__(self, batch):
        words = [torch.tensor(item["words"]) for item in batch]
        labels = torch.tensor([item["label"] for item in batch])
        
        # idxes to sort by sent len
        seq_lengths, perm_idx = LongTensor(list(map(len, words))).sort(0, descending=True)

        # sort by len sentence in desc
        words = [words[i] for i in perm_idx]

        # pad words: [batch_size, max_sentence_len_in_batch]
        padd_words = pad_sequence(words, padding_value=self.pad_idx, batch_first=True)

        return {"words": padd_words, 
                "label": labels,
                "seq_lengths": seq_lengths}

class QueryDataset(Dataset):
    def __init__(
        self,
        file_path: str,
        vocab_words: VocabularyWords,
    ):
        self.data: pd.DataFrame = read_parquet_s3(file_path)
        self.vocab_words = vocab_words

    def __len__(self) -> int:
        return len(self.sentences)

    def __getitem__(self, index: int) -> Dict[str, Any]:
        line: str = self.data.iloc[index]
        label: int = line["sentiment"]
        text: str = line["text"]
        numericalized_word: List[int] = self.vocab_words.numericalize(text)

        return {"text": numericalized_word, 
                "label": label}

In [60]:
v = VocabularyBpeSubWords(path_to_tokenizer="s3://zoomcamp-mlops/data/")

In [61]:
d = QueryDataset(file_path="s3://zoomcamp-mlops/data/twitter_validation.pq", vocab_words=v)

In [62]:
v.build_vocabulary(d.data.text.tolist())

s3://zoomcamp-mlops/data/vocab.pkl


In [65]:
d[999]

{'text': [32, 40, 32, 3, 331, 1, 1, 218, 387, 10, 1, 4, 500, 1, 1, 1],
 'label': 0}