In [9]:
import sys, re, csv
from pathlib import Path
import pandas as pd
import numpy as np

In [5]:
COLUMNS = ["ID", "class ID", "Recording ID", "Ship Name",
           "Date & Time", "Duration(sec)", "Distances(m)"]
           
def clean_line(s):
    # 统一空白和连字符
    return (s.replace("\u00A0", " ")   # nbsp -> space
             .replace("\u2013", "-")  # en-dash –
             .replace("\u2014", "-")  # em-dash —
             .replace("\u2212", "-")  # minus sign −
             .strip())

def parse_file(path):
    rows, bad = [], 0
    with open(path, "r") as f:
        for line_num, raw in enumerate(f, 1):
            line = clean_line(raw)

            # 跳过空行
            if not line:
                continue

            # 使用逗号分割数据
            parts = [part.strip() for part in line.split(',')]
            
            # 提取各个字段
            id = parts[0].strip()
            class_id = parts[1].strip() 
            ship_name = parts[2].strip()
            date_part = parts[3].strip()
            time_part = parts[4].strip()
            duration = parts[5].strip()
            distances = parts[6].strip()
            
            # 合并日期和时间，格式化为 YYYYMMDD:HHMMSS
            date_time = f"{date_part}:{time_part}"
            
            # 这里假设 Recording ID 和 ID 相同，或者可以根据需要调整
            recording_id = id # 或者可以设置为其他值
            
            rows.append([id, class_id, recording_id, ship_name, date_time, duration, distances])
    
    return rows

In [8]:
in_path = Path(r"E:\数据集\DeepShip\data_preprocessing\annotation\tug-metafile")
out_path = in_path.with_suffix(".csv")
rows = parse_file(in_path)
with open(out_path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(COLUMNS)
        writer.writerows(rows)
print(f"Saved: {out_path})")

Saved: E:\数据集\DeepShip\data_preprocessing\annotation\tug-metafile.csv)


In [13]:
# 1. 合并四个标注文件
annotation_original_dir = Path(r"E:\数据集\DeepShip\data_preprocessing\annotation_original")
data_dir = annotation_original_dir
files = {
    "cargo": data_dir / "cargo-metafile.csv",
    "passenger": data_dir / "passengership-metafile.csv",
    "tanker": data_dir / "tanker-metafile.csv",
    "tug": data_dir / "tug-metafile.csv",
}

dfs = []
for label, path in files.items():
    df = pd.read_csv(path)
    dfs.append(df)

merged = pd.concat(dfs, ignore_index=True)
merged["ID"] = merged["ID"].astype(int)

# 2. 添加 class_id
class_id = merged["class ID"].astype(int)
merged["class_id"] = np.select(
    [
        class_id.between(70, 79),  # Cargo
        class_id.between(60, 69),  # Passengership
        class_id.between(80, 89),  # Tanker
        class_id == 52,            # Tug
    ],
    [0, 1, 2, 3],
    default=-1
)

# 3. 构建 folder_name 映射
data_root = Path(r"E:\数据集\DeepShip\data_preprocessing\data")
class_dirs = {0: "Cargo", 1: "Passengership", 2: "Tanker", 3: "Tug"}

def parse_id(folder_name: str):
    parts = folder_name.rsplit("-", 1)
    if len(parts) != 2 or not parts[1].isdigit():
        return None
    return int(parts[1])

folder_map = {}
for cid, dirname in class_dirs.items():
    cls_dir = data_root / dirname
    if not cls_dir.exists():
        continue
    for sub in cls_dir.iterdir():
        if not sub.is_dir():
            continue
        sid = parse_id(sub.name)
        if sid is not None:
            folder_map[(cid, sid)] = sub.name

merged["folder_name"] = merged.apply(
    lambda row: folder_map.get(
        (int(row["class_id"]), int(row["ID"])),
        pd.NA
    ),
    axis=1,
)

# 4. 写回文件
merged.to_csv(r"E:\数据集\DeepShip\data_preprocessing\annotation\DeepShip.csv", index=False)

In [8]:
data_root = Path(r"E:\数据集\DeepShip\data_preprocessing\data")
anno_root = Path(r"E:\数据集\DeepShip\data_preprocessing\annotation_original")

meta_files = {
    "Cargo": anno_root / "cargo-metafile.csv",
    "Passengership": anno_root / "passengership-metafile.csv",
    "Tanker": anno_root / "tanker-metafile.csv",
    "Tug": anno_root / "tug-metafile.csv",
}

def parse_id(folder_name: str) -> int | None:
    parts = folder_name.rsplit("-", 1)
    if len(parts) != 2 or not parts[1].isdigit():
        return None
    return int(parts[1])

summary = {}

for cls, meta_path in meta_files.items():
    ann = pd.read_csv(meta_path)
    expected_ids = set(ann["ID"].astype(int))

    cls_dir = data_root / cls
    if not cls_dir.exists():
        summary[cls] = {"error": f"目录不存在: {cls_dir}"}
        continue

    folder_ids: dict[int, Path] = {}
    invalid_names: list[str] = []
    duplicate_names: list[str] = []

    for subdir in cls_dir.iterdir():
        if not subdir.is_dir():
            continue
        sid = parse_id(subdir.name)
        if sid is None:
            invalid_names.append(subdir.name)
            continue
        # 如果同一个 ID 出现多个文件夹，记录重复名称
        if sid in folder_ids:
            duplicate_names.append(subdir.name)
        else:
            folder_ids[sid] = subdir

    observed_ids = set(folder_ids)
    missing_ids = sorted(expected_ids - observed_ids)
    extra_ids = sorted(observed_ids - expected_ids)

    missing_audio = []
    renamed_audio = []

    for sid, folder in folder_ids.items():
        expected_wav = folder / f"{sid}.wav"
        if expected_wav.exists():
            # 名称已经对上
            continue

        # 这里没有标准命名，尝试重命名
        wav_files = list(folder.glob("*.wav"))

        if len(wav_files) == 1:
            src = wav_files[0]
            src.rename(expected_wav)
            renamed_audio.append(f"{src.name} -> {expected_wav.name}")
        elif len(wav_files) > 1:
            missing_audio.append(
                f"{folder}（发现多个 wav：{', '.join(f.name for f in wav_files)}）"
            )
        else:
            missing_audio.append(f"{expected_wav}（文件夹没有 wav）")

    summary[cls] = {
        "annotated_count": len(expected_ids),
        "folder_count": len(observed_ids),
        "missing_ids": missing_ids,
        "extra_ids": extra_ids,
        "invalid_folder_names": invalid_names,
        "duplicate_folder_names": duplicate_names,
        "missing_wav_files": missing_audio,
        "renamed_wav_files": renamed_audio if renamed_audio else "OK",
    }

for cls, info in summary.items():
    print(cls)
    for key, value in info.items():
        if isinstance(value, list):
            print(f"  {key}: {value if value else 'OK'}")
        else:
            print(f"  {key}: {value}")
    print("-" * 40)

Cargo
  annotated_count: 110
  folder_count: 109
  missing_ids: [23]
  extra_ids: OK
  invalid_folder_names: OK
  duplicate_folder_names: OK
  missing_wav_files: OK
  renamed_wav_files: OK
----------------------------------------
Passengership
  annotated_count: 191
  folder_count: 191
  missing_ids: OK
  extra_ids: OK
  invalid_folder_names: OK
  duplicate_folder_names: OK
  missing_wav_files: OK
  renamed_wav_files: OK
----------------------------------------
Tanker
  annotated_count: 240
  folder_count: 240
  missing_ids: OK
  extra_ids: OK
  invalid_folder_names: OK
  duplicate_folder_names: OK
  missing_wav_files: OK
  renamed_wav_files: OK
----------------------------------------
Tug
  annotated_count: 69
  folder_count: 69
  missing_ids: OK
  extra_ids: OK
  invalid_folder_names: OK
  duplicate_folder_names: OK
  missing_wav_files: OK
  renamed_wav_files: OK
----------------------------------------


In [16]:
annotations = pd.read_csv(r"E:\数据集\DeepShip\data_preprocessing\annotation\DeepShip.csv")
data_root = Path(r"E:\数据集\DeepShip\data_preprocessing\data")

class_dirs = {
    0: "Cargo",
    1: "Passengership",
    2: "Tanker",
    3: "Tug",
}

for _, row in annotations.iterrows():
    cls_id = int(row["class_id"])
    sample_id = int(row["ID"])
    folder_name = row["folder_name"]

    if pd.isna(folder_name):
        print(f"ID:{sample_id}缺少folder_name，跳过ID:{sample_id}, class_id:{cls_id}")
        continue

    subdir = class_dirs.get(cls_id)
    if subdir is None:
        print(f"未知 class_id={cls_id}，跳过")
        continue

    src_dir = data_root / subdir / str(folder_name)
    if not src_dir.exists():
        print(f"目录不存在: {src_dir}")
        continue

    src_file = src_dir / f"{sample_id}.wav"
    dst_file = src_dir / f"{cls_id}_{sample_id}.wav"

    if dst_file.exists():
        print(f"目标名已存在，跳过: {dst_file}")
        continue

    src_file.rename(dst_file)

目标名已存在，跳过: E:\数据集\DeepShip\data_preprocessing\data\Cargo\20171104-1\0_1.wav
目标名已存在，跳过: E:\数据集\DeepShip\data_preprocessing\data\Cargo\20171104a-2\0_2.wav
目标名已存在，跳过: E:\数据集\DeepShip\data_preprocessing\data\Cargo\20171105a-3\0_3.wav
目标名已存在，跳过: E:\数据集\DeepShip\data_preprocessing\data\Cargo\20171106-4\0_4.wav
目标名已存在，跳过: E:\数据集\DeepShip\data_preprocessing\data\Cargo\20171107-5\0_5.wav
目标名已存在，跳过: E:\数据集\DeepShip\data_preprocessing\data\Cargo\20171107b-6\0_6.wav
目标名已存在，跳过: E:\数据集\DeepShip\data_preprocessing\data\Cargo\20171110-7\0_7.wav
目标名已存在，跳过: E:\数据集\DeepShip\data_preprocessing\data\Cargo\20171111-8\0_8.wav
目标名已存在，跳过: E:\数据集\DeepShip\data_preprocessing\data\Cargo\20171111e-9\0_9.wav
目标名已存在，跳过: E:\数据集\DeepShip\data_preprocessing\data\Cargo\20171111f-10\0_10.wav
目标名已存在，跳过: E:\数据集\DeepShip\data_preprocessing\data\Cargo\20171111g-11\0_11.wav
目标名已存在，跳过: E:\数据集\DeepShip\data_preprocessing\data\Cargo\20171112-12\0_12.wav
目标名已存在，跳过: E:\数据集\DeepShip\data_preprocessing\data\Cargo\20171113-13\0_13.wa