In [1]:
import os
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
# Model added : physio-seg-public

In [3]:
# Kaggle paths
DATA_DIR = r"/kaggle/input/physionet-ecg-image-digitization/"
MODEL_DIR = r"/kaggle/input/physio-seg-public/pytorch/net3_009_4200/1"
TEST_IMG_DIR = os.path.join(DATA_DIR, r"test")
TEST_CSV = os.path.join(DATA_DIR, r"test.csv")
SAMPLE_SUB = os.path.join(DATA_DIR, r"sample_submission.parquet")
OUT_SUB = r"/kaggle/working/submission.csv"

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cpu


# Model - Minimal UNet Blocks

In [5]:
class ConvBNReLU(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 3, padding=1, bias=False),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        return self.conv(x)

In [6]:
def center_crop(tensor, target_h, target_w):
    _, _, h, w = tensor.shape
    dh = (h - target_h) // 2
    dw = (w - target_w) // 2
    return tensor[:, :, dh:dh + target_h, dw:dw + target_w]

def match_tensor(x, ref):
    """
    Crops x spatially so that x.shape[-2:] == ref.shape[-2:]
    """
    _, _, h, w = x.shape
    _, _, rh, rw = ref.shape

    dh = h - rh
    dw = w - rw

    if dh > 0:
        x = x[:, :, dh // 2 : h - (dh - dh // 2), :]
    if dw > 0:
        x = x[:, :, :, dw // 2 : w - (dw - dw // 2)]

    return x


In [7]:
class DecoderBlock(nn.Module):
    def __init__(self, in_ch, skip_ch, out_ch):
        super().__init__()
        self.conv1 = ConvBNReLU(in_ch + skip_ch, out_ch)
        self.conv2 = ConvBNReLU(out_ch, out_ch)
    
    
    def forward(self, x, skip):
        x = F.interpolate(x, scale_factor=2, mode="nearest")

        if x.shape[-2:] != skip.shape[-2:]:
            skip = match_tensor(skip, x)

        x = torch.cat([x, skip], dim=1)
        x = self.conv1(x)
        x = self.conv2(x)
        return x

# ResNet34 Encoder (Torchvision-Free)

In [8]:
class Encoder(nn.Module):
    def __init__(self):
        super().__init__()
        from torchvision.models import resnet34
        m = resnet34(weights=None)

        self.layer0 = nn.Sequential(m.conv1, m.bn1, m.relu)
        self.layer1 = nn.Sequential(m.maxpool, m.layer1)
        self.layer2 = m.layer2
        self.layer3 = m.layer3
        self.layer4 = m.layer4

    def forward(self, x):
        x0 = self.layer0(x)
        x1 = self.layer1(x0)
        x2 = self.layer2(x1)
        x3 = self.layer3(x2)
        x4 = self.layer4(x3)
        return x0, x1, x2, x3, x4

# Full UNet Model (Matches physio-seg)

In [9]:
class ECGUNet(nn.Module):
    def __init__(self):
        super().__init__()

        self.encoder = Encoder()

        self.decoder4 = DecoderBlock(512, 256, 256)
        self.decoder3 = DecoderBlock(256, 128, 128)
        self.decoder2 = DecoderBlock(128, 64, 64)
        self.decoder1 = DecoderBlock(64, 64, 16)
        
        self.head     = nn.Conv2d(16, 4, kernel_size=1)


    def forward(self, x):
        x0, x1, x2, x3, x4 = self.encoder(x)
        d4 = self.decoder4(x4, x3)
        d3 = self.decoder3(d4, x2)
        d2 = self.decoder2(d3, x1)
        d1 = self.decoder1(d2, x0)
        return self.head(d1)

# Load physio-seg Weights

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ECGUNet().to(device)

ckpt = torch.load(
    "/kaggle/input/physio-seg-public/pytorch/net3_009_4200/1/iter_0004200.pt",
    map_location=device
)

state = ckpt["state_dict"] if "state_dict" in ckpt else ckpt

clean = {}
for k, v in state.items():
    if "num_batches_tracked" in k:
        continue
    clean[k.replace("decoder.block.", "decoder.")\
            .replace("pixel.", "head.")] = v

model.load_state_dict(clean, strict=False)
model.eval()

print("Model loaded successfully")

Model loaded successfully


# Image Preprocessing

In [11]:
def load_image(path, pad=32):
    img = cv2.imread(path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = img.astype(np.float32) / 255.0

    h, w, _ = img.shape

    pad_h = (pad - h % pad) % pad
    pad_w = (pad - w % pad) % pad

    img = np.pad(
        img,
        ((0, pad_h), (0, pad_w), (0, 0)),
        mode="constant",
        constant_values=0
    )

    img = torch.from_numpy(img).permute(2, 0, 1)  # [3, H, W]
    img = img.unsqueeze(0)                         # [1, 3, H, W]
    return img

# Inference + Submission

In [12]:
test_files = sorted(os.listdir(TEST_IMG_DIR))

def file_to_id(fname):
    return os.path.splitext(fname)[0]   # removes .png

test_ids = [file_to_id(f) for f in test_files]

In [13]:
# def load_image(path):
#     img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
#     img = img.astype(np.float32) / 255.0
#     img = torch.from_numpy(img).unsqueeze(0).unsqueeze(0)
#     return img

In [14]:
# def load_image(path):
#     img = cv2.imread(path)                 # BGR, 3-channel
#     img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
#     img = img.astype(np.float32) / 255.0

#     img = torch.from_numpy(img).permute(2, 0, 1)  # [3, H, W]
#     img = img.unsqueeze(0)                         # [1, 3, H, W]
#     return img

In [15]:
rows = []

for id_, fname in zip(test_ids, test_files):
    img_path = os.path.join(TEST_IMG_DIR, fname)
    with torch.no_grad():
        x = load_image(img_path).to(device)
        feats = model.encoder(x)[-1]
        value = feats.mean().item()

    rows.append({
        "id": id_,
        "value": float(value)
    })

df = pd.DataFrame(rows)
df.to_csv(OUT_SUB, index=False)

In [16]:
df

Unnamed: 0,id,value
0,1053922973,0.022935
1,2352854581,0.02295


In [17]:
# assert list(df.columns) == ["id", "value"]
# assert df.isnull().sum().sum() == 0
# assert len(df) == len(test_ids)

# print(df.head())
# print(df.dtypes)
# print("Rows:", len(df))

In [18]:
# @torch.no_grad()
# def predict_mask(img_path):
#     x = load_image(img_path).to(device)
#     y = model(x)                  # [1, 4, H, W] logits

#     probs = torch.sigmoid(y)      # independent channels

#     ECG_CLASS = 1   # <-- try 1 first (see note below)
#     ecg = probs[:, ECG_CLASS]     # [1, H, W]

#     # mask = (ecg > 0.5).float()    # Day-1 threshold
#     # keep only top X% most confident pixels
#     thr = torch.quantile(ecg, 0.995)   # start with 99.5 percentile
#     mask = (ecg >= thr).float()

#     print(
#     ecg.min().item(),
#     ecg.mean().item(),
#     ecg.max().item(),
#     mask.mean().item()
# )
    


#     return mask.squeeze().cpu().numpy()

In [19]:
# IMAGE_IDS = sorted(os.listdir(TEST_IMG_DIR))

# records = []

# for name in tqdm(IMAGE_IDS):
#     img_path = os.path.join(TEST_IMG_DIR, name)

#     mask = predict_mask(img_path)

#     # simple Day-1 threshold
#     binary = (mask > 0.5).astype(np.uint8)

#     # flatten for submission (example format)
#     rle = binary.flatten().tolist()

#     records.append({
#         "image_id": name,
#         "prediction": " ".join(map(str, rle))
#     })

In [20]:
# df = pd.DataFrame(records)
# df.to_csv(OUT_SUB, index=False)
# print("submission.csv saved")

In [21]:
# df.head()

In [22]:
# df.iloc[0,1]

In [23]:
# img_path = os.path.join(TEST_IMG_DIR, r'1053922973.png')

# with torch.no_grad():
#     x = load_image(img_path).to(device)
#     y = model(x)
#     # y = torch.sigmoid(y)

# for c in range(4):
#     print(c, y[0, c].mean().item(), y[0, c].max().item())