#### **LMR-Text Local Alignment Search Class**

Idea: The idea is to inspire from BLAST, Basic Local Alignment Search Tool for genomics data and develop light and simple alignment search tool for LMR text. We have to take raw prediction from the model and find a match within the initl tweet to identify the correct word the model is trying to predict.

In [10]:
import re
from typing import Literal
import pandas as pd

- Version 1: DynamicProgramming

In [89]:
class DynamicTextAligner:
    def __init__(self, text, subtext):
        self.text = text
        self.subtext = subtext
        self.subtext_chunks = subtext.split()
        self.chunk_ids = list(range(len(self.subtext_chunks)))
        self.text_words_offsets = self._get_text_word_offsets()
    
    def _get_text_word_offsets(self):
        words = self.text.split()
        word_offsets = []
        current_position = 0
        
        for word in words:
            start_offset = self.text.find(word, current_position)
            end_offset = start_offset + len(word) - 1
            word_offsets.append({
                'word': word,
                'start_offset': start_offset,
                'end_offset': end_offset
            })
            current_position = end_offset + 1

        return word_offsets
    
    def find_chunk_positions(self):
        results = []
        text_len = len(self.text)

        current_pos = 0
        for idx, chunk in enumerate(self.subtext_chunks):
            chunk_len = len(chunk)
            
            # Search for the chunk starting from the current position
            match = None
            for i in range(current_pos, text_len - chunk_len + 1):
                if self.text[i:i + chunk_len] == chunk:
                    match = (i, i + chunk_len - 1)
                    break
            
            if match:
                start_offset, end_offset = match
                results.append({
                    'chunk_id': idx,
                    'chunk': chunk,
                    'start_offset': start_offset,
                    'end_offset': end_offset
                })
                current_pos = end_offset + 1

        return results
    
    def merge_consecutive_words(self, words):
        merged_words = []
        i = 0
        while i < len(words):
            current_word = words[i]
            while i + 1 < len(words) and current_word['end_offset'] + 2 == words[i + 1]['start_offset']:
                next_word = words[i + 1]
                current_word['word'] += f" {next_word['word']}"
                current_word['end_offset'] = next_word['end_offset']
                i += 1 
            merged_words.append(current_word)
            i += 1
        return merged_words
    
    def merge_consecutive_words(self, words):
        merged_words = []
        i = 0
        while i < len(words):
            current_word = words[i]
            if i + 1 < len(words):
                next_word = words[i + 1]
                if current_word['end_offset'] + 2 == next_word['start_offset']:
                    merged_word = {
                        'word': f"{current_word['word']} {next_word['word']}",
                        'start_offset': current_word['start_offset'],
                        'end_offset': next_word['end_offset']
                    }
                    merged_words.append(merged_word)
                    i += 2
                    continue
            merged_words.append(current_word)
            i += 1
        return merged_words

    def get_alignment(self, mode: Literal["dict", "flat", "groups", "flat_groups", "sort_flat_groups"] = "dict"):
        matches = self.find_chunk_positions()
        aligned_words = []
        remaining_word_offsets = self.text_words_offsets.copy()

        for match in matches:
            chunk_start = match['start_offset']
            chunk_end = match['end_offset']

            for i, word_info in enumerate(remaining_word_offsets):
                word_start = word_info['start_offset']
                word_end = word_info['end_offset']

                if word_start <= chunk_start and word_end >= chunk_end:
                    aligned_words.append(word_info)

                    del remaining_word_offsets[i]
                    break

        if mode == "flat":
            output = [word['word'] for word in aligned_words]
        elif mode == "groups":
            output = self.merge_consecutive_words(aligned_words)
        elif mode == "flat_groups":
            merged_words = self.merge_consecutive_words(aligned_words)
            output = " ".join([word['word'] for word in merged_words])
        elif mode == "flat_sorted_groups":
            merged_words = self.merge_consecutive_words(aligned_words)
            output = " ".join(sorted([word['word'] for word in merged_words]))
        else:
            output = aligned_words
        return output

    
    # def display_results(self):
    #     matches = self.find_chunk_positions()
    #     for match in matches:
    #         print(f"Chunk ID: {match['chunk_id']}, Chunk: '{match['chunk']}', Start: {match['start_offset']}, End: {match['end_offset']}")

- Version 2: DynamicProgramming

In [133]:
class DynamicTextAligner:
    def __init__(self, text, subtext):
        self.text = text
        self.subtext = subtext
        self.subtext_chunks = subtext.split()
        self.chunk_ids = list(range(len(self.subtext_chunks)))
        self.text_words_offsets = self._get_text_word_offsets()
    
    def _get_text_word_offsets(self):
        words = self.text.split()
        word_offsets = []
        current_position = 0
        
        for word in words:
            start_offset = self.text.find(word, current_position)
            end_offset = start_offset + len(word) - 1
            word_offsets.append({
                'word': word,
                'start_offset': start_offset,
                'end_offset': end_offset,
                'length': len(word)
            })
            current_position = end_offset + 1

        return word_offsets

    def find_chunk_positions(self):
        results = []
        text_len = len(self.text)

        # Find all possible matches for each chunk
        all_matches = []
        for idx, chunk in enumerate(self.subtext_chunks):
            chunk_len = len(chunk)
            chunk_matches = []

            for i in range(text_len - chunk_len + 1):
                if self.text[i:i + chunk_len] == chunk:
                    # Find which word this matches
                    for word_data in self.text_words_offsets:
                        if word_data['start_offset'] <= i <= word_data['end_offset']:
                            start_offset = i
                            end_offset = start_offset + chunk_len - 1
                            chunk_matches.append({
                                'chunk_id': idx,
                                'chunk': chunk,
                                'start_offset': start_offset,
                                'end_offset': end_offset,
                                'word': word_data['word'],
                                'word_length': word_data['length']
                            })
            all_matches.append(chunk_matches)

        # Apply constraints to get the best match
        filtered_matches = []
        previous_end = -1

        for i, chunk_matches in enumerate(all_matches):
            # Sort by word length (descending) to prioritize longest word matches
            chunk_matches = sorted(chunk_matches, key=lambda x: x['word_length'], reverse=True)

            if i + 1 < len(all_matches):
                next_chunk_matches = all_matches[i + 1]
                if next_chunk_matches:
                    next_min_start_offset = min([m['start_offset'] for m in next_chunk_matches])
                else:
                    next_min_start_offset = float('inf')  # No next chunk means no overlap constraint
            else:
                next_min_start_offset = float('inf')

            # Filter matches to ensure end_offset is lower than the next chunk's min start_offset
            chunk_matches = [
                match for match in chunk_matches if match['end_offset'] < next_min_start_offset
            ]

            # If multiple matches remain, select the one with the longest word length
            if chunk_matches:
                best_match = max(chunk_matches, key=lambda x: x['word_length'])
                filtered_matches.append(best_match)
                previous_end = best_match['end_offset']

        return filtered_matches
    
    def find_chunk_positions(self):
        results = []
        text_len = len(self.text)

        # Find all possible matches for each chunk
        all_matches = []
        for idx, chunk in enumerate(self.subtext_chunks):
            chunk_len = len(chunk)
            chunk_matches = []

            for i in range(text_len - chunk_len + 1):
                if self.text[i:i + chunk_len] == chunk:
                    # Find which word this matches
                    for word_data in self.text_words_offsets:
                        if word_data['start_offset'] <= i <= word_data['end_offset']:
                            start_offset = i
                            end_offset = start_offset + chunk_len - 1
                            chunk_matches.append({
                                'chunk_id': idx,
                                'chunk': chunk,
                                'start_offset': start_offset,
                                'end_offset': end_offset,
                                'word': word_data['word'],
                                'word_length': word_data['length']
                            })
            all_matches.append(chunk_matches)

        # Apply constraints to get the best match for each chunk
        filtered_matches = []
        previous_end = -1

        for i, chunk_matches in enumerate(all_matches):
            # Sort by word length (descending) to prioritize longest word matches
            chunk_matches = sorted(chunk_matches, key=lambda x: x['word_length'], reverse=True)

            if i + 1 < len(all_matches):
                next_chunk_matches = all_matches[i + 1]
                if next_chunk_matches:
                    next_min_start_offset = min([m['start_offset'] for m in next_chunk_matches])
                else:
                    next_min_start_offset = float('inf')  # No next chunk means no overlap constraint
            else:
                next_min_start_offset = float('inf')

            # Filter matches to ensure end_offset is lower than the next chunk's min start_offset
            chunk_matches = [
                match for match in chunk_matches if match['end_offset'] < next_min_start_offset
            ]

            # If multiple matches remain, select the one with the longest word length
            if chunk_matches:
                for match in chunk_matches:
                    if match['start_offset'] > previous_end:
                        filtered_matches.append(match)
                        previous_end = match['end_offset']

        return filtered_matches
    
    def display_results(self):
        matches = self.find_chunk_positions()
        for match in matches:
            print(f"Chunk ID: {match['chunk_id']}, Chunk: '{match['chunk']}', Start: {match['start_offset']}, "
                  f"End: {match['end_offset']}, Word: '{match['word']}', Word Length: {match['word_length']}")
    
    def merge_consecutive_words(self, words):
        merged_words = []
        i = 0
        while i < len(words):
            current_word = words[i]
            while i + 1 < len(words) and current_word['end_offset'] + 2 == words[i + 1]['start_offset']:
                next_word = words[i + 1]
                current_word['word'] += f" {next_word['word']}"
                current_word['end_offset'] = next_word['end_offset']
                i += 1 
            merged_words.append(current_word)
            i += 1
        return merged_words
    
    def merge_consecutive_words(self, words):
        merged_words = []
        i = 0
        while i < len(words):
            current_word = words[i]
            if i + 1 < len(words):
                next_word = words[i + 1]
                if current_word['end_offset'] + 2 == next_word['start_offset']:
                    merged_word = {
                        'word': f"{current_word['word']} {next_word['word']}",
                        'start_offset': current_word['start_offset'],
                        'end_offset': next_word['end_offset']
                    }
                    merged_words.append(merged_word)
                    i += 2
                    continue
            merged_words.append(current_word)
            i += 1
        return merged_words

    def get_alignment(self, mode: Literal["dict", "flat", "groups", "flat_groups", "sort_flat_groups"] = "dict"):
        matches = self.find_chunk_positions()
        aligned_words = []
        remaining_word_offsets = self.text_words_offsets.copy()

        for match in matches:
            chunk_start = match['start_offset']
            chunk_end = match['end_offset']

            for i, word_info in enumerate(remaining_word_offsets):
                word_start = word_info['start_offset']
                word_end = word_info['end_offset']

                if word_start <= chunk_start and word_end >= chunk_end:
                    aligned_words.append(word_info)

                    del remaining_word_offsets[i]
                    break

        if mode == "flat":
            output = [word['word'] for word in aligned_words]
        elif mode == "groups":
            output = self.merge_consecutive_words(aligned_words)
        elif mode == "flat_groups":
            merged_words = self.merge_consecutive_words(aligned_words)
            output = " ".join([word['word'] for word in merged_words])
        elif mode == "flat_sorted_groups":
            merged_words = self.merge_consecutive_words(aligned_words)
            output = " ".join(sorted([word['word'] for word in merged_words]))
        else:
            output = aligned_words
        return output

- Version 3: More like **BLAST**

In [202]:
class BiDirectionalTextAligner:
    def __init__(self, text, subtext):
        self.text = text
        self.subtext = subtext
        self.text_chunks = text.split()
        self.subtext_chunks = subtext.split()
        self.text_word_offsets = self._get_text_word_offsets()

    def _get_text_word_offsets(self):
        words = self.text_chunks
        word_offsets = []
        current_position = 0
        for word in words:
            start_offset = self.text.find(word, current_position)
            end_offset = start_offset + len(word) - 1
            word_offsets.append({
                'word': word,
                'start_offset': start_offset,
                'end_offset': end_offset,
                'length': len(word)
            })
            current_position = end_offset + 1
        return word_offsets

    def find_word_by_offsets(self, start_offset, end_offset):
        for index, entry in enumerate(self.text_word_offsets):
            if entry['start_offset'] <= start_offset <= entry['end_offset'] and \
                entry['start_offset'] <= end_offset <= entry['end_offset']:
                return (entry['word'], index)
        return None

    def _find_matches(self, chunk):
        """Find all positions in the text where the chunk can match."""
        matches = []
        for i, word in enumerate(self.text_chunks):
            start_pos = word.find(chunk)  # Search for chunk anywhere in the word
            while start_pos != -1:  # As long as the chunk is found in the word
                # Compute the start and end offsets of the match within the full text
                start_offset = self.text_word_offsets[i]['start_offset'] + start_pos
                end_offset = start_offset + len(chunk) - 1
                matches.append({
                    'chunk': chunk,
                    'start_offset': start_offset,
                    'end_offset': end_offset,
                    'word': word,
                    'word_index': i,
                    'match_start_in_word': start_pos,  # Position of chunk in the word
                    'word_length': len(word),
                })
                # Look for the next occurrence of the chunk within the same word (in case it appears multiple times)
                start_pos = word.find(chunk, start_pos + 1)
        return matches

    def _extend_match(self, match, chunk_id):
        """Extend the match to the left and right, counting insertions and deletions and collecting matches."""
        insertion_count = 0
        deletion_count = 0
        chunks_found = 1
        matches_found = [match]

        # Forward search (right direction)
        current_subtext_idx = chunk_id + 1
        cursor = match['end_offset'] + 1
        while cursor < len(self.text):
            if current_subtext_idx != len(self.subtext_chunks):
                next_chunk = self.subtext_chunks[current_subtext_idx]

                if self.text[cursor] != next_chunk[0]:
                    insertion_count += 1
                    cursor += 1
                else:
                    c = 0
                    for i, char in enumerate(next_chunk):
                        if char == self.text[cursor+i]:
                            c += 1
                    if c == len(next_chunk):
                        chunks_found += 1
                        cursor += len(next_chunk)
                        new_start_offset = cursor - len(next_chunk)
                        new_end_offset = cursor - 1
                        text_word, text_index = self.find_word_by_offsets(new_start_offset, new_end_offset)
                        matches_found.append({
                            'chunk': next_chunk,
                            'start_offset': new_start_offset,
                            'end_offset': new_end_offset,
                            'word': text_word,
                            'word_index': text_index,
                            'word_length': len(text_word),
                        })
                        current_subtext_idx += 1
                    else:
                        deletion_count += (len(next_chunk) - c)
                        cursor += 1
            else:
                break

        # Backward search (left direction)
        current_subtext_idx = chunk_id - 1
        cursor = match['start_offset'] - 1
        while cursor >= 0:
            if current_subtext_idx >= 0:
                prev_chunk = self.subtext_chunks[current_subtext_idx]

                if self.text[cursor] != prev_chunk[-1]:
                    insertion_count += 1
                    cursor -= 1
                else:
                    c = 0
                    for i, char in enumerate(reversed(prev_chunk)):
                        if char == self.text[cursor-i]:
                            c += 1
                    if c == len(prev_chunk):
                        chunks_found += 1
                        cursor -= len(prev_chunk)
                        new_start_offset = cursor + 1
                        new_end_offset = cursor + len(prev_chunk)
                        text_word, text_index = self.find_word_by_offsets(new_start_offset, new_end_offset)
                        matches_found.append({
                            'chunk': prev_chunk,
                            'start_offset': new_start_offset,
                            'end_offset': new_end_offset,
                            'word': text_word,
                            'word_index': text_index,
                            'word_length': len(text_word),
                        })
                        current_subtext_idx -= 1
                    else:
                        deletion_count += (len(prev_chunk) - c)
                        cursor -= 1
            else:
                break

        return {
            'chunks_found': chunks_found,
            'insertion_count': insertion_count,
            'deletion_count': deletion_count,
            'matches_found': matches_found
        }

    def align(self):
        all_results = []

        # For each subtext chunk, find matches and extend them
        for chunk_id, chunk in enumerate(self.subtext_chunks):
            matches = self._find_matches(chunk)
            for match in matches:
                result = self._extend_match(match, chunk_id)
                all_results.append(result)

        return all_results

    def display_best_result(self):
        results = self.align()

        # Scoring: maximize chunks found, minimize insertions and deletions
        best_result = max(
            results, 
            key=lambda r: (r['chunks_found'], -r['insertion_count'], -r['deletion_count'])
        )

        print(f"Best Match:")
        matches = best_result['matches_found']
        for match in matches:
            print(f"Chunk: {match['chunk']}, Reference_Word: '{match['word']}', Start: {match['start_offset']}, End: {match['end_offset']}")

- Example Usage

In [203]:
# Example usage
text = "woo is good for this job job"
subtext = "wo oo ob"

text = "Other parts of Maryland also saw significant damage from Sundays storms including this Baltimore city neighborhood Dundalk and Catonsville Rain totals spanned from 1 to 10 inches across Maryland ECFlood"
subtext = "Maryland Baltimore Maryland"

aligner = BiDirectionalTextAligner(text, subtext)
aligner.display_best_result()

Best Match:
Chunk: Maryland, Reference_Word: 'Maryland', Start: 186, End: 193
Chunk: Baltimore, Reference_Word: 'Baltimore', Start: 87, End: 95
Chunk: Maryland, Reference_Word: 'Maryland', Start: 15, End: 22


- Example usage

In [201]:
text = "ELLI ELLICOTT CITY, Md. (WJZ)– Last Sunday, heavy down pours caused major flooding in both Howard and Baltimore Counties. Main Street in Ellicott City was slammed by the deluge that led to the death of a Maryland National  via"
subtext = "LL T Baltimore El lic ott City Maryland"

text = "UPDATE on A man in his 40s is missing from Ellicott City Maryland where this water was raging down Main Street Video Courtesy Baltimore Sun flooding MarylandFlooding"
subtext = "El lic ott City Maryland Baltimore"

text = "Other parts of Maryland also saw significant damage from Sundays storms including this Baltimore city neighborhood Dundalk and Catonsville Rain totals spanned from 1 to 10 inches across Maryland ECFlood"
subtext = "Maryland Baltimore Maryland"

aligner = DynamicTextAligner(text, subtext)
aligner.display_results()

Chunk ID: 0, Chunk: 'Maryland', Start: 15, End: 22, Word: 'Maryland', Word Length: 8
Chunk ID: 2, Chunk: 'Maryland', Start: 186, End: 193, Word: 'Maryland', Word Length: 8


In [137]:
aligner.get_alignment(mode="dict")

[{'word': 'Maryland', 'start_offset': 15, 'end_offset': 22, 'length': 8},
 {'word': 'Maryland', 'start_offset': 186, 'end_offset': 193, 'length': 8}]

In [127]:
aligner.get_alignment(mode="flat")

['Ellicott', 'City', 'Maryland', 'Baltimore']

#### **Heuristic 1**

In [36]:
def find_indices(text, locations):
    try:
        if locations.strip() == '':
            return ' '
        words = locations.split()
        
        combinations = []
        for length in range(1, 4):
            for i in range(len(words) - length + 1):
                combination = ' '.join(words[i:i + length])
                combinations.append(combination)
        indices = []
        for comb in combinations:
            for match in re.finditer(re.escape(comb), text, re.IGNORECASE):
                indices.append((match.start(), match.end(), comb))

        # keep only indices with the longest match. 
        indices = sorted(indices, key=lambda x: len(x[2]), reverse=True)
        # drop duplicated start indices 
        indices = [indices[0]] + [x for i, x in enumerate(indices[1:], 1) if x[0] not in [y[0] for y in indices[:i]]]
        # drop indices that are contained in other indices
        indices = [x for i, x in enumerate(indices) if not any(x[0] >= y[0] and x[1] <= y[1] for y in indices[:i] + indices[i+1:])]

        # return group of words corresponding to the indices in text 
        words_from_text = []
        for start, end, comb in indices:
            words_from_text.append(text[start:end])
    except IndexError: 
        words_from_text = find_substring_indices(text, locations)

    return " ".join(sorted(set(words_from_text)))

def find_substring_indices(text, locations):
    # Generate all possible substrings of the word
    substrings = [locations[i:j] for i in range(len(locations)) for j in range(i + 1, len(locations) + 1)]
    indices = []
    for substring in substrings:
        for match in re.finditer(re.escape(substring), text, re.IGNORECASE):
            indices.append((match.start(), match.end(), substring))

    # keep only indices with the longest match.
    indices = sorted(indices, key=lambda x: len(x[2]), reverse=True)

    # drop duplicated start indices
    indices = [indices[0]] + [x for i, x in enumerate(indices[1:], 1) if x[0] not in [y[0] for y in indices[:i]]]

    # drop indices that are contained in other indices
    indices = [x for i, x in enumerate(indices) if not any(x[0] >= y[0] and x[1] <= y[1] for y in indices[:i] + indices[i+1:])]

    # keep indices of words long of at least 2 characters
    indices = [x for x in indices if len(x[2]) >= 3]

    # words_from_text 
    words_from_text = []
    for start, end, substring in indices:
        words_from_text.append(substring)

    # sort words_from_text
    words_from_text = sorted(words_from_text)

    return words_from_text

def heuristic_postprocess_1(row):
    _id    = row['tweet_id']
    text  = row['raw_prediction']
    tweet = row['text']
    
    if not isinstance(text, str) or not text.strip():
        return " "
    else:
        # 1 - Clean Special char
        replacements = {
            " ##": "",
            "##": "",
            ",": "",
            "U . S .": "U.S.",
            "U . S": "U.S.",
            "U S": "U.S.",
            "L . A .": "L.A.",
            "L . A": "L.A.",
            "L A": "L.A.",
            "P . R .": "P.R.",
            "P . R": "P.R.",
            "P R": "P.R.",
            "N . C .": "N.C.",
            "N . N": "N.C.",
            "N C": "N.C.",
            "D . C .": "D.C.",
            "D . C": "D.C.",
            "D C": "D.C."
        }
        for word, replacement in replacements.items():
            text = text.replace(word, replacement)
        
        #"""
        # 2 - Special Replace
        text = re.sub(r'\bM\b', 'Md.', text)
        text = re.sub(r'\bElliot\b', '', text)
        text = re.sub(r'\bMat\b', 'Matti', text)
        text = re.sub(r'\bSD\b', 'SDMA', text)
        text = re.sub(r'\bZ\b', 'Zimba', text)
        text = re.sub(r'\btt\b', 'Hutt', text)
        text = re.sub(r'\bbe\b', 'Brooklyn', text)
        text = re.sub(r'\bly\b', 'welly', text)
        text = re.sub(r'\bgree\b', 'greece', text)
        text = re.sub(r'\bAt\b', 'Attica', text)
        
        
        # 3 - Join City or County or New as one word
        pattern1 = r'\b(\w+)\s(city|CITY|City|county|COUNTY|County)\b'
        pattern2 = r'\b(New|NEW|new|United|United__Arab|East)\s(\w+)\b'
        def replace_func1(match):
            first_word = match.group(1)
            city_word = match.group(2)
            return f'{first_word}__{city_word}'
        def replace_func2(match):
            first_word = match.group(1)
            next_word = match.group(2)
            return f'{first_word}__{next_word}'
        text = re.sub(pattern1, replace_func1, text)
        text = re.sub(pattern2, replace_func2, text)
        text = re.sub(pattern2, replace_func2, text)
        
        # 4 - Remove repeated groups of words
        words = text.split()
        seen_words = set()
        unique_words = []
        for word in words:
            if word not in seen_words:
                seen_words.add(word)
                unique_words.append(word)
        
        # 5 - Sort location in Alphabetic order
        unique_words = [place.replace("__", " ") for place in unique_words]
        unique_words = sorted(unique_words)
        text = " ".join(unique_words)
        
        # 6 - Remove words with length lower than 2
        text = " ".join([word for word in text.split() if len(word) >= 2])
        #"""
        
        # Desiré processing
        #text = find_indices(tweet, text)
        
        return text.strip()

#### **Heuristic Text Aligner protopyte**

In [128]:
class DynamicTextAligner:
    """
    **LMR-Text Local Alignment Search Class**
    Idea: The idea is to inspire from BLAST, Basic Local Alignment Search Tool for genomics data 
    and develop light and simple alignment search tool for LMR text. We have to take raw 
    prediction from the model and find a match within the initl tweet to identify the correct 
    word the model is trying to predict.
    """
    def __init__(self, text, subtext):
        self.text = text
        self.subtext = subtext
        self.subtext_chunks = subtext.split()
        self.chunk_ids = list(range(len(self.subtext_chunks)))
        self.text_words_offsets = self._get_text_word_offsets()
    
    def _get_text_word_offsets(self):
        words = self.text.split()
        word_offsets = []
        current_position = 0
        
        for word in words:
            start_offset = self.text.find(word, current_position)
            end_offset = start_offset + len(word) - 1
            word_offsets.append({
                'word': word,
                'start_offset': start_offset,
                'end_offset': end_offset,
                'length': len(word)
            })
            current_position = end_offset + 1

        return word_offsets
    
    def find_chunk_positions(self):
        results = []
        text_len = len(self.text)

        # Find all possible matches for each chunk
        all_matches = []
        for idx, chunk in enumerate(self.subtext_chunks):
            chunk_len = len(chunk)
            chunk_matches = []

            for i in range(text_len - chunk_len + 1):
                if self.text[i:i + chunk_len] == chunk:
                    # Find which word this matches
                    for word_data in self.text_words_offsets:
                        if word_data['start_offset'] <= i <= word_data['end_offset']:
                            start_offset = i
                            end_offset = start_offset + chunk_len - 1
                            chunk_matches.append({
                                'chunk_id': idx,
                                'chunk': chunk,
                                'start_offset': start_offset,
                                'end_offset': end_offset,
                                'word': word_data['word'],
                                'word_length': word_data['length']
                            })
            all_matches.append(chunk_matches)

        # Apply constraints to get the best match
        filtered_matches = []
        previous_end = -1

        for i, chunk_matches in enumerate(all_matches):
            # Sort by word length (descending) to prioritize longest word matches
            chunk_matches = sorted(chunk_matches, key=lambda x: x['word_length'], reverse=True)

            if i + 1 < len(all_matches):
                next_chunk_matches = all_matches[i + 1]
                if next_chunk_matches:
                    next_min_start_offset = min([m['start_offset'] for m in next_chunk_matches])
                else:
                    next_min_start_offset = float('inf')  # No next chunk means no overlap constraint
            else:
                next_min_start_offset = float('inf')

            # Filter matches to ensure end_offset is lower than the next chunk's min start_offset
            chunk_matches = [
                match for match in chunk_matches if match['end_offset'] < next_min_start_offset
            ]

            # If multiple matches remain, select the one with the longest word length
            if chunk_matches:
                best_match = max(chunk_matches, key=lambda x: x['word_length'])
                filtered_matches.append(best_match)
                previous_end = best_match['end_offset']

        return filtered_matches
    
    def merge_consecutive_words(self, words):
        merged_words = []
        i = 0
        while i < len(words):
            current_word = words[i]
            if i + 1 < len(words):
                next_word = words[i + 1]
                if current_word['end_offset'] + 2 == next_word['start_offset']:
                    merged_word = {
                        'word': f"{current_word['word']} {next_word['word']}",
                        'start_offset': current_word['start_offset'],
                        'end_offset': next_word['end_offset']
                    }
                    merged_words.append(merged_word)
                    i += 2
                    continue
            merged_words.append(current_word)
            i += 1
        return merged_words

    def heuristic_post_processing(self, merged_words):
        return " ".join(sorted(set([word['word'] for word in merged_words])))
    
        """
        # 1 - Replace special cases
        punctuations = [",", ";", ":", "#", "(", ")", "\"", "[", "]", "?"]
        output = [word['word'].split("’")[0] for word in merged_words]
        output = [word.replace(".,", ".") for word in output]
        output = [subword for word in output for subword in word.split('/')]
        words  = [word.translate(str.maketrans('', '', ''.join(punctuations))) for word in output]

        # 2 - Handle hyphens and capital letters
        processed_words = []
        for word in words:
            if word.isupper():
                processed_words.append(word)
            else:
                processed_word = word.replace("-", " ")
                processed_words.append(processed_word)
                
        # 3 - Process dots in words
        final_words = []
        for word in processed_words:
            if "." in word and word.count(".") < 2:
                if word.endswith("Md."):
                    final_words.append(word)
                else:
                    final_words.append(word.replace(".", ""))
            else:
                final_words.append(word)
                
        final_words = sorted(final_words)
        output = " ".join(final_words)
        return output
        """

    def get_alignment(self, mode: Literal["dict", "flat", "groups", "flat_groups", "flat_sorted_groups", "flat_sorted_groups+heur"] = "dict"):
        matches = self.find_chunk_positions()
        aligned_words = []
        remaining_word_offsets = self.text_words_offsets.copy()

        for match in matches:
            chunk_start = match['start_offset']
            chunk_end = match['end_offset']

            for i, word_info in enumerate(remaining_word_offsets):
                word_start = word_info['start_offset']
                word_end = word_info['end_offset']

                if word_start <= chunk_start and word_end >= chunk_end:
                    aligned_words.append(word_info)

                    del remaining_word_offsets[i]
                    break

        if mode == "flat":
            output = [word['word'] for word in aligned_words]
        elif mode == "groups":
            output = self.merge_consecutive_words(aligned_words)
        elif mode == "flat_groups":
            merged_words = self.merge_consecutive_words(aligned_words)
            output = " ".join([word['word'] for word in merged_words])
        elif mode == "flat_sorted_groups":
            merged_words = self.merge_consecutive_words(aligned_words)
            output = " ".join(sorted([word['word'] for word in merged_words]))
        elif mode == "flat_sorted_groups+heur":
            merged_words = self.merge_consecutive_words(aligned_words)
            output = self.heuristic_post_processing(merged_words)
        else:
            output = aligned_words
        return output

    
    def display_results(self):
        matches = self.find_chunk_positions()
        for match in matches:
            print(f"Chunk ID: {match['chunk_id']}, Chunk: '{match['chunk']}', Start: {match['start_offset']}, End: {match['end_offset']}")

# ----------------------------------------------------------------------------------------------------------------------------------------

def heuristic_pre_processing(sentence):
        words = sentence.split()

        # 1 - Replace special cases
        punctuations = [",", ";", ":", "#", "(", ")", "\"", "[", "]", "?", "!"]
        output = [word.split("’")[0] for word in words]
        output = [word.replace(".,", ".") for word in output]
        output = [subword for word in output for subword in word.split('/')]
        words  = [word.translate(str.maketrans('', '', ''.join(punctuations))) for word in output]

        # 2 - Handle hyphens and capital letters
        processed_words = []
        for word in words:
            if word.isupper():
                processed_words.append(word)
            else:
                processed_word = word.replace("-", " ") 
                processed_words.append(processed_word)
                
        # 3 - Process dots in words
        final_words = []
        for word in processed_words:
            if "@" not in word:
                if "." in word and word.count(".") < 2:
                    if word.endswith("Md."):
                        final_words.append(word)
                    else:
                        final_words.append(word.replace(".", ""))
                else:
                    final_words.append(word)
                
        output = " ".join(final_words)
        return output

def TLAST_postprocess(row, with_text=False):
    generated_text  = row['raw_prediction']
    targeted_text   = heuristic_pre_processing(row['text'])
    
    if not isinstance(generated_text, str) or not generated_text.strip():
        return " "
    
    # Clean Special Clean
    replacements = {
        # " ##": "",
        # "##": "",
        "#": "",
    }
    for word, replacement in replacements.items():
        generated_text = generated_text.replace(word, replacement)
    
    # Call TLAST
    cleaned_text = DynamicTextAligner(targeted_text, generated_text).get_alignment(
        mode="flat_sorted_groups+heur"
    )
    if with_text:
        return cleaned_text.strip(), targeted_text
    return cleaned_text.strip()

In [129]:
df = pd.read_csv('./train_inference_tlast3.csv')
df = df.drop(columns=["Unnamed: 0", "prediction", "WER"])

In [130]:
df[['prediction', 'cleaned_text']] = df.apply(lambda x: pd.Series(TLAST_postprocess(x, True)), axis=1)
#df['prediction'] = df.apply(heuristic_postprocess_1, axis=1)
df.head(10)

Unnamed: 0,tweet_id,text,location,raw_prediction,prediction,cleaned_text
0,ID_1001136696589631488,"Flash floods struck a Maryland city on Sunday,...",Maryland,Maryland,Maryland,Flash floods struck a Maryland city on Sunday ...
1,ID_1001136950345109504,State of emergency declared for Maryland flood...,Maryland,Maryland,Maryland,State of emergency declared for Maryland flood...
2,ID_1001137334056833024,Other parts of Maryland also saw significant d...,Baltimore Maryland,Maryland Baltimore Maryland,Maryland,Other parts of Maryland also saw significant d...
3,ID_1001138374923579392,"Catastrophic Flooding Slams Ellicott City, Mar...",Ellicott City Maryland,El ##lic ##ott City Maryland,Ellicott City Maryland,Catastrophic Flooding Slams Ellicott City Mary...
4,ID_1001138377717157888,WATCH: 1 missing after flash #FLOODING devasta...,Ellicott City Maryland,El ##lic ##ott City Maryland,Ellicott City Maryland,WATCH 1 missing after flash FLOODING devastate...
5,ID_1001141769155891200,Watch Live: Aerials of damage after historic f...,Ellicott City Maryland,El ##lic ##ott City Maryland,Ellicott City Maryland,Watch Live Aerials of damage after historic fl...
6,ID_1001143589630427136,One person is reported missing as a state of e...,Maryland,Maryland,Maryland,One person is reported missing as a state of e...
7,ID_1001143679267098624,Monday May 28 - Morning Report: National Guard...,Arlington Maryland,Maryland Arlington,Arlington Maryland,Monday May 28 Morning Report National Guards...
8,ID_1001144314897018880,One man is still missing after flash flooding ...,Ellicott City Maryland,El ##lic ##ott City Maryland,Ellicott City Maryland,One man is still missing after flash flooding ...
9,ID_1001144944717914112,RT @KCCINews: State of emergency declared in M...,Maryland,Maryland,Maryland,RT State of emergency declared in Maryland aft...


- Compute score

In [131]:
import jiwer
def compute_wer_eval(df, col1='location', col2='prediction'):
    def calculate_wer(row):
        return jiwer.wer(str(row[col1]), str(row[col2]))
    df['WER'] = df.apply(calculate_wer, axis=1)
    average_wer = df['WER'].mean()
    return df, average_wer

# Eval
df, average_wer = compute_wer_eval(df, col2='prediction')
df.to_csv("./train_pred.csv")
average_wer

0.5133498770639104

- Filter problematic lines

In [132]:
data_filtered = df[df['location'].notna()]
data_filtered = data_filtered[data_filtered['WER'] != 0]
data_filtered = data_filtered.drop(columns=['tweet_id'])
data_filtered = data_filtered.reset_index(drop=True)
data_filtered.to_csv("problematic_train_tlast1.csv")
data_filtered

Unnamed: 0,text,location,raw_prediction,prediction,cleaned_text,WER
0,Other parts of Maryland also saw significant d...,Baltimore Maryland,Maryland Baltimore Maryland,Maryland,Other parts of Maryland also saw significant d...,0.500000
1,Howard County Executive Allan Kittleman said M...,Howard,Howard County,Howard County,Howard County Executive Allan Kittleman said M...,1.000000
2,RT @CristianiCasco: State of Emergency declare...,Maryland,El ##lic ##ott ##C ##ity,Catastrophic EllicottCity,RT State of Emergency declared Catastrophic fl...,2.000000
3,I liked a @YouTube video CRAZY MARYLAND FLOOD...,Ellicott City MARYLAND Maryland,MA ##R ##Y ##LA ##ND El ##lic ##ott City,CRAZY MARYLAND Ellicott City,I liked a video CRAZY MARYLAND FLOODING Flas...,1.000000
4,"During the devastating Maryland flash floods, ...",Ellicott City Maryland,Maryland El ##lic ##ott,Ellicott Maryland,During the devastating Maryland flash floods w...,0.333333
...,...,...,...,...,...,...
4478,RT @tomtravel2: Detailed hotel damage assessme...,Caribbean,,,,1.000000
4479,Death toll from the 7.1-magnitude earthquake w...,Mexico Mexico City,Mexico Mexico City,City Mexico,Death toll from the 71 magnitude earthquake wh...,0.666667
4480,"Welcome to Thailand Ambassador Meir Shlomo, an...",Israels Mexico City Thailand,Thailand Israel ##s Mexico,Mexico Thailand Ambassador,Welcome to Thailand Ambassador Meir Shlomo and...,0.750000
4481,Help Mexico after the terrible earthquake of S...,Mexico,Mexico,FuerzaMexico,Help Mexico after the terrible earthquake of S...,1.000000
