In [1]:
import pandas as pd
from pathlib import Path
import pandas as pd
import numpy as np
from pathlib import Path
import random
import string
import re
from rapidfuzz import fuzz

import evaluate

from multiprocessing import Pool, cpu_count

from matplotlib import pyplot as plt

import nltk
nltk.download('punkt')
from nltk.tokenize import word_tokenize

from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from rouge_score import rouge_scorer

# DEBUG
from nltk import translate

[nltk_data] Downloading package punkt to
[nltk_data]     /eagle/projects/tpc/siebenschuh/envs_/bo/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


# Script that explored alternative ways to accelerate computation of BLEU/ROUGE/CAR etc. that tends to be slow

- lesson learned: naïve `pandas` implementation (parallelize over rows) was not worse that elaborate schemes in `dask`

## Speed Assessment
`get_tables.py` has been a real bottleneck.
We need 25k rows with 6 parsers (Nougat, PyMuPDF, Marker, Oreo, pypdf, Grobid). 

Compute as many metrics as pssobiel (BLEU, ROUGE, EDIT). 

25k Rows for 3 * 6 metrics to parse in less than 1 hour $\rightarrow$ 5s per 100 rows per metric.

In [2]:
class FastResponseTable:
    def __init__(self,
                 db_src_filename:str,
                 db_dst_filename:str,
                 chunk_index:int=-1,
                 chunk_size:int=-1,
                 num_cores:int=100,
                 overwrite_flag:bool = False,
                 parser_columns:list[str]=['pymupdf', 'nougat', 'grobid'],
                 root_dir:Path=Path('/lus/eagle/projects/argonne_tpc/siebenschuh/aurora_gpt/database'),
                 n:int=-1) -> None:
        """
        Assume `html` is groudntruth text
        """

        # validate
        root_dir = Path(root_dir)
        assert root_dir.is_dir(), f"Path`root_dir` does not exist or is not directory. Invalid directory: {root_dir}"
        
        # paths
        self.db_src_path = Path(root_dir) / db_src_filename
        self.db_dst_path = Path(root_dir) / db_dst_filename
        self.overwrite_flag = overwrite_flag
        self.n = n
        self.parser_columns = parser_columns
        self.chunk_index = chunk_index
        self.chunk_size = chunk_size
        self.num_cores = num_cores
        
        # raw data
        assert self.db_src_path.is_file(), f"Source CSV path invalid. No such path: {self.db_src_path}"
        if not(self.overwrite_flag):
            assert not(self.db_dst_path.is_file()), f"Destination Path invalid. File already exists at path: {self.db_dst_path}"
        
        # load df
        df_raw = pd.read_csv(self.db_src_path, sep='|')
        
        # drop NA rows
        df_proc = df_raw.dropna()

        # subset DataFrame
        if self.chunk_index!=-1 and self.chunk_size!=-1:
            # check validity 
            assert self.chunk_index >= 0, f"`chunk_index` should be non-negative (or -1) but is {chunk_index}"
            assert self.chunk_size > 0, f"`chunk_size` should be positive (or -1) but is {chunk_size}"
            
            # overwrite self.db_dst_path
            db_dst_filename = Path(db_dst_filename).stem + f'_{self.chunk_index}-{len(df_proc) // self.chunk_size}' + Path(db_dst_filename).suffix
            self.db_dst_path = Path(root_dir) / db_dst_filename
            
            # subset dataframe
            i_start, i_end = chunk_index * chunk_size, min((chunk_index+1) * chunk_size, len(df_proc))
            if i_start>= len(df_proc):
                raise ValueError(f'i_start index exceeds length of Dataframe: i_start={i_start} for len(df_proc)={len(df_proc)}')
            df_proc = df_proc.iloc[i_start:i_end]

            #  DEBUG
            print(f'len(df_proc): {len(df_proc)}, i_start/i_end: ', i_start, i_end)

        # status
        print(f'DF loaded...\n{len(df_raw)} rows\n... {len(df_proc)} after removing NANs & subsetting')
        
        # subset
        if n > 0:
            df_proc = df_proc.iloc[:round(n)]
            print(f'n={n} ... Only use first n rows.')
        
        # normalized text columns
        df_proc['html_norm'] = df_proc.apply(lambda row: self.normalize(row['html']), axis=1)
        for parser_col in self.parser_columns:
            df_proc[f'{parser_col}_norm'] = df_proc.apply(lambda row: self.normalize(row[parser_col]), axis=1)

        # latex text columns
        df_proc['html_latex'] = df_proc.apply(lambda row: self.extract_latex(row['html']), axis=1)
        for parser_col in self.parser_columns:
            df_proc[f'{parser_col}_latex'] = df_proc.apply(lambda row: self.extract_latex(row[parser_col]), axis=1)
            
        # assign
        self.df = df_proc

        pass

    def extract_latex(self, text):
        """
        Extracts LaTeX expressions from the input text and returns both the LaTeX expressions and the text with LaTeX stripped.
    
        Parameters:
            text (str): The input string containing LaTeX expressions.
    
        Returns:
            tuple: A tuple containing two elements:
                - A list of extracted LaTeX expressions.
                - A string with the LaTeX expressions removed.
        """
        # Regular expression to match LaTeX expressions
        latex_pattern = re.compile(r'(\$.*?\$|\\\[.*?\\\]|\\\(.*?\\\)|\\begin\{.*?\}.*?\\end\{.*?\})', re.DOTALL)
    
        # Extract all LaTeX expressions
        latex_expressions = latex_pattern.findall(text)
    
        # Strip the LaTeX expressions from the text
        stripped_text = latex_pattern.sub('', text).strip()
    
        return latex_expressions
        
    def remove_latex(self,
                     text:str) -> str:
        """
        Remove LaTeX formatting from a string
        """
        # Remove LaTeX commands (e.g., \textbf{...}, \emph{...}, etc.)
        text = re.sub(r'\\[a-zA-Z]+\{(.*?)\}', r'\1', text)
        
        # Remove inline math (e.g., $...$)
        text = re.sub(r'\$(.*?)\$', r'\1', text)
        
        # Remove display math (e.g., \[...\] or $$...$$)
        text = re.sub(r'\$\$(.*?)\$\$', r'\1', text)
        text = re.sub(r'\\\[(.*?)\\\]', r'\1', text)
        
        # Remove other LaTeX-specific characters (e.g., \, \%, etc.)
        text = re.sub(r'\\[a-zA-Z]+', '', text)
        
        # Remove braces and any content between them
        text = re.sub(r'\{|\}', '', text)
        
        return text
    
    def normalize(self,
                  x:str, 
                  remove_latex_flag:bool=True) -> str:
        """
        Normalize the text
        """
    
        # const
        REMOVE_PUNCT = str.maketrans("", "", string.punctuation)
    
        # remove latex
        if remove_latex_flag:
            x = self.remove_latex(x)
    
        # remove escape characters
        x = x.translate(REMOVE_PUNCT)
        x = re.sub(r"\s+", " ", x)
        x = x.lower()
        x = x.strip()
        
        return x

    # - NLTK tokenization
    def safe_word_tokenize(self, text):
        if pd.isna(text):
            return []
        
        return word_tokenize(text)
    
    def parallel_tokenize(self, series):
        """
        (Safe) tokenization of entire column of a df
        """
        
        with Pool(processes=self.num_cores) as pool:  # Explicitly set the number of processes
            return pool.map(self.safe_word_tokenize, series)

    def extract_parts(self, token_list):
        """
        Extract sublist (of tokens) post tokenization of the text column
        """
        
        length = len(token_list)

        # extract first, middle, and last 10%
        first_10 = token_list[:max(1, length // 10)]
        mid_start = length // 2 - max(1, length // 20)
        mid_end = length // 2 + max(1, length // 20)
        middle_10 = token_list[mid_start:mid_end]
        last_10 = token_list[-max(1, length // 10):]
        
        return first_10, middle_10, last_10
        
    def tokenize_columns(self,):
        """
        Tokenize html/parser text columns in dataframe
        """

        # get current df
        df_proc = pd.DataFrame(self.df)

        # parser outputs
        for parser_col in ['html'] + self.parser_columns:
            print(f'Tokenizing {parser_col} ... ')
            df_proc[f'{parser_col}_token'] = self.parallel_tokenize(df_proc[parser_col])
            df_proc[f'{parser_col}_norm_token'] = self.parallel_tokenize(df_proc[f'{parser_col}_norm'])
            print('... completed!\n')

        # append beginning/middle/end part
        for parser_col in ['html'] + self.parser_columns:
            df_proc[f'{parser_col}Beg_token'], df_proc[f'{parser_col}Mid_token'], df_proc[f'{parser_col}End_token'] = zip(*df_proc[f'{parser_col}_token'].apply(self.extract_parts))
            df_proc[f'{parser_col}Beg_norm_token'], df_proc[f'{parser_col}Mid_norm_token'], df_proc[f'{parser_col}End_norm_token'] = zip(*df_proc[f'{parser_col}_norm_token'].apply(self.extract_parts))
        
        # re-assign
        self.df = df_proc

        pass
    
    # - BLEU
    def compute_bleu(self, row, reference_col, candidate_col):
        return translate.bleu_score.sentence_bleu(
            [row[reference_col]], 
            row[candidate_col], 
            smoothing_function=translate.bleu_score.SmoothingFunction().method1
        )
    
    # Function to apply compute_bleu in parallel
    def parallel_bleu(self, df, reference_col, candidate_col):
        with Pool(processes=self.num_cores) as pool:
            results = pool.starmap(self.compute_bleu, [(row, reference_col, candidate_col) for _, row in df.iterrows()])
        return results
    
    # - METEOR SLOW
    def compute_meteor(self, row, reference_col, candidate_col):
        return translate.meteor_score.meteor_score(
            [row[reference_col]], 
            row[candidate_col]
        )
    
    # Function to apply compute_meteor in parallel
    def parallel_meteor(self, df, reference_col, candidate_col):
        with Pool(processes=self.num_cores) as pool:
            results = pool.starmap(self.compute_meteor, [(row, reference_col, candidate_col) for _, row in df.iterrows()])
        return results

    # Function to compute the approximate CER (1 minus CER)
    def compute_approx_car(self, row, reference_col, candidate_col):
        '''
        Complement of character error rate (CER); hence: character accuracy rate (CAR)
        '''
        similarity = fuzz.ratio(row[reference_col], row[candidate_col])
        return similarity / 100.0
    
    # Function to apply compute_approx_cer in parallel
    def parallel_car(self, df, reference_col, candidate_col):
        '''
        Complement of character error rate (CER); hence: character accuracy rate
        '''
        with Pool(processes = self.num_cores) as pool:
            results = pool.starmap(self.compute_approx_car, [(row, reference_col, candidate_col) for _, row in df.iterrows()])
        return results

    # ROUGE scores (rouge1, rouge2, rougeL) for a single row
    def compute_rouge(self, row, reference_col, candidate_col, scorer):
        scores = scorer.score(row[reference_col], row[candidate_col])
        return [scores['rouge1'].fmeasure, scores['rouge2'].fmeasure, scores['rougeL'].fmeasure]
    
    # Function to apply compute_rouge in parallel
    def parallel_rouge(self, df, reference_col, candidate_col, scorer):
        with Pool(processes=self.num_cores) as pool:
            results = pool.starmap(self.compute_rouge, [(row, reference_col, candidate_col, scorer) for _, row in df.iterrows()])
        return results
        

    def compute_metrics(self, normalized: bool = True) -> None:
        """
        Processes the table in parallel
        """
        
        # Copy frame of processed columns
        df_proc = pd.DataFrame(self.df)
        
        # Initialize a dictionary to store new columns
        new_columns = {}
    
        # BLEU
        print('BLEU ...')
        for parser_col in self.parser_columns:
            print(f'   {parser_col}')
            for part in ['', 'Beg', 'Mid', 'End']:
                new_columns[f'{parser_col}{part}_bleu'] = self.parallel_bleu(df_proc, f'html{part}_token', f'{parser_col}_token')
                new_columns[f'{parser_col}{part}_bleu_norm'] = self.parallel_bleu(df_proc, f'html{part}_norm_token', f'{parser_col}_norm_token')
        print('...done')
    
        # ROUGE
        print('ROUGE ...')
        scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
        for parser_col in self.parser_columns:
            print(f'   {parser_col}')
            for part in ['']:
                # raw text
                rouge_scores = self.parallel_rouge(df_proc, f'html{part}', f'{parser_col}{part}', scorer)
                # - assign
                new_columns[f'{parser_col}{part}_rouge1'] = [score[0] for score in rouge_scores]
                new_columns[f'{parser_col}{part}_rouge2'] = [score[1] for score in rouge_scores]
                new_columns[f'{parser_col}{part}_rougeL'] = [score[2] for score in rouge_scores]
        
                # normalized text
                rouge_scores_norm = self.parallel_rouge(df_proc, f'html{part}_norm', f'{parser_col}{part}_norm', scorer)
                # - assign
                new_columns[f'{parser_col}{part}_rouge1_norm'] = [score_norm[0] for score_norm in rouge_scores_norm]
                new_columns[f'{parser_col}{part}_rouge2_norm'] = [score_norm[1] for score_norm in rouge_scores_norm]
                new_columns[f'{parser_col}{part}_rougeL_norm'] = [score_norm[2] for score_norm in rouge_scores_norm]
        print('...done')
    
        # CAR (Character Accuracy Rate)
        print('CAR ...')
        for parser_col in self.parser_columns:
            print(f'   {parser_col}')
            for part in ['', 'Beg', 'Mid', 'End']:
                new_columns[f'{parser_col}{part}_car'] = self.parallel_car(df_proc, f'html{part}_token', f'{parser_col}{part}_token')
                new_columns[f'{parser_col}{part}_car_norm'] = self.parallel_car(df_proc, f'html{part}_norm_token', f'{parser_col}{part}_norm_token')
        print('...done')
    
        # Convert the new columns dictionary to a DataFrame
        new_columns_df = pd.DataFrame(new_columns)
    
        # Concatenate the new columns to df_proc
        df_proc = pd.concat([df_proc, new_columns_df], axis=1)
    
        # Assign the final DataFrame to self.df_metrics
        self.df_metrics = df_proc

        pass

    def save_table(self,) -> None:
        """Store processed table
        """
        
        # store table
        self.df_score.to_csv(self.db_dst_path, sep='|')

        pass 

    def load_table(self,) -> None:
        """Store processed table
        """
        assert self.db_dst_path.is_file(), ""
        # store table
        self.df_score = pd.read_csv(self.db_dst_path, sep='|')

        pass

In [7]:
t = FastResponseTable(root_dir='./tmp',
                      chunk_index=0,
                      chunk_size=100,
                      num_cores=8,
                      db_src_filename='df_500.csv', 
                      db_dst_filename='out_df_500.csv')

len(df_proc): 100, i_start/i_end:  0 100
DF loaded...
500 rows
... 100 after removing NANs & subsetting


In [8]:
%%time
t.tokenize_columns()

Tokenizing html ... 
... completed!

Tokenizing pymupdf ... 
... completed!

Tokenizing nougat ... 
... completed!

Tokenizing grobid ... 
... completed!

CPU times: user 3.74 s, sys: 10.9 s, total: 14.6 s
Wall time: 31.3 s


In [None]:
%%time

t.compute_metrics()

BLEU ...
   pymupdf
   nougat


Process ForkPoolWorker-273:
Process ForkPoolWorker-278:
Process ForkPoolWorker-279:
Process ForkPoolWorker-275:
Process ForkPoolWorker-276:
Process ForkPoolWorker-277:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/eagle/projects/tpc/siebenschuh/envs_/bo/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/eagle/projects/tpc/siebenschuh/envs_/bo/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/eagle/projects/tpc/siebenschuh/envs_/bo/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/eagle/projects/tpc/siebenschuh/envs_/bo/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/eagle/projects/tpc/siebenschuh/envs_/bo/lib/python3.11/multip

In [None]:
t.df.columns