In [53]:
"""
pip install kagglehub==0.3.3
pip install scikit-learn==1.5.2
pip install textdistance==4.6.2
"""

'\npip install kagglehub==0.3.3\npip install scikit-learn==1.5.2\npip install textdistance==4.6.2\n'

In [54]:
# create negatives
def create_negatives(df: pd.DataFrame) -> pd.Series:
    while True:
        index_shuffled = pd.Series(df.index).sample(frac=1).values
        if (index_shuffled == df.index).sum() == 0:
            break
    neg = df["description_y"][index_shuffled]
    neg.index = df.index 

    df["same"] = 1
    df_neg = pd.concat([df["description_x"], neg], axis=1)
    df_neg["same"] = 0
    return pd.concat([df, df_neg]).sample(frac=1)


In [55]:
from typing import List, Union, Tuple
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
import pandas as pd
import textdistance
import re


class WeightedSims:
    def __init__(self):
        self.whitespace_pattern = re.compile(r"\s+")
        self.digits_pattern = re.compile(r"\d+")

    def rescale(self, ser: pd.Series, min: float, max: float = 1) -> pd.Series:
        return min + (ser - ser.min()) * (max - min) / (ser.max() - ser.min())

    def fit(self, texts_train: pd.Series, texts_test: pd.Series, min_weight: float):
        """
        for simplicity assumes that test data is not streamed, but completely given
        - train data is used to learn/assign weights to characters
        - test data is merely used to assign a naive "neutral" weight of 1 to characters that don't appear in train data
        """
        count_vectorizer = CountVectorizer(analyzer='char')
        char_counts = count_vectorizer.fit_transform(texts_train)
        char_counts_df = pd.DataFrame(char_counts.toarray(), columns=count_vectorizer.get_feature_names_out())
        char_counts_rel = (char_counts_df.sum() / char_counts_df.sum().sum()).sort_values(ascending=False)
        # treat chars that only appear in test as absolutely rare
        self.weights = (1 - char_counts_rel)
        self.weights = self.rescale(self.weights, min=min_weight)
        chars_test = list(set(texts_test.str.cat()))
        for char in [c for c in chars_test if c not in self.weights.index]:
            self.weights.loc[char] = 1

        vectorizer = TfidfVectorizer(analyzer="char", ngram_range=(3, 3), norm=None, binary=False)
        ngrams_count = pd.DataFrame(vectorizer.fit_transform(texts_train).toarray(), columns=vectorizer.get_feature_names_out())
        self.weights_3gram = (1 - (ngrams_count.sum() / ngrams_count.sum().sum()).sort_values())
        self.weights_3gram = self.rescale(self.weights_3gram, min=min_weight)
        # treat ngrams that only appear in test as absolutely rare
        vectorizer.fit(texts_test)
        ngrams_test = vectorizer.get_feature_names_out()
        for ngram in ngrams_test:
            if ngram not in self.weights_3gram.index:
                self.weights_3gram.loc[ngram] = 1

        vectorizer = TfidfVectorizer(analyzer="char", ngram_range=(2, 2), norm=None, binary=False)
        ngrams_count = pd.DataFrame(vectorizer.fit_transform(texts_train).toarray(), columns=vectorizer.get_feature_names_out())
        self.weights_2gram = (1 - (ngrams_count.sum() / ngrams_count.sum().sum()).sort_values())
        self.weights_2gram = self.rescale(self.weights_2gram, min=min_weight)
        # treat ngrams that only appear in test as absolutely rare
        vectorizer.fit(texts_test)
        ngrams_test = vectorizer.get_feature_names_out()
        for ngram in ngrams_test:
            if ngram not in self.weights_2gram.index:
                self.weights_2gram.loc[ngram] = 1

    def jaccard_char_level(self, text1: str, text2: str) -> float:
        if len(text1) == 0 or len(text2) == 0:
            return 0.
        elif text1 == text2:
            return 1.
        union_ids = list(set([c for c in text1] + [c for c in text2]))
        intersection = self.weights[list(set([c for c in text1 if c in text2] + [c for c in text2 if c in text1]))]
        union = self.weights[union_ids]
        return float(intersection.sum() / union.sum())
    
    def jaccard_initials(self, text1: str, text2: str) -> float:
        initials_text1 = "".join([w[0] for w in re.split(pattern=self.whitespace_pattern, string=text1)])
        initials_text2 = "".join([w[0] for w in re.split(pattern=self.whitespace_pattern, string=text2)])
        return self.jaccard_char_level(text1=initials_text1, text2=initials_text2)
    
    def jaccard_digits(self, text1: str, text2: str) -> float:
        digits_text1 = "".join(re.findall(pattern=self.digits_pattern, string=text1))
        digits_text2 = "".join(re.findall(pattern=self.digits_pattern, string=text2))
        return self.jaccard_char_level(digits_text1, digits_text2)
    
    def hamming(self, text1: str, text2: str, overshoot_weight: float = 0.5, sort: bool = False) -> float:
        if len(text1) == 0 and len(text2) == 0:
            return 0.
        elif text1 == text2:
            return 1.
        # sort
        if sort:
            text1 = " ".join(sorted(re.split(pattern=self.whitespace_pattern, string=text1)))
            text2 = " ".join(sorted(re.split(pattern=self.whitespace_pattern, string=text2)))
        # determine long and short text
        if len(text1) >= len(text2):
            text_long = text1
            text_short = text2
        else:
            text_long = text2
            text_short = text1
        # calculate weighted similarity
        total = 0
        sim = 0
        for position in range(len(text_long)):
            if position < len(text_short):
                total += max(self.weights[text_long[position]], self.weights[text_short[position]])
                if text_short[position] == text_long[position]:
                    sim += self.weights[text_short[position]]
            else:
                total += self.weights[text_long[position]] * overshoot_weight
        return sim / total
    
    def longest_common_substring(self, text1: str, text2: str) -> float:
        """
        hello, he eats -> he
        """
        weight1 = self.weights.loc[[c for c in text1]].sum()
        weight2 = self.weights.loc[[c for c in text2]].sum()
        weight_total = max(weight1, weight2)

        common_substring = textdistance.lcsstr(text1, text2)
        weight_common = self.weights.loc[[c for c in common_substring]].sum()
        return float(weight_common / weight_total)
    
    def longest_common_subseq(self, text1: str, text2: str) -> float:
        """
        hello, he will -> hell (longest common substring with skipping)
        """
        weight1 = self.weights.loc[[c for c in text1]].sum()
        weight2 = self.weights.loc[[c for c in text2]].sum()
        weight_total = max(weight1, weight2)

        common_subsseq = textdistance.lcsseq(text1, text2)
        weight_common = self.weights.loc[[c for c in common_subsseq]].sum()
        return float(weight_common / weight_total)
    
    def ngrams(self, text1: str, text2: str, n: int = 3) -> float:
        if len(text1) == 0 or len(text2) == 0:
            return 0.
        elif text1 == text2:
            return 1.
        
        if n == 3:
            weights = self.weights_3gram
        elif n == 2:
            weights = self.weights_2gram
        
        ngrams1 = [text1[i:i+n] for i in range(len(text1) - n + 1)]
        ngrams2 = [text2[i:i+n] for i in range(len(text2) - n + 1)]
        union = list(set(ngrams1 + ngrams2))
        intersection = [n for n in ngrams1 if n in ngrams2]
        weights_union = weights[union].sum()
        weights_intersecion = weights[intersection].sum()
        return weights_intersecion / weights_union


In [56]:
import copy
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score


def add_features(df: pd.DataFrame, weighted_sims):
    df["2gram"] = df.apply(lambda row: weighted_sims.ngrams(row["description_x"], row["description_y"], 2), axis=1)
    df["3gram"] = df.apply(lambda row: weighted_sims.ngrams(row["description_x"], row["description_y"], 3), axis=1)
    df["damerau_levenshtein"] = df.apply(lambda row: textdistance.damerau_levenshtein(row["description_x"], row["description_y"]) / max(len(row["description_x"]), len(row["description_y"])), axis=1)
    """
    df["levenshtein"] = df.apply(lambda row: textdistance.levenshtein(row["description_x"], row["description_y"]) / max(len(row["description_x"]), len(row["description_y"])), axis=1)
    df["lcsseq_unweighted"] = df.apply(lambda row: len(textdistance.lcsseq(row["description_x"], row["description_y"])) / max(len(row["description_x"]), len(row["description_y"])), axis=1)
    df["lcsseq"] = df.apply(lambda row: weighted_sims.longest_common_subseq(row["description_x"], row["description_y"]), axis=1)
    df["lcsstr_unweighted"] = df.apply(lambda row: len(textdistance.lcsstr(row["description_x"], row["description_y"])) / max(len(row["description_x"]), len(row["description_y"])), axis=1)
    df["lcsstr"] = df.apply(lambda row: weighted_sims.longest_common_substring(row["description_x"], row["description_y"]), axis=1)
    df["cosine"] = df.apply(lambda row: textdistance.cosine(row["description_x"], row["description_y"]), axis=1)
    df["jaccard_unweighted"] = df.apply(lambda row: textdistance.jaccard(row["description_x"], row["description_y"]), axis=1)
    df["jaccard_char_level"] = df.apply(lambda row: weighted_sims.jaccard_char_level(row["description_x"], row["description_y"]), axis=1)
    df["jaccard_initials"] = df.apply(lambda row: weighted_sims.jaccard_initials(row["description_x"], row["description_y"]), axis=1)
    df["hamming"] = df.apply(lambda row: weighted_sims.hamming(row["description_x"], row["description_y"]), axis=1)
    """
    weights_original = copy.deepcopy(weighted_sims.weights)
    weighted_sims.weights = pd.Series(index=weighted_sims.weights.index, data=[1] * len(weighted_sims.weights))
    """
    df["jaccard_initials_unweighted"] = df.apply(lambda row: weighted_sims.jaccard_initials(row["description_x"], row["description_y"]), axis=1)
    df["hamming_unweighted"] = df.apply(lambda row: weighted_sims.hamming(row["description_x"], row["description_y"]), axis=1)
    """
    df["2gram_unweighted"] = df.apply(lambda row: weighted_sims.ngrams(row["description_x"], row["description_y"], 2), axis=1)
    df["3gram_unweighted"] = df.apply(lambda row: weighted_sims.ngrams(row["description_x"], row["description_y"], 3), axis=1)
    weighted_sims.weights = weights_original
    return df


def eval(features: List[str], df_train: pd.DataFrame, df_test: pd.DataFrame):
    cls = RandomForestClassifier(min_samples_split=3)
    cls.fit(X=df_train[features], y=df_train["same"])
    pred_test = cls.predict(X=df_test[features])
    return accuracy_score(y_true=df_test["same"], y_pred=pred_test)

In [59]:
import kagglehub
import pandas as pd


# download data
df_path = kagglehub.dataset_download("rishisankineni/text-similarity")
df_train_orig = pd.read_csv(df_path + "/train.csv")
df_train_orig = df_train_orig[["description_x", "description_y"]].drop_duplicates()
df_test_orig = pd.read_csv(df_path + "/test.csv")
df_test_orig = df_test_orig[["description_x", "description_y"]].drop_duplicates()

accs = []
for i in range(10):
    print(i)
    accs_inner = []
    df_train = create_negatives(df_train_orig)
    df_test = create_negatives(df_test_orig)
    weighted_sims = WeightedSims()
    weighted_sims.fit(
        texts_train=pd.concat([df_train["description_x"], df_train["description_y"]]), 
        texts_test=pd.concat([df_test["description_x"], df_test["description_y"]]),
        min_weight=0.9)
    df_train = add_features(df_train, weighted_sims=weighted_sims)
    df_test = add_features(df_test, weighted_sims=weighted_sims)
    feats = [c for c in df_train.columns if c not in ["description_x", "description_y", "same"]]
    for f in feats:
        accs_inner.append(eval(features=[f], df_train=df_train, df_test=df_test))
    accs.append(accs_inner)

0
1
2
3
4
5
6
7
8
9


In [60]:
pd.DataFrame(accs, columns=feats).mean().sort_values(ascending=False)

damerau_levenshtein    0.951647
3gram_unweighted       0.939535
3gram                  0.939341
2gram                  0.938953
2gram_unweighted       0.938953
dtype: float64