In [1]:
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import train_test_split
import math
from collections import defaultdict
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_extraction.text import TfidfVectorizer

## Loading the data set

In [2]:
# Read the file eclipse_new_updated.csv
df = pd.read_csv("..\\new_dataset\\eclipse\\eclipse_new.csv")
df2 = pd.read_csv("..\\new_dataset\\firefox\\firefox_new.csv")

# append the two dataframes into one
df = df.append(df2)

# Print the first 5 rows of the dataframe (preprocessed_description column only)
print(df['preprocessed_description'].head())

0    km pm pr deletion indicator sync viewer subtle...
1    setup project contains gif resource release pr...
2    current vcm api repository adapter either pess...
3    become synchronized project repository use dif...
4    iresource setlocal ha problem method replaces ...
Name: preprocessed_description, dtype: object


  df = df.append(df2)


## Split the data to train and test

In [3]:
# Split the data into training and testing data
X_train, X_test = train_test_split(df, test_size=0.2, random_state=42)

# print the first 1 rows of the training data
print(X_train.head(1))

# print the first 1 rows of the testing data
print(X_test.head(1))

        bug_id                                        description dup_id  \
84372  88542.0  Is it my imagination or is the only way to cha...     []   

      priority                           preprocessed_description  
84372       P3  imagination way change data set chart go prope...  
          bug_id                                        description dup_id  \
240900  264207.0  RAP 1.2 M5\n\nReproducable in controls demo\n\...     []   

       priority                           preprocessed_description  
240900       P3  rap reproducable control demo open expandbar t...  


In [4]:
# print the size of training and testing data
print(f"Training data size: {X_train.shape}")
print(f"Testing data size: {X_test.shape}")

Training data size: (382667, 5)
Testing data size: (95667, 5)


## TF-IDF Algorithm

In [16]:
class TfIdfVectorizer(BaseEstimator, TransformerMixin):
    """
    Convert a collection of raw documents to a matrix of TF-IDF features.

    Parameters
    ----------
    max_df : float, default=1.0
        When building the vocabulary, ignore terms that have a document frequency
        strictly higher than the given threshold (corpus-specific stop words).

    min_df : int, default=1
        When building the vocabulary, ignore terms that have a document frequency
        strictly lower than the given threshold.

    stop_words : list, default=None
        If a list, it contains the stop words to be removed from the documents.

    ngram_range : tuple, default=(1, 1)
        The lower and upper boundary of the range of n-values for different n-grams
        to be extracted.
    """
    def __init__(self, max_df=1.0, min_df=0.1, stop_words=None, ngram_range=(1, 1)):
        self.max_df = max_df
        self.min_df = min_df
        self.stop_words = stop_words
        self.ngram_range = ngram_range
        self.vocab = {}
        self.idf = {}
    
    def fit(self, documents):
        """
        Learn the vocabulary and idf from the documents.

        Parameters
        ----------
        documents : list of str
            A list of raw documents to be vectorized.

        Returns
        -------
        self : object
            Returns the instance itself.
        """
        # Tokenize documents and calculate document frequency (DF)
        doc_count = len(documents)
        df = defaultdict(int)
        
        # Calculate document frequency (DF) for each token
        for doc in documents:
            tokens = self._tokenize(doc)
            unique_tokens = set(tokens)
            for token in unique_tokens:
                df[token] += 1

        # Filter tokens based on max_df and min_df thresholds and create vocabulary
        self.vocab = {token: i for i, (token, count) in enumerate(df.items()) 
                      if count >= self.min_df and count <= self.max_df * doc_count}
        
        # Calculate inverse document frequency (IDF)
        # ==> +1 for smoothing to avoid division by zero
        self.idf = {token: math.log(doc_count / (count + 1)) + 1 for token, count in df.items() if token in self.vocab}
        
        return self

    def transform(self, documents):
        """
        Transform documents to document-term matrix.

        Parameters
        ----------
        documents : list of str
            A list of raw documents to be vectorized.

        Returns
        -------
        X : array of shape (n_samples, n_features)
            The transformed document-term matrix.
        """
        # Calculate TF-IDF for each document
        tfidf_matrix = np.zeros((len(documents), len(self.vocab)))
        
        # Calculate TF-IDF for each document using the vocabulary and IDF values
        for i, doc in enumerate(documents):
            tokens = self._tokenize(doc)
            tf = self._calculate_tf(tokens)
            
            # Calculate TF-IDF for each token in the document
            for token, freq in tf.items():
                if token in self.vocab:
                    tfidf_matrix[i, self.vocab[token]] = freq * self.idf[token]
        
        return tfidf_matrix
    
    def fit_transform(self, documents):
        """
        Learn the vocabulary and idf, return document-term matrix.

        Parameters
        ----------
        documents : list of str
            A list of raw documents to be vectorized.

        Returns
        -------
        X : array of shape (n_samples, n_features)
            The transformed document-term matrix.
        """
        return self.fit(documents).transform(documents)

    def _tokenize(self, document):
        """
        Tokenize the document into n-grams.

        Parameters
        ----------
        document : str
            A single document.

        Returns
        -------
        tokens : list of str
            The list of tokens (n-grams) in the document.
        """
        # Simple tokenization: split by whitespace
        words = document.lower().split()
        if self.stop_words:
            words = [word for word in words if word not in self.stop_words]
        
        # Generate n-grams
        tokens = []
        for n in range(self.ngram_range[0], self.ngram_range[1] + 1):
            for i in range(len(words) - n + 1):
                tokens.append(' '.join(words[i:i + n]))
        
        return tokens
    
    def _calculate_tf(self, tokens):
        """
        Calculate term frequency (TF) for the document.

        Parameters
        ----------
        tokens : list of str
            The list of tokens (n-grams) in the document.

        Returns
        -------
        tf : dict
            The term frequency of each token.
        """

        # Calculate term frequency (TF)
        tf = defaultdict(int)

        # Count the frequency of each token in the document
        for token in tokens:
            tf[token] += 1
        
        # Normalize the frequency of each token by the total number of tokens
        total_tokens = len(tokens)

        # Return the normalized term frequency
        return {token: freq / total_tokens for token, freq in tf.items()}

In [20]:
# Use TF-IDF as a feature extraction technique
vectorizer = TfidfVectorizer(ngram_range=(1, 1))
X_train_tfidf = vectorizer.fit_transform(X_train['preprocessed_description'])
X_test_tfidf = vectorizer.transform(X_test['preprocessed_description'])

In [None]:
# make a dictionary to store the test example bug_id as a key and the predicted duplicates as values
predicted_duplicates_dict = {}

# make a dictionary to store the test example bug_id as a key and the corrected duplicates as values
true_duplicates_dict = {}

# Iterate over each example of the test data
for i in range(X_test_tfidf.shape[0]):
    predicted_duplicates_dict[X_test['bug_id'].values[i]] = []
    true_duplicates_dict[X_test['bug_id'].values[i]] = []
    
    # Calculate the cosine similarity between the test example and all the training examples
    similarity = cosine_similarity(X_test_tfidf[i], X_train_tfidf)

    # Get the bug_ids of the training examples that have similarity greater than or equal to 0.9
    bug_ids = X_train['bug_id'].values[np.where(similarity >= 0.9)[1]]
    # if bug_ids is empty, replace it with -1
    if len(bug_ids) == 0:
        bug_ids = [-1]

    # Append the bug_ids to the predicted_duplicates list
    predicted_duplicates_dict[X_test['bug_id'].values[i]].extend(bug_ids)
    print ("Predicted Duplicates for Bug ID {}: {}".format(X_test['bug_id'].values[i], bug_ids))

    # Print the bug ID of the test example
    print("Test Bug ID:", X_test['bug_id'].values[i])

    # along with the bug_id of the training example
    print("Training Bug IDs:", X_train['bug_id'].values[np.where(similarity >= 0.9)[1]])

    # Get the dup_id of the test example and split it by semicolons
    dup_ids = X_test['dup_id'].values[i].split(';')

    # after splitting check if there is an empty string in the list and remove it
    if '' in dup_ids:
        dup_ids.remove('')

    # if the dup_ids list is  ['[]'] replace it with -1
    if dup_ids == ['[]']:
        dup_ids = [-1]
    # make the dup_ids list as integers
    dup_ids = [int(dup_id) for dup_id in dup_ids]

    # Append the dup_ids to the true_duplicates list
    true_duplicates_dict[X_test['bug_id'].values[i]].extend(dup_ids)
    print("True Duplicates for Bug ID {}: {}".format(X_test['bug_id'].values[i], dup_ids))

    print("\n")
    
# calculate the true_positive, false_positive, true_negative, false_negative
true_positive = 0
false_positive = 0
true_negative = 0
false_negative = 0

# There are 4 cases to consider:
# 1. The predicted duplicate is in the true duplicates list (true positive)
# 2. The predicted duplicate is not in the true duplicates list (false positive)
# 3. The true duplicate is not predicted as a duplicate (false negative)
# 4. The true duplicate is predicted as a duplicate (true negative)

# Note: if predicted_duplicates_dict[X_test['bug_id'].values[i]] contains -1, and true_duplicates_dict[X_test['bug_id'].values[i]] contains -1, then it is a true negative
# if predicted_duplicates_dict[X_test['bug_id'].values[i]] contains -1, and true_duplicates_dict[X_test['bug_id'].values[i]] contains a bug_id, then it is a false negative
# if predicted_duplicates_dict[X_test['bug_id'].values[i]] contains a bug_id, and true_duplicates_dict[X_test['bug_id'].values[i]] contains -1, then it is a false positive
# if predicted_duplicates_dict[X_test['bug_id'].values[i]] contains a bug_id, and true_duplicates_dict[X_test['bug_id'].values[i]] contains a bug_id, then it is a true positive

for key in predicted_duplicates_dict.keys():
    if -1 in predicted_duplicates_dict[key] and -1 in true_duplicates_dict[key]:
        true_negative += 1
    elif -1 in predicted_duplicates_dict[key] and -1 not in true_duplicates_dict[key]:
        false_negative += 1
    elif -1 not in predicted_duplicates_dict[key] and -1 in true_duplicates_dict[key]:
        false_positive += 1
    elif -1 not in predicted_duplicates_dict[key] and -1 not in true_duplicates_dict[key]:
        for bug_id in predicted_duplicates_dict[key]:
            if bug_id in true_duplicates_dict[key]:
                true_positive += 1

# calculate the precision, recall, and f1 score
precision = true_positive / (true_positive + false_positive)
recall = true_positive / (true_positive + false_negative)
f1 = 2 * precision * recall / (precision + recall)
accuracy_score = (true_positive + true_negative) / (true_positive + true_negative + false_positive + false_negative)

#print("Precision:", precision)
#print("Recall:", recall)
#print("F1 Score:", f1)
#print("Accuracy Score:", accuracy_score)

# write the results to a file
with open("results.txt", "w") as f:
    f.write("Accuracy Score: " + str(accuracy_score) + "\n")
    f.write("Precision: " + str(precision) + "\n")
    f.write("Recall: " + str(recall) + "\n")
    f.write("F1 Score: " + str(f1) + "\n")