In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from pathlib import Path
from tqdm import tqdm
import random


In [None]:
label_paths = 'D:/globit/Images/Infrared/globit_nas_07_flatfish_size_label/yolo/'
for label_path in Path(label_paths).glob('*.txt'):
    print(label_path)


In [None]:
all_data = []
for label_path in Path(label_paths).glob('*.txt'):
    with open(label_path, 'r') as f:
        lines = f.readlines()
        for line in lines:
            numbers = list(map(float, line.strip().split()))
            all_data.append(numbers)

df = pd.DataFrame(all_data)
df


In [None]:
column_names = [
    "class", "x_center", "y_center", "width", "height",
    "front_key_x", "front_key_y", "front_key_vis",
    "tail_key_x", "tail_key_y", "tail_key_vis",
    "right_key_x", "right_key_y", "right_key_vis",
    "left_key_x", "left_key_y", "left_key_vis"
]
df.columns = column_names
df


In [None]:
df.info()


In [None]:
df.describe()


In [None]:
fig, axs = plt.subplots(5, 3, figsize=(18, 24))
fig.subplots_adjust(hspace=0.4, wspace=0.3)

cols_to_plot = [
    "class", "x_center", "y_center", "width", "height",
    "front_key_x", "front_key_y", "tail_key_x", "tail_key_y",
    "right_key_x", "right_key_y", "left_key_x", "left_key_y"
]

for idx, col in enumerate(cols_to_plot):
    ax = axs[idx // 3, idx % 3]
    df[col].hist(ax=ax, bins=30)
    ax.set_title(f'{col} Histogram')
    ax.set_xlabel(col)
    ax.set_ylabel('Frequency')

for j in range(len(cols_to_plot), 15):
    fig.delaxes(axs.flatten()[j])

plt.show()


In [None]:
fig, axes = plt.subplots(4, 2, figsize=(20, 24))
fig.subplots_adjust(hspace=0.3, wspace=0.3)

keypoints = ['front', 'tail', 'right', 'left']
colors = ['red', 'blue', 'green', 'purple']
x_scale, y_scale = 3840, 2160

for row, key in enumerate(keypoints):
    for col, class_label in enumerate([0, 1]):
        ax = axes[row, col]
        subset = df[df["class"] == class_label]
        ax.scatter(subset[f'{key}_key_x'] * x_scale,
                   subset[f'{key}_key_y'] * y_scale,
                   color=colors[row], alpha=0.7)
        ax.set_title(f'Class {class_label} - {key.capitalize()} Keypoints')
        ax.set_xlabel('X Coordinate')
        ax.set_ylabel('Y Coordinate')
        ax.grid(True)
        ax.set_xlim(0, x_scale)
        ax.set_ylim(0, y_scale)

plt.show()


In [None]:
df_errors_idx = []
threshold = 1e-3

for i, row in df.iterrows():
    vec_forward = np.array([
        row["tail_key_x"] - row["front_key_x"],
        row["tail_key_y"] - row["front_key_y"]
    ])
    vec_rl = np.array([
        row["left_key_x"] - row["right_key_x"],
        row["left_key_y"] - row["right_key_y"]
    ])

    norm_f = np.linalg.norm(vec_forward)
    norm_rl = np.linalg.norm(vec_rl)

    if norm_f == 0 or norm_rl == 0:
        continue

    vec_forward_normalized = vec_forward / norm_f
    vec_rl_normalized = vec_rl / norm_rl

    cross_z = np.cross(vec_forward_normalized, vec_rl_normalized)

    if cross_z >= -threshold:
        df_errors_idx.append(i)

df_errors = df.loc[df_errors_idx].reset_index(drop=True)
df_errors


In [None]:
keypoints = ['front', 'tail', 'right', 'left']
colors = {'front': 'red', 'tail': 'blue', 'right': 'green', 'left': 'purple'}
x_scale, y_scale = 3840, 2160

num_cols = 4
num_rows = 3
num_samples = num_cols * num_rows

if len(df_errors) <= num_samples:
    selected_samples = df_errors
else:
    selected_samples = df_errors.sample(num_samples).reset_index(drop=True)

fig, axes = plt.subplots(num_rows, num_cols, figsize=(num_cols * 5, num_rows * 5))
axes = axes.flatten()

for idx, row in enumerate(selected_samples.itertuples()):
    ax = axes[idx]
    for key in keypoints:
        ax.scatter(
            getattr(row, f"{key}_key_x") * x_scale,
            getattr(row, f"{key}_key_y") * y_scale,
            color=colors[key],
            label=key if idx == 0 else "",
            s=120
        )

    ax.plot(
        [row.front_key_x * x_scale, row.tail_key_x * x_scale],
        [row.front_key_y * y_scale, row.tail_key_y * y_scale],
        'k--', alpha=0.6
    )
    ax.plot(
        [row.right_key_x * x_scale, row.left_key_x * x_scale],
        [row.right_key_y * y_scale, row.left_key_y * y_scale],
        'k--', alpha=0.6
    )

    ax.set_title(f"Sample {row.Index}")
    ax.set_xlim(0, x_scale)
    ax.set_ylim(0, y_scale)
    ax.grid(True)
    ax.set_xticks([])
    ax.set_yticks([])

handles, labels = axes[0].get_legend_handles_labels()
by_label = dict(zip(labels, handles))
fig.legend(by_label.values(), by_label.keys(), loc='upper right')

for idx in range(len(selected_samples), len(axes)):
    fig.delaxes(axes[idx])

plt.tight_layout()
plt.show()


In [None]:
all_frame = {}

for label_path in Path(label_paths).glob('*.txt'):
    with open(label_path, 'r') as f:
        lines = f.readlines()

    data = []
    for line in lines:
        numbers = list(map(float, line.strip().split()))
        data.append(numbers)

    if not data:
        print(f"⚠️ 파일이 비어 있음: {label_path.name}")
        continue

    df = pd.DataFrame(data)
    if df.shape[1] != len(column_names):
        print(f"⚠️ 열 수 불일치: {label_path.name} (columns: {df.shape[1]})")
        continue

    df.columns = column_names
    all_frame[label_path.stem] = df

all_frame


In [None]:
correction_rows = []

for frame_name, df in all_frame.items():
    for idx, row in df.iterrows():
        vec_forward = np.array([
            row["tail_key_x"] - row["front_key_x"],
            row["tail_key_y"] - row["front_key_y"]
        ])
        vec_rl = np.array([
            row["left_key_x"] - row["right_key_x"],
            row["left_key_y"] - row["right_key_y"]
        ])

        norm_f = np.linalg.norm(vec_forward)
        norm_rl = np.linalg.norm(vec_rl)

        if norm_f == 0 or norm_rl == 0:
            continue

        vec_forward_normalized = vec_forward / norm_f
        vec_rl_normalized = vec_rl / norm_rl

        cross_z = np.cross(vec_forward_normalized, vec_rl_normalized)

        if cross_z >= -threshold:
            row_data = row.to_dict()
            row_data["file_name"] = frame_name
            correction_rows.append(row_data)

df_corrections = pd.DataFrame(correction_rows).reset_index(drop=True)
df_corrections


In [None]:
num_cols = 4
num_rows = 3
num_samples = num_cols * num_rows

if len(df_corrections) <= num_samples:
    selected_samples = df_corrections
else:
    selected_samples = df_corrections.sample(num_samples).reset_index(drop=True)

fig, axes = plt.subplots(num_rows, num_cols, figsize=(num_cols * 5, num_rows * 5))
axes = axes.flatten()

for idx, row in enumerate(selected_samples.itertuples()):
    ax = axes[idx]
    for key in keypoints:
        ax.scatter(
            getattr(row, f"{key}_key_x") * x_scale,
            getattr(row, f"{key}_key_y") * y_scale,
            color=colors[key],
            label=key if idx == 0 else "",
            s=120
        )

    ax.plot(
        [row.front_key_x * x_scale, row.tail_key_x * x_scale],
        [row.front_key_y * y_scale, row.tail_key_y * y_scale],
        'k--', alpha=0.6
    )
    ax.plot(
        [row.right_key_x * x_scale, row.left_key_x * x_scale],
        [row.right_key_y * y_scale, row.left_key_y * y_scale],
        'k--', alpha=0.6
    )

    ax.set_title(f"{row.file_name}")
    ax.set_xlim(0, x_scale)
    ax.set_ylim(0, y_scale)
    ax.grid(True)
    ax.set_xticks([])
    ax.set_yticks([])

# 범례 중복 제거
handles, labels = axes[0].get_legend_handles_labels()
by_label = dict(zip(labels, handles))
fig.legend(by_label.values(), by_label.keys(), loc='upper right')

# 사용되지 않는 플롯 제거
for idx in range(len(selected_samples), len(axes)):
    fig.delaxes(axes[idx])

plt.tight_layout()
plt.show()


In [None]:
correction_rows = []

for frame_name, df in all_frame.items():
    for idx, row in df.iterrows():
        vec_forward = np.array([
            row["tail_key_x"] - row["front_key_x"],
            row["tail_key_y"] - row["front_key_y"]
        ])
        vec_rl = np.array([
            row["left_key_x"] - row["right_key_x"],
            row["left_key_y"] - row["right_key_y"]
        ])

        norm_f = np.linalg.norm(vec_forward)
        norm_rl = np.linalg.norm(vec_rl)

        if norm_f == 0 or norm_rl == 0:
            continue

        vec_forward_normalized = vec_forward / norm_f
        vec_rl_normalized = vec_rl / norm_rl

        cross_z = np.cross(vec_forward_normalized, vec_rl_normalized)

        if cross_z >= -threshold:
            row_data = row.to_dict()
            row_data["file_name"] = frame_name
            row_data["original_index"] = idx
            correction_rows.append(row_data)

original_df = pd.DataFrame(correction_rows).reset_index(drop=True)
original_df


In [None]:
corrected_rows = []
for row_data in correction_rows:
    corrected_data = row_data.copy()
    corrected_data["right_key_x"], corrected_data["left_key_x"] = corrected_data["left_key_x"], corrected_data["right_key_x"]
    corrected_data["right_key_y"], corrected_data["left_key_y"] = corrected_data["left_key_y"], corrected_data["right_key_y"]
    corrected_rows.append(corrected_data)

corrected_df = pd.DataFrame(corrected_rows).reset_index(drop=True)

unique_files = original_df['file_name'].unique()
unique_files


In [None]:
for file_name in unique_files:
    file_original = original_df[original_df['file_name'] == file_name]

    # 최대 3개까지 랜덤 추출
    if len(file_original) > 3:
        file_original = file_original.sample(3, random_state=42).reset_index(drop=True)
    else:
        file_original = file_original.reset_index(drop=True)

    # original_index를 기준으로 corrected_df에서 동일한 샘플 추출
    indices = file_original["original_index"].tolist()
    file_corrected = corrected_df[(corrected_df['file_name'] == file_name) &
                                  (corrected_df['original_index'].isin(indices))].reset_index(drop=True)

    num_samples = len(file_original)
    fig, axes = plt.subplots(2, num_samples, figsize=(num_samples * 5, 10))

    if num_samples == 1:
        axes = np.array([[axes[0]], [axes[1]]])

    for idx, row in enumerate(file_original.itertuples()):
        ax = axes[0, idx]
        for key in keypoints:
            ax.scatter(
                getattr(row, f"{key}_key_x") * x_scale,
                getattr(row, f"{key}_key_y") * y_scale,
                color=colors[key],
                label=key if idx == 0 else "",
                s=120
            )
        ax.plot(
            [row.front_key_x * x_scale, row.tail_key_x * x_scale],
            [row.front_key_y * y_scale, row.tail_key_y * y_scale],
            'k--', alpha=0.6
        )
        ax.plot(
            [row.right_key_x * x_scale, row.left_key_x * x_scale],
            [row.right_key_y * y_scale, row.left_key_y * y_scale],
            'k--', alpha=0.6
        )
        ax.set_title(f"Original Line {row.original_index + 1}")
        ax.set_xlim(0, x_scale)
        ax.set_ylim(0, y_scale)
        ax.grid(True)
        ax.set_xticks([])
        ax.set_yticks([])

    for idx, row in enumerate(file_corrected.itertuples()):
        ax = axes[1, idx]
        for key in keypoints:
            ax.scatter(
                getattr(row, f"{key}_key_x") * x_scale,
                getattr(row, f"{key}_key_y") * y_scale,
                color=colors[key],
                s=120
            )
        ax.plot(
            [row.front_key_x * x_scale, row.tail_key_x * x_scale],
            [row.front_key_y * y_scale, row.tail_key_y * y_scale],
            'k--', alpha=0.6
        )
        ax.plot(
            [row.right_key_x * x_scale, row.left_key_x * x_scale],
            [row.right_key_y * y_scale, row.left_key_y * y_scale],
            'k--', alpha=0.6
        )
        ax.set_title(f"Corrected Line {row.original_index + 1}")
        ax.set_xlim(0, x_scale)
        ax.set_ylim(0, y_scale)
        ax.grid(True)
        ax.set_xticks([])
        ax.set_yticks([])

    fig.suptitle(f"File: {file_name}", fontsize=16)

    handles, labels = axes[0, 0].get_legend_handles_labels()
    by_label = dict(zip(labels, handles))
    fig.legend(by_label.values(), by_label.keys(), loc='upper right')

    plt.tight_layout(rect=[0, 0, 1, 0.95])
    plt.show()


In [None]:
from pathlib import Path

save_dir = Path("corrected_labels")
save_dir.mkdir(exist_ok=True)

for frame_name, df in all_frame.items():
    df_corrected = df.copy()
    file_modified = False

    for idx, row in df_corrected.iterrows():
        vec_forward = np.array([
            row["tail_key_x"] - row["front_key_x"],
            row["tail_key_y"] - row["front_key_y"]
        ])
        vec_rl = np.array([
            row["left_key_x"] - row["right_key_x"],
            row["left_key_y"] - row["right_key_y"]
        ])

        norm_f = np.linalg.norm(vec_forward)
        norm_rl = np.linalg.norm(vec_rl)

        if norm_f == 0 or norm_rl == 0:
            continue

        vec_forward_normalized = vec_forward / norm_f
        vec_rl_normalized = vec_rl / norm_rl

        cross_z = np.cross(vec_forward_normalized, vec_rl_normalized)

        if cross_z >= -threshold:
            # 좌우 키포인트 스왑
            df_corrected.at[idx, "right_key_x"], df_corrected.at[idx, "left_key_x"] = row["left_key_x"], row["right_key_x"]
            df_corrected.at[idx, "right_key_y"], df_corrected.at[idx, "left_key_y"] = row["left_key_y"], row["right_key_y"]
            file_modified = True

    if file_modified:
        # YOLO 형식으로 텍스트 저장
        save_path = save_dir / f"{frame_name}.txt"
        with open(save_path, "w") as f:
            for _, row in df_corrected.iterrows():
                yolo_line = " ".join(map(str, row.values.tolist()))
                f.write(yolo_line + "\n")


In [None]:
import pandas as pd
import numpy as np
from pathlib import Path


## # 상수 설정


In [None]:
LABEL_PATHS = Path('D:/생육거점 데이터/infrared_of/images/roboflow/asd.v20i.yolov8/test/labels/')
COLUMN_NAMES = [
    "class", "x_center", "y_center", "width", "height",
    "front_key_x", "front_key_y", "front_key_vis",
    "tail_key_x", "tail_key_y", "tail_key_vis",
    "right_key_x", "right_key_y", "right_key_vis",
    "left_key_x", "left_key_y", "left_key_vis"
]
THRESHOLD = 1e-3


## # 1. 데이터 불러오기 함수


In [None]:


def load_all_labels(label_dir: Path):
    """
    지정된 디렉터리 내의 모든 텍스트 파일을 읽어들여,
    파일의 stem을 key, DataFrame을 value로 갖는 딕셔너리를 반환합니다.
    """
    all_frames = {}
    for label_file in label_dir.glob('*.txt'):
        with open(label_file, 'r') as f:
            lines = f.readlines()
            data = [list(map(float, line.strip().split())) for line in lines if line.strip()]
        df = pd.DataFrame(data, columns=COLUMN_NAMES)
        all_frames[label_file.stem] = df
    return all_frames


In [None]:
def combine_all_data(label_dir: Path):
    """
    모든 텍스트 파일의 데이터를 하나의 DataFrame으로 결합하여 반환합니다.
    """
    all_data = []
    for label_file in label_dir.glob('*.txt'):
        with open(label_file, 'r') as f:
            lines = f.readlines()
            for line in lines:
                if line.strip():
                    numbers = list(map(float, line.strip().split()))
                    all_data.append(numbers)
    df = pd.DataFrame(all_data, columns=COLUMN_NAMES)
    return df


## # 2. 오류 검출 및 수정 함수


In [None]:
def compute_cross_z(row):
    """
    전방 벡터 (tail_key - front_key)와 좌우 벡터 (left_key - right_key)를 정규화한 후,
    두 벡터의 외적(z 성분)을 계산합니다.
    """
    vec_forward = np.array([row["tail_key_x"] - row["front_key_x"],
                            row["tail_key_y"] - row["front_key_y"]])
    vec_rl = np.array([row["left_key_x"] - row["right_key_x"],
                       row["left_key_y"] - row["right_key_y"]])
    norm_f = np.linalg.norm(vec_forward)
    norm_rl = np.linalg.norm(vec_rl)
    if norm_f == 0 or norm_rl == 0:
        return None
    vec_forward_norm = vec_forward / norm_f
    vec_rl_norm = vec_rl / norm_rl
    cross_z = np.cross(vec_forward_norm, vec_rl_norm)
    return cross_z


In [None]:
def get_error_indices(df, threshold=THRESHOLD):
    """
    DataFrame의 각 행에 대해 외적 결과가 임계값 조건을 만족하면 해당 인덱스를 반환합니다.
    """
    error_indices = []
    for idx, row in df.iterrows():
        cross_z = compute_cross_z(row)
        if cross_z is None:
            continue
        if cross_z > -threshold:
            error_indices.append(idx)
    return error_indices


In [None]:
def correct_dataframe(df, threshold=THRESHOLD):
    """
    각 행의 외적 결과가 임계값 이상일 경우 좌우 키포인트를 스왑하여 수정한 DataFrame을 반환합니다.
    수정이 발생하면 file_modified 플래그가 True로 설정됩니다.
    """
    df_corrected = df.copy()
    file_modified = False
    for idx, row in df_corrected.iterrows():
        cross_z = compute_cross_z(row)
        if cross_z is None:
            continue
        if cross_z >= -threshold:
            # 좌우 키포인트 스왑
            df_corrected.at[idx, "right_key_x"], df_corrected.at[idx, "left_key_x"] = row["left_key_x"], row["right_key_x"]
            df_corrected.at[idx, "right_key_y"], df_corrected.at[idx, "left_key_y"] = row["left_key_y"], row["right_key_y"]
            file_modified = True
    return df_corrected, file_modified


## # 3. 수정된 라벨 파일 저장 함수


In [None]:
def save_corrected_labels(all_frames, save_dir_path="corrected_labels", threshold=THRESHOLD):
    """
    각 파일의 DataFrame을 수정하여 저장합니다.
    수정이 발생한 경우에만 파일이 저장되며, YOLO 형식의 텍스트 파일로 기록합니다.
    """
    save_dir = Path(save_dir_path)
    save_dir.mkdir(exist_ok=True)

    for file_name, df in all_frames.items():
        df_corrected, file_modified = correct_dataframe(df, threshold)
        if file_modified:
            save_path = save_dir / f"{file_name}.txt"
            with open(save_path, "w") as f:
                for _, row in df_corrected.iterrows():
                    yolo_line = " ".join(map(str, row.values.tolist()))
                    f.write(yolo_line + "\n")
            print(f"수정된 파일 저장됨: {save_path}")


## # 4. 메인 실행부


### # (1) 데이터 결합 및 기본 통계 출력


In [None]:
df_combined = combine_all_data(LABEL_PATHS)
print("결합된 데이터 미리보기:")
print(df_combined.head())
print("\nDataFrame Info:")
print(df_combined.info())
print("\nDataFrame 기술 통계:")
print(df_combined.describe())


### # (2) 오류 데이터 판별


In [None]:
error_indices = get_error_indices(df_combined, THRESHOLD)
df_errors = df_combined.loc[error_indices].reset_index(drop=True)
print("\n오류 데이터:")
print(df_errors)


### # (3) 전체 프레임 데이터 불러오기 및 수정된 라벨 파일 저장


In [None]:
all_frames = load_all_labels(LABEL_PATHS)
save_corrected_labels(all_frames, save_dir_path="corrected_labels", threshold=THRESHOLD)
