In [6]:
import spacy
from nltk import sent_tokenize
import re
from ciraprocessor import CiRAProcessor
from sys import stderr
from cira.src.data.labels import EventLabel
import math
import os
import time

In [7]:
class Timer:
    def __init__(self):
        self.start_time = time.time()
        self.interval = 60
        self.last_write_time = self.start_time
        self.elapsed_time = 0
        
    def log_time(self):
        """Writes the current time to the timer file if the elapsed time is greater than the interval."""
        self.elapsed_time = time.time() - self.start_time
        if self.elapsed_time - self.last_write_time > self.interval:
            with open('timer_4G.txt', 'a') as f:
                f.write(f'Elapsed time: {self.elapsed_time} seconds\n')
            self.last_write_time = self.elapsed_time

In [8]:
class RODExtender:
    
    # Constants
    CONDITIONAL_MARKERS = ['if', 'unless', 'upon', 'when', 'whenever', 'as long as', 'so long as']
    PMI_THRESHOLD = 3.97
    SIMILARITY_THRESHOLD = 0.6 # spacy similarity threshold

    def __init__(self, input_file_path, seed_RODs: list[str], output_file_path: str):
        classifier_causal_model_path = 'cira/model/cira-classifier.bin'
        converter_s2l_model_path = 'cira/model/cira-labeler.ckpt'
        self._cira_processor = CiRAProcessor(classifier_causal_model_path, converter_s2l_model_path)
        self._seed_RODs = seed_RODs
        self._current_RODs = []
        with open(input_file_path, 'r', encoding='utf-8') as f:
            text = f.read()
            self._sentences = sent_tokenize(text)
        self._verb_phrases = None
        self._current_ROD_id = 0
        self._output_file = open(output_file_path, 'w')

    '''Returns whether sentence is conditional/causal (used interchangeably for this context)'''

    def __get_indices_based_on_condition(self, condition_func, sentences_subset_ids=None):
        if sentences_subset_ids is None:
            sentences_subset_ids = list(range(len(self._sentences)))
        valid_indexes = [index for index in sentences_subset_ids if 0 <= index < len(self._sentences)]
        indices = [
            index for index in valid_indexes if condition_func(self._sentences[index])
        ]
        return indices

    def __contains_conditional_marker(self, sentence):
        # Condition: Check if the sentence contains any of the conditional markers
        return any(re.search(fr'\b{marker}\b', sentence, re.IGNORECASE) for marker in self.CONDITIONAL_MARKERS)

    def __conditional_indices(self, sentences_subset_ids: list[int] = None):
        return self.__get_indices_based_on_condition(self.__contains_conditional_marker, sentences_subset_ids)

    def __cira_is_causal(self, sentence):
        causal, _ = self._cira_processor.cira_classify(sentence)
        return causal

    def __cira_classified_as_causal_indices(self, sentences_subset_ids: list[int] = None):
        return self.__get_indices_based_on_condition(self.__cira_is_causal, sentences_subset_ids)

    # noinspection PyPep8Naming
    def __contains_ROD(self, sentence):
        if len(self._current_RODs[self._current_ROD_id].split()) != 2:
            stderr.write("ROD is not of length 2.\n")
            return False
        current_ROD_list = self._current_RODs[self._current_ROD_id].split()

        # Construct the regular expression pattern for the seed phrase with a maximum of 8 words in between
        pattern = re.compile(fr'{re.escape(current_ROD_list[0])} (\w+ ){{0,8}}{re.escape(current_ROD_list[1])}', re.IGNORECASE)
        return re.search(pattern, sentence) is not None

    '''Returns whether sentence contains the given seed ROD (verb phrase, 2 words) with 8 or less words in between the words'''

    # noinspection PyPep8Naming
    def __contain_ROD_indices(self, sentences_subset_ids: list[int] = None):
        """Returns a list of indices of sentences that contain the given ROD."""
        if self._current_ROD_id < 0 or self._current_ROD_id > (len(self._current_RODs) - 1):
            stderr.write("ROD index out of bounds for current_RODs.")
            return
        return self.__get_indices_based_on_condition(self.__contains_ROD, sentences_subset_ids)

    def __find_conditions_for_sentence(self, sentence) -> list[str]:
        """Returns a list of conditions in the given sentence."""
        # causal, _ = self._cira_processor.cira_classify(sentence)
        # if not causal:
        #     return []
        conditions = []
        labels = self._cira_processor.cira_label(sentence)
        for label in labels:
            if (isinstance(label, EventLabel)) and label.is_cause():
                sorted_children = sorted(label.children, key=lambda x: x.begin)
                for i, sublabel in enumerate(sorted_children):
                    if sublabel.name == "Condition":
                        condition = sentence[sublabel.begin:sublabel.end]
                        # Check if the condition is preceded by a negation
                        if i > 0 and sorted_children[i - 1].name == "Negation":
                            condition = "not " + condition
                        conditions.append(condition)
        return conditions
    
    def __conditions_overlap(self, sentence1, sentence2, nlp):
        '''Returns whether the sentences' conditions overlap (at least one common condition) using spacy similarity'''
        conditions1 = self.__find_conditions_for_sentence(sentence1)
        conditions2 = self.__find_conditions_for_sentence(sentence2)
        for condition1 in conditions1:
            for condition2 in conditions2:
                similarity = nlp(condition1).similarity(nlp(condition2))
                if similarity > self.SIMILARITY_THRESHOLD:
                    return True
        return False 
    
    
    def __find_subjects(self, sentence) -> list[str]:
        '''Returns a list of subjects in the given sentence.'''
        nlp = spacy.load("en_core_web_lg")
        doc = nlp(sentence)
        subjects = []
        for token in doc:
            if token.dep_ == "nsubj":
                subjects.append(token.text)
        return subjects
    
    
    def __subject_overlap(self, sentence1, sentence2, nlp) -> bool:
        '''Returns whether the sentences'/conditions' nominal subjects overlap (at least one common subject) using spacy similarity and returns the overlapping subject from sentence2'''
        subjects1 = self.__find_subjects(sentence1)
        subjects2 = self.__find_subjects(sentence2)
        for subject1 in subjects1:
            for subject2 in subjects2:
                if nlp(subject1).similarity(nlp(subject2)) > self.SIMILARITY_THRESHOLD:
                    return True
        return False
                    
    def __extract_verb_phrases(self, sentence):
        '''Extracts verb phrases (verb + object) from the given sentence using dependency parsing, not suitable for complicated sentences.'''
        nlp = spacy.load("en_core_web_lg")
        doc = nlp(sentence)
        verb_phrases = []
        for token in doc:
            if token.pos_ == "VERB":
                for child in token.children:
                    if child.dep_ in {"dobj", "attr"}:
                        verb_phrase = ' '.join([token.lemma_, child.lemma_])
                        verb_phrases.append(verb_phrase)
                    elif child.dep_ == "ccomp":
                        for grandchild in child.children:
                            if grandchild.dep_ == "nsubj":
                                verb_phrase = ' '.join([token.lemma_, grandchild.lemma_])
                                verb_phrases.append(verb_phrase)        
        return verb_phrases
    
    def __total_count_verb_phrases(self):
        '''Returns the total number of verb phrases in all sentences.'''
        return sum(len(verb_phrases) for verb_phrases in self._verb_phrases)
        
    def __identify_cooc_verb_phrases(self, ROD_sentence, neighbor_sentence, ROD):
        '''Identify co-occuring verb phrases with the same subject and condition and a PMI above the threshold. Returns a list of co-occuring verb phrases for the two given sentences.'''
        nlp = spacy.load('en_core_web_lg')
        subject_overlap = self.__subject_overlap(ROD_sentence, neighbor_sentence, nlp)
        if subject_overlap is False:
            return []
        try: 
            conditions_overlap = self.__conditions_overlap(ROD_sentence, neighbor_sentence, nlp)
        except AttributeError:
            # the labelingconverter.py from cira may throw an AttributeError (for unknown reasons), so we catch it here
            return []
        # if the subject and the conditions overlap, check for co-occurring verb phrases
        if conditions_overlap:
            cooc_verb_phrases = []
            new_verb_phrases = self._verb_phrases[self._sentences.index(neighbor_sentence)]
            for verb_phrase in new_verb_phrases:
                if verb_phrase == ROD:
                    continue
                # use pmi to calculate the similarity between the new verb phrase and ROD
                pmi = self.__pmi(ROD, verb_phrase)
                self.__write_to_file(("PMI of \"" + verb_phrase + "\" and \"" + ROD + "\": " + str(pmi)))                    
                if pmi > self.PMI_THRESHOLD:
                    self.__write_to_file(verb_phrase + " is a co-occurring verb phrase with " + ROD + " in the sentence: " + neighbor_sentence)
                    cooc_verb_phrases.append(verb_phrase)
            return cooc_verb_phrases
        else: return []
        
    
    def __pmi(self, ROD, verb_phrase2):
        """Calculates the pointwise mutual information between the given verb phrases: PMI(x, y) = log2(P(x, y) / (P(x) * P(y)))
        """
        total_count = self.__total_count_verb_phrases()
        count_xy = self.__cooc_phrase_count(ROD, verb_phrase2)
        if count_xy == 0:
            return 0
        count_x = self.__count_phrase(ROD)
        count_y = self.__count_phrase(verb_phrase2)
        pmi = math.log((count_xy / total_count) / ((count_x / total_count) * (count_y / total_count)))
        return pmi
    
    def __count_phrase(self, phrase):
        """Calculates the number of occurences of the given phrase in the document."""
        return sum(sublist.count(phrase) for sublist in self._verb_phrases)
    

    def __cooc_phrase_count(self, ROD, phrase2):
        """Calculates the number of co-occurences an ROD and a verb phrase in the document.
        They are considered to co-occur if they are present in the same sentence or in neighboring sentences."""
        cooc_verb_phrases_count = 0
        # or self._sentences[phrase].contains(ROD)
        # add ROD to phrase list if it is present in the sentence but did not get extracted as a verb phrase
        self._verb_phrases = [sublist + [ROD] if ROD not in sublist and self.__contains_ROD(self._sentences[i]) else sublist for i, sublist in enumerate(self._verb_phrases)]
        verb_phrases = [[phrase for phrase in sublist if phrase == ROD or phrase == phrase2 or self.__contains_ROD(self._sentences[ind])] for ind, sublist in enumerate(self._verb_phrases)]
        for index in range(len(verb_phrases)):
            if ROD in verb_phrases[index] and phrase2 in verb_phrases[index]:
                cooc_verb_phrases_count += 1
                # remove the phrases from the list to avoid double counting
                verb_phrases[index].remove(ROD)
                verb_phrases[index].remove(phrase2)
            if index < len(verb_phrases) - 1:
                # merge the list of verb phrases from the next sentence with the current sentence's verb phrases
                merged_verb_phr = verb_phrases[index] + verb_phrases[index+1]
                if ROD in merged_verb_phr and phrase2 in merged_verb_phr:
                    cooc_verb_phrases_count += 1
                    # after counting the co-oc, remove the phrases from the list to avoid double counting
                    try:
                        verb_phrases[index].remove(ROD)
                    except ValueError:
                        verb_phrases[index+1].remove(ROD)
                    try:
                        verb_phrases[index].remove(phrase2)
                    except ValueError:
                        verb_phrases[index+1].remove(phrase2)
        return cooc_verb_phrases_count
    
    def __search_neighbors(self, _indices, ROD, timer):
        """Searches the neighboring sentences of the given index for co-occurring verb phrases with the current ROD."""
        cooc_verb_phrases = []
        for index in _indices:
            # log time here as this loop is not too time-consuming but also not too fast
            timer.log_time()
            if index > 0: # and index - 1 not in _indices:
                    cooc_verb_phrases += self.__identify_cooc_verb_phrases(self._sentences[index], self._sentences[index-1], ROD)
            if index < len(self._sentences) - 1: # and index + 1 not in _indices:
                    cooc_verb_phrases += self.__identify_cooc_verb_phrases(self._sentences[index], self._sentences[index+1], ROD)
        self.__write_to_file("self.__search_neighbors returns co-occurring verb phrases with " + ROD + ":")
        return cooc_verb_phrases
    
    def __write_to_file(self, message):
        """Writes the given message to the output file."""
        try:
            self._output_file.write(message + "\n")
            self._output_file.flush()
            os.fsync(self._output_file.fileno())
        except IOError as e:
            print(f"An IOError occurred: {e}")
        finally:
            if self._output_file in locals():  # Check if output_file exists
                self._output_file.close()
                        
        
    def extend(self):
        """Extends the seed RODs to new RODs based on the given sentences.
            Returns a list of lists of extended RODs for each seed ROD."""
        timer = Timer()
        try:           
            self.__write_to_file("Filtering indices based on conditional markers...")
            indices = self.__conditional_indices()
            self.__write_to_file("Number of conditional sentences: " + str(len(indices)))
            indices = self.__cira_classified_as_causal_indices(indices)
            self.__write_to_file("Number of sentences classified by CiRA as causal sentences: " + str(len(indices)))
            self.__write_to_file("Extracting all verb phrases from sentences...")
            self._verb_phrases = [self.__extract_verb_phrases(sentence) for sentence in self._sentences]
    
            # check each seed_ROD separately
            final_RODs = [[] for _ in self._seed_RODs]
            
            # for each seed ROD, find all RODs that are extended from it
            for seed in self._seed_RODs:
                # copy indices bc they are needed for each seed
                _indices = indices[:]
                checked_RODs = []
                # start with the seed ROD
                self._current_RODs = [seed]
                # while there are still RODs to check
                while self._current_RODs:
                    self.__write_to_file("Current RODs to check:")
                    self.__write_to_file(str(self._current_RODs))
                    new_RODs = []
                    for ROD_id, ROD in enumerate(self._current_RODs):
                        self.__write_to_file("Checking ROD: " + ROD)
                        # set the current ROD id as self.__contain_ROD_indices needs it
                        self._current_ROD_id = ROD_id
                        # get the indices of the sentences that contain the current ROD
                        _indices = self.__contain_ROD_indices(_indices)
                        # for each sentence that contains the ROD, check if there are co-occurring verb phrases
                        cooc_verb_phrases = self.__search_neighbors(_indices, ROD, timer)
                        # add the new RODs to the list of RODs to check
                        new_RODs += [new_ROD for new_ROD in cooc_verb_phrases if new_ROD not in checked_RODs and new_ROD not in new_RODs]
                        # remove the current ROD from the list of RODs to check
                        # self._current_RODs.remove(ROD)
                        # add the current ROD to the list of checked RODs (and from this later to the list of final RODs)
                        checked_RODs.append(ROD)
                    if new_RODs:
                        self.__write_to_file("New RODs to check:")
                        self.__write_to_file(str(new_RODs))
                    else: 
                        self.__write_to_file("No new RODs to check.")
                    self._current_RODs = new_RODs
                self._current_RODs = []
                # save the checked RODs to the list corresponding to the seed ROD, remove duplicates
                final_RODs[self._seed_RODs.index(seed)] = list(set(checked_RODs))
                self.__write_to_file("RODs extended from seed " + seed + ":")
                self.__write_to_file(str(checked_RODs))
            
            # write results to file
            self.__write_to_file("FINAL RESULTS:")
            for id, seed_ROD in enumerate(self._seed_RODs):
                self.__write_to_file("RODs extended from " + seed_ROD + ":")
                self.__write_to_file(str(final_RODs[id]))
            # write final elapsed time to output file
            self.__write_to_file(f'The program finished. Final elapsed time: {timer.elapsed_time} seconds\n')
            self._output_file.close()
            
        except Exception as e:
            # If an error occurs, write the current time to the timer file, close the output file and raise the error
            with open('timer_4G.txt', 'a') as f:
                f.write(f'Elapsed time at crash: {timer.elapsed_time} seconds\n')
            self._output_file.close()
            print(f"An error occurred: {e}")
            raise e
    
                             

In [9]:
classifier_model_path = 'cira/model/cira-classifier.bin'
converter_model_path = 'cira/model/cira-labeler.ckpt'

In [10]:
seed_RODs = ["abort procedure", "consider USIM"]
# input_path = "demo.txt"
# output_path = "extended_demo.txt"
# input_path = "processed_5G_NAS.txt"
# output_path = "5G_NAS_extended_RODs_final_final.txt"
input_path = "processed_LTE_NAS.txt"
output_path = "LTE_NAS_extended_RODs_final.txt"
extender = RODExtender(input_path, seed_RODs, output_path)
extended_RODs = extender.extend()    
# bert-base-cased weights warning is expected

Some weights of the model checkpoint at bert-base-cased were not used when initializing BertModel: ['cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.weight', 'cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Some weights of the model checkpoint at