In [1]:
# made by Gibeom LEE, HI LAB
# 1M to 100k.ipynb 와 달리, folder_path안에 있는 모든 csv파일을 전부 축소 시켜줘서, 다른 폴더에 저장해주는 코드입니다.

In [2]:
import os
import glob
import numpy as np
import pandas as pd
from scipy.signal import find_peaks
from loky import ProcessPoolExecutor
from concurrent.futures import as_completed


In [3]:

# ================= 사용자 설정 =================
folder_path = r"C:\Users\user\Desktop\Drive파일\HI Lab\0. Projects\0. On going\2. Aloe inspired DEG\0. 실험자료\4. Data measurement\251019_데모 데이터\conventional 2차"

height        = 0.1          # 피크 감지 임계값 (노이즈 레벨 고려)
distance      = 150          # 피크 간 최소 거리(샘플)
num_samples   = 100000    # 최종 행 수 (정확히 맞춤)
time_mode     = "similar"    # "similar-대충 plot용" | "exact-정확한 DFT용"
MAX_WORKERS   = max(1, (os.cpu_count() or 4) - 1)  # 병렬 프로세스 수
SEED          = 1234         # 재현성용 시드
# =================================================

In [4]:
# 출력 폴더
parent_dir = os.path.dirname(folder_path)
original_folder_name = os.path.basename(folder_path)
out_dir = os.path.join(parent_dir, f"minimized_{original_folder_name}")
os.makedirs(out_dir, exist_ok=True)

# ---------- 공통 유틸 ---------
def read_csv_loose(path: str) -> pd.DataFrame:
    try:
        df = pd.read_csv(
            path, header=None, skiprows=2, engine="c",
            sep=",", on_bad_lines="error", encoding="utf-8-sig"
        )
    except Exception:
        df = pd.read_csv(
            path, header=None, skiprows=2, engine="python",
            sep=r"[,\t;]+", on_bad_lines="skip", encoding="utf-8-sig"
        )
    df = df.apply(pd.to_numeric, errors="coerce").dropna(how="all")
    if df.shape[1] < 2:
        raise ValueError("유효한 채널(열)이 부족합니다. (최소 2열 필요: Time + 1채널)")
    df.columns = ["Time"] + [f"Ch{i}" for i in range(1, df.shape[1])]
    return df

def union_peaks(df: pd.DataFrame, height: float, distance: int) -> np.ndarray:
    """
    모든 채널(Ch1..ChN)의 ±피크 인덱스 합집합.
    """
    idx_list = []
    for ch in df.columns[1:]:
        y = df[ch].to_numpy(dtype=float, copy=False)
        p_pos, _ = find_peaks(y, height=height, distance=distance)
        p_neg, _ = find_peaks(-y, height=height, distance=distance)
        if p_pos.size: idx_list.append(p_pos)
        if p_neg.size: idx_list.append(p_neg)
    if not idx_list:
        return np.array([], dtype=int)
    # 중복 제거 + 정렬 1회
    return np.unique(np.concatenate(idx_list).astype(int, copy=False))

def select_indices(total_len: int, peaks: np.ndarray, num_samples: int, rng: np.random.Generator) -> np.ndarray:
    """
    피크 우선 + 균등 샘플로 인덱스 선택, 중복 제거, 최종 행수 강제 보정.
    """
    if peaks.size >= num_samples:
        return np.sort(rng.choice(peaks, num_samples, replace=False))
    need = num_samples - peaks.size
    all_idx = np.arange(total_len, dtype=int)
    non_peaks = np.setdiff1d(all_idx, peaks, assume_unique=True)
    if need > 0 and non_peaks.size > 0:
        # 균등 간격 인덱스 선택(빠르고 예측 가능)
        add = non_peaks[np.linspace(0, non_peaks.size - 1, need, dtype=int)]
        sel = np.unique(np.concatenate((peaks, add)))
    else:
        sel = np.unique(peaks)
    # 강제 보정
    if sel.size > num_samples:
        sel = sel[:num_samples]
    elif sel.size < num_samples and sel.size > 0:
        pad = rng.choice(sel, num_samples - sel.size, replace=True)
        sel = np.sort(np.concatenate((sel, pad)))
    return sel

# ---------- 모드 구현 ----------
def mode_similar(df: pd.DataFrame, selected_indices: np.ndarray) -> pd.DataFrame:
    """
    균일 시간 격자(0초 시작)로 재배열.
    - 선택된 지점은 최근접 그리드에 실제 샘플값(피크 포함)으로 스냅 → 피크 크기/모양 보존
    - 비어 있는 구간은 선형보간으로 채움
    """
    t0 = float(df["Time"].iloc[0])
    t1 = float(df["Time"].iloc[-1])
    t_orig = df["Time"].to_numpy(dtype=float, copy=False) - t0

    sel = np.asarray(selected_indices, dtype=int)
    N = sel.size
    # 균일 격자: 0 ~ round(T), endpoint=False
    T = max(0.0, t1 - t0)
    T_round = round(T)
    if N <= 1:
        t_new = np.zeros(N, dtype=float)
    else:
        t_new = np.linspace(0.0, T_round, N, endpoint=False)

    out = pd.DataFrame({"Time": t_new})
    # 선형보간으로 채워두기
    for ch in df.columns[1:]:
        y = df[ch].to_numpy(dtype=float, copy=False)
        out[ch] = np.interp(t_new, t_orig, y)

    if N == 0:
        return out

    # 선택 샘플 → 최근접 버킷
    t_sel = t_orig[sel]
    j = np.searchsorted(t_new, t_sel, side="left")
    j = np.clip(j, 0, N - 1)

    # 좌/우 버킷 비교로 진짜 최근접 찾기
    # (벡터화: 왼/오 인덱스 계산 후, 더 가까운 쪽 선택)
    left = np.maximum(j - 1, 0)
    right = np.minimum(j + 1, N - 1)
    # 거리 계산
    dist_j = np.abs(t_sel - t_new[j])
    dist_l = np.abs(t_sel - t_new[left])
    dist_r = np.abs(t_sel - t_new[right])
    # 초기 nearest=j, 더 가까우면 교체
    nearest = j.copy()
    mask_l = dist_l < np.abs(t_sel - t_new[nearest])
    nearest = np.where(mask_l, left, nearest)
    mask_r = dist_r < np.abs(t_sel - t_new[nearest])
    nearest = np.where(mask_r, right, nearest)

    # 충돌 시 절댓값 큰 값 유지
    for ch in df.columns[1:]:
        y = df[ch].to_numpy(dtype=float, copy=False)
        y_sel = y[sel]
        # 같은 버킷에 여러 값이 몰릴 수 있으므로, 버킷별 최대 절댓값 선택
        # 현재 out[ch]와 비교하여 더 큰 절댓값이면 덮어쓰기
        current = out[ch].to_numpy()
        # 한 번에 적용: 같은 버킷에 여러 항목이면 마지막 것이 남으므로,
        # 버킷 단위로 최대치 인덱스를 구해 적용
        # 간단/빠른 방법: 반복(채널 수가 적으므로 충분히 빠름)
        for k in range(N):
            b = nearest[k]
            if abs(y_sel[k]) >= abs(current[b]):
                current[b] = y_sel[k]
        out[ch] = current

    return out

def mode_exact(df: pd.DataFrame, selected_indices: np.ndarray) -> pd.DataFrame:
    """
    원래 시간/값 그대로(비균일 간격). 피크 시점/값 절대 변경 없음.
    """
    return df.iloc[selected_indices].copy()

def process_one(file_path: str) -> str:
    """
    한 파일 처리(개별 프로세스에서 실행). 로그 문자열 반환.
    """
    rng = np.random.default_rng(SEED)  # 각 프로세스 동일 시드 → 결과 재현 가능(원하면 os.getpid()로 섞어도 됨)
    try:
        df = read_csv_loose(file_path)
        peaks = union_peaks(df, height=height, distance=distance)
        num_peaks = peaks.size

        sel = select_indices(len(df), peaks, num_samples, rng)

        if time_mode == "similar":
            out_df = mode_similar(df, sel)
        elif time_mode == "exact":
            out_df = mode_exact(df, sel)
        else:
            return f"[ERR] {os.path.basename(file_path)}: time_mode='{time_mode}'가 올바르지 않습니다."

        save_path = os.path.join(out_dir, os.path.basename(file_path))
        out_df.to_csv(save_path, index=False, header=False)

        return (f"{os.path.basename(file_path)} → 총 피크(합집합): {num_peaks:,d} | "
                f"{time_mode} / {len(out_df):,d}행 → {save_path}")
    except Exception as e:
        return f"[ERR] {os.path.basename(file_path)}: {e}"

def main():
    files = glob.glob(os.path.join(folder_path, "*.csv"))
    if not files:
        print("CSV 파일이 없습니다.")
        return

    print(f"총 {len(files)}개 파일 병렬 처리 시작 "
          f"(mode={time_mode}, workers={MAX_WORKERS} - LOKY 사용)...\n")

    results = []
    
    # ProcessPoolExecutor를 loky 것으로 사용합니다.
    # mp_context 인수를 제거하고 기존과 같이 사용합니다.
    with ProcessPoolExecutor(max_workers=MAX_WORKERS) as ex:
        futs = {ex.submit(process_one, p): p for p in files}
        for fut in as_completed(futs):
            p = futs[fut]
            try:
                print(fut.result())
            except Exception as e:
                print(f"[ERR] {os.path.basename(p)}: {e}")

    # 완료 로그
    for line in results:
        print(line)

if __name__ == "__main__":
    main()


총 6개 파일 병렬 처리 시작 (mode=similar, workers=27 - LOKY 사용)...

ConvDEG_3-6.csv → 총 피크(합집합): 27,567 | similar / 100,000행 → C:\Users\user\Desktop\Drive파일\HI Lab\0. Projects\0. On going\2. Aloe inspired DEG\0. 실험자료\4. Data measurement\251019_데모 데이터\minimized_conventional 2차\ConvDEG_3-6.csv
ConvDEG_3-3.csv → 총 피크(합집합): 27,136 | similar / 100,000행 → C:\Users\user\Desktop\Drive파일\HI Lab\0. Projects\0. On going\2. Aloe inspired DEG\0. 실험자료\4. Data measurement\251019_데모 데이터\minimized_conventional 2차\ConvDEG_3-3.csv
ConvDEG_3-5.csv → 총 피크(합집합): 27,553 | similar / 100,000행 → C:\Users\user\Desktop\Drive파일\HI Lab\0. Projects\0. On going\2. Aloe inspired DEG\0. 실험자료\4. Data measurement\251019_데모 데이터\minimized_conventional 2차\ConvDEG_3-5.csv
ConvDEG_3-2.csv → 총 피크(합집합): 26,966 | similar / 100,000행 → C:\Users\user\Desktop\Drive파일\HI Lab\0. Projects\0. On going\2. Aloe inspired DEG\0. 실험자료\4. Data measurement\251019_데모 데이터\minimized_conventional 2차\ConvDEG_3-2.csv
ConvDEG_3-1.csv → 총 피크(합집합): 25,598 | simi