In [70]:
# Import necessary modules
import os
import sys
import csv

# Check if ColBERT repository exists; if not, clone it
if not os.path.exists('ColBERT/'):
    os.system('git clone https://github.com/stanford-futuredata/ColBERT.git')
else:
    # If ColBERT repository exists, pull the latest changes
    os.system('git -C ColBERT/ pull')

# Add ColBERT directory to sys.path to enable imports
sys.path.insert(0, 'ColBERT/')

# Install required packages using pip
!pip install -e ColBERT/['faiss-gpu','torch']

# Import all necessary ColBERTv2 modules
import colbert
from colbert import Indexer, Searcher
from colbert.infra import Run, RunConfig, ColBERTConfig
from colbert.data import Queries, Collection

Obtaining file:///content/ColBERT
  Preparing metadata (setup.py) ... [?25l[?25hdone
Installing collected packages: colbert-ai
  Attempting uninstall: colbert-ai
    Found existing installation: colbert-ai 0.2.17
    Uninstalling colbert-ai-0.2.17:
      Successfully uninstalled colbert-ai-0.2.17
  Running setup.py develop for colbert-ai
Successfully installed colbert-ai-0.2.17


In [71]:
"""
      A class for handling files and giving training and testing data.

      Attributes:
          folder_path (str): The path to the folder containing the files.
"""
class FileHandlerData:
    def __init__(self, folder_path):
        """Initializes the FileHandlerData with the specified folder path.

        Args:
            folder_path (str): Path to the folder containing files.
        """
        self.folder_path = folder_path

    def read_dock_files(self):
        """Reads dock files from the specified folder.

        Returns:
            list: A list containing texts of dock files indexed by file number.
        """
        dock_file_texts = []

        try:
            # Get the list of all dock files in the folder.
            dock_files = os.listdir(self.folder_path)
            dock_files.sort(key=int)  # Sort files numerically

            # Create a list to store the dock file texts with None values initially.
            dock_file_texts = ["Null"] * (int(max(dock_files)) + 1)  # Add 1 to accommodate dock files starting from index 1.

            # Iterate over the dock files and read their texts.
            for dock_file in dock_files:

                dock_file_index = int(dock_file) # Assuming the dock file name can be converted to an integer.

                with open(os.path.join(self.folder_path, dock_file), "r", encoding="utf-8") as f:

                    dock_file_text = f.read()
                    dock_file_texts[dock_file_index] = dock_file_text

        except FileNotFoundError as e:
            print(f"File '{e.filename}' not found.")
        except Exception as e:
            print(f"Error: {e}")

        return dock_file_texts

    def read_queries_from_file(self, file_path):
        """Reads queries from a text file.

        Args:
            file_path (str): Path to the input file containing queries.

        Returns:
            list: A list containing queries read from the file.

        Raises:
            FileNotFoundError: If the specified file is not found.
            Exception: If an error occurs during file reading.
        """
        queries = []  # Initialize an empty list to store queries
        try:
            with open(file_path, "r", encoding="utf-8") as file:
                for line in file:
                    query = line.strip("\n")  # Remove leading/trailing whitespaces and newline characters
                    queries.append(query)  # Add the cleaned query to the list of queries
        except FileNotFoundError:
            raise FileNotFoundError(f"File '{file_path}' not found.")
        except Exception as e:
            raise Exception(f"Error: {e}")  # Raise an exception if any other error occurs during file reading

        return queries  # Return the list of queries read from the file


    def read_relevance_scores_from_file(self, file_path):
            """Reads the relevance scores for each query from a file.

            Args:
                file_path (str): The path to the file containing the relevance scores.

            Returns:
                dict: A dictionary where the keys are the line numbers and the values are the relevance scores for each line.
            """
            relevance_scores = {}  # Initialize an empty dictionary to store relevance scores
            line_number = 0  # Initialize line number counter

            try:
                with open(file_path, 'r') as f:
                    for line in f:
                        scores = line.strip().split()  # Split the line into individual scores
                        relevance_scores[line_number] = scores  # Add scores to the dictionary with line number as key
                        line_number += 1  # Increment line number

            except FileNotFoundError:
                raise FileNotFoundError(f"File '{file_path}' not found.")
            except Exception as e:
                raise Exception(f"Error: {e}")  # Raise an exception if any other error occurs during file reading

            return relevance_scores  # Return the dictionary containing relevance scores for each line
    def export_metrics(self, query_metrics, csv_file_path):
        """Writes evaluation metrics to a CSV file.

        Args:
            query_metrics (dict): A dictionary containing evaluation metrics for each query.
            csv_file_path (str, optional): The path to the CSV file. Defaults to "query_metrics_output.csv".
        """
        # Open the CSV file in write mode
        with open(csv_file_path, mode="w", newline='') as csv_file:
            # Create a CSV writer object
            csv_writer = csv.writer(csv_file)

            # Write the header row
            header = ["Precision", "Recall", "F1-score", "True Positives", "False Positives", "False Negatives", "AP", "RR"]
            csv_writer.writerow(header)

            # Iterate over query metrics and write each row to the CSV file
            for query_index, metrics in query_metrics.items():
                row = [
                    f"{metrics['Precision']:.2f}",
                    f"{metrics['Recall']:.2f}",
                    f"{metrics['F1-score']:.2f}",
                    metrics['True Positives'],
                    metrics['False Positives'],
                    metrics['False Negatives'],
                    f"{metrics['AP']:.2f}",
                    f"{metrics['RR']:.2f}"
                ]
                csv_writer.writerow(row)

In [72]:
class ColBert_Search_Engine:
    def __init__(self, index_name, dock_file_texts):
        """Initializes the ColBertSearchEngine with the specified index name and dock file texts.

        Args:
            index_name (str): Name of the index for the search engine.
            dock_file_texts (list): List containing texts of dock files for indexing.
        """
        self.index_name = index_name
        self.dock_file_texts = dock_file_texts

    def create_index(self, doc_maxlen, nbits , kmeans_niters):
        """Creates an index for the search engine using the provided dock file texts.

        Args:
            doc_maxlen (int): Maximum length of a document for indexing.
            nbits (int): Number of bits for hash functions in the index.

        Returns:
            Indexer: An instance of the Indexer class representing the created index.
        """
        checkpoint = 'colbert-ir/colbertv2.0'

        with Run().context(RunConfig(nranks=1, experiment='notebook')):
            config = ColBERTConfig(doc_maxlen=doc_maxlen, nbits=nbits, kmeans_niters = kmeans_niters)
            indexer = Indexer(checkpoint=checkpoint, config=config)
            indexer.index(self.index_name, collection=self.dock_file_texts, overwrite=True)

        return indexer

    def search_queries(self, queries, _number_):
        """Searches for queries in the created index and returns the search results.

        Args:
            queries (list): List of queries to search for.
            _number_ (int): Number of top passages to retrieve for each query.

        Returns:
            dict: A dictionary where keys are query indices and values are lists of tuples
                  containing (query, passage_id, passage_score) for top passages.
        """
        with Run().context(RunConfig(experiment='notebook')):
            searcher = Searcher(self.index_name, collection=self.dock_file_texts)

        results_dict = {}  # Initialize an empty dictionary to store the results
        i = 0  # Initialize an incrementing number

        for query in queries:
            print(f"Query: [{query}]")
            results = searcher.search(query, k=_number_)
            query_results = []
            for passage_id, passage_rank, passage_score in zip(*results):
                query_results.append((query, passage_id, passage_score, passage_rank))
            results_dict[i] = query_results
            i += 1  # Increment the number for the next iteration

        return results_dict

In [73]:
class EvaluationMetrics:
    @staticmethod
    def calculate_confusion_matrix(retrieved_docs, relevant_docs):
        """Calculate confusion matrix for binary classification.

        Args:
            retrieved_docs (list): List of retrieved document IDs.
            relevant_docs (list): List of relevant document IDs.

        Returns:
            tuple: True positives, false positives, false negatives, true negatives.
        """
        true_positives = len(set(retrieved_docs) & set(relevant_docs))
        false_positives = len(set(retrieved_docs) - set(relevant_docs))
        false_negatives = len(set(relevant_docs) - set(retrieved_docs))
        true_negatives = 0  # In information retrieval, TN is usually not applicable

        return true_positives, false_positives, false_negatives, true_negatives

    @staticmethod
    def calculate_precision_recall(retrieved_docs, relevant_docs):
        """Calculate precision and recall.

        Args:
            retrieved_docs (list): List of retrieved document IDs.
            relevant_docs (list): List of relevant document IDs.

        Returns:
            tuple: Precision and recall.
        """
        # Calculate precision and recall
        true_positives, false_positives, false_negatives, _ = EvaluationMetrics.calculate_confusion_matrix(
            retrieved_docs, relevant_docs)

        precision = 0 if (true_positives + false_positives) == 0 else true_positives / (true_positives + false_positives)
        recall = 0 if (true_positives + false_negatives) == 0 else true_positives / (true_positives + false_negatives)
        return precision, recall

    @staticmethod
    def calculate_f1_score(precision, recall):
        """Calculate F1-score.

        Args:
            precision (float): Precision value.
            recall (float): Recall value.

        Returns:
            float: F1-score.
        """
        # Calculate F1-score: 2 * (precision * recall) / (precision + recall)
        if precision + recall == 0:
            return 0
        return 2 * (precision * recall) / (precision + recall)

    @staticmethod
    def calculate_ap(results_dict, relevance_scores):
        """Calculate Average Precision (AP) for the given results and relevance scores.

        Args:
            results_dict (dict): A dictionary containing query indices and corresponding retrieved documents.
            relevance_scores (dict): A dictionary containing query indices and lists of relevant document IDs.

        Returns:
            dict: A dictionary with query indices as keys and their corresponding AP scores as values.
        """
        # Dictionary to store AP scores for each query
        query_metrics = {}

        for query_index, retrieved_docs in results_dict.items():
            relevant_docs = set(int(doc_id) for doc_id in relevance_scores[query_index])

            # Calculate precision at each position and average precision
            precision_at_k = [1 if doc_id in relevant_docs else 0 for _, doc_id, _, _ in retrieved_docs]
            average_precision = sum(precision_at_k[:i + 1].count(1) / (i + 1) for i in range(len(precision_at_k)) if precision_at_k[i] == 1)
            average_precision /= len(relevant_docs) if len(relevant_docs) > 0 else 1

            query_metrics[query_index] = average_precision

        return query_metrics

    @staticmethod
    def calculate_rr(query_results, relevance_scores):
        """Calculate Reciprocal Rank (RR) for the given results and relevance scores.

        Args:
            query_results (dict): A dictionary where each key is a query index, and the corresponding value is a list of tuples
                                containing (passage_id, passage_score, passage_rank).
            relevance_scores (dict): A dictionary where each key is a query index, and the corresponding value is a list
                                    containing relevant passage IDs.

        Returns:
            dict: A dictionary where each key is a query index, and the corresponding value is the Reciprocal Rank (RR) for that query.
        """
        rr_values = {}

        for query_index, results_list in query_results.items():
            relevant_docs = set(int(doc_id) for doc_id in relevance_scores[query_index])

            # Find the rank of the first relevant document (Reciprocal Rank)
            reciprocal_rank = 0
            for i, (_, doc_id, _, _) in enumerate(results_list, start=1):
                if int(doc_id) in relevant_docs:
                    reciprocal_rank = 1 / i
                    break

            rr_values[query_index] = reciprocal_rank

        return rr_values



In [74]:
def main():

    # Specify the folder path containing dock files
    folder_path_docs = "/content/drive/MyDrive/docs"

    # Specify the file path containing the Queries
    # file_path_queries = "/content/drive/MyDrive/data/Queries.txt"
    file_path_queries = "/content/drive/MyDrive/data/Queries_20"

    # Specify the file path containing the evaluation data relevant texts
    # file_path_evaluation  = "/content/drive/MyDrive/data/Relevant.txt"
    file_path_evaluation = "/content/drive/MyDrive/data/Relevant_20"

    # ColBERT Parameters
    # Number of ranked texts results the search engine returns
    k_number = 400
    doc_maxlen=500 # truncate passages at 500 tokens
    kmeans_niters=14 # kmeans_niters specifies the number of iterations of k-means clustering.
    nbits=4  # encode each dimension with 4 bits

    # Initialize and use DockFileHandler to read dock files from the specified folder
    dock_file_handler = FileHandlerData(folder_path_docs)
    dock_file_texts = dock_file_handler.read_dock_files()
    # print(f"Text of dock file with index 200:\n{dock_file_texts[200]}")

    queries = dock_file_handler.read_queries_from_file(file_path_queries)
    # print(queries[88])

    relevance_scores = dock_file_handler.read_relevance_scores_from_file(file_path_evaluation)
    # print(f"re {relevance_scores[99]}")


    indexer_name = 'Le_Indexer'
    search_engine = ColBert_Search_Engine(indexer_name, dock_file_texts)
    indexer = search_engine.create_index(doc_maxlen, nbits, kmeans_niters)

    results_dict = search_engine.search_queries(queries, k_number)

    # comment out the following section if you want to print the search engine results each query
    # Print the results
    for query, results in results_dict.items():
       print(f"Results for query: [{query+1}]")
       for query,passage_id,passage_score,passage_rank in results:
            print(f"Query: {query} Passage ID: {passage_id}, Score: {passage_score:.1f}, Rank [{passage_rank}]")
       print("\n")


    # Initialize an empty dictionary to store evaluation metrics for each query
    query_metrics = {}

    # Iterate through all queries
    for query_index in range(0, len(queries)):
        query_text = queries[query_index]  # Get the query text for the current query
        retrieved_docs = results_dict[query_index]
        relevant_docs = set(int(doc_id) for doc_id in relevance_scores[query_index])

        # Calculate precision, recall, F1-score, and confusion matrix for the current query
        retrieved_doc_ids = set(doc_id for _, doc_id, _, _ in retrieved_docs)
        precision, recall = EvaluationMetrics.calculate_precision_recall(retrieved_doc_ids, relevant_docs)
        f1_score = EvaluationMetrics.calculate_f1_score(precision, recall)
        true_positives, false_positives, false_negatives, _ = EvaluationMetrics.calculate_confusion_matrix(
            retrieved_doc_ids, relevant_docs)

        # Calculate MAP and MRR for the current query
        ap_values = EvaluationMetrics.calculate_ap({query_index: retrieved_docs},
                                                  {query_index: relevance_scores[query_index]})
        rr_values = EvaluationMetrics.calculate_rr({query_index: retrieved_docs},
                                                  {query_index: relevance_scores[query_index]})

        # Store metrics in the dictionary including query text
        query_metrics[query_index] = {
            "Query Text": query_text,  # Include query text in the dictionary
            "Precision": precision,
            "Recall": recall,
            "F1-score": f1_score,
            "True Positives": true_positives,
            "False Positives": false_positives,
            "False Negatives": false_negatives,
            "AP": ap_values[query_index],
            "RR": rr_values[query_index]
        }

        dock_file_handler.export_metrics(query_metrics , "query_metrics_output.csv")

if __name__ == "__main__":
    main()



[Jan 28, 15:46:37] #> Note: Output directory /content/experiments/notebook/indexes/Le_Indexer already exists


[Jan 28, 15:46:37] #> Will delete 10 files already at /content/experiments/notebook/indexes/Le_Indexer in 20 seconds...
#> Starting...
#> Joined...
[Jan 28, 15:47:40] #> Loading codec...
[Jan 28, 15:47:40] #> Loading IVF...
[Jan 28, 15:47:40] #> Loading doclens...


100%|██████████| 1/1 [00:00<00:00, 421.79it/s]

[Jan 28, 15:47:40] #> Loading codes and residuals...



100%|██████████| 1/1 [00:00<00:00, 60.54it/s]

Query: [What are the effects of calcium on the physical properties of mucus from CF patients]

#> QueryTokenizer.tensorize(batch_text[0], batch_background[0], bsize) ==
#> Input: . What are the effects of calcium on the physical properties of mucus from CF patients, 		 True, 		 None
#> Output IDs: torch.Size([32]), tensor([  101,     1,  2054,  2024,  1996,  3896,  1997, 13853,  2006,  1996,
         3558,  5144,  1997, 14163,  7874,  2013, 12935,  5022,   102,   103,
          103,   103,   103,   103,   103,   103,   103,   103,   103,   103,
          103,   103], device='cuda:0')
#> Output Mask: torch.Size([32]), tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0], device='cuda:0')

Query: [Can one distinguish between the effects of mucus hypersecretion and infection on the submucosal glands of the respiratory tract in CF]





[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Query: What histochemical differences have been described between normal and CF respiratory epithelia Passage ID: 548, Score: 12.3, Rank [239]
Query: What histochemical differences have been described between normal and CF respiratory epithelia Passage ID: 192, Score: 12.3, Rank [240]
Query: What histochemical differences have been described between normal and CF respiratory epithelia Passage ID: 776, Score: 12.3, Rank [241]
Query: What histochemical differences have been described between normal and CF respiratory epithelia Passage ID: 998, Score: 12.3, Rank [242]
Query: What histochemical differences have been described between normal and CF respiratory epithelia Passage ID: 559, Score: 12.3, Rank [243]
Query: What histochemical differences have been described between normal and CF respiratory epithelia Passage ID: 1136, Score: 12.3, Rank [244]
Query: What histochemical differences have been described between normal and