- VTT 모델에 맞는 환경을 구축하기 위하여 필요한 패키지 임포트

In [None]:
%cd ../src
import os  # 운영 체제와의 상호작용을 위한 모듈
import sys  # 시스템 특정 파라미터와 함수를 위한 모듈
import random  # 난수 생성기를 위한 모듈
from tqdm import tqdm # 진행률을 시각적으로 보여주는 모듈 
import warnings  # 경고 메시지를 관리하기 위한 모듈
warnings.filterwarnings("ignore")  # 경고 메시지를 무시하도록 설정

import numpy as np # 다차원 배열과 연산을 다루는 모듈
import pandas as pd  # 데이터 조작 및 분석을 위한 라이브러리
from sklearn.preprocessing import MinMaxScaler  # 데이터 전처리를 위한 스케일링 모듈

# 데이터 시각화를 위한 라이브러리
import matplotlib.pyplot as plt  
from matplotlib.colors import LinearSegmentedColormap
import matplotlib.colors as mcl
from matplotlib.ticker import MultipleLocator
import seaborn as sns
sns.set_style('darkgrid')
# 주피터 노트북에서 인라인으로 그래프를 표시
%matplotlib inline

from easydict import EasyDict  # 딕셔너리의 속성을 점 표기법으로 접근할 수 있도록 하는 모듈
from omegaconf import OmegaConf  # 설정 파일 관리를 위한 모듈

import torch  # 딥러닝을 위한 라이브러리
import h5py # 데이터 로드를 위한 모듈

from utils.utils import set_seed, version_build  # 유틸리티 함수들
from data_provider.waferdataset import get_dataloader  # 데이터 로더 함수
from model import build_model  # 모델 빌드 함수
from utils.utils import load_model # 모델 가중치 로더 함수


# torch 버전 및 디바이스를 확인
print("Python version:[%s]." % (sys.version))
print("PyTorch version:[%s]." % (torch.__version__))
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print("device:[%s]." % (device))  # GPU를 사용 가능한 경우 'cuda:0'가 출력되면 GPU를 사용

/home/iai3/Desktop/Bongjun/RTM/drystrip_4차/VTT_entire_wafer/src
Python version:[3.9.21 (main, Dec 11 2024, 16:24:11) 
[GCC 11.2.0]].
PyTorch version:[2.7.0+cu118].
device:[cuda:0].


모델에 필요한 argument들을 설정

In [None]:
# 설정 인자들을 정의
args = EasyDict({
    # for dataloader 
    'dataname': 'vtt_all_step',
    'subdataname': None,
    'window_size': 350,
    'slide_size': 1,

    # for training
    'train': True,
    'test': True,
    'resume': None,
    'model': 'VTTSAT',

    # for setting
    'seed': 72,
    'use_gpu':True,
    'gpu': 0,
    'use_multi_gpu': False,
    'devices': '0',
    'configure': './config.yaml',
    'batch_size': 16
})

# 구성 파일을 로드합니다.
with open(args.configure) as f:
    config = OmegaConf.load(f)

# 로드된 구성 내용을 출력합니다.
print(OmegaConf.to_yaml(config, resolve=True))

path: ./
log_dir: ./logs
workers: 8
vtt_all_step:
  train_dir: ../../data/all_step/train
  test_dir: ../../data/all_step/test
SMD:
  train_dir: ../data/ServerMachineDataset/train
  test_dir: ../data/ServerMachineDataset/test
  test_label_dir: ../data/ServerMachineDataset/test_label
  interpretation_label_dir: ../data/ServerMachineDataset/interpretation_label
scale: standard
loader_params:
  batch_size: 32
  shuffle: false
  num_workers: 8
  pin_memory: true
  use_val: false
VTTSAT:
  hidden_size: 128
  n_layer: 3
  n_head: 4
  resid_pdrop: 0.1
  attn_pdrop: 0.1
  time_emb: 4
  optim: adamw
  lr: 0.0001
  lradj: type1
  window_size: 350
  feature_num: 12
VTTPAT:
  hidden_size: 128
  n_layer: 3
  n_head: 4
  resid_pdrop: 0.1
  attn_pdrop: 0.1
  time_emb: 4
  optim: adamw
  lr: 0.0001
  lradj: type1
  window_size: 30
  feature_num: 12



In [None]:
# 로그 디렉토리와 저장 디렉토리를 설정
logdir = os.path.join(config.log_dir, f'{args.dataname}/{args.model}')
savedir = version_build(logdir=logdir, is_train=args.train, resume=args.resume)

# GPU 사용 설정을 구성
if args.use_gpu and args.use_multi_gpu:
    # GPU 디바이스 목록을 설정
    args.devices = args.devices.replace(' ', '')
    args.device_ids = list(map(int, args.devices.split(',')))
    args.gpu = args.device_ids[0]  # 첫 번째 GPU를 기본 GPU로 설정

# 랜덤 시드를 설정
set_seed(args.seed)

# 모델 파라미터를 설정
model_params = config[args.model]

### Inference 결과 불러오기

In [None]:
def load_inference_result(h5_path):
    with h5py.File(h5_path, 'r') as f:
        preds = f['pred'][:]
        actuals = f['actual'][:]
        lotids = f['lotid'][:].astype(str)
        wafer_numbers = f['wafer_number'][:].astype(str)
        step_nums = f['step_num'][:]
        masks = f['mask'][:]
        dist_per_feature = f['dist_per_window_per_features'][:] if 'dist_per_window_per_features' in f else None

    return preds, actuals, dist_per_feature, lotids, wafer_numbers, step_nums, masks

# main.py --test 로 실행 후, 추출되는 파일 필요 (inference_result_with_metadata.h5)
h5_path = './logs/vtt_all_step/VTTSAT/version5/inference_result_with_metadata.h5'
preds, actuals, dist_per_feature, lotids, wafer_numbers, step_nums, masks = load_inference_result(h5_path)

masks = masks.reshape(16808, 350)

feature_names = ['APC_Position', 'APC_Pressure', 'Gas1_Monitor', 'Gas6_Monitor', 'Mat_Irms', 'Mat_Phase','Mat_Vrms',
                      'Mat_VC1_Position', 'Mat_VC2_Position', 'SourcePwr_Read', 'Temp', 'Wall_Temp_Monitor']

### Inference 결과를 시각화

In [None]:
def visualize_overlay_all_wafers_subplots(
    preds, actuals, lotids, wafer_numbers, feature_names, masks,
    output_path='overlay_all_wafers_subplots.png', num_wafer_to_plot=1000, seed=42
):
    """
    12개 센서를 (3,4) subplot으로 시각화. 각 subplot에는 여러 웨이퍼가 겹쳐져 있음.
    (웨이퍼 하나당 시계열 전체를 하나의 샘플로 사용하는 경우에 맞게 수정됨)
    """
    lotids = np.array(lotids).astype(str)
    wafer_numbers = np.array(wafer_numbers).astype(str)
    unique_ids = np.char.add(lotids, np.char.add("-", wafer_numbers))
    unique_wafer_ids = np.unique(unique_ids)

    np.random.seed(seed)
    selected_indices = np.random.choice(len(unique_ids), size=min(num_wafer_to_plot, len(unique_ids)), replace=False)

    T = preds.shape[1]
    D = preds.shape[2]

    # 시각화
    n_rows, n_cols = 3, 4
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(4 * n_cols, 3 * n_rows), sharex=False)
    axes = axes.flatten()

    for d in range(D):
        ax = axes[d]

        for i, idx in enumerate(selected_indices):
            actual_seq = actuals[idx, :, d]
            pred_seq = preds[idx, :, d]
            
            mask_seq = masks[idx]
            time_indices = np.where(mask_seq ==1)[0]
            actual_seq = actual_seq[time_indices]
            pred_seq = pred_seq[time_indices]

            label_actual = 'Actual' if i == 0 else None
            label_pred = 'Predicted' if i == 0 else None

            ax.plot(actual_seq, color='blue', alpha=0.2, linewidth=0.8, label=label_actual, zorder=0)
            ax.plot(pred_seq, color='magenta', alpha=0.2, linewidth=0.8, label=label_pred, zorder=1)

        ax.set_title(feature_names[d], fontsize=10, fontweight='bold')
        ax.yaxis.set_major_locator(MultipleLocator(0.1))
        # ax.grid(True, linestyle='-', linewidth=1.0, color='gray', alpha=0.4)

        if d == 0:
            ax.legend(fontsize=8)

    for j in range(D, len(axes)):
        fig.delaxes(axes[j])

    fig.suptitle(f'Overlay of {len(selected_indices)} Wafers per Sensor', fontsize=16)
    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.savefig(output_path, bbox_inches='tight')
    plt.close(fig)
    print(f"✅ 저장 완료: {output_path}")


In [None]:
visualize_overlay_all_wafers_subplots(
    preds=preds,
    actuals=actuals,
    lotids=lotids,
    wafer_numbers=wafer_numbers,
    feature_names=feature_names,
    masks=masks,
    output_path='./logs/vtt_all_step/VTTSAT/version5/plots/overlay_all_wafers_subplots.png',
    num_wafer_to_plot=16808
)

### 웨이퍼 단위로 recipe step 구분하여 VTT score 계산

In [None]:
def compute_vtt_score_per_stepnum(preds, actuals, masks, lotids, wafer_numbers, step_nums, feature_names):
    """
    step_num별로 VTT score를 계산하고 하나의 DataFrame으로 정리 (전체 시계열 1개 per wafer)

    Returns:
        pd.DataFrame: columns = [step, unique_id, sensor1, ..., sensorN]
    """
    lotids = np.array(lotids).astype(str)
    wafer_numbers = np.array(wafer_numbers).astype(str)
    unique_ids = np.char.add(lotids, np.char.add("-", wafer_numbers))
    step_nums = np.array(step_nums)

    D = preds.shape[2]
    result_records = []

    for step in sorted(np.unique(step_nums)):
        # 🔹 해당 step만 추출
        step_mask = step_nums == step
        step_preds = preds[step_mask]
        step_actuals = actuals[step_mask]
        step_masks = masks[step_mask]
        step_ids = unique_ids[step_mask]

        for i, uid in enumerate(step_ids):
            pred = step_preds[i]        # (T, D)
            actual = step_actuals[i]    # (T, D)
            mask = step_masks[i]        # (T,)

            valid_idx = mask == 1
            if not np.any(valid_idx):
                continue

            mae_per_feature = -np.sum(np.abs(pred[valid_idx] - actual[valid_idx]), axis=0)  # (D,)
            record = {'step': int(step), 'unique_id': uid}
            record.update({feature_names[d]: mae_per_feature[d] for d in range(D)})
            result_records.append(record)

    return pd.DataFrame.from_records(result_records)


vtt_stepwise_scores_df = compute_vtt_score_per_stepnum(
    preds=preds,
    actuals=actuals,
    masks=masks,
    lotids=lotids,
    wafer_numbers=wafer_numbers,
    step_nums=step_nums,
    feature_names=feature_names
)

vtt_stepwise_scores_df.to_csv('./logs/vtt_all_step/VTTSAT/version5/vtt_scores_per_wafer.csv', index=False)

### recipe step 구분하여 VTT score 계산

In [None]:
def compute_vtt_score_all_wafer(preds, actuals, masks, lotids, wafer_numbers, feature_names):
    """
    VTT score(MAE, masking 적용)를 웨이퍼 단위로 계산
    step_num 구분 없이 전체에 대해 계산

    Returns:
        pd.DataFrame: columns = [unique_id, sensor1, ..., sensorN]
    """
    lotids = np.array(lotids).astype(str)
    wafer_numbers = np.array(wafer_numbers).astype(str)
    unique_ids = np.char.add(lotids, np.char.add("-", wafer_numbers))

    D = preds.shape[2]
    result_records = []

    for i, uid in enumerate(unique_ids):
        pred = preds[i]        # (T, D)
        actual = actuals[i]    # (T, D)
        mask = masks[i]        # (T,)

        valid_idx = mask == 1
        if not np.any(valid_idx):
            continue

        mae_per_feature = -np.sum(np.abs(pred[valid_idx] - actual[valid_idx]), axis=0)  # (D,)
        record = {'unique_id': uid}
        record.update({feature_names[d]: mae_per_feature[d] for d in range(D)})
        result_records.append(record)

    return pd.DataFrame.from_records(result_records)


vtt_all_scores_df = compute_vtt_score_all_wafer(
    preds=preds,
    actuals=actuals,
    masks=masks,
    lotids=lotids,
    wafer_numbers=wafer_numbers,
    feature_names=feature_names
)
vtt_all_scores_df.to_csv('./logs/vtt_all_step/VTTSAT/version5/vtt_scores_per_wafer_allstep.csv', index=False)

# 모델 code 실행

- 모델 체크포인트 로드

In [6]:
args.resume = 5
model = build_model(args, model_params, savedir)
weights, _, _, _ = load_model(resume=args.resume,
                              logdir=savedir)
model.model.load_state_dict(weights)

Use GPU: cuda:0
modelpath:  ./logs/vtt_all_step/VTTSAT/version5/1675.pth


<All keys matched successfully>

### vtt score highest/lowest 각각 10개 씩 웨이퍼 리스트 만들기

In [None]:
def get_top_bottom_wafers_per_sensor(score_df, top_k=10, draw_plot=True, save_dir=None):
    if save_dir:
        os.makedirs(save_dir, exist_ok=True)

    sensor_columns = [col for col in score_df.columns if col not in ['step', 'unique_id']]
    result = {}
    result_rows = []

    for sensor in sensor_columns:
        df_sorted = score_df[['unique_id', sensor]].sort_values(by=sensor)
        bottom_rows = df_sorted.head(top_k)
        top_rows = df_sorted.tail(top_k)

        bottom_ids = bottom_rows['unique_id'].tolist()
        top_ids = top_rows['unique_id'].tolist()

        result[sensor] = {
            'bottom': bottom_ids,
            'top': top_ids,
        }

        # 🔴 bottom k 추가
        for i in range(len(bottom_rows)):
            result_rows.append({
                'sensor': sensor,
                'rank_type': 'bottom',
                'unique_id': bottom_rows.iloc[i]['unique_id'],
                'score': bottom_rows.iloc[i][sensor]
            })

        # 🔵 top k 추가
        for i in range(len(top_rows)):
            result_rows.append({
                'sensor': sensor,
                'rank_type': 'top',
                'unique_id': top_rows.iloc[i]['unique_id'],
                'score': top_rows.iloc[i][sensor]
            })

        if draw_plot:
            plt.figure(figsize=(6, 2.5))

            others_mask = ~score_df['unique_id'].isin(top_ids + bottom_ids)
            sns.stripplot(x=score_df[others_mask][sensor], color='gray', size=2.5, jitter=0.25, label='others')
            sns.stripplot(x=bottom_rows[sensor], color='red', size=3, jitter=0.25, label='Lowest 10')
            sns.stripplot(x=top_rows[sensor], color='blue', size=3, jitter=0.25, label='Highest 10')


            plt.title(sensor)
            plt.xlabel("VTT Score")
            plt.yticks([])
            plt.legend(loc='upper right', fontsize=8)
            plt.tight_layout()

            if save_dir:
                plt.savefig(f"{save_dir}/{sensor}_score_dist.png", dpi=150)
            else:
                plt.show()

    if save_dir:
        df_result = pd.DataFrame(result_rows)
        df_result.to_csv(os.path.join(save_dir, 'highest_lowest_results.csv'), index=False)

    return result


top_bottom_ids_per_sensor = get_top_bottom_wafers_per_sensor(
    vtt_all_scores_df,
    top_k=10,
    draw_plot=True,
    save_dir="./logs/vtt_all_step/VTTSAT/version5/plots/score_dist"
)


### reference, highest, lowest 실제값 시계열 plot 그리기

In [None]:
P_FG_ch1 = pd.read_parquet("drystrip_dataset/bigger_train_df.parquet")
interested_sensors = ['APC_Position', 'APC_Pressure', 'Gas1_Monitor', 'Gas6_Monitor', 'Mat_Irms', 'Mat_Phase','Mat_Vrms',
                      'Mat_VC1_Position', 'Mat_VC2_Position', 'SourcePwr_Read', 'Temp', 'Wall_Temp_Monitor']

filterd_df = pd.concat([P_FG_ch1[['time', 'lotid', 'wafer_number', 'Recipe_Step_Num']],P_FG_ch1[interested_sensors]], axis=1)
filterd_df = filterd_df.sort_values(by=["lotid", "wafer_number", "time"]).reset_index(drop=True)

In [None]:
grouped = list(filterd_df.groupby(['lotid', 'wafer_number'])) 
normal_groups = [group for group in grouped]
train_df = pd.concat([group[1] for group in normal_groups])
train_df_test = train_df.drop(columns='time')
train_df_test
train_df.groupby(['lotid','wafer_number','Recipe_Step_Num'])

scaler = MinMaxScaler()
train_data_combined = train_df.iloc[:,4:]
train_scaled = scaler.fit_transform(train_data_combined)
train_scaled = pd.concat([train_df.iloc[:,:4], pd.DataFrame(train_scaled, columns=interested_sensors)], axis=1)
train_scaled = train_scaled.drop(columns=['time'])

In [None]:
def highest_lowest_bottom_actuals_with_reference(
    actuals, lotids, wafer_numbers, feature_names,
    highest_lowest_df, reference_df,
    mask=None,
    output_dir="top_bottom_overlay_plots",
    ref_max_count=100
):
    os.makedirs(output_dir, exist_ok=True)

    lotids = np.array(lotids).astype(str)
    wafer_numbers = np.array(wafer_numbers).astype(str)
    unique_ids = np.char.add(lotids, np.char.add("-", wafer_numbers))

    D = actuals.shape[2]
    highest_lowest_df = highest_lowest_df.copy()

    for sensor in highest_lowest_df['sensor'].unique():
        if sensor not in feature_names:
            continue

        sensor_idx = feature_names.index(sensor)

        fig, ax = plt.subplots(figsize=(10, 4))
        ax.set_title(f"{sensor} | Lowest vs Highest actuals", fontsize=13)

        # reference
        if reference_df is not None and sensor in reference_df.columns:
            ref_grouped = reference_df.groupby(['lotid', 'wafer_number'])
            ref_keys = random.sample(list(ref_grouped.groups.keys()), min(ref_max_count, len(ref_grouped)))
            for key in ref_keys:
                ref_series = ref_grouped.get_group(key).reset_index(drop=True)[sensor].values
                ax.plot(ref_series, color='#999999', alpha=1.0, linewidth=1.0,
                        label='Reference' if key == ref_keys[0] else None)

        # top 10
        top_ids = highest_lowest_df[(highest_lowest_df['sensor'] == sensor) & (highest_lowest_df['rank_type'] == 'top')]['unique_id']
        for i, uid in enumerate(top_ids):
            idxs = np.where(unique_ids == uid)[0]
            if len(idxs) == 0:
                continue
            idx = idxs[0]

            if mask is not None:
                valid = mask[idx] == 1
                actual_seq = actuals[idx, valid, sensor_idx]
            else:
                actual_seq = actuals[idx, :, sensor_idx]

            ax.plot(actual_seq, color='blue', alpha=0.6, linewidth=0.8,
                    label='Highest 10 Actual' if i == 0 else None)

        # bottom 10
        bottom_ids = highest_lowest_df[(highest_lowest_df['sensor'] == sensor) & (highest_lowest_df['rank_type'] == 'bottom')]['unique_id']
        for i, uid in enumerate(bottom_ids):
            idxs = np.where(unique_ids == uid)[0]
            if len(idxs) == 0:
                continue
            idx = idxs[0]

            if mask is not None:
                valid = mask[idx] == 1
                actual_seq = actuals[idx, valid, sensor_idx]
            else:
                actual_seq = actuals[idx, :, sensor_idx]

            ax.plot(actual_seq, color='red', alpha=0.6, linewidth=0.8,
                    label='Lowest 10 Actual' if i == 0 else None)

        ax.set_xlabel('Time')
        ax.set_ylabel('Sensor Value')
        ax.legend(fontsize=8)
        ax.grid(True, linestyle='--', alpha=0.5)
        plt.tight_layout()

        save_path = os.path.join(output_dir, f'{sensor}_highest_lowest_actual_overlay.png')
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.close(fig)

    return f"✅ 저장 완료: {output_dir}"



In [None]:
highest_lowest_df = pd.read_csv('./logs/vtt_all_step/VTTSAT/version5/plots/score_dist/highest_lowest_results.csv')

highest_lowest_bottom_actuals_with_reference(
    actuals=actuals,
    lotids=lotids,
    wafer_numbers=wafer_numbers,
    feature_names=feature_names,
    highest_lowest_df=highest_lowest_df,
    reference_df=train_scaled,
    mask=masks,
    output_dir="./logs/vtt_all_step/VTTSAT/version5/plots/highest_lowest_overlay",
    ref_max_count=1163
)

### 각 센서 별 vtt score 가장 높은 웨이퍼, 가장 낮은 웨이퍼에 대한 시계열 시각화
### custom하여 원하는 센서 같이 그릴 수 있음

In [None]:
def highest_lowest_bottom_actuals_with_reference_custom_single(
    actuals, lotids, wafer_numbers, feature_names,
    highest_lowest_df, reference_df,
    mask=None,
    output_dir="custom_overlay_plots_single",
    ref_max_count=100
):
    os.makedirs(output_dir, exist_ok=True)

    lotids = np.array(lotids).astype(str)
    wafer_numbers = np.array(wafer_numbers).astype(str)
    unique_ids = np.char.add(lotids, np.char.add("-", wafer_numbers))

    D = actuals.shape[2]
    highest_lowest_df = highest_lowest_df.copy()

    sensor_groups = {
        'APC_Position': ['APC_Position', 'APC_Pressure', 'Mat_Irms', 'Mat_VC1_Position'],
        'APC_Pressure': ['APC_Pressure'],
        'Gas1_Monitor': ['Gas1_Monitor', 'Wall_Temp_Monitor'],
        'Gas6_Monitor': ['Gas6_Monitor', 'Mat_Irms', 'Mat_VC2_Position'],
        'Mat_Irms': ['Mat_Irms', 'APC_Pressure', 'Mat_Vrms', 'Wall_Temp_Monitor'],
        'Mat_Phase': ['Mat_Phase', 'Mat_VC2_Position'],
        'Wall_Temp_Monitor': ['Wall_Temp_Monitor'],
        'Temp': ['Temp','Wall_Temp_Monitor', 'Mat_Irms'],
        'SourcePwr_Read': ['APC_Pressure', 'SourcePwr_Read',  'Temp'],
        'Mat_Vrms': ['APC_Pressure', 'Mat_Vrms',  'Mat_VC1_Position', 'Temp' ],
        'Mat_VC2_Position': ['Mat_VC2_Position', 'Mat_Vrms','Wall_Temp_Monitor'],
        'Mat_VC1_Position': ['Mat_VC1_Position', 'APC_Pressure', 'Mat_Phase', 'Mat_VC2_Position', 'Temp', 'Wall_Temp_Monitor'],
    }

    for key_sensor, related_sensors in sensor_groups.items():
        fig, axes = plt.subplots(len(related_sensors), 1, figsize=(10, 3 * len(related_sensors)), sharex=True)

        if len(related_sensors) == 1:
            axes = [axes]

        for ax, sensor in zip(axes, related_sensors):
            if sensor not in feature_names:
                continue

            sensor_idx = feature_names.index(sensor)

            # reference
            if reference_df is not None and sensor in reference_df.columns:
                ref_grouped = reference_df.groupby(['lotid', 'wafer_number'])
                ref_keys = random.sample(list(ref_grouped.groups.keys()), min(ref_max_count, len(ref_grouped)))
                for key in ref_keys:
                    ref_series = ref_grouped.get_group(key).reset_index(drop=True)[sensor].values
                    ax.plot(ref_series, color='#999999', alpha=0.5, linewidth=1.0,
                            label='Reference' if key == ref_keys[0] else None)

            # Highest (1개만)
            top_ids = highest_lowest_df[
                (highest_lowest_df['sensor'] == key_sensor) &
                (highest_lowest_df['rank_type'] == 'top')
            ]['unique_id'].values[:1]

            for uid in top_ids:
                idxs = np.where(unique_ids == uid)[0]
                if len(idxs) == 0:
                    continue
                idx = idxs[0]
                if mask is not None:
                    valid = mask[idx] == 1
                    actual_seq = actuals[idx, valid, sensor_idx]
                else:
                    actual_seq = actuals[idx, :, sensor_idx]

                ax.plot(actual_seq, color='blue', alpha=0.6, linewidth=1.2,
                        label='Highest')

            # Lowest (1개만)
            bottom_ids = highest_lowest_df[
                (highest_lowest_df['sensor'] == key_sensor) &
                (highest_lowest_df['rank_type'] == 'bottom')
            ]['unique_id'].values[:1]

            for uid in bottom_ids:
                idxs = np.where(unique_ids == uid)[0]
                if len(idxs) == 0:
                    continue
                idx = idxs[0]
                if mask is not None:
                    valid = mask[idx] == 1
                    actual_seq = actuals[idx, valid, sensor_idx]
                else:
                    actual_seq = actuals[idx, :, sensor_idx]

                ax.plot(actual_seq, color='red', alpha=0.6, linewidth=1.2,
                        label='Lowest')

            ax.set_ylabel(sensor)
            ax.legend(fontsize=7)
            ax.grid(True, linestyle='--', alpha=0.5)

        axes[-1].set_xlabel('Time')
        fig.suptitle(f"{key_sensor} | Highest vs Lowest", fontsize=13)
        plt.tight_layout(rect=[0, 0.03, 1, 0.95])
        save_path = os.path.join(output_dir, f'{key_sensor}_context_overlay_single.png')
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.close(fig)

    return f"✅ 저장 완료: {output_dir}"


In [None]:
highest_lowest_bottom_actuals_with_reference_custom_single(
    actuals=actuals,
    lotids=lotids,
    wafer_numbers=wafer_numbers,
    feature_names=feature_names,
    highest_lowest_df=highest_lowest_df,
    reference_df=train_scaled,
    mask=masks,
    output_dir="./logs/vtt_all_step/VTTSAT/version5/plots/highest_lowest_context_overlay_single",
    ref_max_count=1163
)

# Attention difference map 그리기

In [None]:
def get_input_by_uid(preds, actuals, lotids, wafer_numbers, masks, target_uid):
    """
    주어진 UID에 해당하는 window들을 반환 (입력 데이터와 mask 모두)
    
    Args:
        preds, actuals: (N, T, D)
        masks: (N, T)
        lotids, wafer_numbers: 리스트 또는 배열
        target_uid: 'lotid-wafer_number' 형식의 문자열

    Returns:
        input_windows: torch.Tensor of shape (B, T, D)
        input_masks: torch.Tensor of shape (B, T)
    """
    unique_ids = np.char.add(lotids, np.char.add("-", wafer_numbers))
    indices = np.where(unique_ids == target_uid)[0]
    if len(indices) == 0:
        raise ValueError(f"UID '{target_uid}'에 해당하는 window 없음")

    input_windows = actuals[indices]  # 예측 대상은 actual (input)
    input_masks = masks[indices]

    return torch.tensor(input_windows).float(), torch.tensor(input_masks).float()

In [None]:
def calc_diff_attns(prior, post, mask=None):
    """
    prior, post: shape (L, T, H, D, D)
    mask: shape (T,) or (1, T), where 1 indicates valid time step

    Returns:
        diff: shape (D, D) – 평균 diff (마스킹 적용됨)
    """
    diff = np.abs(post - prior)  # (L, T, H, D, D)

    if mask is not None:
        if mask.ndim == 2:
            mask = mask[0]  # shape: (T,)

        # broadcast mask to (L, T, H, D, D)
        mask_broadcast = mask[None, :, None, None, None]  # (1, T, 1, 1, 1)
        diff = np.where(mask_broadcast, diff, np.nan)     # 마스킹된 부분은 NaN

        diff = np.nanmean(diff, axis=(0, 1, 2))            # NaN 제외 평균
    else:
        diff = np.mean(diff, axis=(0, 1, 2))               # 전체 평균
    return diff  # shape: (D, D)


### 각 센서 별 vtt score 가장 높은 웨이퍼, 가장 낮은 웨이퍼들의 Attention difference map 비교를 위한 시각화

In [None]:
def plot_attn_diff_top_bottom_per_sensor(
    preds, actuals, lotids, wafer_numbers, masks,
    highest_lowest_df, model, feature_names,
    get_input_by_uid, calc_diff_attns,
    save_root='attn_diff_top_bottom', device='cuda'
):
    os.makedirs(save_root, exist_ok=True)
    model.model.eval()

    lotids = np.array(lotids).astype(str)
    wafer_numbers = np.array(wafer_numbers).astype(str)
    unique_ids = np.char.add(lotids, np.char.add("-", wafer_numbers))

    for sensor in highest_lowest_df['sensor'].unique():
        if sensor not in feature_names:
            continue

        sensor_df = highest_lowest_df[highest_lowest_df['sensor'] == sensor]
        top_uid = sensor_df[sensor_df['rank_type'] == 'top']['unique_id'].values[0]
        bottom_uid = sensor_df[sensor_df['rank_type'] == 'bottom']['unique_id'].values[0]
        sensor_idx = feature_names.index(sensor)

        fig, axes = plt.subplots(1, 2, figsize=(12, 5))
        uids = {'Lowest': bottom_uid, 'Highest': top_uid}
        diffs = {}

        # 1. 먼저 두 diff 계산
        for title, uid in uids.items():
            try:
                input_seq, input_mask = get_input_by_uid(
                    preds, actuals, lotids, wafer_numbers, masks, uid
                )
                input_seq = input_seq.to(device)
                mask_tensor = input_mask.cpu().numpy()

                with torch.no_grad():
                    prior_pred, [prior_vattn, _] = model.model(input_seq, use_attn=True)
                    post_pred, [post_vattn, _] = model.model(prior_pred, use_attn=True)

                prior_vattn = prior_vattn.cpu().detach().numpy()
                post_vattn = post_vattn.cpu().detach().numpy()
                diff = calc_diff_attns(prior_vattn, post_vattn, mask_tensor)
                diffs[title] = diff

            except Exception as e:
                print(f"[에러] {sensor} | {title} | {uid} 처리 실패: {e}")
                diffs[title] = None

        # 2. min, max 계산
        valid_diffs = [d for d in diffs.values() if d is not None]
        if not valid_diffs:
            continue

        global_min = min(d.min() for d in valid_diffs)
        global_max = max(d.max() for d in valid_diffs)

        # 3. colormap 설정
        h = 24
        colors = [
            mcl.hsv_to_rgb((h / 360, 0, 1)),
            mcl.hsv_to_rgb((h / 360, 0.5, 1)),
            mcl.hsv_to_rgb((h / 360, 1, 1))
        ]
        cmap = LinearSegmentedColormap.from_list('custom_cmap', colors, gamma=3)

        # 4. 히트맵 시각화
        for i, (title, diff) in enumerate(diffs.items()):
            if diff is None:
                continue

            hm  = sns.heatmap(
                diff,
                cmap=cmap,
                annot=True,
                fmt=".4f",
                annot_kws={"size": 4},
                xticklabels=feature_names,
                yticklabels=feature_names,
                cbar=True,
                ax=axes[i],
                vmin=global_min,
                vmax=global_max  # ✅ 고정 범위!
            )

            axes[i].set_title(f"{title}", fontsize=9)
            axes[i].tick_params(axis='x', labelrotation=90, labelsize=6)
            axes[i].tick_params(axis='y', labelsize=6)
            colorbar = hm.collections[0].colorbar
            colorbar.ax.tick_params(labelsize=5)  # ← 숫자 크기 8pt로 설정

        plt.suptitle(f"{sensor} | Attention Difference (Lowest vs Highest)", fontsize=11)
        plt.subplots_adjust(wspace=0.3, hspace=0.3)
        save_path = os.path.join(save_root, f"{sensor}_attn_diff_top_bottom.png")
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.close(fig)

    return f"✅ 저장 완료: {save_root}"


In [None]:
plot_attn_diff_top_bottom_per_sensor(
    preds=preds,
    actuals=actuals,
    lotids=lotids,
    wafer_numbers=wafer_numbers,
    masks=masks,
    highest_lowest_df=highest_lowest_df,
    model=model,
    feature_names=feature_names,
    get_input_by_uid=get_input_by_uid,
    calc_diff_attns=calc_diff_attns,
    save_root='./logs/vtt_all_step/VTTSAT/version5/plots/attn_diff_highest_lowest',
    device='cuda'
)
