In [None]:
import re
import fitz

def extract_ecg_text(pdf_path):
    doc = fitz.open(pdf_path)
    text = ""
    for page in doc:
        text += page.get_text()

    sex_pattern = r"sex:([^\n]*)"
    match = re.search(sex_pattern, text)
    if match:
        sex = match.group(1).replace(" ", "")
    else:
        return None

    age_pattern = r"age:([^\n]*)"
    match = re.search(age_pattern, text)
    if match:
        age = match.group(1).replace(" ", "")
    else:
        return None

    diagnosis_pattern = r"diagnostic suggestion:"
    match = re.search(diagnosis_pattern, text)
    if match:
        diagnosis = match.group(1).replace(" ", "")
        diagnosis = [line.strip() for line in diagnosis.split("\n") if line.strip()]
    else:
        return None

    return [age, sex, diagnosis]

In [None]:
import os
import pandas as pd

input_folder = "data/ekg_signal"
columns = ["filename", "age", "sex", "description"]
data_rows = []

for filename in os.listdir(input_folder):
    if filename.lower().endswith('.npy'):
        name = os.path.splitext(filename)[0]
        pdf_path = os.path.join("data/ekg_data", f"{name}.PDF")
        data_list = extract_ecg_text(pdf_path)
        if not data_list:
            continue
        data_list = [name] + data_list
        data_rows.append(data_list)
            
df = pd.DataFrame(data_rows, columns=columns)
df.to_parquet("data/ekg.parquet", engine="pyarrow")

In [None]:
import os
import subprocess

def pdf_to_svg_batch(input_folder, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    for filename in os.listdir(input_folder):
        if filename.lower().endswith('.pdf'):
            pdf_path = os.path.join(input_folder, filename)
            svg_path = os.path.join(output_folder, f"{os.path.splitext(filename)[0]}.svg")
            if os.path.exists(svg_path):
                print(f"{svg_path} exist")
            else:
                subprocess.run(["pdf2svg", pdf_path, svg_path])
                print(f"{svg_path} done")
                
pdf_to_svg_batch("data/ekg_data", "data/ekg_svg")

In [None]:
import os
import subprocess

def pdf_to_svg_batch(input_folder, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    for filename in os.listdir(input_folder):
        if filename.lower().endswith('.pdf'):
            pdf_path = os.path.join(input_folder, filename)
            svg_path = os.path.join(output_folder, f"{os.path.splitext(filename)[0]}.svg")
            if os.path.exists(svg_path):
                print(f"{svg_path} exist")
            else:
                subprocess.run(["pdf2svg", pdf_path, svg_path])
                print(f"{svg_path} done")
                
pdf_to_svg_batch("data/ekg_data", "data/ekg_svg")

In [None]:
import re
import numpy as np
from scipy.interpolate import interp1d
from lxml import etree

def process_svg(svg_path, signal_path):
    try:
        parser = etree.XMLParser(remove_blank_text=True)
        tree = etree.parse(svg_path, parser)
    except etree.XMLSyntaxError as e:
        print(str(e))
        return None
        
    root = tree.getroot()
    ns = {'svg': 'http://www.w3.org/2000/svg'}

    surface1 = root.xpath('//svg:g[@id="surface1"]', namespaces=ns)
    if not surface1:
        return None
    surface1 = surface1[0]
    
    first_path = surface1.xpath('.//svg:path[1]', namespaces=ns)
    if first_path:
        style = first_path[0].get('style', '')
        if 'stroke:rgb(0%,0%,0%)' not in style:
            return None
    
    big_list = []
    medium_list = []
    points = []
    all_elements = surface1.xpath('./*', namespaces=ns)
    total_elements = len(all_elements)
    start_index = 0

    while start_index < total_elements:
        if all_elements[start_index].tag.endswith('}g'):
            break
        start_index += 1
    else:
        return None
    
    index = start_index
    target_path = all_elements[start_index-2]
    if not target_path.tag.endswith('}path'):
        return None
    
    d_attr = target_path.get('d', '')
    path_points = extract_points(d_attr)
    
    if len(path_points) < 2:
        return None
    
    last_M = path_points[len(path_points)-2][1]
    last_L = path_points[len(path_points)-1][1]
    height = abs(last_L - last_M) / (25 * 0.5)
    
        
    while index < total_elements and len(big_list) < 12:
        elem = all_elements[index]
        
        if elem.tag.endswith('}g'):
            if medium_list:
                if len(medium_list) <= 5:
                    return None
                medium_arr = np.array(medium_list)
                y_coords = medium_arr[:, 1]
                unique_y, counts = np.unique(y_coords, return_counts=True)
                if len(unique_y) > 0 and np.max(counts) / len(y_coords) > 0.9:
                    return None
                else:
                    target = 1250 if len(big_list) < 5 or len(big_list) == 11 else 1240
                    interpolated = interpolate_points(medium_list[1:], target)
                    if len(big_list) == 11:
                        big_list.insert(0, interpolated)
                    else:
                        big_list.append(interpolated)
                medium_list = []
            index += 1
            continue
            
        if elem.tag.endswith('}path'):
            last_points = points
            points = extract_points(elem.get('d', ''))
            medium_list.extend(last_points[1:])
        index += 1
    
    if len(big_list) != 12:
        return None

    result = np.array(big_list, dtype=np.float32)
    mean = np.mean(result, axis=-1, keepdims=1)
    result = (result - mean) / height
    np.save(signal_path, result)
    return result

def extract_points(d_attr):
    points = re.findall(r'[ML]\s+([\d.]+)\s+([\d.]+)', d_attr)
    return [(float(x), float(y)) for x, y in points]

def interpolate_points(points, target, len = 1000):
    points = np.array(points)
    points = points[points[:,0].argsort()]
    _, unique_indices = np.unique(points[:, 0], return_index=True)
    points = points[np.sort(unique_indices)]
    
    fy = interp1d(points[:,0], points[:,1], 
                 kind='linear',
                 fill_value='extrapolate')
    
    uniform_x = np.linspace(points[0,0], points[-1,0], target)    
    y_interp = fy(uniform_x)
    result = y_interp.astype(np.float32)
    begin = int((target - len) / 2)
    return result[begin : begin+len]

In [None]:
import os

def svg_to_signal_batch(input_folder, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    for filename in os.listdir(input_folder):
        if filename.lower().endswith('.svg'):
            svg_path = os.path.join(input_folder, filename)
            signal_path = os.path.join(output_folder, f"{os.path.splitext(filename)[0]}.npy")
            if os.path.exists(signal_path):
                print(f"{signal_path} exist")
            else:
                process_svg(svg_path, signal_path)
                print(f"{signal_path} done")           

svg_to_signal_batch("data/ekg_svg", "data/ekg_signal_new")

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import utils.my_ecg_process as ecg

def save_png(signal_path, output_path):
    ecg_data = np.load(signal_path)[:,125:626] 
    lead_names = ["I", "II", "III", "aVR", "aVL", "aVF", "V1", "V2", "V3", "V4", "V5", "V6"]
    
    plt.figure(figsize=(15, 15))
    plt.suptitle("12-Lead Electrocardiogram Report", fontsize=20, y=0.93)
    
    for row in range(6):
        for col in range(2):
            idx = row + col * 6
            if idx >= 12:
                break
    
            ax = plt.subplot(6, 2, row * 2 + col + 1)
            ax.plot(ecg_data[idx], 'b-', linewidth=1.5)
            ax.text(-0.05, 0.5, lead_names[idx], fontsize=20,
                    horizontalalignment='right', verticalalignment='center', transform=ax.transAxes)
    
            ax.set_ylim(np.min(ecg_data), np.max(ecg_data))
            
            xticks = np.arange(0, 501, 62.5)
            ax.set_xticks(xticks, minor=False)
            ax.set_yticks(np.linspace(np.min(ecg_data), np.max(ecg_data), 10))

            if row == 5:
                ax.set_xticklabels([f"{x/250:.2f}" for x in xticks])
                ax.set_xlabel('Time (s)', fontsize=12)
            else:
                ax.set_xticklabels([])
            ax.set_yticklabels([])
            ax.grid(True, linestyle='--', alpha=0.3, linewidth=0.5)
    
    plt.savefig(output_path, dpi=150, bbox_inches='tight')
    plt.close()

In [None]:
import os

def signal_to_png_batch(input_folder, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    for filename in os.listdir(input_folder):
        if filename.lower().endswith('.npy'):
            signal_path = os.path.join(input_folder, filename)
            output_path = os.path.join(output_folder, f"{os.path.splitext(filename)[0]}.png")
            if os.path.exists(output_path):
                print(f"{output_path} exist")
            else:
                save_png(signal_path, output_path)
                print(f"{output_path} done")

signal_to_png_batch("data/ekg_signal", "data/ekg_png_test")