### Convert ECB Model Guide to machine-readable format

This code aims to process and clean a document from the European Central Bank (ECB) about internal models. The code provides a structured approach to extracting, cleaning, and processing text from the document.

#### Overview

1. **Rules for Text Formatting**: There are 4 primary rules that dictate how the text should be formatted based on specific patterns.
2. **Text Extraction**: The code imports necessary libraries and sets up the environment. It defines functions to extract text from a PDF using two different methods (`pypdf` and `pdfminer`).
3. **Text Cleaning**: It cleans the text, particularly removing headers and footers and joining separated sentences.
4. **Text Processing**: The code processes the text to follow the formatting rules mentioned above. It identifies titles and subtitles and formats them appropriately.
5. **Data Extraction**: The formatted text is then broken down into structured data with levels of headings and body content.
6. **Text Cleaning with GPT-3**: The code utilizes OpenAI's GPT-3 to perform additional spell checks on the text.
7. **Embedding**: The code generates embeddings for the cleaned text using OpenAI's embeddings utility.

#### Detailed Breakdown

- **Rules for Text Formatting**:
    - Rule 1: Format lines like "x.y [text]" with "###".
    - Rule 2: Format lines like "x [text]" followed by lines from Rule 1 with "##".
    - Rule 3: Format lines like "x.x.x [text]" with "####".
    - Rule 4: Format lines like "x. [text]" with a preceding newline.

- **Text Extraction**:
    - Two methods to extract text from a PDF (`extract_pdf_pypdf` and `extract_pdf_pdfminer`).
    - Save extracted text to `.txt` files.

- **Text Cleaning**:
    - Count words in the extracted files.
    - Remove unwanted headers and footers.
    - Process text to concatenate separated sentences.

- **Text Processing**:
    - Apply the formatting rules to the text. For example, if a line matches the format "x.y [text]", it will be prefixed with "###".

- **Data Extraction**:
    - The processed text is broken down into a DataFrame where each row represents a paragraph or section of the document. The DataFrame has columns for each level of heading (from main heading to sub-sub-heading) and a column for the body content.
    - Some columns are added to the DataFrame for further analysis, such as counting words and checking specific patterns.

- **Text Cleaning with GPT-3**:
    - The text is sent to GPT-3 for spell-checking. This is particularly useful for words that may have been incorrectly split during the conversion from PDF to text.
    - The corrected text replaces the original in the DataFrame.

- **Embedding**:
    - The cleaned text is then processed to generate embeddings. This is useful for future applications such as similarity checks, clustering, etc.

In [1]:
import pandas as pd
from pypdf import PdfReader
import re
from pdfminer.high_level import extract_text
from tqdm import tqdm
import openai
import time
import tiktoken
from openai.embeddings_utils import get_embedding

# Settings
tqdm.pandas()
pd.set_option('display.max_colwidth', 500)
pd.set_option('display.max_rows', 1000)
pd.set_option('display.max_columns', 1000)

#### A. Import the ECB Guide

In [None]:
# Extract document using different readers
def extract_pdf_pypdf(pdf_path):
    reader = PdfReader(pdf_path)
    full_text = ""
    for page in reader.pages:
        full_text += page.extract_text()
    return full_text

def extract_pdf_pdfminer(pdf_path):
    return extract_text(pdf_path)

def save_to_txt(text, output_path):
    with open(output_path, 'w', encoding='utf-8') as file:
        file.write(text)

# Extract and save files
pdf_path = "ssm.pubcon230622_guide.en.pdf"
text = extract_pdf_pypdf(pdf_path)
text_pdfminer = extract_pdf_pdfminer(pdf_path)

save_to_txt(text, 'trim_guide_pypdf.txt')
save_to_txt(text_pdfminer, 'trim_guide_pdfminer.txt')

In [None]:
#### A. Import the ECB Guide

In [None]:
def count_words(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
        
    # Use regex to tokenize the text and find words with more than 3 letters
    words = re.findall(r'\b\w{4,}\b', text)
    return len(words)

# Paths to the three text files
file1 = 'trim_guide_pypdf.txt'
file2 = 'trim_guide_pdfminer.txt'
file3 = 'ssm.pubcon230622_guide.en.txt'

# Get word counts for each file
count1 = count_words(file1)
count2 = count_words(file2)
count3 = count_words(file3)

# Print comparison
print(f"Words with more than 3 letters in {file1}: {count1}")
print(f"Words with more than 3 letters in {file2}: {count2}")
print(f"Words with more than 3 letters in {file3}: {count3}")

#### B. Basic cleansing activities

In [None]:
# Remove header and footer
def remove_ecb_guide_from_file(filename):
    # Pattern to match the unwanted lines
    pattern = r'^ECB guide to internal models\s*–.*?\d+'
    
    # Read the file line by line and clean each line
    with open(filename, 'r', encoding='utf-8') as file:
        lines = file.readlines()
        cleaned_lines = [re.sub(pattern, '', line) for line in lines]
    
    # Write the cleaned lines back to the file
    with open('trim_guide_pypdf_no_footer.txt', 'w', encoding='utf-8') as file:
        file.writelines(cleaned_lines)

# Call the function
filename = "trim_guide_pypdf.txt"
remove_ecb_guide_from_file(filename)


#### C. Create inventory of headings and convert the doc to Pandas

In [None]:
def version_to_list(version_str):
    return list(map(int, version_str.split('.')))

def increment_by_one(v1, v2):
    if (v1 == "") | (v2 == ""):
        return True

    v1_list = version_to_list(v1)
    v2_list = version_to_list(v2)
    
    if len(v1_list) != len(v2_list):
        return False
    
    diff_count = 0
    for a, b in zip(v1_list, v2_list):
        if b < a:
            return False
        if b - a > 1:
            return False
        if b - a == 1:
            diff_count += 1
        if b - a == 0 and diff_count > 0:
            return False
            
    return diff_count == 1

def count_parts(s):
    return len(s.split('.'))

In [None]:
def process_text_file(filename):
    with open(filename, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    # Add a sentinel line at the end to make sure we process the last line correctly
    lines.append('Sentinel line')

    last_seen_main = ""
    last_seen_sub = ""
    last_seen_sub_sub = ""
    footnote = "0"
    
    num_par = ""

    processed_lines = []
    i = 0
    while i < len(lines) - 1:
        # print(i, len(lines))
        line = lines[i].strip()
        next_line = lines[i+1].strip()


        k = re.match(r'^(\d+(\.\d+){0,2}) (\s*[A-Z])', line)
        if k:
            num_par = k.groups()[0]
            # print(i, num_par)
        
        # New Rule: Appending # to a sentence if the next non-empty line starts with 1 and matches the criteria of Rule 2
        j = i + 1
        while j < len(lines) and not lines[j].strip():  # Skip empty lines
            j += 1
        
        if j < len(lines) and lines[j].startswith('1') and re.match(r'^\d+ [A-Z]', lines[j]) and re.match(r'^[A-Z]', line):
            processed_lines.append('# ' + line + '\n')
        # Rule 1
        elif (re.match(r'^\d+\.\d+ [A-Z]', line) is not None) & increment_by_one(last_seen_sub, num_par):

            if next_line[0].islower():
                line = line + " " + next_line
                i = i + 1
            
            processed_lines.append('\n')
            processed_lines.append('### ' + line + '\n')
            processed_lines.append('\n')
            last_seen_sub = num_par

        elif (re.match(r'^(\d+\.\d+\.\d+) {1,3}[A-Z]', line) is not None):
            if next_line[0].islower():
                line = line + " " + next_line
                i = i + 1
                
            processed_lines.append('\n')
            processed_lines.append('#### ' + line + '\n')
            processed_lines.append('\n')
            last_seen_sub_sub = num_par
        # Rule 2 todo
        elif re.match(r'^\d+ [A-Z]', line) and (re.match(r'^\d+\.\d+ [A-Z]', lines[i + 1]) or re.match(r'^\d+\. [A-Z]', lines[i + 1])):

            if next_line[0].islower():
                line = line + " " + next_line
                i = i + 1
                
            processed_lines.append('\n')
            processed_lines.append('## ' + line + '\n')
            processed_lines.append('\n')
            last_seen_main = num_par
            last_seen_sub = ""
        # Rule 4
        elif re.match(r'^\d+\. [A-Z]', line):
            processed_lines.append('\n')
            processed_lines.append(line + '\n')

        elif (re.match(r'(^\d+)\s{2}([A-Z"“]+)', line) is not None):
            processed_lines.append('\n')
            processed_lines.append('FOOTNOTE ' + line + '\n')
        
        else:
            processed_lines.append(line + '\n')
            footnote = num_par
        i += 1

    # Write to the modified file
    with open('trim_guide_pypdf_mod_v6.txt', 'w', encoding='utf-8') as f:
        f.writelines(processed_lines)

# Call the function
filename = 'trim_guide_pypdf_no_footer.txt'
process_text_file(filename)


In [None]:
import re
import pandas as pd

def starts_with_integer_ends_with_dot(s: str) -> int:
    """
    Check if the given string starts with an integer and ends with a dot, or if it starts with the word "Table", or if it ends with a floating point.
    Args:
    - s (str): Input string to check.
    
    Returns:
    - int: 1 if conditions are met, otherwise 0.
    """
    pattern_1 = r'^\d+.*\.$'
    pattern_2 = r'^Table'
    pattern_3 = r'^\d+.*\.\d+$'
    
    if re.match(pattern_1, s) or re.match(pattern_2, s) or re.match(pattern_3, s):
        return 1
    return 0

def extract_data(text: str) -> pd.DataFrame:
    """
    Extract data from the given text based on various conditions and create a DataFrame.
    Args:
    - text (str): Input text.
    
    Returns:
    - pd.DataFrame: Extracted data in DataFrame format.
    """
    lines = text.split('\n')
    data = []
    
    # Initialize labels
    level_0_label, level_1_label, level_2_label, level_3_label, level_4_label, level_5_label = [None]*6
    body_buffer = []
    
    for i in range(len(lines) - 1):
        stripped_line = lines[i].strip()
        stripped_line_2 = lines[i+1].strip()

        # Detect headings based on our markers
        if stripped_line.startswith("FOOTNOTE"):
            match = re.search(r"FOOTNOTE\s*(\d+)", stripped_line) 
            level_5_label = f'Footnote {match.group(1)}'
            body_buffer.append(stripped_line)
            
        elif stripped_line.startswith("#####"):
            level_4_label = stripped_line.replace("#####", "").strip()
            level_5_label = None
            
        elif stripped_line.startswith("####"):
            level_3_label = stripped_line.replace("####", "").strip()
            level_4_label, level_5_label = None, None
            
        elif stripped_line.startswith("###"):
            level_2_label = stripped_line.replace("###", "").strip()
            level_3_label, level_4_label, level_5_label = [None]*3
            
        elif stripped_line.startswith("##"):
            level_1_label = stripped_line.replace("##", "").strip()
            level_2_label, level_3_label, level_4_label, level_5_label = [None]*4
            
        elif stripped_line.startswith("#"):
            level_0_label = stripped_line.replace("#", "").strip()
            level_1_label, level_2_label, level_3_label, level_4_label, level_5_label = [None]*5
            
        else:
            body_buffer.append(stripped_line)

        # Create body buffer

        # Condition 1: Check if the line is empty and there's content in the buffer
        is_empty_line_with_buffer = (not stripped_line) and body_buffer
        
        # Condition 2a: The next line is either empty or starts with a lowercase letter
        next_line_starts_lower = len(stripped_line_2) == 0 or stripped_line_2[0].islower()
        
        # Condition 2b: The current line ends with a period but not with 'e.g.' or 'i.e.'
        current_line_special_end = stripped_line.endswith('.') and not (stripped_line.endswith('e.g.') or stripped_line.endswith('i.e.'))
        
        # Combine the conditions
        should_end_paragraph = is_empty_line_with_buffer or (next_line_starts_lower and current_line_special_end)
        
        # Use the final condition in the 'if' statement
        if should_end_paragraph:
            # Exclude blank lines from body buffer
            cleaned_body_buffer = [line for line in body_buffer if line]
            if cleaned_body_buffer:
                data.append([level_0_label, level_1_label, level_2_label, level_3_label, level_4_label, level_5_label, ' '.join(body_buffer)])
                level_5_label = None
                        
            body_buffer = []

    # Add the last buffered body text if present
    if body_buffer:
        data.append([level_0_label, level_1_label, level_2_label, level_3_label, level_4_label, level_5_label, ' '.join(body_buffer)])
        level_5_label = None

    # Convert data to Pandas DataFrame
    df = pd.DataFrame(data, columns=['Level_0_Label', 'Level_1_Label', 'Level_2_Label', 'Level_3_Label', 'Level_4_Label', 'Level_5_Label', 'Body'])

    # Add new columns based on various conditions
    df['num_of_words'] = df['Body'].apply(lambda x: sum(1 for word in x.split() if len(word) >= 3))
    df['no_fault_detected'] = df['Body'].apply(starts_with_integer_ends_with_dot)
    df['Body'] = df['Body'].str.strip()

    return df

#### D. After-processing

In [None]:
def clean_text_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame:
    """
    Cleans text columns based on specified patterns.
    
    Parameters:
    - df: DataFrame containing the columns to clean.
    - columns: List of column names to apply the cleaning on.
    
    Returns:
    - DataFrame with cleaned columns.
    """
    for col in columns:
        df[col] = df[col].str.replace('  ', ' ', regex=True) \
                         .str.replace(' -', '-', regex=True) \
                         .str.replace('regulato ry', 'regulatory', regex=False) \
                         .str.replace('B anking', 'Banking', regex=False) \
                         .str.replace('t he', 'the', regex=False) \
                         .str.replace('fo llow', 'follow', regex=False) \
                         .str.replace('\(  ', '(', regex=True) \
                         .str.replace('m aturity', 'maturity', regex=False) \
                         .str.replace('sy stems', 'systems', regex=False) \
                         .str.replace('Granularity ,', 'Granularity,', regex=False) \
                         .str.replace('interna l', 'internal', regex=False) \
                         .str.replace('data24', 'data 24', regex=False) \
                         .str.replace('co nsolidations', 'consolidations', regex=False) \
                         .str.replace('Relevan t', 'Relevant', regex=False) \
                         .str.replace('Regula tory', 'Regulatory', regex=False) \
                         .str.replace('Relev ant', 'Relevant', regex=False) \
                         .str.replace('applicat ion', 'application', regex=False) \
                         .str.replace('Quantificat ion', 'Quantification', regex=False) \
                         .str.replace('bac k-testing', 'back-testing', regex=False) \
                         .str.replace('adj ustments', 'adjustments', regex=False) \
                         .str.replace('inte rnal', 'internal', regex=False) \
                         .str.replace('rati ng', 'rating', regex=False) \
                         .str.replace('Remediatio n', 'Remediation', regex=False) \
        
    return df

In [None]:
with open('trim_guide_pypdf_mod_v6_human_review.txt', 'r', encoding='utf-8') as file:
    text = file.read()

df = extract_data(text)
columns_to_clean = ['Body', 'Level_0_Label', 'Level_1_Label', 'Level_2_Label']
df = clean_text_columns(df, columns_to_clean)

In [None]:
# Define the regular expression pattern for "number space capital letter"
pattern = r'^\d\s[A-Z]'

# Find all occurrences of numbers that follow a lowercase letter or a lowercase letter and a dot
df['temp_extracted'] = df['Body'].str.findall(r'[a-z][\.:,]?\d+')

# Remove the preceding lowercase letter and optional dot or colon, and convert to integers
def clean_and_convert(lst):
    new_lst = []
    for item in lst:
        number = re.search(r'\d+', item)
        if number:
            new_lst.append(int(number.group()))
    return new_lst

df['footnotes'] = df['temp_extracted'].apply(clean_and_convert)
df['footnote_flag'] = ~df['Level_5_Label'].isna()

# Drop the temporary column
df.drop(columns=['temp_extracted'], inplace=True)

In [None]:
df2 = df.copy()

# Setting new index to start at 1
df2.index = range(1, len(df) + 1)

# Variable to keep track of the previous row
previous_text = None
previous_index = None

# Extract footnotes
condition = (df2['footnote_flag'] == 1)
footnote_df = df2[condition].copy()
df2.drop(df2.index[condition], inplace=True)

# Create an empty DataFrame to store the results
rows_list = []  # To store DataFrame pieces for concatenation

# Loop through the rows
for index, row in df2.iterrows():
    # print(5, index, previous_index)

    curr_concat = ''.join(map(str, row[['Level_1_Label', 'Level_2_Label', 'Level_3_Label']]))
    
    if previous_text is not None:
        # Check if the text in the current row starts without a number
        if (not row['Body'][0].isdigit()) and (previous_index is not None and index - previous_index != 1) and (curr_concat == previous_concat):
            concatenated_text = previous_text + row['Body']
            merged_row = row.copy()
            merged_row['Body'] = concatenated_text
            rows_list.append(merged_row)
           
            # Skip storing this row as previous since it's already concatenated
            previous_text = None
            previous_row = None
            previous_index = index
            continue
        else:
            # If condition not met, add the previous row to the frames list
            rows_list.append(previous_row)

    # Store the current row as previous for the next iteration
    previous_text = row['Body']
    previous_row = row.copy()
    previous_index = index
    previous_concat = ''.join(map(str, row[['Level_1_Label', 'Level_2_Label', 'Level_3_Label']]))

# Append the last row if it was not concatenated
if previous_text is not None:
    rows_list.append(row)

# Concatenate all DataFrame pieces
new_df = pd.DataFrame(rows_list)
new_df = pd.concat([new_df, footnote_df])

# Add a new column based on the condition
new_df['no_fault_detected'] = new_df['Body'].apply(starts_with_integer_ends_with_dot)

In [None]:
# Create full path
new_df['full_label'] = new_df['Level_0_Label'].str.cat([
                                                        df['Level_1_Label'], 
                                                        df['Level_2_Label'], 
                                                        df['Level_3_Label']
                                                       ], 
                                                       sep=' > ', 
                                                       na_rep='').str.strip(' > ')

#  Identify a non-digit character (\D) followed by a digit character (\d) (cycle5 >> cycle 5)
new_df['Body'] = new_df['Body'].str.replace(r'(\D)(\d)', r'\1 \2', regex=True)

In [None]:
new_df.head()

#### E. Correct the spelling errors introduced during conversion with ChatGPT

In [None]:
def gpt_spellcheck(phrase, max_retries=20, timeout=120):
    retries = 0
    while retries <= max_retries:
        try:
            messages = [
                {"role": "system", "content": "You perform a spellcheck and return correct sentences. "\
                     "The ONLY focus is on words that have been incorrectly split during the pdf conversion. "\
                     "No introduction or other explanations are necessary, just return corrected version of the text."},
                {"role": "user", "content": phrase},
            ]
            response = openai.ChatCompletion.create(
                model="gpt-3.5-turbo-0613",
                messages=messages,
                request_timeout=timeout,
            )
            response_message = response["choices"][0]["message"]['content']
            return response_message
        
        except openai.error.OpenAIError as e:
            print(f"An error occurred: {e}. Retrying...")
        
        except Exception as e:
            print(f"An unexpected error occurred: {e}. Retrying...")
        
        retries += 1
        time.sleep(1)
    
    print("Maximum retries reached. Exiting...")
    return None

# Initialize or reload a DataFrame column for checked sentences
if "checked_sentence" not in new_df.columns:
    new_df["checked_sentence"] = ""

# Start from the last successful index
last_successful_index = new_df[new_df['checked_sentence'] != ""].index.max()
start_index = 0 if pd.isna(last_successful_index) else last_successful_index + 1

# Apply the function to the DataFrame starting from the last successful index
for i, row in tqdm(new_df.iloc[start_index:].iterrows(), total=new_df.iloc[start_index:].shape[0]):
    checked_sentence = gpt_spellcheck(row["Body"])
    
    if checked_sentence is not None:
        new_df.loc[i, "checked_sentence"] = checked_sentence
        # Optionally, save the DataFrame at each step to keep the successful results
        new_df.to_excel('gpt3_spellcheck_v2_checked.xlsx', index=False)

#### F. Create embeddings 

In [None]:
# embedding model parameters
embedding_model = "text-embedding-ada-002"
embedding_encoding = "cl100k_base"  # this the encoding for text-embedding-ada-002
max_tokens = 8000  # the maximum for text-embedding-ada-002 is 8191

In [None]:
def get_embedding_openai(phrase, max_retries=20, timeout=120):
    retries = 0
    while retries <= max_retries:
        try:
            response_message = get_embedding(phrase, engine=embedding_model)
            return response_message
        
        except openai.error.OpenAIError as e:
            print(f"An error occurred: {e}. Retrying...")
        
        except Exception as e:
            print(f"An unexpected error occurred: {e}. Retrying...")
        
        retries += 1
        time.sleep(1)
    
    print("Maximum retries reached. Exiting...")
    return None


df["combined"] = ("Source: " + df['full_label'] + "; Content: " + df['checked_sentence'].str.strip())

# embedding model parameters
embedding_model = "text-embedding-ada-002"
embedding_encoding = "cl100k_base"  # this the encoding for text-embedding-ada-002
max_tokens = 8000  # the maximum for text-embedding-ada-002 is 8191

# Initialize or reload a DataFrame column for checked sentences
if "embedding" not in new_df.columns:
    new_df["embedding"] = ""

# Start from the last successful index
last_successful_index = new_df[new_df['embedding'] != ""].index.max()
start_index = 0 if pd.isna(last_successful_index) else last_successful_index + 1

# Apply the function to the DataFrame starting from the last successful index
for i, row in tqdm(new_df.iloc[start_index:].iterrows(), total=new_df.iloc[start_index:].shape[0]):
    embedding_result = get_embedding_openai(row["combined"])
    
    if checked_sentence is not None:
        new_df.loc[i, "embedding"] = str(embedding_result)

#### G. Final after-processing and saving of results

In [None]:
# Add index as the first column
new_df = new_df.reset_index()
new_df.rename(columns={'index': 'Index'}, inplace=True)

# Create a pickle with the final results
new_df.to_pickle("ecb_guide_embeddings.pkl")
new_df.to_excel("ecb_guide_embeddings.xlsx")

In [None]:
new_df.head()