In [14]:
import os
import re
from io import BytesIO
import base64
from PIL import Image, UnidentifiedImageError
from paddleocr import PaddleOCR
from tqdm.auto import tqdm
from bs4 import BeautifulSoup
import numpy as np
import shutil
import logging
import pandas as pd
import chardet
import warnings
import pydicom
import requests


# Set logging level for PaddleOCR
logging.getLogger("ppocr").setLevel(logging.ERROR)

# Initialize PaddleOCR
ocr = PaddleOCR(use_angle_cls=True, lang='en')

warnings.filterwarnings("ignore", category=UserWarning, module="pydicom")

# Initialize global DataFrame variables
done_df = pd.DataFrame(columns=['Folder'])
not_done_df = pd.DataFrame(columns=['Folder'])

# Function to save DataFrames to Excel
def save_dataframes():
    global done_df, not_done_df, done_file, not_done_file
    done_df.to_excel(done_file, index=False)
    not_done_df.to_excel(not_done_file, index=False)
    # print("DataFrames saved to Excel.")

def extract_image_tags(soup):
    img_tags = soup.find_all('img')
    for img_tag in img_tags:
        src = img_tag['src']
        if src.startswith('http://') or src.startswith('https://'):
            image = url_to_image(src)
        else:
            base64str = src
            image = base64_to_image(base64str)
            
        if image is None:
            continue
        image = np.array(image)
        
        result = ocr.ocr(image, cls=True)
        extracted_text = ' '.join([element[1][0] for line in result if line for element in line if element])
        
        pattern = r"Reg(.*)"
        match = re.search(pattern, extracted_text, re.IGNORECASE)
        if match:
            reg_no = f"Reg. No.{match.group(1).strip()}"
            return src, reg_no, extracted_text
    return None, None, None

def base64_to_image(base64str):
    base64_string = base64str.split(",")[-1]
    
    # Fix padding issues in base64 string
    missing_padding = len(base64_string) % 4
    if missing_padding:
        base64_string += '=' * (4 - missing_padding)
    
    try:
        image_data = base64.b64decode(base64_string)
        image = Image.open(BytesIO(image_data))
        return image
    except (UnidentifiedImageError, base64.binascii.Error) as e:
        logging.warning(f"Unidentified image file or invalid base64 string: {e}")
        return None

def url_to_image(url):
    try:
        response = requests.get(url)
        response.raise_for_status()
        image_data = BytesIO(response.content)
        image = Image.open(image_data)
        return image
    except (requests.RequestException, UnidentifiedImageError) as e:
        logging.warning(f"Failed to fetch or identify image from URL: {url} - {e}")
        return None

def replace_image_with_text(soup, original_src, text):
    img_tag = None
    for tag in soup.find_all('img'):
        if original_src in tag['src']:
            img_tag = tag
            break
    if img_tag:
        img_tag.replace_with(text)

def extract_patient_details_from_table(soup):
    table = soup.find('table', {'border': '1', 'cellapdding': '0', 'cellspacing': '0'})
    
    if not table:
        return None, None, None, None, None, None, None, None, None
    
    patient_id = None
    patient_name = None
    age = None
    sex = None
    accession = None
    modality = None
    physician = None
    study = None
    study_date = None
    
    rows = table.find_all('tr')
    for row in rows:
        cells = row.find_all('td')
        if len(cells) == 4:
            key1, value1 = cells[0].get_text(strip=True), cells[1].get_text(strip=True)
            key2, value2 = cells[2].get_text(strip=True), cells[3].get_text(strip=True)
            
            if key1 == "Patient ID:":
                patient_id = value1
            elif key1 == "Patient Name:":
                patient_name = value1
            elif key1 == "Age:":
                age = value1
            elif key1 == "Sex:":
                sex = value1
            elif key1 == "Accession:":
                accession = value1
            elif key1 == "Modality:":
                modality = value1
            elif key1 == "Physician:":
                physician = value1
            elif key1 == "Study:":
                study = value1
            elif key1 == "Study Date:":
                study_date = value1
            
            if key2 == "Patient ID:":
                patient_id = value2
            elif key2 == "Patient Name:":
                patient_name = value2
            elif key2 == "Age:":
                age = value2
            elif key2 == "Sex:":
                sex = value2
            elif key2 == "Accession:":
                accession = value2
            elif key2 == "Modality:":
                modality = value2
            elif key2 == "Physician:":
                physician = value2
            elif key2 == "Study:":
                study = value2
            elif key2 == "Study Date:":
                study_date = value2

    age_match = re.search(r'\d+', patient_name)
    if (age_match):
        age = patient_name[age_match.start():].strip()
        # Remove the age part from patient_name_value
        patient_name = patient_name[:age_match.start()].strip()
    
    return patient_id, patient_name, age, sex, accession, modality, physician, study, study_date

def erase_and_save_details(input_folder, error_folder):
    global done_df, not_done_df, done_file, not_done_file
    count = 0
    no_report = []

    # Initialize an empty DataFrame to store the extracted data
    columns = ["Folder", "Patient ID", "Patient Name", "Age", "Gender", "Study", "Modality", "Study Date", "Accession", "Physician", "Extracted Text", "Reg No"]
    data_df = pd.DataFrame(columns=columns)

    # Load existing done folders from Excel if it exists
    done = []
    done_file = os.path.join(error_folder, 'done_folders.xlsx')
    not_done_file = os.path.join(error_folder, 'not_done_folders.xlsx')
    
    if os.path.exists(done_file):
        done_df = pd.read_excel(done_file)
        done = done_df['Folder'].tolist()
    
    # Ensure directories exist
    os.makedirs(error_folder, exist_ok=True)

    # Iterate through each folder in the input folder
    for folder in tqdm(os.listdir(input_folder)):
        if folder not in done:
            inside_folder = os.path.join(input_folder, folder)
            html_files = [f for f in os.listdir(inside_folder) if f.endswith('.html')]
            
            if not html_files:
                shutil.move(inside_folder, os.path.join(error_folder, folder))
                no_report.append(folder)
            else:
                for filename in os.listdir(inside_folder):
                    if filename.startswith('Approved'):              
                        # Read the HTML file
                        report_file_path = os.path.join(inside_folder, filename)
                        
                        with open(report_file_path, 'rb') as f:
                            raw_data = f.read()
                            encoding_result = chardet.detect(raw_data)
                            file_encoding = encoding_result['encoding']
        
                        with open(report_file_path, 'r', encoding=file_encoding) as f:
                            content = f.read()
        
                        soup = BeautifulSoup(content, 'html.parser')
        
                        src, reg_no, extracted_text = extract_image_tags(soup)
        
                        if src and reg_no:    
                            patient_name_value = None
                            patient_id_value = None
                            age_value = None
                            sex_value = None
                            study_value = None
                            modality_value = None
                            study_date_value = None
                            accession_value = None
                            physician_value = None
        
                            # Find and erase 'Patient Name' and 'Patient ID' values
                            for tag in soup.find_all('td'):
                                if tag.find('b') and 'Patient Name' in tag.find('b').text:
                                    patient_name_value = tag.get_text().strip().replace('Patient Name:', '').strip()
                                    age_match = re.search(r'\d+', patient_name_value)
                                    if (age_match):
                                        age_value = patient_name_value[age_match.start():].strip()
                                        # Remove the age part from patient_name_value
                                        patient_name_value = patient_name_value[:age_match.start()].strip()
                                    # Replace text after 'Patient Name' with empty string
                                    tag.contents[-1].replace_with('')
        
                                if tag.find('b') and 'Patient ID' in tag.find('b').text:
                                    patient_id_value = tag.get_text().strip().replace('Patient ID:', '').strip()
                                    tag.contents[-1].replace_with('')
        
                                if tag.find('b') and 'Sex' in tag.find('b').text:
                                    sex_value = tag.get_text().strip().replace('Sex:', '').strip()
        
                                if tag.find('b') and 'Study' in tag.find('b').text and not 'Study ' in tag.find('b').text:
                                    study_value = tag.get_text().strip().replace('Study:', '').strip()
        
                                if tag.find('b') and 'Modality' in tag.find('b').text:
                                    modality_value = tag.get_text().strip().replace('Modality:', '').strip()
        
                                if tag.find('b') and 'Study Date' in tag.find('b').text:
                                    study_date_value = tag.get_text().strip().replace('Study Date:', '').strip()
                                    tag.contents[-1].replace_with('')
        
                                if tag.find('b') and 'Accession' in tag.find('b').text:
                                    accession_value = tag.get_text().strip().replace('Accession Number:', '').strip()
                                    tag.contents[-1].replace_with('')
        
                                if tag.find('b') and 'Physician' in tag.find('b').text:
                                    physician_value = tag.get_text().strip().replace('Referring Physician:', '').strip()
                                    tag.contents[-1].replace_with('')

                            # Insert the age value into the <b>Age</b> tag if it exists
                            if age_value:
                                for tag in soup.find_all('td'):
                                    if tag.find('b') and 'Age' in tag.find('b').text:
                                        # Replace ':' with ': ' + age_value
                                        tag.contents[-1].replace_with(f':{age_value}')
                            else:
                                for tag in soup.find_all('td'):
                                    if tag.find('b') and 'Age' in tag.find('b').text:
                                        age_value = tag.get_text().strip().replace('Age:', '').strip()
                                        if not age_value:
                                            age_value = "0"
        
                            # Use the new extraction function if primary extraction fails
                            if not any([patient_name_value, patient_id_value, age_value, sex_value, study_value, modality_value, study_date_value, accession_value, physician_value]):
                                (patient_id_value, patient_name_value, age_value, sex_value, accession_value, modality_value, physician_value, study_value, study_date_value) = extract_patient_details_from_table(soup)
        
                            # Add the extracted values to the DataFrame
                            new_row = {
                                "Folder": folder,
                                "Patient ID": patient_id_value,
                                "Patient Name": patient_name_value,
                                "Age": age_value,
                                "Gender": sex_value,
                                "Study": study_value,
                                "Modality": modality_value,
                                "Study Date": study_date_value,
                                "Accession": accession_value,
                                "Physician": physician_value,
                                "Extracted Text": extracted_text,
                                "Reg No": reg_no
                            }
                            
                            data_df = pd.concat([data_df, pd.DataFrame([new_row])], ignore_index=True)
        
                            replace_image_with_text(soup, src, reg_no)
        
                            new_file = os.path.join(inside_folder, filename.replace('Approved', 'Annonymized'))
                            with open(new_file, 'w', encoding=file_encoding) as file:
                                file.write(str(soup))
                            os.remove(os.path.join(inside_folder, filename))
        
                            count += 1
                            done_df = pd.concat([done_df, pd.DataFrame({"Folder": [folder]})], ignore_index=True)
                            save_dataframes()
        
                        else:
                            not_done_df = pd.concat([not_done_df, pd.DataFrame({"Folder": [folder]})], ignore_index=True)
                            save_dataframes()
    
    data_df.to_excel(os.path.join(error_folder, "details.xlsx"), index=False)
    print(f"Processed {count} folders with extracted details saved to 'details.xlsx'.")
    print(f"Folders with no report: {len(no_report)}")

    # Save the DataFrames to Excel after processing all folders
    save_dataframes()

def modify_dicom_files(root_path, excel_path):
    # Check if the master Excel file exists
    if os.path.exists(excel_path):
        institution_df = pd.read_excel(excel_path)
    else:
        institution_df = pd.DataFrame(columns=["InstitutionName", "Counter"])

    dicom_files = []
    for root, dirs, files in os.walk(root_path):
        for file in files:
            if file.endswith('.dic'):
                dicom_files.append(os.path.join(root, file))

    for dicom_path in tqdm(dicom_files, desc="Processing DICOM files"):
        dicom_image = pydicom.dcmread(dicom_path, force=True)

        # Check if PatientName attribute exists
        if hasattr(dicom_image, 'PatientName'):
            # Extract the patient's age from PatientName
            patient_name = str(dicom_image.PatientName)
            age_match = re.search(r'(\d+)(?:Y(?:rs?)?|Year?)$', patient_name, re.IGNORECASE)
            if age_match:
                dicom_image.PatientAge = age_match.group(1)

            # Delete the PatientName, PatientID, and InstitutionName tags
            del dicom_image.PatientName
            del dicom_image.PatientID
            
        tag_numbers = [(0x0021, 0x0012), (0x0400, 0x0561), (0x0009, 0x0010), (0x0008, 0x1070), (0x0002, 0x0013), (0x0008, 0x0090), (0x0002, 0x0016), (0x0010, 0x0020)]

        # Handle the InstitutionName tag
        if hasattr(dicom_image, 'InstitutionName'):
            institution_name = str(dicom_image.InstitutionName)
            if institution_name not in institution_df['InstitutionName'].values:
                new_counter = len(institution_df) + 1
                new_entry = pd.DataFrame({"InstitutionName": [institution_name], "Counter": [new_counter]})
                institution_df = pd.concat([institution_df, new_entry], ignore_index=True)
            else:
                new_counter = institution_df[institution_df['InstitutionName'] == institution_name]['Counter'].values[0]
            dicom_image.InstitutionName = str(new_counter)

        for tag_number in tag_numbers:
            if tag_number in dicom_image:
                del dicom_image[tag_number]

        # Save the modified DICOM image with the same name
        try:
            dicom_image.save_as(dicom_path)
        except Exception as e:
            print(f"Error saving DICOM file: {dicom_path}")
            print(f"Error message: {str(e)}")

    # Save the DataFrame to an Excel file
    institution_df.to_excel(excel_path, index=False)

input_folder = r"E:\Anonymize app\data"
input_folder = input_folder.replace('\\','/')
error_folder = input_folder + "-err"

# Call the function
erase_and_save_details(input_folder, error_folder)

excel_path = error_folder + "/institute.xlsx"

modify_dicom_files(input_folder, excel_path)


  0%|          | 0/4 [00:00<?, ?it/s]

Processed 4 folders with extracted details saved to 'details.xlsx'.
Folders with no report: 0


Processing DICOM files:   0%|          | 0/8 [00:00<?, ?it/s]