In [None]:
## Natus Embla KeyLogic PSG Report Extractor

In [None]:
!pip install python-docx pymupdf olefile tqdm

In [248]:
folder_path = "<FOLDER PATH HERE>"

In [249]:
import os
import re
import fitz  # PyMuPDF
from docx import Document
import pandas as pd
import olefile
from tqdm import tqdm

pd.set_option('display.max_columns', None)

def correct_turkish_characters(text):
    character_map = {
        'Þ': 'Ş',
        'þ': 'ş',
        'Ý': 'İ',
        'ý': 'ı',
        'Ð': 'Ğ',
        'ð': 'ğ',
        'Ç': 'Ç',
        'ç': 'ç',
        'Ö': 'Ö',
        'ö': 'ö',
        'Ü': 'Ü',
        'ü': 'ü'
    }
    for incorrect, correct in character_map.items():
        text = text.replace(incorrect, correct)
    return text

# Function to extract text from a DOC file using olefile
def doc_to_text(doc_path):
    try:
        ole = olefile.OleFileIO(doc_path)
        if ole.exists('WordDocument'):
            data = ole.openstream('WordDocument').read()
            text = ''
            for char in data:
                if 32 <= char <= 126:
                    text += chr(char)
            return text
    except Exception as e:
        print(f"Error processing DOC file {doc_path}: {e}")
        return None

# Function to extract text from a DOCX file
def docx_to_text_with_tables(file_path):
    try:
        doc = Document(file_path)
        full_text = []
        for para in doc.paragraphs:
            full_text.append(para.text)
        for table in doc.tables:
            for row in table.rows:
                row_text = ' | '.join(cell.text.strip() for cell in row.cells)
                full_text.append(row_text)
        return '\n'.join(full_text)
    except Exception as e:
        print(f"Error processing DOCX file {file_path}: {e}")
        return None

# Function to preprocess DOCX content
def preprocess_docx_content(content):
    lines = content.split('\n')
    processed_lines = []
    for line in lines:
        parts = line.split('|', 2)
        if len(parts) == 3:
            line = parts[0] + '|' + parts[1] + '\n' + parts[2]
        if line.endswith('|'):
            line = line[:-1]
        processed_lines.append(line)
    return '\n'.join(processed_lines)

# Function to extract information after a keyword
def extract_information(preprocessed_text, keyword):
    keyword_escaped = re.escape(keyword.strip())
    pattern = re.compile(rf'{keyword_escaped}\s*\|\s*([^\|\n]+)')
    match = pattern.search(preprocessed_text)
    if match:
        return match.group(1).strip()
    else:
        pattern_with_spaces = re.compile(rf'\s*{keyword_escaped}\s*\|\s*([^\|\n]+)')
        match_with_spaces = pattern_with_spaces.search(preprocessed_text)
        if match_with_spaces:
            return match_with_spaces.group(1).strip()
        return None

# Function to extract plain text between keywords
def extract_plaintext(preprocessed_text, keyword, end_keywords=None):
    keyword_escaped = re.escape(keyword.strip())
    if end_keywords:
        end_keywords_escaped = '|'.join(re.escape(end_kw.strip()) for end_kw in end_keywords)
        pattern = re.compile(rf'{keyword_escaped}\s*(.*?)\s*(?:{end_keywords_escaped})', re.DOTALL)
    else:
        pattern = re.compile(rf'{keyword_escaped}\s*([^\|\n]+)')
    match = pattern.search(preprocessed_text)
    if match:
        return match.group(1).strip()
    else:
        #print(f"No match found for keyword '{keyword}'.")
        return None

# Function to extract text from a PDF file
def pdf_to_text(file_path):
    try:
        document = fitz.open(file_path)
        full_text = ""
        for page_num in range(len(document)):
            page = document.load_page(page_num)
            full_text += page.get_text()
        return full_text
    except Exception as e:
        print(f"Error processing PDF file {file_path}: {e}")
        return None

# Function to extract all text after a keyword
def extract_text_after_keyword(text, keyword):
    pattern = re.compile(rf'{keyword}:\s*(.*?)\s*(?=\\n|$)', re.DOTALL)
    match = pattern.search(text)
    if match:
        return match.group(1).strip()
    else:
        #print(f"No match found for keyword '{keyword}'.")
        return None

# Function to get the next line after a keyword
def get_next_line_after_keyword(preprocessed_text, keyword):
    # Escape special characters in the keyword for regex
    keyword_escaped = re.escape(keyword.strip())

    # Split the content into lines
    lines = preprocessed_text.split('\n')

    # Iterate over lines to find the keyword
    for i, line in enumerate(lines):
        if re.search(keyword_escaped, line):
            # Return the next line if it exists
            if i + 1 < len(lines):
                return lines[i + 1].strip()
            else:
                return None

    # Return None if the keyword is not found or there is no next line
    return None

def get_text_after_keyword_until_eol(content, keyword):
    # Escape special characters in the keyword for regex
    keyword_escaped = re.escape(keyword.strip())
    
    # Split the content into lines
    lines = content.split('\n')
    
    # Iterate over lines to find the keyword
    for line in lines:
        if re.search(keyword_escaped, line):
            # Extract the text after the keyword until the end of the line
            pattern = re.compile(rf'{keyword_escaped}(.*)')
            match = pattern.search(line)
            if match:
                result = match.group(1).strip()
                # Strip leading or trailing '|' characters
                result = result.strip('|')
                # Split by '|' and return as list of columns
                columns = result.split('|')
                return [col.strip() for col in columns]
    
    # Return None if the keyword is not found
    return None

def extract_summary(text, keyword, line_count):
    # Escape the keyword for regex
    keyword_escaped = re.escape(keyword)
    pattern = re.compile(rf'{keyword_escaped}')
    match = pattern.search(text)

    if match:
        start_line = text[:match.end()].count('\n')
        lines = text.split('\n')
        column_names = lines[start_line + 1:start_line + 1 + line_count]
        values = lines[start_line + 1 + line_count:start_line + 1 + 2 * line_count]
        
        return column_names, values
    else:
        #print(f"Keyword '{keyword}' not found in text.")
        return None, None

def extract_value_lineskip(text, keyword, line_offset):
    keyword_escaped = re.escape(keyword)
    #pattern = re.compile(rf'{keyword_escaped}')
    #pattern = re.compile(rf'\b{keyword_escaped}\b')
    pattern = re.compile(rf'^{keyword_escaped}\s*$', re.MULTILINE)


    match = pattern.search(text)

    if match:
        start_line = text[:match.end()].count('\n')
        lines = text.split('\n')
        target_line_index = start_line + line_offset
        if target_line_index < len(lines):
            return lines[target_line_index].strip()
        else:
            print(f"Line offset {line_offset} goes beyond the number of lines in the text.")
            return None
    else:
        #print(f"Keyword '{keyword}' not found in text.")
        return None

def get_all_doc_files(folder_path):
    docx_files = []
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.endswith(('.docx', '.DOCX', '.doc', '.DOC')):
                docx_files.append(os.path.join(root, file))
    return docx_files

def get_all_pdf_files(folder_path):
    docx_files = []
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.endswith(('.pdf', '.PDF')):
                docx_files.append(os.path.join(root, file))
    return docx_files

In [250]:
# Function to process files
def process_files(folder_path):
    pdf_files = get_all_pdf_files(folder_path)
    doc_files = get_all_doc_files(folder_path)
    data = []

    for pdf_file in tqdm(pdf_files, desc="Processing files", unit="file"):
        row = {"File": pdf_file}
        pdf_content = pdf_to_text(pdf_file)
        pdf_content_corrected = correct_turkish_characters(pdf_content)
        base_name = os.path.splitext(os.path.basename(pdf_file))[0]
        
        # Find the corresponding DOCX/DOC file
        doc_file = next((doc for doc in doc_files if os.path.splitext(os.path.basename(doc))[0] == base_name), None)
        
        if doc_file:
            if doc_file.lower().endswith('.docx'):
                content = docx_to_text_with_tables(doc_file)
            else:
                content = doc_to_text(doc_file)

            if content:
                preprocessed_text = preprocess_docx_content(content)
                diagnosis = extract_plaintext(preprocessed_text, "Tanı:", ["Saygılarımızla", "Prof", "Uz", "Doç"])
                row["Tanı"] = diagnosis
                sonuc = extract_plaintext(preprocessed_text, "Sonuç:", ["Saygılarımızla", "Prof", "Uz", "Doç"])
                row["Sonuç"] = sonuc
        else: row["Tanı"] = "Rapor bulunamadı."
        
        # Extract information from the PDF file
        if pdf_content_corrected:
            # Keywords to extract

            # CUSTOMIZE THIS LIST FOR YOUR OWN REPORT. 
            # keywords will return the characters between the keyword and "|"
            # keywords_nextline will return next line after the keyword.
            # keywords_eol will return all characters until end of that line after the keyword.
            # keyword_lineskip works like nextline but skips lines.
            keywords = []
            keywords_nextline = ["Name:", "Date of Birth:", "Gender:", "Height:", "Weight:", "Age:", "RDI:"]
            keywords_eol = ["Total Recording Time:", "Lights Off Clock Time:", "T.C:", "TC:"]
            keyword_lineskip = [
                ("Sleep Period:", 22), ("Wake After Sleep Onset:", 22), ("Total Sleep Time:", 22), ("Sleep Onset:", 22),
                ("Sleep Efficiency:", 22), ("Number of Awakenings:", 22), ("Sleep Latency to N1:", 22), ("Sleep Latency to N2:", 22),
                ("Sleep Latency to N3 (SWS):", 22), ("Stage R Latency from Sleep", 22),
                ("Apnea + Hypopnea (A+H):", 18), ("Obstructive Apnea:", 22), ("Central Apnea:", 22), ("Mixed Apnea:", 22),
                ("Hypopnea (All)", 21), ("Oxygen Desaturation Events", 21),
                ("Limb Movement:", 20), ("PLMS:", 20),
                ("N1", 5), ("N2", 6), ("N3", 7), ("R", 8), ("Wake", 9),
                ("Total Arousals", 169-157),
                ("Average Heart Rate during Sleep:", 24), ("Highest Heart Rate during Sleep:", 24),
                ("Highest Heart Rate during Recording:", 24), ("Lowest Heart Rate during Sleep:", 24),
                ("Lowest Heart Rate during Recording:", 24)
            ]

            for keyword in keywords:
                value = extract_text_after_keyword(pdf_content_corrected, keyword)
                row[keyword] = value
        
            for keyword in keywords_nextline:
                value = get_next_line_after_keyword(pdf_content_corrected, keyword)
                if value is not None:
                    value = value.replace('|', '')
                    row[keyword] = value
        
            for keyword in keywords_eol:
                value_array = get_text_after_keyword_until_eol(pdf_content_corrected, keyword)
                i = 0
                if value_array is not None:
                    for value in value_array:
                        row[keyword] = value
                        if i == 4: break
                        i = i + 1

            for keyword, line_count in keyword_lineskip:
                row[keyword] = extract_value_lineskip(pdf_content_corrected, keyword, line_count)

        data.append(row)

    df = pd.DataFrame(data)
    return df

In [251]:
df = process_files(folder_path)

Processing files: 100%|███████████████████| 3016/3016 [00:49<00:00, 60.52file/s]


In [252]:
def strip_units_from_all_columns(df, units):
    """
    Strips specified units from the values in all columns of a DataFrame.

    Parameters:
    df (pd.DataFrame): The DataFrame to process.
    units (list): A list of unit strings to strip from the values.

    Returns:
    pd.DataFrame: The DataFrame with the processed columns.
    """
    # Create a regular expression pattern to match and remove the units
    unit_pattern = '|'.join(map(re.escape, units))
    
    # Iterate through each column in the DataFrame
    for column in df.columns:
        # Apply the unit stripping and convert to numeric
        df[column] = df[column].astype(str).str.replace(unit_pattern, '', regex=True).str.strip()
        
    
    return df

# Strip units 'dk' and '%' from all columns
units_to_strip = ['minutes']
df = strip_units_from_all_columns(df, units_to_strip)

In [None]:
def extract_cinsiyet_yas(df):
    # Check if the column exists in the DataFrame
    if "Total Recording Time:" not in df.columns:
        print("Column 'Total Recording Time:_0' not found in the DataFrame.")
        return df
    
    # Extract the 'CİNSİYET -YAŞ' column
    trt = df["Total Recording Time:"]
    
    # Extract 'cinsiyet' and 'yaş' from the column
    min = trt.str.extract(r'\(([^()]*)\)')[0]
    
    # Add the extracted values as new columns to the DataFrame
    df["Total Recording Time"] = min

    # Drop the 'CİNSİYET -YAŞ' column
    df = df.drop(columns=["Total Recording Time:"])
    
    return df

df = extract_cinsiyet_yas(df)

df['TC'] = df.apply(lambda row: row['TC:'] if row['T.C:'] == "nan" else (row['T.C:'] if row['TC:'] == "nan" else row['TC:'] + " " + row['T.C:']), axis=1)
df = df.drop(columns=['TC:','T.C:'])


In [253]:
def process_tani(tani):
    # Replace tab characters with spaces
    tani = tani.replace('\t', ' ')
    
    # Split by newlines
    parts = re.split(r'\n', tani)
    
    # Clean each part
    cleaned_parts = []
    for part in parts:
        # Remove leading numbers, periods, or dashes within the first 5 characters
        part = re.sub(r'^[\s\d.-:]{1,10}', '', part.strip())
        # Remove the last period
        part = part.rstrip('.')
        # Append to cleaned parts if it's not empty
        if part:
            cleaned_parts.append(part)
    
    return cleaned_parts

#df['Tanı'] = df['Tanı'].astype(str) + ' ' + df['Sonuç'].astype(str)

df['Tanı'] = df.apply(lambda row: row['Tanı'] if row['Sonuç'] == "None" else (row['Sonuç'] if row['Tanı'] == "None" else row['Tanı'] + " " + row['Sonuç']), axis=1)

df = df.drop(columns=['Sonuç'])

# Apply the processing function and create new columns
new_cols = df['Tanı'].apply(process_tani).apply(pd.Series)

# Combine the new columns with the original dataframe
df = pd.concat([df.drop(columns=['Tanı']), new_cols], axis=1)

# Rename the new columns for clarity, starting from the next column index
for i in range(len(new_cols.columns)):
    df.rename(columns={i: f'Tanı_{i+1}'}, inplace=True)

In [None]:
# Device stamp
df['device'] = "embla"

# If the filename has PSG in it, this will stamp it
df['is_PSG'] = df['File'].apply(lambda x: 1 if 'PSG' in x else 0)
display(df)

In [259]:
df.to_excel("export_embla.xlsx", index=False, header=True)
print(f"Data exported!")

Data exported!
