In [1]:
import re
from nltk.stem import SnowballStemmer
from nltk.corpus import stopwords
from sklearn.model_selection import train_test_split
from collections import defaultdict
import math
import numpy as np
import scipy

In [2]:
class Tokenizer:
    def __init__(self):
        self.regex_subs = {
            r'(https?:\/\/)\S+': "0URL0",
            r'(?<!http://)www\.\S+': "0URL0",
            r'(\W)(?=\1)': '',
            r'(?<=[a-zA-Z])(\-)(?=[a-zA-Z])': ''
        }

        self.punctuations = [r'\.', r'\.{2,}',
                             r'\!+', r'\:+', r'\;+', r'\"+', r"\'+", r'\?+', r'\,+', r'\(|\)|\[|\]|\{|\}|\<|\>']

        self.delimiter = '<SPLIT>'
        self.stemmer = SnowballStemmer(language='english')    
        self.stop_words = set(stopwords.words('english'))

    def clean_line(self, line):
        for pattern, rep in self.regex_subs.items():
            line = re.sub(pattern, rep, line)
        for pattern in self.punctuations:
            line = re.sub(pattern, '', line)
        return line.lower()

    def tokenize_line(self, line):
        line = re.sub('\s+', self.delimiter, line)

        token_list = [x.strip()
                      for x in line.split(self.delimiter) if x.strip() != '']

        return token_list

    def clean_and_tokenize(self, lines):
        if isinstance(lines, list):
            cleaned_tokens = []
            for line in lines:
                if not len(line.strip()):
                    continue
                line = self.clean_line(line)
                tokens = self.tokenize_line(line)
                cleaned_tokens.append(tokens)
            return cleaned_tokens
        else:
            line = self.clean_line(lines)
            tokens = self.tokenize_line(line)
            return tokens

    def _clean(self, line):
        line = self.clean_line(line)
        
        cleaned = []
        for token in line.split():
            if token not in self.stop_words:
                cleaned.append(self.stemmer.stem(token))

        return " ".join(cleaned)

    def clean(self, lines):
        if isinstance(lines, list):
            cleaned_lines = []
            for line in lines:
                if not len(line.strip()):
                    continue
                line = self._clean(line)
                cleaned_lines.append(line)
            return cleaned_lines
        else:
            line = self._clean(lines)
            return line

In [3]:
def file_to_data(path):
    try:
        with open(path) as f:
            data = [[x.rstrip().split('\t')[1], x.rstrip().split('\t')[2], x.rstrip().split('\t')[3]]  for x in f.readlines()]
    except FileNotFoundError:
        print("File does not exist")
        return
    
    formatted_data = []
    for row in data[1:]:
        formatted_data.append([row[0], row[1], row[2]])

    return formatted_data

In [5]:
year = '2014'
data_file = 'images.test.tsv'
path = f'data/sts/semeval-sts/{year}/{data_file}'
data = file_to_data("data/sts/sick2014/SICK_train.txt")
print(data[0], len(data))

['A group of kids is playing in a yard and an old man is standing in the background', 'A group of boys in a yard is playing and a man is standing in the background', '4.5'] 4500


In [6]:
def TFIDF_clean(data):
    tokenizer = Tokenizer()
    formatted_data = []
    for row in data:
        formatted_data.append([tokenizer.clean(row[0]), tokenizer.clean(row[1]), row[2]])
    
    return formatted_data

In [7]:
tok_data = TFIDF_clean(data)

In [8]:
def train_dev_test_split(data, test_size=0.15):
    X, y = [], []

    for row in data:
        X.append([row[0], row[1]])
        y.append(float(row[2]))
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=69)

    return X_train, X_test, y_train, y_test

In [9]:
X_train, X_test, y_train, y_test = train_dev_test_split(tok_data)
print(len(X_train), len(X_test))

3825 675


In [10]:
# import pickle
# with open('data/train_data.pkl', 'wb') as f:
#     pickle.dump({'x': X_train, 'y': y_train}, f)

In [11]:
class TfidfVectorizer():
    def __init__(self):
        self.tf_l = []
        self.tf_r = []

        self.idf = defaultdict(int)

        self.vocab = {}
        self.data_l = None
        self.data_r = None
        self.vocab_len = 0
        self.num_docs = 0

    def create_vocab(self, data):
        print("Creating Vocabulary...")
        self.data_l = []
        self.data_r = []
        for items in data:
            self.data_l.append(items[0].split())
            self.data_r.append(items[1].split())
        self.num_docs = len(self.data_l)

        for text in self.data_l:
            for token in text:
                if not token in self.vocab:
                    self.vocab[token] = self.vocab_len
                    self.vocab_len += 1
        
        for text in self.data_r:
            for token in text:
                if not token in self.vocab:
                    self.vocab[token] = self.vocab_len
                    self.vocab_len += 1

    def compute_tf(self):
        print("Computing TF Scores...")
        for text in self.data_l:
            d = defaultdict(int)
            for token in text:
                d[self.vocab[token]] += 1
            self.tf_l.append(d)
        
        for text in self.data_r:
            d = defaultdict(int)
            for token in text:
                d[self.vocab[token]] += 1
            self.tf_r.append(d)
    
    def compute_idf(self):
        print("Computing IDF Scores...")
        for token in self.vocab:
            df = 0
            for text in self.data_l:
                if token in text:
                    df += 1
            
            for text in self.data_r:
                if token in text:
                    df += 1
        
            self.idf[self.vocab[token]] = math.log((1 + self.num_docs)/(1 + df)) + 1
     
    def fit_transform(self, data):
        self.create_vocab(data)
        self.compute_tf()
        self.compute_idf()
        print("Creating TF-IDF Vectors...")
        X_l = np.zeros((self.num_docs, self.vocab_len), dtype='float32')
        X_r = np.zeros((self.num_docs, self.vocab_len), dtype='float32')

        for i in range(self.num_docs):
            for token in self.data_l[i]:
                X_l[i][self.vocab[token]] = self.tf_l[i][self.vocab[token]] * self.idf[self.vocab[token]]
        
        for i in range(self.num_docs):
            for token in self.data_r[i]:
                X_r[i][self.vocab[token]] = self.tf_r[i][self.vocab[token]] * self.idf[self.vocab[token]]
        
        return X_l, X_r
    
    def transform(self, data):
        data_l = []
        data_r = []
        tf_l = []
        tf_r = []
        num_docs = len(data)

        for items in data:
            data_l.append(items[0].split())
            data_r.append(items[1].split())

        for text in data_l:
            d = defaultdict(int)
            for token in text:
                if token in self.vocab:
                    d[self.vocab[token]] += 1
            tf_l.append(d)

        for text in data_r:
            d = defaultdict(int)
            for token in text:
                if token in self.vocab:
                    d[self.vocab[token]] += 1
            tf_r.append(d)
        
        X_l = np.zeros((num_docs, self.vocab_len), dtype='float32')
        X_r = np.zeros((num_docs, self.vocab_len), dtype='float32')

        for i in range(num_docs):
            for token in data_l[i]:
                if token in self.vocab:
                    X_l[i][self.vocab[token]] = tf_l[i][self.vocab[token]] * self.idf[self.vocab[token]]
        
        for i in range(num_docs):
            for token in data_r[i]:
                if token in self.vocab:
                    X_r[i][self.vocab[token]] = tf_r[i][self.vocab[token]] * self.idf[self.vocab[token]]
        
        return X_l, X_r

In [12]:
vectorizer = TfidfVectorizer()

X_train_l, X_train_r = vectorizer.fit_transform(X_train)
X_train_r.shape

Creating Vocabulary...
Computing TF Scores...
Computing IDF Scores...
Creating TF-IDF Vectors...


(3825, 1590)

In [13]:
# import pickle
# with open('tfidf_vectorizer.pkl', 'wb') as f:
#     pickle.dump(vectorizer, f)

In [119]:
def cosine_similarity(vec_1, vec_2):
    return vec_1@vec_2.T/(np.linalg.norm(vec_1) * np.linalg.norm(vec_2))

In [135]:
X_dev_l, X_dev_r = vectorizer.transform(X_test)
X_dev_l.shape

(675, 1590)

In [136]:
def output_preds(data_l, data_r):
    return [5 * (0.1 if cosine_similarity(data_l[i], data_r[i]) == 0.0 else cosine_similarity(data_l[i], data_r[i])) for i in range(len(data_l))]

In [138]:
preds = output_preds(X_dev_l, X_dev_r)
pearson_score, _ = scipy.stats.pearsonr(preds, y_test)
pearson_score

0.6605509998061392

## Further Steps
- Get TFIDF Vectors
- Compare vectors (cosine similarity) and scale between (0, 5) [Pipeline] (**Unsupervised/Semi-Supervised**)
  - Show its disadvantages of being unsupervised:
    - Lack of reliable vocabulary (highly dependent on training vocab set and common words in test set)
    - Context (Esssential for meaning) is not captured in TF-IDF
- Attempt TFIDF for multilingual (en-es), create TF-IDF vectors for different languages
  - Concat the two vectors (can't be compared straight-forwardly by just taking cosine similarity, *since different word token bases, no alignment*)
  - Pass through a linear NN to predict the training similarities (**Supervised**)
  - Can act as a baseline for multilingual STS. Through this method we don't have to worry about alignment of the tokens of TF-IDF for different languages.