# 解释 *PhysioNet - 心电图图像数字化* 竞赛的基线

这个笔记本为比赛贡献了三个创新：
1. 它展示了如何在扫描的心电图中找到标记（在 `MarkerFinder` 类中的对象检测）。
1. 它可视化了如何通过自上而下的平面扫描从类型 3 和 11 的图像中提取时间序列（函数 `convert_scanned_color()`）。
1. 它展示了如何通过神经网络从灰度图像中提取时间序列（函数 `convert_scanned_grayscale()`）。

您可以根据耐心和 GPU 配额选择有或没有 GPU 运行此笔记本。

## 训练数据概览

训练集中有 977 个心电图（id），占用 84 GByte。每个心电图有 9 个 PNG 文件和一个 CSV 文件。

每个心电图有九种图像类型：
- 0001 由 ECG-image-kit 生成的原始彩色心电图图像。
- 0003 彩色打印并扫描的图像。→ 由 `convert_scanned_color()` 处理
- 0004 彩色打印并扫描的黑白图像。→ 由 `convert_scanned_grayscale()` 处理
- 0005 彩色打印图像的手机照片。
- 0006 在笔记本电脑屏幕上显示的心电图的手机照片。
- 0009 被染色和浸泡的打印心电图的手机照片。
- 0010 严重损坏的打印心电图的手机照片。
- 0011 带有霉菌的打印心电图图像扫描，彩色。→ 由 `convert_scanned_color()` 处理
- 0012 带有霉菌的打印心电图图像扫描，黑白。→ 由 `convert_scanned_grayscale()` 处理

训练集中的采样频率为每秒 250、256、500、512、1000、1025。

## 名誉殿堂 / 致谢

这个笔记本受到了以下工作影响：
- @sasaleaf ([边缘修复 | 小改进](https://www.kaggle.com/code/sasaleaf/edge-fix-small-improvement))
- @guntasdhanjal ([心电图数字化 - 信号平滑增强](https://www.kaggle.com/code/guntasdhanjal/ecg-digitization-signal-smoothing-enhancement))
- @seowoohyeon ([PhysioNet_adjust](https://www.kaggle.com/code/seowoohyeon/physionet-adjust))
- @antonoof ([大型 EDA 和统计模型]())

感谢他们发表他们的想法。

In [None]:
import pandas as pd
import numpy as np
import cv2
from glob import glob
import matplotlib.pyplot as plt
from collections import defaultdict
from tqdm import tqdm
from scipy.signal import medfilt

from sklearn.model_selection import KFold
from sklearn.metrics import r2_score

from tensorflow.keras.layers import Input, Dense, Activation, Reshape, GaussianNoise
from tensorflow.keras.initializers import Constant
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, TerminateOnNaN

# 辅助函数

In [None]:
# Competition metric
# From https://www.kaggle.com/code/metric/physionet-ecg-signal-extraction-metric
from typing import Tuple

import numpy as np
import pandas as pd

import scipy.optimize
import scipy.signal


LEADS = ['I', 'II', 'III', 'aVR', 'aVL', 'aVF', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6']
MAX_TIME_SHIFT = 0.2
PERFECT_SCORE = 384


class ParticipantVisibleError(Exception):
    pass


def compute_power(label: np.ndarray, prediction: np.ndarray) -> Tuple[float, float]:
    if label.ndim != 1 or prediction.ndim != 1:
        raise ParticipantVisibleError('Inputs must be 1-dimensional arrays.')
    finite_mask = np.isfinite(prediction)
    if not np.any(finite_mask):
        raise ParticipantVisibleError("The 'prediction' array contains no finite values (all NaN or inf).")

    prediction[~np.isfinite(prediction)] = 0
    noise = label - prediction
    p_signal = np.sum(label**2)
    p_noise = np.sum(noise**2)
    return p_signal, p_noise


def compute_snr(signal: float, noise: float) -> float:
    if noise == 0:
        # Perfect reconstruction
        snr = PERFECT_SCORE
    elif signal == 0:
        snr = 0
    else:
        snr = min((signal / noise), PERFECT_SCORE)
    return snr


def align_signals(label: np.ndarray, pred: np.ndarray, max_shift: float = float('inf')) -> np.ndarray:
    if np.any(~np.isfinite(label)):
        raise ParticipantVisibleError('values in label should all be finite')
    if np.sum(np.isfinite(pred)) == 0:
        raise ParticipantVisibleError('prediction can not all be infinite')

    # Initialize the reference and digitized signals.
    label_arr = np.asarray(label, dtype=np.float64)
    pred_arr = np.asarray(pred, dtype=np.float64)

    label_mean = np.mean(label_arr)
    pred_mean = np.mean(pred_arr)

    label_arr_centered = label_arr - label_mean
    pred_arr_centered = pred_arr - pred_mean

    # Compute the correlation between the reference and digitized signals and locate the maximum correlation.
    correlation = scipy.signal.correlate(label_arr_centered, pred_arr_centered, mode='full')

    n_label = np.size(label_arr)
    n_pred = np.size(pred_arr)

    lags = scipy.signal.correlation_lags(n_label, n_pred, mode='full')
    valid_lags_mask = (lags >= -max_shift) & (lags <= max_shift)

    max_correlation = np.nanmax(correlation[valid_lags_mask])
    all_max_indices = np.flatnonzero(correlation == max_correlation)
    best_idx = min(all_max_indices, key=lambda i: abs(lags[i]))
    time_shift = lags[best_idx]
    start_padding_len = max(time_shift, 0)
    pred_slice_start = max(-time_shift, 0)
    pred_slice_end = min(n_label - time_shift, n_pred)
    end_padding_len = max(n_label - n_pred - time_shift, 0)
    aligned_pred = np.concatenate((np.full(start_padding_len, np.nan), pred_arr[pred_slice_start:pred_slice_end], np.full(end_padding_len, np.nan)))

    def objective_func(v_shift):
        return np.nansum((label_arr - (aligned_pred - v_shift)) ** 2)

    if np.any(np.isfinite(label_arr) & np.isfinite(aligned_pred)):
        results = scipy.optimize.minimize_scalar(objective_func, method='Brent')
        vertical_shift = results.x
        aligned_pred -= vertical_shift
    return aligned_pred


def _calculate_image_score(group: pd.DataFrame) -> float:
    """帮助函数用于计算单个图像组的总 SNR 分数。"""

    unique_fs_values = group['fs'].unique()
    if len(unique_fs_values) != 1:
        raise ParticipantVisibleError('Sampling frequency should be consistent across each ecg')
    sampling_frequency = unique_fs_values[0]
    if sampling_frequency != int(len(group[group['lead'] == 'II']) / 10):
        raise ParticipantVisibleError('The sequence_length should be sampling frequency * 10s')
    sum_signal = 0
    sum_noise = 0
    for lead in LEADS:
        sub = group[group['lead'] == lead]
        label = sub['value_true'].values
        pred = sub['value_pred'].values

        aligned_pred = align_signals(label, pred, int(sampling_frequency * MAX_TIME_SHIFT))
        p_signal, p_noise = compute_power(label, aligned_pred)
        sum_signal += p_signal
        sum_noise += p_noise
    return compute_snr(sum_signal, sum_noise)


def score(solution: pd.DataFrame, submission: pd.DataFrame, row_id_column_name: str) -> float:
    """计算PhysioNet 2025比赛中多个ECG导联和图像的平均信噪比（SNR）。
最终得分是所有唯一图像上不同线路的SNR总和的平均值。
参数：
    solution: 包含真实值的DataFrame。预期列：'id'及每个导联一个列。
    submission: 包含预测值的DataFrame。预期列：'id'及每个导联一个列。
    row_id_column_name: 唯一标识符列的名称，通常为'id'。
返回：
    最终的比赛得分。

示例
--------
>>> import pandas as pd
>>> import numpy as np
>>> row_id_column_name = "id"
>>> solution = pd.DataFrame({'id': ['343_0_I', '343_1_I', '343_2_I', '343_0_III', '343_1_III','343_2_III','343_0_aVR', '343_1_aVR','343_2_aVR',\
'343_0_aVL', '343_1_aVL', '343_2_aVL', '343_0_aVF', '343_1_aVF','343_2_aVF','343_0_V1', '343_1_V1', '343_2_V1','343_0_V2', '343_1_V2','343_2_V2',\
'343_0_V3', '343_1_V3', '343_2_V3','343_0_V4', '343_1_V4', '343_2_V4', '343_0_V5', '343_1_V5','343_2_V5','343_0_V6', '343_1_V6','343_2_V6',\
'343_0_II', '343_1_II','343_2_II', '343_3_II', '343_4_II', '343_5_II','343_6_II', '343_7_II','343_8_II','343_9_II','343_10_II','343_11_II'],\
'fs': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],\
'value':[0.1,0.3,0.4,0.6,0.6,0.4,0.2,0.3,0.4,0.5,0.2,0.7,0.2,0.3,0.4,0.8,0.6,0.7, 0.2,0.3,-0.1,0.5,0.6,0.7,0.2,0.9,0.4,0.5,0.6,0.7,0.1,0.3,0.4,\
0.6,0.6,0.4,0.2,0.3,0.4,0.5,0.2,0.7,0.2,0.3,0.4]})
>>> submission = solution.copy()
>>> round(score(solution, submission, row_id_column_name), 4)
25.8433
>>> submission.loc[0, 'value'] = 0.9 # 引入一些噪声
>>> round(score(solution, submission, row_id_column_name), 4)
13.6291
>>> submission.loc[4, 'value'] = 0.3 # 引入一些噪声
>>> round(score(solution, submission, row_id_column_name), 4)
13.0576

>>> solution = pd.DataFrame({'id': ['343_0_I', '343_1_I', '343_2_I', '343_0_III', '343_1_III','343_2_III','343_0_aVR', '343_1_aVR','343_2_aVR',\
'343_0_aVL', '343_1_aVL', '343_2_aVL', '343_0_aVF', '343_1_aVF','343_2_aVF','343_0_V1', '343_1_V1', '343_2_V1','343_0_V2', '343_1_V2','343_2_V2',\
'343_0_V3', '343_1_V3', '343_2_V3','343_0_V4', '343_1_V4', '343_2_V4', '343_0_V5', '343_1_V5','343_2_V5','343_0_V6', '343_1_V6','343_2_V6',\
'343_0_II', '343_1_II','343_2_II', '343_3_II', '343_4_II', '343_5_II','343_6_II', '343_7_II','343_8_II','343_9_II','343_10_II','343_11_II'],\
'fs': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],\
'value':[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]})
>>> round(score(solution, submission, row_id_column_name), 4)
-384
>>> submission = solution.copy()
>>> round(score(solution, submission, row_id_column_name), 4)
25.8433

>>> # 测试对齐
>>> label = np.array([0, 1, 2, 1, 0])
>>> pred = np.array([0, 1, 2, 1, 0])
>>> aligned = align_signals(label, pred)
>>> expected_array = np.array([0, 1, 2, 1, 0])
>>> np.allclose(aligned, expected_array, equal_nan=True)
True

>>> # 测试 2：垂直偏移（直流偏移）应被移除
>>> label = np.array([0, 1, 2, 1, 0])
>>> pred = np.array([10, 11, 12, 11, 10])
>>> aligned = align_signals(label, pred)
>>> expected_array = np.array([0, 1, 2, 1, 0])
>>> np.allclose(aligned, expected_array, equal_nan=True)
True

>>> # 测试 3：时间偏移应被校正
>>> label = np.array([0, 0, 1, 2, 1, 0., 0.])
>>> pred = np.array([1, 2, 1, 0, 0, 0, 0])
>>> aligned = align_signals(label, pred)
>>> expected_array = np.array([np.nan, np.nan, 1, 2, 1, 0, 0])
>>> np.allclose(aligned, expected_array, equal_nan=True)
True

>>> # 测试 4：max_shift限制阻止最佳对齐
>>> label = np.array([0, 0, 0, 0, 1, 2, 1]) # 峰值距离较远
>>> pred = np.array([1, 2, 1, 0, 0, 0, 0])
>>> aligned = align_signals(label, pred, max_shift=10)
>>> expected_array = np.array([ np.nan, np.nan, np.nan, np.nan, 1, 2, 1])
>>> np.allclose(aligned, expected_array, equal_nan=True)
True"""
    for df in [solution, submission]:
        if row_id_column_name not in df.columns:
            raise ParticipantVisibleError(f"'{row_id_column_name}' column not found in DataFrame.")
        if df['value'].isna().any():
            raise ParticipantVisibleError('NaN exists in solution/submission')
        if not np.isfinite(df['value']).all():
            raise ParticipantVisibleError('Infinity exists in solution/submission')

    submission = submission[['id', 'value']]
    merged_df = pd.merge(solution, submission, on=row_id_column_name, suffixes=('_true', '_pred'))
    merged_df['image_id'] = merged_df[row_id_column_name].str.split('_').str[0]
    merged_df['row_id'] = merged_df[row_id_column_name].str.split('_').str[1].astype('int64')
    merged_df['lead'] = merged_df[row_id_column_name].str.split('_').str[2]
    merged_df.sort_values(by=['image_id', 'row_id', 'lead'], inplace=True)
    image_scores = merged_df.groupby('image_id').apply(_calculate_image_score, include_groups=False)
    return max(float(10 * np.log10(image_scores.mean())), -PERFECT_SCORE)

In [None]:
def plot_training_history(history):
    """绘制Keras训练历史。"""
    if len(history['loss']) >= 2:
        _, axs = plt.subplots(1, 1, figsize=(6, 3), squeeze=False)
        axs = axs.ravel()
        axs[0].plot(np.arange(len(history['loss'])) + 1, history['loss'], ':', label='train_loss')
        axs[0].plot(np.arange(len(history['val_loss'])) + 1, history['val_loss'], label='val_loss')
        axs[0].legend()
        axs[0].set_title('Training history')
        plt.show()

## 读取元数据和标签

In [None]:
train = pd.read_csv('/kaggle/input/physionet-ecg-image-digitization/train.csv')
test = pd.read_csv('/kaggle/input/physionet-ecg-image-digitization/test.csv')
label_dict = {}
for idx, row in tqdm(train.iterrows(), total=len(train)):
    label_dict[idx] = pd.read_csv(f'/kaggle/input/physionet-ecg-image-digitization/train/{row.id}/{row.id}.csv')

## 循环遍历训练数据集

我们定义了一个生成器函数 `train_images_and_labels`，它循环遍历训练图像的一个子集。

In [None]:
def train_images_and_labels(start=None, end=None, image_types=None, use_tqdm=True):
    """生成器函数，它产生训练图像的一个子集。
    
    参数
    start: 切片的起始索引
    end: 切片的结束索引
    image_types: 选择的图像类型列表"""
    t = train.iloc[start:end]
    iterable = t.iterrows()
    if use_tqdm:
        iterable = tqdm(iterable, total=len(t))
    for idx, row in iterable:
        png_paths = sorted(glob(f'/kaggle/input/physionet-ecg-image-digitization/train/{row.id}/{row.id}-*.png'))
        labels = label_dict[idx]
        for path in png_paths:
            img_type = int(path[-8:-4])
            if image_types is None or img_type in image_types:
                ima = cv2.imread(path)

                # The following lines document the possible shapes for every image type in train
                # Test files may be different
                shape = ima.shape
                assert len(shape) == 3
                assert (img_type == 1) <= (shape == (1700, 2200, 3)) # 200 pixels per inch on Letter paper
                assert (img_type == 3) <= (shape[0] == 1652)
                assert (img_type == 4) <= (shape[0] == 1652)
                assert (img_type == 5) <= (shape in {(3024, 4032, 3), (1344, 1008, 3), (4032, 3024, 3)})
                assert (img_type == 6) <= ((shape == (4000, 3000, 3) or (shape == (3000, 4000, 3))))
                assert (img_type == 9) <= ((shape == (3024, 4032, 3) or (shape == (4032, 3024, 3))))
                assert (img_type == 10) <= ((shape == (3024, 4032, 3) or (shape == (4032, 3024, 3))))
                assert (img_type == 11) <= (shape[0] == 1652)
                assert (img_type == 12) <= (shape[0] == 1652)

                yield idx, ima, img_type, labels


## 平均 ECG

对于回归任务，真实标签的平均值常常是一个良好的基线。 

我们计算每个导联的平均时间序列，以便我们可以预测这一平均值，对于我们的模型无法处理的图像类型。

In [None]:
def fit_mean_model(train, verbose=False):
    """计算时间序列的最小值、最大值和均值。"""
    mean_dict = defaultdict(list)
    for idx, row in tqdm(train.iterrows(), total=len(train)):
        labels = label_dict[idx]
        for lead in labels.columns:
            values = labels[lead]
            values = values[~values.isna()]
            mean_dict[lead].append(values)
    
    for lead in mean_dict.keys():
        # Upsample every time series to 20000 samples
        mean_dict[lead] = [
            np.interp(np.linspace(0, len(values)-1, 20000), np.arange(len(values)), values)
            for values in mean_dict[lead]
        ]

        # Stack all ECGs
        mean_dict[lead] = np.stack(mean_dict[lead])

        # Plot the mean ECG
        if verbose:
            m = mean_dict[lead].mean(axis=0)
            # s = mean_dict[lead].std(axis=0)
            plt.figure(figsize=(6, 1.5))
            plt.title(f"Mean curve for {lead}")
            plt.plot(m)
            # plt.plot(m-s/30)
            # plt.plot(m+s/30)
            plt.axhline(0, color='gray')
            plt.ylabel('mV')
            plt.gca().get_xaxis().set_visible(False)
            plt.show()

    return mean_dict

def validate_mean_model(val, mean_dict):
    snr_list = []
    for idx, row in tqdm(val.iterrows(), total=len(val)):
        labels = label_dict[idx]
        # Evaluate the signal-to-noise ratio
        sum_signal = 0
        sum_noise = 0
        for lead in labels.columns:
            label = labels[lead]
            label = label[~ label.isna()]
            pred = mean_dict[lead].mean(axis=0)
            pred = np.interp(np.linspace(0, 1, len(label)), np.linspace(0, 1, len(pred)), pred)
            assert len(label) == len(pred)
    
            aligned_pred = align_signals(label, pred, int(row.fs * MAX_TIME_SHIFT))
            p_signal, p_noise = compute_power(label, aligned_pred)
            sum_signal += p_signal
            sum_noise += p_noise
    
        snr = compute_snr(sum_signal, sum_noise)
        snr_list.append(snr)
    
    snr = np.array(snr_list).mean()
    val_score = max(float(10 * np.log10(snr)), -PERFECT_SCORE)
    print(f"# Validation SNR for mean prediction: {snr:.2f} {val_score=:.2f}")

# # Validate the mean model
# train_test_split_loc = 780
# mean_dict = fit_mean_model(train.iloc[:train_test_split_loc], verbose=True)
# validate_mean_model(train.iloc[train_test_split_loc:], mean_dict)

# Refit the mean model to the full dataset
mean_dict = fit_mean_model(train, verbose=True)


# 使用 MarkerFinder 寻找导联端点

在解码图像之前，了解 ECG 中 17 个导联端点的坐标是很重要的。以下单元格定义了类 `MarkerFinder`，该类确定这些点。通过模式匹配函数 `cv2.matchTemplate()` 找到 13 个点；四条线的右端点被推断为其他向量的线性组合。

In [None]:
class MarkerFinder:
    """这个类在扫描的心电图图像中找到13个标记，并猜测4个线条的端点。"""
    # From https://www.kaggle.com/code/ambrosm/ecg-original-explained-baseline
    
    def __init__(self, show_templates=False):
        # Derive the templates from type 1 images
        # np.max keeps the gridlines and markers and removes the ecg lines
        ima = np.max([
            cv2.imread('/kaggle/input/physionet-ecg-image-digitization/train/4292118763/4292118763-0001.png'),
            cv2.imread('/kaggle/input/physionet-ecg-image-digitization/train/4289880010/4289880010-0001.png'),
            cv2.imread('/kaggle/input/physionet-ecg-image-digitization/train/4284351157/4284351157-0001.png'),
        ], axis=0)

        # Template points in global coordinates of type 1 images
        absolute_points = np.zeros((17, 2), dtype=int)
        for i in range(3):
            absolute_points[5 * i] = np.array([707 + 284 * i, 118]) # y, x
            for j in range(1, 5):
                absolute_points[5 * i + j] = np.array([707 + 284 * i, 118 + 492 * j])
        absolute_points[5 * 3] = np.array([1535, 118])
        absolute_points[5 * 3 + 1] = np.array([1535, 118 + 492 * 4])

        # Top left corner of template rectangle
        template_positions = [None] * 17
        for i in range(len(absolute_points)):
            if absolute_points[i][1] < 118 + 492 * 4:
                if i % 5 == 0:
                    template_positions[i] = (absolute_points[i][0] - 87, absolute_points[i][1] - 50) # y, x
                else:
                    template_positions[i] = (absolute_points[i][0] - 37, absolute_points[i][1] - 13)

        # Height and width of the templates
        template_sizes = np.array([(105, 60)] * 17) # height, width

        # Transform the points to relative coordinates (inside the template)
        template_points = [np.array([absolute_points[i][0] - template_positions[i][0],
                                     absolute_points[i][1] - template_positions[i][1]])
                           if template_positions[i] is not None
                           else None
                           for i in range(len(absolute_points))]

        # Save the template matrices
        templates = [None] * 17
        for i in range(len(template_positions)):
            if template_points[i] is not None:
                template = (ima[template_positions[i][0]:template_positions[i][0]+template_sizes[i][0],
                            template_positions[i][1]:template_positions[i][1]+template_sizes[i][1]])
                templates[i] = template

        # Plot the template matrices
        if show_templates:
            _, axs = plt.subplots(4, 4, figsize=(5, 7))
            for i in range(len(template_positions)):
                if template_points[i] is not None:
                    template = templates[i].copy()
                    cv2.rectangle(template,
                                  (template_points[i][1]-1, template_points[i][0]-1),
                                  (template_points[i][1]+1, template_points[i][0]+1), 
                                  [255, 0, 0], 2)
                    axs[i // 5, i % 5].imshow(template)
            for i in range(13, len(axs.ravel())):
                axs.ravel()[i].axis('off')
            plt.tight_layout()
            plt.suptitle('The templates for the 13 markers', y=1.01)
            plt.show()

        self._absolute_points = absolute_points
        self._template_positions = template_positions
        self._template_sizes = template_sizes
        self._template_points = template_points
        self._templates = templates
        
    def find_markers(self, ima, warn=False, plot=False, title=''):
        """返回 17 个标记，作为大小为 2 的整数数组（行，列）的列表

        参数：
        ima：形状为 (1652, height, 3) 的数组"""
        
        if ima.shape[0] != 1652:
            raise ValueError("Implemented only for scanned images (image types 3, 4, 11, 12)")

        markers = np.full((17, 2), -1)

        # Find 13 template-based markers
        for j in range(len(self._templates)):
            if self._template_points[j] is not None:
                t = self._template_positions[j][0]-100
                l = max(self._template_positions[j][1]-100, 0)
                search_range = (ima[t:self._template_positions[j][0]+100+self._template_sizes[j][0],
                                l:self._template_positions[j][1]+250+self._template_sizes[j][0]])
                res = cv2.matchTemplate(search_range, self._templates[j], cv2.TM_CCOEFF)
                min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
    
                top_left = max_loc
                if warn and max_val < 3e7:
                    bottom_right = (top_left[0] + self._templates[j].shape[1],
                                    top_left[1] + self._templates[j].shape[0])
                    print(j, top_left, max_val)
                    search_range = search_range.copy()
                    cv2.rectangle(search_range, top_left, bottom_right, 0, 2)
                    plt.imshow(search_range)
                    plt.show()
                markers[j] = np.array((t + top_left[1] + self._template_points[j][0],
                                       l + top_left[0] + self._template_points[j][1]))

        # Guess the ends of the first three lines (can be outside the bounding box of the image)
        for i in range(3):
            m = markers[5 * i + 3] * 2 - markers[5 * i + 2]
            markers[5 * i + 4] = m

        # Guess the end of the fourth line (can be outside the bounding box of the image)
        markers[16] = ((markers[14] * (284 + 260) - markers[9] * 260) / 284).astype(int)

        if plot:
            ima = ima.copy()
            for m in markers:
                if m is not None:
                    cv2.rectangle(ima, (m[1]-40, m[0]-40), (m[1]+40, m[0]+40), (255, 0, 0), 2)
            # plt.figure(figsize=(12, 8))
            plt.imshow(ima)
            plt.title(title)
            plt.show()

        return markers

    # def baseline(self, i):
    #     """类型1图像中第i条基线的y坐标"""
    #     if i not in [0, 1, 2, 3]:
    #         raise ValueError("i must be in [0, 1, 2, 3]")
    #     return self._absolute_points[5 * i][0]
        
    @staticmethod
    def lead_info(lead):
        """指定哪些标记标记了引导的开始和结束。"""
        begin, end = {
            'I': (0, 1),
            'II-subset': (5, 6),
            'III': (10, 11),
            'aVR': (1, 2),
            'aVL': (6, 7),
            'aVF': (11, 12),
            'V1': (2, 3),
            'V2': (7, 8),
            'V3': (12, 13),
            'V4': (3, 4),
            'V5': (8, 9),
            'V6': (13, 14),
            'II': (15, 16),
        }[lead]
        return begin // 5, begin, end

    def demo(self, ima, warn=False, title=''):
        """用红色标记绘制图像"""
        markers = self.find_markers(ima, warn, plot=True, title=title)

mf = MarkerFinder(show_templates=False)

ima = cv2.imread('/kaggle/input/physionet-ecg-image-digitization/train/1026034238/1026034238-0011.png') # correct
mf.demo(ima, warn=False, title='Scanned ECG with 17 line endpoints')

# 解决方案 1：对扫描彩色图像进行数字化的平面扫描

我们定义函数 `convert_scanned_color()`，它将图像转换为十二个时间序列。这个函数实现了比赛的主要任务，但它仅适用于类型 3 和 11 的图像。它尚未推广到具有黑色网格背景或手机照片的图像。

算法从上到下扫描图像。扫描过程中检测到的第一批黑色像素定义了第一行。扫描在该行下方的白色像素上继续，下一批黑色像素定义了第二行。第三行和第四行也依此类推。

在我们获得四行之后，使用 `MarkerFinder` 找到的标记来选择形成 12 个导联的片段。

那么我为什么选择类型 3 和 11 呢？有两个原因：首先，扫描图像的质量高于手机照片，并且它们总是具有相同的比例（80 像素每 mV）。其次，颜色使得黑色 ECG 线与红色网格线易于区分。

In [None]:
def find_line_by_topdown_sweep(ima):
    """找到图像中的最顶部黑线并移除它。

    参数：
    ima：二维布尔图像数组（False = 黑色，True = 白色），将被更新

    返回值：
    top：矩阵中每列的最顶部黑色像素
    bottom：每列中最顶部黑色像素下方的最顶部白色像素"""
    # Find the topmost black pixels
    top = np.argmin(ima, axis=0) # topmost black (False) pixel per column; 0 if there are no black pixels

    # Extend into columns without black pixels
    median_top = int(np.median(top))
    top[top == 0] = median_top
    top[top > median_top + 250] = median_top
    
    # Bound from above
    strip_width = 64
    for strip_left in range(0, ima.shape[1], strip_width):
        median_top_strip = int(np.median(top[strip_left:strip_left+strip_width]))
        strip = ima[:median_top_strip-100, strip_left:strip_left+strip_width]
        all_white = strip.all(axis=1)
        if all_white.size > 0:
            first_white_row = np.argmax(all_white[::-1])
            if first_white_row > 0 or all_white[-1]:
                first_white_row = median_top_strip - 100 - first_white_row
                ami = np.argmin(ima[first_white_row:, strip_left:strip_left+strip_width], axis=0)
                top[strip_left:strip_left+strip_width] = np.where(ami != 0, first_white_row + ami, median_top)
    top[top > median_top + 250] = median_top

    # Bound from below
    strip_width = 64
    for strip_left in range(0, ima.shape[1], strip_width):
        median_top_strip = int(np.median(top[strip_left:strip_left+strip_width]))
        strip = ima[median_top_strip+80:, strip_left:strip_left+strip_width]
        all_white = strip.all(axis=1)
        if all_white.size > 0:
            first_white_row = np.argmax(all_white)
            if first_white_row > 0 or all_white[0]:
                first_white_row += median_top_strip + 80
                mask = top > first_white_row
                mask[:strip_left] = False
                mask[strip_left+strip_width:] = False
                top[mask] = median_top_strip

    # Paint black everything above
    mask = np.tile(np.arange(len(ima)).reshape(-1, 1), reps=(1, ima.shape[1]))
    mask = mask >= top # True for lower part of image
    ima &= mask # paint black whatever is above the line

    # Find the topmost white pixels
    bottom = np.argmax(ima, axis=0) # topmost white (True) pixel per column; 0 if there are no white pixels

    # Paint white everything above
    bottomx = np.maximum(bottom, np.median(top) + 100) # overpaints the letters
    mask = np.tile(np.arange(len(ima)).reshape(-1, 1), reps=(1, ima.shape[1]))
    mask = mask < bottomx # True for upper part of image
    ima |= mask # paint white whatever is above the line
    ima[:,:-1] |= mask[:,1:]
    ima[:,1:] |= mask[:,:-1]

    return top, bottom

def get_lead_from_top_bottom(tops, bottoms, lead, n_timesteps, markers):
    """提取并重新采样 ECG 线中的一个导联。

参数：
tops: 形状为 (image_width, ) 的 4 个数组的列表
bottoms: 形状为 (image_width, ) 的 4 个数组的列表
lead: 12 个导联标签中的一个（字符串）
n_timesteps: 所需样本数量（整数）
markers: 作为大小为 2 的整数数组（行，列）的列表的 17 个标记"""

    # Select the markers and determine the baseline
    line, begin, end = mf.lead_info(lead)
    top = tops[line]
    bottom = bottoms[line]
    begin, end = markers[begin], markers[end]
    baseline = np.linspace(begin[0], end[0], end[1] - begin[1])
    
    pred0 = (top[begin[1]:end[1]] + bottom[begin[1]:end[1]]) / 2
    baseline = baseline[:len(pred0)] # in case end is outside the image (only idx 202 and some test)
    pred = baseline - pred0

    # Scale
    pred /= 80 # 80 pixels = 1 mV

    # Fix pixels obscured by the markers
    if lead in ['aVR', 'aVL', 'aVF', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6']:
        # first four pixels can be obscured by the marker
        pred[:4] = np.where(pred[:4] > 0.2, pred[4], pred[:4])
    if lead in ['I', 'II-subset', 'III', 'aVR', 'aVL', 'aVF', 'V1', 'V2', 'V3']:
        # last five pixels can be obscured by the marker
        pred[-5:] = np.where(pred[-5:] > 0.2, pred[-6], pred[-5:])
    if lead in ['I', 'II-subset', 'III', 'II']:
        # first two pixels can be obscured by the marker
        pred[:2] = pred[2]

    # Upsample
    pred = np.interp(np.linspace(0, 1, n_timesteps),
                     np.linspace(0, 1, len(pred)),
                     pred)

    # Fix implausible predictions
    # From https://www.kaggle.com/code/antonoof/large-eda-and-statistical-model
    OUTLIER_LOW_THRESHOLD = -1.5
    OUTLIER_HIGH_THRESHOLD = 0.9
    outlier_mask = (pred < OUTLIER_LOW_THRESHOLD) | (pred > OUTLIER_HIGH_THRESHOLD)
    if np.any(outlier_mask):
        for i in np.where(outlier_mask)[0]:
            start_idx = max(0, i - 3)
            end_idx = min(len(pred), i + 4)
            neighbors = pred[start_idx:end_idx]
            valid_neighbors = neighbors[(neighbors >= OUTLIER_LOW_THRESHOLD) & (neighbors <= OUTLIER_HIGH_THRESHOLD)]
            if len(valid_neighbors) > 0:
                pred[i] = np.median(valid_neighbors)
            else:
                pred[i] = 0

    # Median filter proposed by @guntasdhanjal
    pred = medfilt(pred, kernel_size=5)

    return pred

def convert_scanned_color(ima, markers, n_timesteps, verbose=False):
    """将扫描的彩色图像（类型 3 或 11）转换为 12 导联。

    此函数首先从图像中提取四条线。由于这四条线具有不可忽视的宽度，我们构建两个列表：
    - tops = 线条中最上面的黑色像素的 y 坐标
    - bottoms = 线条下方最上面的白色像素的 y 坐标
    这两个列表都是形状为 (image_width, ) 的 4 个数组的列表

    参数：
    ima：具有高度 1652 和宽度 ≈2200 的 3 通道 BGR 图像。
    markers：17 个标记，作为大小为 2 的整数数组（行，列）列表
    n_timesteps：每个导联所需的样本数量（字典）

    返回：
    preds：包含 12 个时间序列的字典"""
    # Crop the image and convert to black and white
    # We use only the red channel (channel 2) so that the red gridlines disappear
    # False = black, True = white
    # The text at the top of the image is discarded.
    crop_top = 400
    ima = ima[crop_top:, :, 2] > 160

    # Denoise single and double black pixels
    iima = ima.astype(np.uint8)
    ima = (iima[:-2, :-2] + iima[:-2, 1:-1] + iima[:-2, 2:]
           + iima[1:-1, :-2] + iima[1:-1, 1:-1] + iima[1:-1, 2:]
           + iima[2:, :-2] + iima[2:, 1:-1] + iima[2:, 2:]) >= 7

    # Clean the bottom border
    baseline_II = markers[15:17][0].max() - crop_top
    all_white = ima[baseline_II:, markers[15][1]:markers[16][1]].all(axis=1)
    ama = np.argmax(all_white)
    if ama > 0 or all_white[0]:
        ima[baseline_II + ama:] = True
    
    # Plot the denoised black-and-white image
    if verbose:
        plt.figure(figsize=(6, 4))
        plt.imshow(ima)
        plt.title('Denoised black-and-white')
        plt.show()
        # cv2.imwrite('ima-bw.png', ima.astype(int) * 255)

    # Find the four lines
    tops, bottoms = [], []
    for i in range(4):
        top, bottom = find_line_by_topdown_sweep(ima)
        tops.append(top)
        bottoms.append(bottom)
    if verbose:
        left = max(markers[:, 1].min() - 80, 0)
        right = markers[:, 1].max() + 80
        _, axs = plt.subplots(4, 1, sharex=True, figsize=(12, 8))
        for i in range(4):
            for j in range(5):
                if 5*i+j < len(markers):
                    axs[i].axvline(markers[5*i+j][1], color='gray')
            axs[i].plot(np.arange(left, right), tops[i][left:right], color='b', label='top')
            axs[i].plot(np.arange(left, right), bottoms[i][left:right], color='m', label='bottom')
            axs[i].invert_yaxis()
            axs[i].set_ylabel('pixel')
            axs[i].legend()
            axs[i].set_title(f"Extracted line {i}")
        plt.show()

    # Transform to global coordinates
    tops = [t + crop_top for t in tops]
    bottoms = [b + crop_top for b in bottoms]

    # Extract the twelve leads from the four lines
    # (as the first part of II is duplicated, we extract it twice
    # and take the average)
    n_timesteps['II-subset'] = n_timesteps['I']
    preds = {}
    for i, lead in enumerate(LEADS + ['II-subset']):
        pred = get_lead_from_top_bottom(tops, bottoms, lead, n_timesteps[lead], markers)
        preds[lead] = pred

    preds['II'][:len(preds['II-subset'])] = (preds['II'][:len(preds['II-subset'])] + preds['II-subset']) / 2
    del preds['II-subset']

    # Apply Einthoven's law
    apply_einthoven(preds)

    return preds

def apply_einthoven(preds):
    """应用爱因霍芬定律以提高预测。

    三个等式为
    I + III - II = 0
    aVR + avL + aVF = 0
    2 * aVR - 2 * aVF + 3 * II = 0
    
    参数：
    pred：时间序列的字典，将被更新"""
    residual = preds['I'] + preds['III'] - preds['II'][:len(preds['III'])]
    correction = residual / 3
    preds['I'] -= correction
    preds['III'] -= correction
    preds['II'][:len(preds['III'])] += correction
    
    residual = preds['aVR'] + preds['aVL'] + preds['aVF']
    correction = residual / 3
    preds['aVR'] -= correction
    preds['aVL'] -= correction
    preds['aVF'] -= correction

    residual = 2 * preds['aVR'] - 2 * preds['aVF'] + 3 * preds['II'][len(preds['I']):len(preds['I'])+len(preds['aVR'])]
    correction = residual / 17
    preds['aVR'] -= 2 * correction
    preds['aVF'] += 2 * correction
    preds['II'][len(preds['I']):len(preds['I'])+len(preds['aVR'])] -= 3 * correction


## 解决方案 1 的验证

我们数字化了一些训练图像，绘制输出并计算信噪比。

如果仔细查看这些图表，您会很容易获得改进的 ideas。这些图表显示：
1. 原始图像
2. 去噪图像
3. 四条提取的心电图线
4. y_true 与 y_pred

验证很重要：如果您每天只能获得五次提交的反馈，您的进展将会慢得多。

In [None]:
def validate_algorithm(convert, start=None, end=None, image_types=None):
    """转换一些训练图像，绘制输出并计算信噪比。"""
    if len(test) != 24:
        # test file when saving has 2*12=24 rows
        return # when submitting, nobody will see the validation output
    snr_list = []
    index_list = []
    is_first_ecg = True # we plot only the images of the first ECG
    for idx, ima, img_type, labels in train_images_and_labels(start=start,
                                                              end=end,
                                                              image_types=image_types,
                                                              use_tqdm=False):

        # Find the 17 line endpoints
        markers = mf.find_markers(ima, plot=is_first_ecg, title='Image with 17 markers')

        # Convert the image to 12 leads
        n_timesteps = {lead: (~ labels[lead].isna()).sum() for lead in LEADS}
        preds = convert(ima, markers, n_timesteps, verbose=is_first_ecg)
        
        # Evaluate the signal-to-noise ratio, plot y_true vs. y_pred
        if is_first_ecg:
            _, axs = plt.subplots(6, 2, figsize=(12, 14))
        sum_signal = 0
        sum_noise = 0
        for i, lead in enumerate(LEADS):
            label = labels[lead]
            label = label[~ label.isna()]
            pred = preds[lead]
    
            aligned_pred = align_signals(label, pred, int(row.fs * MAX_TIME_SHIFT))
            p_signal, p_noise = compute_power(label, aligned_pred)
            sum_signal += p_signal
            sum_noise += p_noise

            if is_first_ecg:
                ax = axs.T.ravel()[i]
                ax.set_title(lead)
                ax.plot(label.values, label='y_true')
                ax.plot(pred, label='y_pred')
                ax.set_xlabel('timestep')
                ax.set_ylabel('mV')
                ax.legend()
        if is_first_ecg:
            plt.tight_layout()
            plt.suptitle('y_true vs. y_pred', y=1.01)
            plt.show()
        snr = compute_snr(sum_signal, sum_noise)
        print(f"{idx=:4d} {img_type=:2d} SNR: {snr:5.2f}")
        snr_list.append(snr)
        index_list.append([idx, img_type])

        if is_first_ecg:
            print('\n')
            is_first_ecg = False

    print(np.array(snr_list).mean(), len(snr_list))
    snr = (np.array(snr_list).mean() - 1) / 9 * len(image_types) + 1
    val_score = max(float(10 * np.log10(snr)), -PERFECT_SCORE)
    print(f"# Average SNR: {snr:.2f} {val_score=:.2f} {image_types}")
    snr_df = pd.DataFrame(index_list, columns=['idx', 'type'])
    snr_df['snr'] = snr_list
    snr_df.to_csv('~snr.csv', index=False)

validate_algorithm(convert_scanned_color, start=400, end=410, image_types=[3, 11])


# 解决方案 2：用于数字化灰度图像的神经网络

平面扫掠是一种不错的算法，但当图像过于嘈杂时，它会失败。它甚至无法处理黑色网格线。那么我们如何将这个 ECG 图像数字化竞赛转变为一个可处理的回归任务呢？

多亏了 `MarkerFinder` 类，我们知道 ECG 线的起点和终点，甚至可以将真实标签映射到图像的像素上。我们现在在这些标记之间将图像切割成细的垂直条纹（高 600 像素，宽 11 像素）。ECG 线在某个高度与条纹相交，预测这个高度就是一个回归任务。

请看下面的图示，它显示了一些随机的条纹。回归目标（在 0 到 600 之间的数字）显示在每个条纹的上方。尝试辨别以下项目：
1. 与条纹相交的 ECG 线（条纹足够高，因此通常会有多条线，其中一条应该位于图示顶部指示的位置）
2. 水平网格线
3. 垂直网格线
4. 显示导联开始的标记
5. 导联标识（字母 'a' 和 'V'）
6. 其他噪声

条纹的大小（600\*11 = 6600 像素）被选定为神经网络（具有 6600 个输入和一个输出）可以在 Kaggle 笔记本中轻松训练。

## 准备训练数据

我们将训练数据放入形状为 (40000, 6600) 的数组 X 和形状为 (40000, ) 的数组 y 中。

In [None]:
# Preparing the training data
n_train_nn, n_training_samples_per_line = 40000, 30
h0, w0 = 600, 11

X = np.zeros((n_train_nn, h0 * w0), dtype=np.float32)
y = np.zeros((n_train_nn, ), dtype=np.float32)

def prepare_dataset(train):
    i_train_nn = 0
    rng = np.random.default_rng(1)
    t = tqdm(total=n_train_nn, position=0)
    for idx, row in train.iterrows():
        labels = pd.read_csv(f'/kaggle/input/physionet-ecg-image-digitization/train/{row.id}/{row.id}.csv')
        png_paths = sorted(glob(f'/kaggle/input/physionet-ecg-image-digitization/train/{row.id}/{row.id}-*.png'))
        for path in png_paths:
            img_type = int(path[-8:-4])
            if img_type in [4, 12]:
                ima = cv2.imread(path)
                markers = mf.find_markers(ima)
                ima = ima.mean(axis=2)
                ll = np.array([labels['aVR'].fillna(0) + labels['V1'].fillna(0),
                               labels['aVL'].fillna(0) + labels['V2'].fillna(0),
                               labels['aVF'].fillna(0) + labels['V3'].fillna(0),])
                for i in range(3):
                    for j in range(1, 3):
                        yl, xl = markers[5*i+j]
                        yp, xp = markers[5*i+j+1] - markers[5*i+j]
                        lead = ['', 'aVR', 'V1', '', '', '', 'aVL', 'V2', '', '', '' ,'aVF', 'V3'][5*i+j]
                        l = labels[lead]
                        l = l[~l.isna()]
                        l = np.interp(np.linspace(0, 1, xp),
                                      np.linspace(0, 1, len(l)),
                                      l) * 80 # true labels in pixel coordinates
                        assert xp > 0 # yp may be negative
                        for k in range(n_training_samples_per_line):
                            alpha = rng.uniform()
                            x0, y0 = int(xl + alpha * xp), int(yl + alpha * yp)
                            X[i_train_nn] = ima[y0-h0//2:y0+h0//2, x0-w0//2:x0+w0//2+1].ravel()
                            y[i_train_nn] = h0 / 2 - l[x0-xl]
                            i_train_nn += 1
                            t.update(1)
                            if i_train_nn == n_train_nn:
                                t.close()
                                print(f"last idx used: {idx}")
                                return
        
prepare_dataset(train.iloc[200:])
print(X.shape, y.shape, X.size*4) # (n_train_nn, w0*h0) (n_train_nn, ) 40000 -> last idx used: 311
X_orig = X.copy()
X = X / 255.0 - 1 # offset so that X.max() = 0
assert np.isfinite(X).all()
assert np.isfinite(y).all()

# X is a scaled pixel intensity, 0 = white, -1 = black
# y is amplitude in pixels, usually between 0 and h0-1, h0//2 is baseline. In rare cases it is below 0 or above h0

In [None]:
l = [17254, 23409, 3713, 6368, 13561, 12331, 21, 9871, 3718, 21152, 9611, 5918, 22371, 33636, 22909, 14500]
_, axs = plt.subplots(1, len(l), sharey=True, sharex=True, figsize=(9, 12))
for i, j in enumerate(l):
    # print(i, j)
    axs[i].imshow(X[j].reshape((h0, w0)))
    axs[i].set_title(f"{y[j]:.0f}")
    axs[i].get_xaxis().set_visible(False)
# plt.savefig('stripes.png')
plt.show()

## 网络

我们实现了一个具有四个隐藏层（700万参数）的神经网络并对其进行训练。如果一切顺利，R2得分应约为0.75，对应于信噪比4。可以随意尝试其他网络架构。

In [None]:
def regression_model():
    """一个具有 4 个隐藏层的密集前馈网络。"""
    initial_lr = 0.005
    noise = 0.3
    x_input = Input(shape=(X.shape[-1], ))
    x = x_input
    x = GaussianNoise(noise)(x)
    x = Dense(units=1024, activation='selu')(x)
    x = Dense(units=512, activation='selu')(x)
    x = Dense(units=256, activation='selu')(x)
    x = Dense(units=256, activation='selu')(x)
    x_output = Dense(units=1, activation='linear', bias_initializer=Constant(h0 / 2))(x)

    model = Model(inputs=x_input, outputs=x_output)
    return model, initial_lr

model, _ = regression_model()
model(X[0:5]).shape
model.summary()

In [None]:
# Train the regression model
# Inputs for training: X and y
epochs = 70
verbose = 2
batch_size = 64

grayscale_model_list = []
oof = np.zeros_like(y)
kf = KFold(shuffle=True, random_state=1)
for fold, (idx_tr, idx_va) in enumerate(kf.split(X)):
    X_tr = X[idx_tr]
    X_va = X[idx_va]
    y_tr = y[idx_tr]
    y_va = y[idx_va]

    model, initial_lr = regression_model()

    # Train the complete model
    model.compile(
        optimizer=Adam(learning_rate=initial_lr),
        loss=MeanSquaredError(),
    )
    print(y_va.var())
    history = model.fit(
        X_tr, y_tr,
        validation_data=(X_va, y_va),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[EarlyStopping(patience=6, min_delta=0.1),
                   ReduceLROnPlateau(factor=0.5, patience=2, verbose=1, min_delta=0.1, min_lr=initial_lr/63),
                   TerminateOnNaN()],
        verbose=verbose
    )
    history = history.history
    plot_training_history(history)

    y_pred = model.predict(X_va, batch_size=1024, verbose=0).ravel()
    oof[idx_va] = y_pred
    mse = np.square(y_va - y_pred).mean()
    r2 = r2_score(y_va, y_pred)
    print(f'# Fold {fold} {mse:6.2f} {r2:4.2f} {np.var(y_va):6.2f} {len(X)}*{h0}*{w0} {batch_size}\n')
    grayscale_model_list.append(model)

    # Regression
    plt.figure(figsize=(16, 4))
    plt.subplot(1, 2, 1)
    plt.hist(y, bins=50, density=True, label='y_true')
    plt.hist(y_pred, bins=50, density=True, alpha=0.6, label='y_pred')
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.scatter(y_pred, y_va, s=1)
    plt.xlabel('y_pred')
    plt.ylabel('y_true')
    plt.gca().set_aspect('equal')
    plt.show()
    break

# Fold 0  75.41 0.76 317.96 *Model 0 initial_lr=0.005 noise=0.3* 40000*600*11 64

In [None]:
del X, y

## 解决方案 2 的验证

我们已经使用 train.iloc\[200:\] 来训练神经网络。因此，我们可以使用 train.iloc\[:200\] 进行验证。我们处理了一些图像并评估信噪比。

顺便提一下，灰度模型也适用于彩色图像，因此我们可以将神经网络的预测与平面扫描的预测结合起来。

In [None]:
def get_grayscale_lead(ima, lead, n_timesteps, markers):
    """从 ECG 图像中提取并重新采样一个导联。

    如果可能，函数返回时间序列，否则返回 None。
    
    参数：
    ima：单通道（灰度图像）
    lead：12 个导联标签中的一个（字符串）
    n_timesteps：所需样本数量（整数）
    markers：大小为 2 的整数数组（行，列）的 17 个标记列表

    返回：
    pred：形状为 (n_timesteps, ) 的数组或 None"""
    line, begin, end = mf.lead_info(lead)

    X_list = []
    width = markers[end][1] - markers[begin][1]
    slope = (markers[end][0] - markers[begin][0]) / width

    for x0 in range(markers[begin][1], markers[end][1]):
        y0 = int(markers[begin][0] + slope * (x0 - markers[begin][1]))
        X = ima[y0-h0//2:y0+h0//2, x0-w0//2:x0+w0//2+1]
        if X.size == 0:
            X = np.full((h0, w0), 255) # all white
        else:
            if X.shape[0] < h0:
                X = np.vstack([X, np.full((h0 - X.shape[0], X.shape[1]), 255)]) # pad bottom white
            if X.shape[1] < w0:
                X = np.hstack([X, np.full((X.shape[0], w0 - X.shape[1]), 255)]) # pad right white
        X_list.append(X)
    X = np.stack(X_list).reshape(len(X_list), -1).astype(np.float32)
    assert X.shape[1] == w0 * h0
    assert np.isfinite(X).all()

    X = X / 255.0 - 1

    pred = h0 / 2 - np.mean([model.predict(X, batch_size=1024, verbose=0).ravel() for model in grayscale_model_list], axis=0)

    # Scale
    pred /= 80 # 80 pixels = 1 mV

    # Upsample
    pred = np.interp(np.linspace(0, 1, n_timesteps),
                     np.linspace(0, 1, len(pred)),
                     pred)
    
    # Fix implausible predictions
    pred = np.where(np.abs(pred) <= 2, pred, 0)
        
    return pred


def convert_scanned_grayscale(ima, markers, n_timesteps, verbose=False):
    """将扫描的灰度图像（类型4或12）转换为12个时间序列。

    参数：
    ima：3通道BGR图像，高度为1652，宽度≈2200。
    markers：17个标记，作为大小为2的整数数组（行，列）的列表
    n_timesteps：每个导联所需的样本数量（字典）

    返回：
    preds：包含12个时间序列的字典或None"""
    # Drop the color channels / convert to grayscale
    ima = ima.mean(axis=2)

    n_timesteps['II-subset'] = n_timesteps['I']
    preds = {}
    for i, lead in enumerate(LEADS + ['II-subset']):
        preds[lead] = get_grayscale_lead(ima, lead, n_timesteps[lead], markers)

    if preds['II'] is not None and preds['II-subset'] is not None:
        preds['II'][:len(preds['II-subset'])] = (preds['II'][:len(preds['II-subset'])] + preds['II-subset']) / 2
    elif preds['II-subset'] is not None:
        preds['II'] = np.zeros(n_timesteps['II'])
        preds['II'][:len(preds['II-subset'])] = preds['II-subset']
    del preds['II-subset']

    # Apply Einthoven's law
    apply_einthoven(preds)
    
    return preds

def convert_scanned_both(ima, markers, n_timesteps, verbose=False):
    """将扫描的彩色图像（类型 3 或 11）转换为 12 个时间序列。

    该函数使用平面扫描算法集成神经网络。

    参数：
    ima：高度为 1652，宽度约为 2200 的 3 通道 BGR 图像。
    markers：17 个标记，作为大小为 2 的整数数组（行，列）的列表
    n_timesteps：每个导线所需的样本数量（字典）

    返回：
    preds：包含 12 个时间序列的字典或 None"""
    keys = list(n_timesteps.keys())
    pred1 = convert_scanned_grayscale(ima, markers, n_timesteps, verbose=False)
    pred2 = convert_scanned_color(ima, markers, n_timesteps, verbose=False)

    def ensemble_two(pred1, pred2):
        if pred1 is not None and pred2 is not None:
            return (pred1 + pred2) / 2
        if pred1 is not None:
            return pred1
        return pred2
        
    preds = {k: ensemble_two(pred1[k], pred2[k]) for k in keys}
    return preds


validate_algorithm(convert_scanned_grayscale, start=400, end=410, image_types=[4, 12])
validate_algorithm(convert_scanned_both, start=400, end=410, image_types=[3, 11])


# 数字化和提交测试图像

我们对类型为3、4、11和12的所有测试图像进行数字化（这些图像是通过扫描仪获取的）。对于所有其他图像（即手机照片），我们提交平均训练标签。

In [None]:
def is_color_image(ima):
    """测试三通道图像是否具有颜色。"""
    return ima.std(axis=2).mean() != 0

In [None]:
submission_data = []
old_id = None
leads = None
for idx, row in test.iterrows():
    if row.id != old_id:
        path = f"/kaggle/input/physionet-ecg-image-digitization/test/{row.id}.png"
        # path = '/kaggle/input/physionet-ecg-image-digitization/train/1006427285/1006427285-0004.png'
        # path = '/kaggle/input/physionet-ecg-image-digitization/train/1006427285/1006427285-0011.png'
        ima = cv2.imread(path)
        shape = ima.shape
        good_shape = shape[0] == 1652 # scanned images have 1652 rows
        
        if good_shape:
            # Find the 17 line endpoints
            markers = mf.find_markers(ima)

            # Convert the image to 12 time series
            n_timesteps = {lead: row.fs * 10 if lead == 'II' else row.fs * 10 // 4 for lead in LEADS}
            if is_color_image(ima):
                preds = convert_scanned_both(ima, markers, n_timesteps, verbose=False)
            else:
                preds = convert_scanned_grayscale(ima, markers, n_timesteps, verbose=False)

        else: # we cannot interpret the image -> predict the mean
            preds = None
            
        old_id = row.id

    if row.lead == 'II':
        assert row.number_of_rows == row.fs * 10
    else:
        assert row.number_of_rows == row.fs * 10 // 4

    if preds is not None:
        pred = preds[row.lead]
    else:
        pred = mean_dict[row.lead].mean(axis=0)
        pred = np.interp(np.linspace(0, 1, row.number_of_rows),
                         np.linspace(0, 1, len(pred)),
                         pred)
    assert len(pred) == row.number_of_rows

    for timestep in range(row.number_of_rows):
        signal_id = f"{row.id}_{timestep}_{row.lead}"
        submission_data.append({
            'id': signal_id,
            'value': pred[timestep]
        })

submission_df = pd.DataFrame(submission_data)
print(f"Length: {len(submission_df)}")
submission_df.to_csv('submission.csv', index=False)
!head submission.csv