# Libraries

In [None]:
import json
import pandas as pd
from typing import List, Dict
from sklearn.metrics import precision_recall_fscore_support
import re
from collections import OrderedDict
import os
import tempfile
from pydub import AudioSegment
from vosk import Model, KaldiRecognizer

# Formatting

In [None]:
input_file_path = 'transcript_output/transcriptions_seed-gretel-similar0.3-similar0.6.jsonl'
output_file_path = 'transcript_output/formatted_transcriptions_seed-gretel-similar0.3-similar0.6.jsonl'
with open(input_file_path, 'r') as infile, open(output_file_path, 'w') as outfile:
    # Process each line in the input file
    for line in infile:
        # Parse the JSON object
        data = json.loads(line)
        # Extract the transcription field and format it as required
        formatted_entry = {
            "text": data["transcription"]
        }
        # Write the formatted entry to the output file
        outfile.write(json.dumps(formatted_entry) + '\n')
print(f"Transformation complete. Output written to {output_file_path}")


In [None]:
def extract_entities_and_clean_text(transcription):
    # Define token patterns
    patterns = {
        '[PERSON_START]': 'PERSON',
        '[PERSON_END]': 'PERSON',
        '[PHONE_START]': 'PHONE',
        '[PHONE_END]': 'PHONE',
        '[DATE_START]': 'DATE',
        '[DATE_END]': 'DATE',
        '[CARDINAL_START]': 'CARDINAL',
        '[CARDINAL_END]': 'CARDINAL',
        '[GPE_START]': 'GPE',
        '[GPE_END]': 'GPE',
        '[LOC_START]': 'LOC',
        '[LOC_END]': 'LOC',
        '[MONEY_START]': 'MONEY',
        '[MONEY_END]': 'MONEY',
        '[ORG_START]': 'ORG',
        '[ORG_END]': 'ORG',
        '[EMAIL_START]': 'EMAIL',
        '[EMAIL_END]': 'EMAIL',
        '[CREDIT_CARD_START]': 'CREDIT_CARD',
        '[CREDIT_CARD_END]': 'CREDIT_CARD',
        '[BANK_ACCOUNT_START]': 'BANK_ACCOUNT',
        '[BANK_ACCOUNT_END]': 'BANK_ACCOUNT',
        '[CAR_PLATE_START]': 'CAR_PLATE',
        '[CAR_PLATE_END]': 'CAR_PLATE',
        '[NRIC_START]': 'NRIC',
        '[NRIC_END]': 'NRIC',
        '[PASSPORT_NUM_START]': 'PASSPORT_NUM',
        '[PASSPORT_NUM_END]': 'PASSPORT_NUM',
        '[TIME_START]': 'TIME',
        '[TIME_END]': 'TIME'
    }

    entities = []
    cleaned_text = []
    current_entity = None
    last_end = 0

    # Use regex to find all special tokens
    for match in re.finditer(r'\[.*?\]', transcription):
        token = transcription[match.start():match.end()]
        
        # Add text between tokens to cleaned text
        cleaned_text.append(transcription[last_end:match.start()])
        
        if token in patterns:
            if token.endswith('_START]'):
                # Start of a new entity
                if current_entity:
                    # Close previous entity
                    current_entity['end'] = match.start()
                    entities.append(current_entity)
                current_entity = {'start': match.start(), 'label': patterns[token]}
            elif token.endswith('_END]'):
                if current_entity:
                    # End of the current entity
                    current_entity['end'] = match.start()
                    entities.append(current_entity)
                    current_entity = None
        
        last_end = match.end()
    
    # Add the remaining text after the last token
    cleaned_text.append(transcription[last_end:])
    
    # Join all parts to create the cleaned transcription
    cleaned_transcription = ''.join(cleaned_text)
    
    # Handle case where the last entity does not end properly
    if current_entity:
        current_entity['end'] = len(transcription)
        entities.append(current_entity)
    
    return cleaned_transcription, entities

def process_file(input_file_path, output_file_path):
    with open(input_file_path, 'r') as infile, open(output_file_path, 'w') as outfile:
        for line in infile:
            data = json.loads(line)
            transcription = data["text"]
            cleaned_text, entities = extract_entities_and_clean_text(transcription)

            formatted_entry = {
                "text": cleaned_text,
                "entities": [[e['start'], e['end'], e['label']] for e in entities]
            }
            outfile.write(json.dumps(formatted_entry) + '\n')

    print(f"Transformation complete. Output written to {output_file_path}")
input_file_path = 'transcript_output/formatted_transcriptions_seed-gretel-diverse0.3+0.6-similar0.3+0.6.jsonl'
output_file_path = 'transcript_output/formatted_entities_seed-gretel-diverse0.3+0.6-similar0.3+0.6.jsonl'
process_file(input_file_path, output_file_path)

# Remove Entities

In [None]:
def remove_entities(input_file_path, output_file_path):
    with open(input_file_path, 'r') as infile, open(output_file_path, 'w') as outfile:
        for line in infile:
            data = json.loads(line)
            # Extract the "text" field only
            text_entry = {
                "text": data["text"]
            }
            # Write the text-only entry to the output file
            outfile.write(json.dumps(text_entry) + '\n')

    print(f"Text extraction complete. Output written to {output_file_path}")
input_file_path = 'Pipeline_Method/Final_augmented_data_base_sim_0.3_trf.jsonl'  # Replace with your actual file
output_file_path = 'Pipeline_Method/text_only/Final_augmented_data_base_sim_0.3_trf.jsonl'
remove_entities(input_file_path, output_file_path)


# Appending ID

In [None]:
input_file = 'Pipeline_Method/text_only/Final_augmented_data_base_sim_0.3_trf.jsonl'
output_file = 'Pipeline_Method/text_only_ids/Final_augmented_data_base_sim_0.3_trf.jsonl'
# Open the input file and read line by line
with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
    for i, line in enumerate(infile):
        data = json.loads(line)  # Load the JSON object
        
        # Create an OrderedDict with 'id' as the first key
        ordered_data = OrderedDict([('id', i + 1)])
        ordered_data.update(data)  # Add the rest of the data
        
        outfile.write(json.dumps(ordered_data) + '\n')  # Write back to the output file


## Timestamp Alignment

In [None]:
# model_new is Vosk model
model_path = "model_new"
model = Model(model_path)

def align_audio_with_text(audio_path, transcription):
    audio = AudioSegment.from_wav(audio_path)
    recognizer = KaldiRecognizer(model, audio.frame_rate)
    with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_wav:
        temp_wav_path = temp_wav.name
        audio.export(temp_wav_path, format="wav")
    results = []
    try:
        with open(temp_wav_path, "rb") as wf:
            wf.read(44)
            recognizer.SetWords(True)
            while True:
                data = wf.read(4000)
                if len(data) == 0:
                    break
                if recognizer.AcceptWaveform(data):
                    results.append(json.loads(recognizer.Result()))
            results.append(json.loads(recognizer.FinalResult()))
    finally:
        if os.path.exists(temp_wav_path):
            os.remove(temp_wav_path)

    words = []
    for result in results:
        if 'result' in result:
            for word in result['result']:
                words.append(word)
    
    aligned_segments = []
    for word in words:
        aligned_segments.append({
            "start": word["start"],
            "end": word["end"],
            "word": word["word"]
        })

    return aligned_segments
audio_dir = "Audio_Files_for_testing"
transcription_file = "Pipeline_Method/text_only_ids/Final_augmented_data_base_sim_0.3_trf.jsonl"
output_file = "Pipeline_Method/aligned_output/Final_augmented_data_base_sim_0.3_trf.jsonl"
with open(transcription_file, 'r') as f:
    transcriptions = [json.loads(line) for line in f]
aligned_data = []
for item in transcriptions:
    audio_path = f"{audio_dir}/id{item['id']}.wav"
    aligned_transcription = align_audio_with_text(audio_path, item['text'])
    aligned_data.append({
        "id": item['id'],
        "text": item['text'],
        "align": aligned_transcription
    })
with open(output_file, 'w') as f:
    for item in aligned_data:
        f.write(json.dumps(item) + '\n')


# Merging of jsonl files

In [None]:
with open('Pipeline_Method/aligned_output/Final_augmented_data_base_sim_0.3_trf.jsonl', 'r') as f:
    aligned_data = [json.loads(line) for line in f]

# Load the filtered test data
with open('Pipeline_Method/Final_augmented_data_base_sim_0.3_trf.jsonl', 'r') as f:
    filtered_test_data = [json.loads(line) for line in f]

# Create a mapping from text to entities for easy lookup
filtered_test_map = {item['text']: item['entities'] for item in filtered_test_data}

# Merge entities from the second file into the first
for item in aligned_data:
    text = item['text']
    if text in filtered_test_map:
        item['entities'] = filtered_test_map[text]
    else:
        item['entities'] = []  # Or handle cases where no entities are found, if needed
with open('Pipeline_Method/merged_output/Final_augmented_data_base_sim_0.3_trf.jsonl', 'w') as f:
    for item in aligned_data:
        f.write(json.dumps(item) + '\n')


# F1 Score

In [None]:
# Define the regex patterns for different entities
patterns = {
    'CREDIT_CARD': r'\d{4}-\d{4}-\d{4}-\d{4}',
    'BANK_ACCOUNT': r'\d{3}-\d{5}-\d',
    'PASSPORT_NUM': r'K\d{7}[A-Z]',
    'PHONE': r'\d{4}-\d{4}',
    'NRIC': r'[STFG]\d{7}[A-Z]'
}

def extract_entity_times(text, entities, alignments):
    timestamped_entities = []
    alignment_dict = {}
    for alignment in alignments:
        normalized_word = alignment['word'].lower()
        alignment_dict[normalized_word] = {
            'start': alignment['start'],
            'end': alignment['end']
        }

    # Function to convert digit and hyphen sequences
    def convert_digits_and_hyphens(entity_text):
        replacements = {
            '0': 'zero', '1': 'one', '2': 'two', '3': 'three', '4': 'four',
            '5': 'five', '6': 'six', '7': 'seven', '8': 'eight', '9': 'nine',
            '-': ' '
        }
        converted_text = []
        for char in entity_text:
            if char in replacements:
                converted_text.append(replacements[char])
            else:
                converted_text.append(char)
        return ' '.join(converted_text).split()
    
    for entity in entities:
        start_token, end_token, label = entity
        if start_token < 0 or start_token >= len(text):
            print(f"Warning: Start token index out of range for entity: {entity}")
            continue
        if end_token < start_token:
            print(f"Warning: End token index is before start token for entity: {entity}")
            end_token = start_token
        if end_token >= len(text):
            end_token = len(text) - 1
        # Extract and normalize the text for the entity
        entity_text = text[start_token:end_token + 1].strip()
        if not entity_text:
            print(f"Warning: Extracted entity text is empty for entity: {entity}")
            continue
        normalized_entity_text = entity_text.lower()        
        # Handle numeric digit sequences specially
        if normalized_entity_text[0].isdigit() and normalized_entity_text[-1].isdigit():
            normalized_entity_text = convert_digits_and_hyphens(normalized_entity_text)
            start_time = alignment_dict.get(normalized_entity_text[0], {}).get('start')
            end_time = alignment_dict.get(normalized_entity_text[-1], {}).get('end')
            if start_time is None or end_time is None:
                print(f"Error: Could not find alignment for entity: {entity}")
                continue
            timestamped_entities.append([start_time, end_time, label])
            continue
        # Check if the entity label has a predefined pattern
        if label in patterns:
            pattern = patterns[label]
            # Search for the entity text pattern in the alignments
            pattern_str = ' '.join([a['word'] for a in alignments])
            match = re.search(pattern, pattern_str)
            if match:
                # Extract the match start and end positions from alignments
                matched_text = match.group(0)
                start_time = None
                end_time = None
                current_text = ""
                for alignment in alignments:
                    word = alignment['word'].lower()
                    if matched_text.startswith(current_text + word):
                        if start_time is None:
                            start_time = alignment['start']
                        current_text += word
                        end_time = alignment['end']
                        if matched_text == current_text:
                            break
                if start_time is None or end_time is None:
                    print(f"Warning: Could not find alignment for entity: {entity}")
                    continue
                timestamped_entities.append([
                    start_time,
                    end_time,
                    label
                ])
                continue

        # If not using regex patterns, split entity text into words
        words_in_entity = normalized_entity_text.split()
        start_time = None
        end_time = None

        # Track the matched words and their timestamps
        current_text = ""
        for word in words_in_entity:
            if word in alignment_dict:
                if start_time is None:
                    start_time = alignment_dict[word]['start']
                end_time = alignment_dict[word]['end']
            else:
                print(f"Warning: Word '{word}' from entity text not found in alignments.")

        # If start or end time is not found, log a warning and continue
        if start_time is None or end_time is None:
            print(f"Warning: Could not find alignment for entity: {entity}")
            continue

        # Append the entity with its start time, end time, and label
        timestamped_entities.append([
            start_time,
            end_time,
            label
        ])

    return timestamped_entities

def process_jsonl(input_file, output_file):
    with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
        for line in infile:
            item = json.loads(line.strip())
            text = item['text']
            alignments = item['align']
            entities = item['entities']
            # Extract the timestamped entities
            timestamped_entities = extract_entity_times(text, entities, alignments)
            # Create the new json structure
            output_data = {
                "text": text,
                "entities": timestamped_entities
            }
            outfile.write(json.dumps(output_data) + '\n')

# Paths to your input and output files
input_file = 'transcript_output/merged_output/merged_data_seed_gretel_similar0.3.jsonl'
output_file = 'transcript_output/timestamped/processed_data_seed_gretel_similar0.3.jsonl'

# Process the file
process_jsonl(input_file, output_file)


In [None]:
# Define the overlap function
def calculate_overlap(start1, end1, start2, end2, threshold):
    overlap_start = max(start1, start2)
    overlap_end = min(end1, end2)
    overlap_duration = max(0, overlap_end - overlap_start)
    duration1 = end1 - start1
    duration2 = end2 - start2
    overlap_ratio = overlap_duration / min(duration1, duration2)
    return overlap_ratio >= threshold

# Define the F1 score calculation function
def calculate_f1_scores(true_entities, predicted_entities, overlap_threshold, target_labels):
    true_dict = {}
    predicted_dict = {}
    
    # Create dictionaries of lists for true and predicted entities
    for ent in true_entities:
        label = ent[2]
        start, end = ent[0], ent[1]
        if label in target_labels:
            if label not in true_dict:
                true_dict[label] = []
            true_dict[label].append((start, end))

    for ent in predicted_entities:
        label = ent[2]
        start, end = ent[0], ent[1]
        if label in target_labels:
            if label not in predicted_dict:
                predicted_dict[label] = []
            predicted_dict[label].append((start, end))
    
    results = []
    
    all_true_labels = []
    all_pred_labels = []

    for label in target_labels:
        true_intervals = true_dict.get(label, [])
        pred_intervals = predicted_dict.get(label, [])
        
        y_true = []
        y_pred = []

        for start_true, end_true in true_intervals:
            matched = False
            for start_pred, end_pred in pred_intervals:
                if calculate_overlap(start_true, end_true, start_pred, end_pred, overlap_threshold):
                    matched = True
                    break
            y_true.append(1)
            y_pred.append(1 if matched else 0)
        
        for start_pred, end_pred in pred_intervals:
            if not any(calculate_overlap(start_pred, end_pred, start_true, end_true, overlap_threshold) 
                       for start_true, end_true in true_intervals):
                y_true.append(0)
                y_pred.append(1)

        precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary', zero_division=0)
        
        results.append({
            'Label': label,
            'Precision': precision,
            'Recall': recall,
            'F1 Score': f1
        })
        
        # Collect all true and predicted labels for overall metrics
        all_true_labels.extend(y_true)
        all_pred_labels.extend(y_pred)
    
    # Calculate overall metrics
    overall_precision, overall_recall, overall_f1, _ = precision_recall_fscore_support(
        all_true_labels, all_pred_labels, average='weighted', zero_division=0)
    
    results.append({
        'Label': 'Overall',
        'Precision': overall_precision,
        'Recall': overall_recall,
        'F1 Score': overall_f1
    })
    
    return results

# Load data from JSONL files
def load_data(filename):
    with open(filename, 'r') as file:
        data = [json.loads(line.strip()) for line in file]
    return data

# Set file paths and overlap threshold
true_file = 'transcript_output/timestamped/processed_data_true.jsonl'
pred_file = 'transcript_output/timestamped/processed_data_seed_gretel_similar0.3.jsonl'
overlap_threshold = 1.0 # 20% overlap

# Define target labels
target_labels = ['EMAIL', 'NRIC', 'CREDIT_CARD','PHONE','PERSON','PASSPORT_NUM', 'BANK_ACCOUNT', 'CAR_PLATE']

# Load the data
true_data = load_data(true_file)
pred_data = load_data(pred_file)

# Extract entities
true_entities = [item for d in true_data for item in d['entities']]
predicted_entities = [item for d in pred_data for item in d['entities']]

# Calculate F1 scores
results = calculate_f1_scores(true_entities, predicted_entities, overlap_threshold, target_labels)

# Create DataFrame and display results
df = pd.DataFrame(results)
df
