# Abstruct


# 0. Setting

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import nibabel as nib

import platform
import sys
import os
import yaml
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import seaborn as sns
import cv2
import random

# Find session type
def find_session_type():
    # windows
    if os.name == 'nt':
        path = '../input/data/'

        import japanize_matplotlib
        sns.set(font="IPAexGothic")

    elif platform.system()  == 'Darwin':
        # Mac
        path = '../input/data/'
        return 'mac'
    
    elif os.name == 'posix':
    # Kaggle
        if 'KAGGLE_DATA_PROXY_TOKEN' in os.environ.keys():
            print('This is kaggle session')
            return 'kaggle'

    # Google Colab
        else:
            print('This is colab session')
            # セッションの残り時間の確認
            !cat /proc/uptime | awk '{print $1 /60 /60 /24 "days (" $1 / 60 / 60 "h)"}'
            return 'colab'
    # Mac
    # elif 'MAC' in os.environ.keys():
    #     print('This is mac session')
    #     return 'mac'
# Example usage:
if find_session_type() == 'kaggle':
    print("Running in a Kaggle notebook")
elif find_session_type() == 'mac':
    print("Running on a Mac")
    path = '../input/data/'

elif find_session_type() == 'colab':
    from google.colab import drive
    print("Running in Google Colab")
    drive.mount('/content/drive')
    os.makedirs('/content/logs', exist_ok=True)
    os.makedirs('/content/kaggle/input', exist_ok=True)
    os.makedirs('/content/kaggle/output', exist_ok=True)
else:
    print("Not running in a Kaggle notebook or Google Colab")

In [None]:
import time
import random

class Config():
    def __init__(self):
        # General settings
        self.debug = True
        self.random_seed = 42
        self.num_workers = 4
        self.train_split = 0.8
        self.model_name = "yolov8n"
        self.batch_size = 2
        random.seed(self.random_seed)
        np.random.seed(self.random_seed)
        torch.manual_seed(self.random_seed)

        cpu_count = os.cpu_count()
        if find_session_type() == 'kaggle':
            self.num_workers = min(4, cpu_count)  # Kaggle has limit
        elif find_session_type() == 'colab':
            self.num_workers = min(2, cpu_count)  # Colab is often limited
        else:
            self.num_workers = max(2, cpu_count // 2)  # Local: use half of CPUs

        if find_session_type() == 'kaggle':
            self.working_dir = '/kaggle/working/'
            self.input_dir = '/kaggle/input/'
            self.output_dir = '/kaggle/output/'
        elif find_session_type() == 'colab':
            self.working_dir = '/content/drive/MyDrive/kaggle/BYU-motor-detection/'
            self.input_dir = '/content/kaggle/input/'
            self.output_dir = '/content/kaggle/output/'
        elif find_session_type() == 'mac':
            self.working_dir = '/working'
            self.input_dir = '/input/'
            self.output_dir = '/output/'
        
        self.data_dir = os.path.join(self.input_dir, 'byu-locating-bacterial-flagellar-motors-2025')
        
        self.yolo_dataset_dir = os.path.join(self.working_dir, 'yolo_dataset')
        self.yolo_images_train = os.path.join(self.yolo_dataset_dir, 'images', 'train')
        self.yolo_images_val = os.path.join(self.yolo_dataset_dir, 'images', 'val')
        self.yolo_labels_train = os.path.join(self.yolo_dataset_dir, 'labels', 'train')
        self.yolo_labels_val = os.path.join(self.yolo_dataset_dir, 'labels', 'val')
        self.yolo_weights_dir = os.path.join(self.working_dir, 'yolo_weights')


        # 3d Encoder settings
        self.strides = (2, 2, 2)
        self.in_channels = 1
        

        # YOLO settings
        self.img_size = 640
        self.yolo_epochs = 100
        self.yolo_pretrained_weights = "yolov8n.pt"
        self.yolo_box_size = 30

        # Depth Estimation settings
        self.depth_epochs = 100
        self.depth_size_profile = 

class GPUProfiler:
    def __init__(self, name):
        self.name = name
        self.start_time = None

    def __enter__(self):
        if torch.cuda.is_available():
            torch.cuda.synchronize()
        self.start_time = time.time()
        return self

    def __exit__(self, *args):
        if torch.cuda.is_available():
            torch.cuda.synchronize()
        elapsed = time.time() - self.start_time
        # print(f"[PROFILE] {self.name}: {elapsed:.3f}s")

config = Config()

device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
if device.startswith('cuda'):
    # Set CUDA optimization flags
    torch.backends.cudnn.benchmark = True
    torch.backends.cudnn.deterministic = False
    torch.backends.cuda.matmul.allow_tf32 = True  # Allow TF32 on Ampere GPUs
    torch.backends.cudnn.allow_tf32 = True

    # Print GPU info
    gpu_name = torch.cuda.get_device_name(0)
    gpu_mem = torch.cuda.get_device_properties(0).total_memory / 1e9  # Convert to GB
    print(f"Using GPU: {gpu_name} with {gpu_mem:.2f} GB memory")

    # Get available GPU memory and set batch size accordingly
    free_mem = gpu_mem - torch.cuda.memory_allocated(0) / 1e9
    config.batch_size = max(8, min(32, int(free_mem * 4)))  # 4 images per GB as rough estimate
    print(f"Dynamic batch size set to {config.batch_size} based on {free_mem:.2f}GB free memory")
else:
    print("GPU not available, using CPU")
    config.batch_size = 4  # Reduce batch size for CPU

# 1 Preprocess Pipeline

## 1.1 3D-2D encoder

(1) 強度正規化・クリッピング
- HU値は与えられていないため強度の正規化は省略
(2) 最小限のパディング
- 3D Encoderのダウンサンプリング段数に合わせて、各空間軸を2のn乗に合わせる
    - Downsampling stride: 32
- バッチ処理はバッチ内最大サイズに揃える
(3) Tensor化
- torch.Tensorに変換する
(4) Down-sampling
- 各層で半分に3d-down-sampling
(5) 

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import nibabel as nib  # or pydicom, SimpleITK, etc.

# DataLoader
dataset = Patch3DDataset(image_paths, patch_size=(64,64,64), transforms=my_preproc)
loader  = DataLoader(dataset, batch_size=2, num_workers=config.num_workers, pin_memory=True)


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import yolo

class Patch3DDataset(Dataset):
    def __init__(self, image_paths, patch_size=(64,64,64), transforms=None):
        self.paths = image_paths
        self.patch_size = patch_size
        self.transforms = transforms

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        # 1) ボリュームをメモリマップ or メインメモリへ読み込み
        img = nib.load(self.paths[idx]).get_fdata(dtype=np.float32)
        D, H, W = img.shape

        # 2) ランダムに patch の開始位置をサンプリング
        pd, ph, pw = self.patch_size
        d0 = np.random.randint(0, max(D - pd + 1, 1))
        h0 = np.random.randint(0, max(H - ph + 1, 1))
        w0 = np.random.randint(0, max(W - pw + 1, 1))

        # 3) スライスアウト
        patch = img[d0:d0+pd, h0:h0+ph, w0:w0+pw]

        # 4) 必要なら前処理／正規化
        if self.transforms:
            patch = self.transforms(patch)

        # 5) Tensor へ変換
        return torch.from_numpy(patch).unsqueeze(0)  # [1, D, H, W]

class Dynamic3D2DEncoder(nn.Module):
    def __init__(self, in_ch=1, base_ch=32, strides=(2,2,2)):
        super().__init__()
        # 3D Encoder：各層で半分にダウンサンプリング
        layers = []
        ch = in_ch
        for _ in range(3):  # 4層ダウンサンプリング
            layers += [
                nn.Conv3d(ch, base_ch, kernel_size=3, padding=1),
                nn.BatchNorm3d(base_ch), nn.ReLU(inplace=True),
                nn.MaxPool3d(strides)
            ]
            ch = base_ch
            base_ch *= 2
        self.encoder3d = nn.Sequential(*layers)
        # 深度を 1 に縮約
        self.depth_pool = nn.AdaptiveAvgPool3d((1, None, None))

    def forward(self, x3d, yolo_size=(640,640)):
        # x3d: [B,1,D_i,H_i,W_i]　※D_i,H_i,W_i は動的
        feat3d = self.encoder3d(x3d)            # → [B, C, D', H', W']
        feat3d = self.depth_pool(feat3d)        # → [B, C, 1, H', W']
        feat2d = feat3d.squeeze(2)              # → [B, C, H', W']
        # YOLO が想定する (H_y, W_y) に合わせてリサイズ
        feat2d = F.interpolate(feat2d,
                              size=yolo_size,
                              mode='bilinear',
                              align_corners=False)
        return feat2d  # → [B, C, H_y, W_y]


# 2. Detecting 2d position

Encoderに通して2次元に畳み込みを行った画像群にYOLOを適用する。
1. YOLO形式のannotationを生成する
- bboxは既存のものを流用する
2. YOLOに入力
3. Prediction
- 推論を行う。出力されるのは$(x,y)$のみであるため、Z方向に対しては別のモデルを用いて予測を行う

In [None]:
class TwoStageDetector(nn.Module):
    def __init__(self, num_classes, yolo_size=(640,640)):
        super().__init__()
        self.encoder = Dynamic3D2DEncoder(in_ch=1, base_ch=32)
        # YOLO のバックボーン最初のチャネル数を合わせる
        self.yolo = YOLOv5(backbone_in_ch=32, num_classes=num_classes)
        self.yolo_size = yolo_size

    def forward(self, x3d):
        feat2d = self.encoder(x3d, yolo_size=self.yolo_size)
        detections = self.yolo(feat2d)  # 通常の YOLO 推論
        return detections
    
    def predict_on_samples(self, model):
        val_dir = os.path.join(config.data_dir, 'val')
        val_images = sorted(os.listdir(val_dir))
        num_samples = min(num_samples, len(val_images))
        samples = random.sample(val_images, num_samples)
        fig, axes = plt.subplots(2, 2, figsize=(12, 12))
        axes = axes.flatten()

        for i, img_file in enumerate(samples):
            if i >= len(axes):
                break

            img_path = os.path.join(val_dir, img_file)
            results = model.predict(img_path, conf=0.25)[0]
            img = Image.open(img_path)
            axes[i].imshow(np.array(img), cmap='gray')

            try:
                parts = img_file.split('_')
                y_part = [p for p in parts if p.startswith('y')]
                x_part = [p for p in parts if p.startswith('x')]
                if y_part and x_part:
                    y_gt = int(y_part[0][1:])
                    x_gt = int(x_part[0][1:].split('.')[0])
                    rect_gt = Rectangle((x_gt - config.yolo_box_size//2, y_gt - config.yolo_box_size//2),
                                        config.yolo_box_size,
                                        config.yolo_box_size,
                                        linewidth=1, edgecolor='g',
                                        facecolor='none')
                    axes[i].add_patch(rect_gt)
            except:
                pass
        
            if len(results.boxes) > 0:
                boxes = results.boxes.xyxy.cpu().numpy()
                confs = results.boxes.conf.cpu().numpy()
                for box, conf in zip(boxes, confs):
                    x1, y1, x2, y2 = box
                    rect_pred = Rectangle((x1, y1), x2-x1, y2-y1, linewidth=1, edgecolor='r', facecolor='none')
                    axes[i].add_patch(rect_pred)
                    axes[i].text(x1, y1-5, f'{conf:.2f}', color='red')

            axes[i].set_title(f"Image: {img_file}\nGT (green) vs Pred (red)")
        plt.tight_layout()
        plt.savefig(os.path.join(config.working_dir, 'predictions.png'))
        plt.show()

In [None]:
def fix_yaml_paths(yaml_path):
    """
    Fix the paths in the YAML file to match the actual Kaggle directories.

    Args:
        yaml_path (str): Path to the original dataset YAML file.

    Returns:
        str: Path to the fixed YAML file.
    """
    print(f"Fixing YAML paths in {yaml_path}")
    with open(yaml_path, 'r') as f:
        yaml_data = yaml.safe_load(f)

    if 'path' in yaml_data:
        yaml_data['path'] = yolo_dataset_dir #TODO: Change path

    fixed_yaml_path = os.path.join(WORKING_DIR, "fixed_dataset.yaml")
    with open(fixed_yaml_path, 'w') as f:
        yaml.dump(yaml_data, f)

    print(f"Created fixed YAML at {fixed_yaml_path} with path: {yaml_data.get('path')}")
    return fixed_yaml_path

def plot_dfl_loss_curve(run_dir):
    """
    Plot the DFL loss curves for training and validation, marking the best model.

    Args:
        run_dir (str): Directory where the training results are stored.
    """
    results_csv = os.path.join(run_dir, 'results.csv')
    if not os.path.exists(results_csv):
        print(f"Results file not found at {results_csv}")
        return

    results_df = pd.read_csv(results_csv)
    train_dfl_col = [col for col in results_df.columns if 'train/dfl_loss' in col]
    val_dfl_col = [col for col in results_df.columns if 'val/dfl_loss' in col]

    if not train_dfl_col or not val_dfl_col:
        print("DFL loss columns not found in results CSV")
        print(f"Available columns: {results_df.columns.tolist()}")
        return

    train_dfl_col = train_dfl_col[0]
    val_dfl_col = val_dfl_col[0]

    best_epoch = results_df[val_dfl_col].idxmin()
    best_val_loss = results_df.loc[best_epoch, val_dfl_col]

    plt.figure(figsize=(10, 6))
    plt.plot(results_df['epoch'], results_df[train_dfl_col], label='Train DFL Loss')
    plt.plot(results_df['epoch'], results_df[val_dfl_col], label='Validation DFL Loss')
    plt.axvline(x=results_df.loc[best_epoch, 'epoch'], color='r', linestyle='--',
                label=f'Best Model (Epoch {int(results_df.loc[best_epoch, "epoch"])}, Val Loss: {best_val_loss:.4f})')
    plt.xlabel('Epoch')
    plt.ylabel('DFL Loss')
    plt.title('Training and Validation DFL Loss')
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)

    plot_path = os.path.join(run_dir, 'dfl_loss_curve.png')
    plt.savefig(plot_path)
    plt.savefig(os.path.join(WORKING_DIR, 'dfl_loss_curve.png')) #TODO: save to working dir

    print(f"Loss curve saved to {plot_path}")
    plt.close()

    return best_epoch, best_val_loss

def train_yolo_model(yaml_path, pretrained_weights_path, epochs, batch_size, img_size):
    """
    Train the YOLO model using the specified parameters.

    Args:
        yaml_path (str): Path to the dataset YAML file.
        pretrained_weights (str): Path to the pretrained weights file.
        epochs (int): Number of training epochs.
        batch_size (int): Batch size for training.
        img_size (int): Image size for training.

    Returns:
        str: Path to the trained model weights.
    """
    print(f"Loading pre-trained weights from {pretrained_weights_path}")
    model = YOLO(pretrained_weights_path)

    results = model.train(
        data=yaml_path,
        epochs=epochs,
        batch_size=batch_size,
        img_size=img_size,
        device=device,
        project=config.yolo_weights_dir,
        name='motor_detector',
        exist_ok=True,
        patience=5,
        save_period=5,
        val=True,
        save_best=True,
        verbose=True
    )

    run_dir = os.path.join(config.yolo_weights_dir, 'motor_detector')
    best_epoch, best_val_loss = plot_dfl_loss_curve(run_dir)
    if best_epoch is not None:
        print(f"Best epoch: {best_epoch}, Best validation DFL loss: {best_val_loss:.4f}")
    
    return model, results

# 3 Regression Z position

YOLOで検出した$(x,y)$に基づき、適切なZ座標を1次元畳み込みしたものから求める

In [None]:
class DepthRegressor1D(nn.Module):
    def __init__(self, in_channels=1, base_ch=16, D_max=800):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(in_channels, base_ch, 3, padding=1), nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(base_ch, base_ch*2, 3, padding=1), nn.ReLU(),
            nn.AdaptiveAvgPool1d(1),
        )
        self.fc = nn.Linear(base_ch*2, 1)

    def forward(self, depth_profile):
        # depth_profile: [B, D_i] … 元ボリューム V[z] を y,x でサンプリングしたもの
        f = self.net(depth_profile.unsqueeze(1))  # → [B, C, 1]
        return self.fc(f.squeeze(-1))              # → [B,1]

# Evaluate the results
from sklearn.metrics import fbeta_score
def distance_metric(self, 
    solution: pd.DataFrame,
    submission: pd.DataFrame,
    thresh_ratio: float,
    min_radius: float,
):
    coordinate_cols = ['Motor axis 0', 'Motor axis 1', 'Motor axis 2']
    label_tensor = solution[coordinate_cols].values.reshape(len(solution), -1, len(coordinate_cols))
    predicted_tensor = submission[coordinate_cols].values.reshape(len(submission), -1, len(coordinate_cols))
    # Find the minimum euclidean distances between the true and predicted points
    solution['distance'] = np.linalg.norm(label_tensor - predicted_tensor, axis=2).min(axis=1)
    # Convert thresholds from angstroms to voxels
    solution['thresholds'] = solution['Voxel spacing'].apply(lambda x: (min_radius * thresh_ratio) / x)
    solution['predictions'] = submission['Has motor'].values
    solution.loc[(solution['distance'] > solution['thresholds']) & (solution['Has motor'] == 1) & (submission['Has motor'] == 1), 'predictions'] = 0
    return solution['predictions'].values

def score(self, solution:pd.DataFrame, submission:pd.DataFrame, min_radius:float=1000, beta:float=2) -> float:
    """
    Parameters:
    solution (pd.DataFrame): DataFrame containing ground truth motor positions.
    submission (pd.DataFrame): DataFrame containing predicted motor positions.

    Returns:
    float: FBeta score.

    Example
    --------
    >>> solution = pd.DataFrame({
    ...     'tomo_id': [0, 1, 2, 3],
    ...     'Motor axis 0': [-1, 250, 100, 200],
    ...     'Motor axis 1': [-1, 250, 100, 200],
    ...     'Motor axis 2': [-1, 250, 100, 200],
    ...     'Voxel spacing': [10, 10, 10, 10],
    ...     'Has motor': [0, 1, 1, 1]
    ... })
    >>> submission = pd.DataFrame({
    ...     'tomo_id': [0, 1, 2, 3],
    ...     'Motor axis 0': [100, 251, 600, -1],
    ...     'Motor axis 1': [100, 251, 600, -1],
    ...     'Motor axis 2': [100, 251, 600, -1]
    ... })
    >>> score(solution, submission, 1000, 2)
    0.3571428571428571
    """

    solution = solution.sort_values('tomo_id').reset_index(drop=True)
    submission = submission.sort_values('tomo_id').reset_index(drop=True)

    filename_equiv_array = solution['tomo_id'].eq(submission['tomo_id'], fill_value=0).values

    if np.sum(filename_equiv_array) != len(solution['tomo_id']):
        raise ValueError('Submitted tomo_id values do not match the sample_submission file')

    submission['Has motor'] = 1
    # If any columns are missing an axis, it's marked with no motor
    select = (submission[['Motor axis 0', 'Motor axis 1', 'Motor axis 2']] == -1).any(axis='columns')
    submission.loc[select, 'Has motor'] = 0

    cols = ['Has motor', 'Motor axis 0', 'Motor axis 1', 'Motor axis 2']
    assert all(col in submission.columns for col in cols)

    # Calculate a label of 0 or 1 using the 'has motor', and 'motor axis' values
    predictions = self.distance_metric(
        solution,
        submission,
        thresh_ratio=1.0,
        min_radius=min_radius,
    )

    return fbeta_score(solution['Has motor'].values, predictions, beta=beta)


In [None]:
def main():
    print("Starting 3D-2D Encoder Training")
    print("Starting YOLOv8 Training")
    print("Starting Depth Estimation Training")

Z方向の長さが異なる場合
サンプルの深さ $ D_i $ が異なる問題への対策
1. 補完リサイズ
- 下の深度プロファイル( $ D_i $)を事前に決めた固定長$D_t$にF.interpolateでリサイズ
- F.interpolate(profile.unsqueeze(1), size=D_t, mode='linear').squeeze(1)を用いて線型補完を行う
2. Adaptive Pooling
- nn.AdaptiveAvgPool1d(D_t)でどの長さの入力でも自動的に$D_t$長にマッピング

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# バッチサイズ2, チャネル16, 深さ50, 高さ200, 幅300 のダミー入力
x = torch.randn(2, 16, 50, 200, 300)

# ① 出力を (1, 10, 10) に揃えたい場合
pool = nn.AdaptiveAvgPool3d((1, 10, 10))
y = pool(x)
# → y.shape == (2, 16, 1, 10, 10)

# ② 深さはそのまま, 高さ・幅だけ (100,100) にしたい場合
pool_hw = nn.AdaptiveAvgPool3d((None, 100, 100))
y2 = pool_hw(x)
# → y2.shape == (2, 16, 50, 100, 100)

# ③ グローバル平均プーリングとして使う場合
global_pool = nn.AdaptiveAvgPool3d(1)  
# 1 は (1,1,1) と同義
y3 = global_pool(x)
# → y3.shape == (2, 16, 1, 1, 1)

print("y.shape:", y.shape)
print("y2.shape:", y2.shape)
print("y3.shape:", y3.shape)

y.shape: torch.Size([2, 16, 1, 10, 10])
y2.shape: torch.Size([2, 16, 50, 100, 100])
y3.shape: torch.Size([2, 16, 1, 1, 1])
