# 베이스 코드

In [1]:
import os
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# 입력 폴더 및 CSV 파일 리스트
ess_name = 'preprocessing_A/00391262(0002)'

ess_list = glob.glob(os.path.join(ess_name, '*.csv'))

# 결과 저장 폴더 생성 (A_numpy/00321804(0002))
output_folder = os.path.join('A_numpy', '00391262(0002)')
os.makedirs(output_folder, exist_ok=True)

for ess_path in ess_list:
    # CSV 파일 읽기
    df = pd.read_csv(ess_path)
    
    # colec_dt 컬럼 처리: 없으면 clct_dt 컬럼을 colec_dt로 이름 변경
    if "colec_dt" not in df.columns:
        if "clct_dt" in df.columns:
            df.rename(columns={'clct_dt': 'colec_dt'}, inplace=True)
        else:
            continue  # 두 컬럼 모두 없으면 해당 파일은 건너뜀

    # cell_volt 컬럼 처리:
    # 만약 "cell_volt_1", "cell_volt_2", ... 컬럼이 없으면,
    # "cel_volt_01", "cel_volt_02", ... 컬럼들을 찾아 "cell_volt_1", "cell_volt_2", ... 로 이름 변경
    if not any(col.startswith("cell_volt_") for col in df.columns):
        rename_dict = {}
        for col in df.columns:
            if col.startswith("cel_volt_"):
                num_str = col.replace("cel_volt_", "")
                try:
                    num = int(num_str)
                    rename_dict[col] = f"cell_volt_{num}"
                except ValueError:
                    pass
        if rename_dict:
            df.rename(columns=rename_dict, inplace=True)
    
    # colec_dt 컬럼을 datetime 형식으로 변환
    df["colec_dt"] = pd.to_datetime(df["colec_dt"])

    # 분 단위의 새로운 타임라인 생성 후 merge
    start_time = df["colec_dt"].min()
    end_time = df["colec_dt"].max()
    new_time_range = pd.date_range(start=start_time, end=end_time, freq="min")
    new_df = pd.DataFrame({"colec_dt": new_time_range})
    merged_df = pd.merge(new_df, df, on="colec_dt", how="left")
    
    # 불필요한 "Unnamed: 0" 컬럼 제거 (존재하면)
    if "Unnamed: 0" in merged_df.columns:
        merged_df = merged_df.drop("Unnamed: 0", axis=1)
    
    # cell_volt 관련 컬럼 처리
    cel_volt_cols = [col for col in merged_df.columns if col.startswith("cell_volt_")]
    merged_df.loc[merged_df["cell_volt_1"] < 3.3, cel_volt_cols] = float('nan')
    
    # 날짜별 NaN 개수 확인 후, NaN이 일정 수치(nan_threshold) 이상인 날짜 제거
    nan_threshold = 1400
    merged_df["date"] = merged_df["colec_dt"].dt.date
    nan_counts = merged_df.groupby("date")["cell_volt_1"].apply(lambda x: x.isna().sum())
    dates_to_drop = nan_counts[nan_counts > nan_threshold].index
    merged_nan_remove_df = merged_df[~merged_df["date"].isin(dates_to_drop)].reset_index(drop=True)
    
    # 선형 보간 (interpolation)
    merged_nan_remove_df[cel_volt_cols] = merged_nan_remove_df[cel_volt_cols].interpolate(method="linear")
    
    # 전압 기준 유효 데이터 필터링
    voltage_threshold = 3.6
    required_valid_count = 400 * len(cel_volt_cols)
    valid_counts = merged_nan_remove_df.groupby("date")[cel_volt_cols].apply(lambda x: (x >= voltage_threshold).sum().sum())
    dates_to_keep = valid_counts[valid_counts >= required_valid_count].index
    merged_valid_volt_df = merged_nan_remove_df[merged_nan_remove_df["date"].isin(dates_to_keep)].reset_index(drop=True)
    
    # 첫 번째 날짜와 마지막 날짜 제외
    first_date = merged_valid_volt_df["date"].iloc[0]
    final_df = merged_valid_volt_df[merged_valid_volt_df["date"] != first_date].reset_index(drop=True)
    last_date = final_df["date"].iloc[-1]
    final_df = final_df[final_df["date"] != last_date].reset_index(drop=True)
    
    # cel_volt_cols = [col for col in final_df.columns if col.startswith("cell_volt_")]
    # unique_dates = final_df["date"].unique()
    
    # X_array = np.empty([len(unique_dates), 1440, len(cel_volt_cols)], dtype=object)
    
    # for i, date in enumerate(unique_dates):
    #     daily_data = final_df[final_df["date"] == date][cel_volt_cols]
    #     X_array[i] = daily_data.to_numpy()
    
    # base_filename = os.path.splitext(os.path.basename(ess_path))[0]
    # np.save(os.path.join(output_folder, f"{base_filename}.npy"), X_array)
    cel_volt_cols = [col for col in final_df.columns if col.startswith("cell_volt_")]
    unique_dates = final_df["date"].unique()

    # 채널별로 2D 배열을 만들어 저장
    for volt in cel_volt_cols:
        # (날짜수, 1440) 2D 배열
        X_array = np.empty([len(unique_dates), 1440], dtype=object)
        for i, date in enumerate(unique_dates):
            # Series 형태: 길이 1440
            daily_series = final_df[final_df["date"] == date][volt]
            X_array[i] = daily_series.to_numpy()
        # 파일명에 채널명 포함해서 저장
        base_filename = os.path.splitext(os.path.basename(ess_path))[0]
        np.save(
            os.path.join(output_folder, f"{base_filename}_{volt}.npy"),
            X_array
        )


# 00454547, 00370960

In [5]:
import os
import glob
import numpy as np
import pandas as pd

def process_bank_data_2d(df, bank_label, base_filename, output_folder):
    """
    bank_no별 데이터를 받아서 베이스 코드 전처리를 적용한 후,
    채널별 2D (날짜수×1440) 배열로 저장합니다.
    """
    df_copy = df.copy()
    # 1. 시간 컬럼 처리
    if "colec_dt" not in df_copy.columns:
        if "clct_dt" in df_copy.columns:
            df_copy.rename(columns={'clct_dt': 'colec_dt'}, inplace=True)
        else:
            print(f"{base_filename} - bank {bank_label}: 시간 컬럼 없음, 스킵")
            return
    # 2. 전압 컬럼 이름 정규화
    if not any(col.startswith("cell_volt_") for col in df_copy.columns):
        rename_dict = {}
        for col in df_copy.columns:
            if col.startswith("cel_volt_"):
                num = col.replace("cel_volt_", "")
                if num.isdigit():
                    rename_dict[col] = f"cell_volt_{int(num)}"
        if rename_dict:
            df_copy.rename(columns=rename_dict, inplace=True)
    # 3. datetime 변환 및 재인덱싱
    df_copy["colec_dt"] = pd.to_datetime(df_copy["colec_dt"])
    start, end = df_copy["colec_dt"].min(), df_copy["colec_dt"].max()
    full_idx = pd.DataFrame({"colec_dt": pd.date_range(start, end, freq='min')})
    merged = pd.merge(full_idx, df_copy, on="colec_dt", how="left")
    merged = merged.groupby("colec_dt").first().reset_index()
    # 4. 불필요 컬럼 제거
    if "Unnamed: 0" in merged.columns:
        merged.drop(columns=["Unnamed: 0"], inplace=True)
    # 5. 전압 NaN 처리 및 필터링
    volt_cols = [c for c in merged.columns if c.startswith("cell_volt_")]
    merged.loc[merged[volt_cols[0]] < 3.3, volt_cols] = np.nan
    merged["date"] = merged["colec_dt"].dt.date
    nan_thr = 1400
    bad = merged.groupby("date")[volt_cols[0]].apply(lambda x: x.isna().sum())
    ok_days = bad[bad <= nan_thr].index
    merged = merged[merged["date"].isin(ok_days)].reset_index(drop=True)
    # 6. 보간 및 유효일 필터링
    merged[volt_cols] = merged[volt_cols].interpolate(method="linear")
    vt_thr, req_count = 3.6, 400*len(volt_cols)
    vc = merged.groupby("date")[volt_cols].apply(lambda df: (df>=vt_thr).sum().sum())
    keep = vc[vc>=req_count].index
    merged = merged[merged["date"].isin(keep)].reset_index(drop=True)
    # 7. 가장 앞/뒤 하루 제거
    dates = merged["date"].unique()
    if len(dates) <= 2:
        print(f"{base_filename} - bank {bank_label}: 유효일 부족")
        return
    merged = merged[~merged["date"].isin([dates[0], dates[-1]])].reset_index(drop=True)
    unique_dates = merged["date"].unique()
    # 8. 채널별 2D 배열 생성 및 저장
    for volt in volt_cols:
        X = np.empty([len(unique_dates), 1440], dtype=float)
        valid_idx = []
        for i, d in enumerate(unique_dates):
            day = merged[merged["date"]==d]
            series = day[volt]
            if series.shape[0] != 1440:
                print(f"{base_filename}, bank{bank_label}, {d}, {volt}: {series.shape[0]} rows")
                continue
            valid_idx.append(i)
            X[i] = series.to_numpy()
        if not valid_idx:
            print(f"{base_filename} - bank {bank_label}: {volt} 유효 데이터 없음")
            continue
        X = X[valid_idx]
        out_path = os.path.join(output_folder, f"{base_filename}_{volt}.npy")
        np.save(out_path, X)
        print(f"Saved: {out_path} shape={X.shape}")

# 사용 예시
input_folder = 'preprocessing_A/00454547(0002)'
ess_files = glob.glob(os.path.join(input_folder, '*.csv'))
base = os.path.basename(input_folder)
out1 = f"A_numpy/{base}_1"
out2 = f"A_numpy/{base}_2"
for p in [out1, out2]: os.makedirs(p, exist_ok=True)
for path in ess_files:
    df = pd.read_csv(path)
    name = os.path.splitext(os.path.basename(path))[0]
    df1 = df[df['bsc_fg_no']==1]
    df2 = df[df['bsc_fg_no']==2]
    if not df1.empty: process_bank_data_2d(df1, 1, name, out1)
    if not df2.empty: process_bank_data_2d(df2, 2, name, out2)


Saved: A_numpy/00454547(0002)_1/rack4_module1_cell_volt_1.npy shape=(36, 1440)
Saved: A_numpy/00454547(0002)_1/rack4_module1_cell_volt_2.npy shape=(36, 1440)
Saved: A_numpy/00454547(0002)_1/rack4_module1_cell_volt_3.npy shape=(36, 1440)
Saved: A_numpy/00454547(0002)_1/rack4_module1_cell_volt_4.npy shape=(36, 1440)
Saved: A_numpy/00454547(0002)_1/rack4_module1_cell_volt_5.npy shape=(36, 1440)
Saved: A_numpy/00454547(0002)_1/rack4_module1_cell_volt_6.npy shape=(36, 1440)
Saved: A_numpy/00454547(0002)_1/rack4_module1_cell_volt_7.npy shape=(36, 1440)
Saved: A_numpy/00454547(0002)_1/rack4_module1_cell_volt_8.npy shape=(36, 1440)
Saved: A_numpy/00454547(0002)_1/rack4_module1_cell_volt_9.npy shape=(36, 1440)
Saved: A_numpy/00454547(0002)_1/rack4_module1_cell_volt_10.npy shape=(36, 1440)
Saved: A_numpy/00454547(0002)_1/rack4_module1_cell_volt_11.npy shape=(36, 1440)
Saved: A_numpy/00454547(0002)_1/rack4_module1_cell_volt_12.npy shape=(36, 1440)
Saved: A_numpy/00454547(0002)_1/rack4_module1_cel

In [6]:
import os
import glob
import numpy as np
import pandas as pd

def process_bank_data_2d(df, bank_label, base_filename, output_folder):
    """
    bank_no별 데이터를 받아서 베이스 코드 전처리를 적용한 후,
    채널별 2D (날짜수×1440) 배열로 저장합니다.
    """
    df_copy = df.copy()
    # 1. 시간 컬럼 처리
    if "colec_dt" not in df_copy.columns:
        if "clct_dt" in df_copy.columns:
            df_copy.rename(columns={'clct_dt': 'colec_dt'}, inplace=True)
        else:
            print(f"{base_filename} - bank {bank_label}: 시간 컬럼 없음, 스킵")
            return
    # 2. 전압 컬럼 이름 정규화
    if not any(col.startswith("cell_volt_") for col in df_copy.columns):
        rename_dict = {}
        for col in df_copy.columns:
            if col.startswith("cel_volt_"):
                num = col.replace("cel_volt_", "")
                if num.isdigit():
                    rename_dict[col] = f"cell_volt_{int(num)}"
        if rename_dict:
            df_copy.rename(columns=rename_dict, inplace=True)
    # 3. datetime 변환 및 재인덱싱
    df_copy["colec_dt"] = pd.to_datetime(df_copy["colec_dt"])
    start, end = df_copy["colec_dt"].min(), df_copy["colec_dt"].max()
    full_idx = pd.DataFrame({"colec_dt": pd.date_range(start, end, freq='min')})
    merged = pd.merge(full_idx, df_copy, on="colec_dt", how="left")
    merged = merged.groupby("colec_dt").first().reset_index()
    # 4. 불필요 컬럼 제거
    if "Unnamed: 0" in merged.columns:
        merged.drop(columns=["Unnamed: 0"], inplace=True)
    # 5. 전압 NaN 처리 및 필터링
    volt_cols = [c for c in merged.columns if c.startswith("cell_volt_")]
    merged.loc[merged[volt_cols[0]] < 3.3, volt_cols] = np.nan
    merged["date"] = merged["colec_dt"].dt.date
    nan_thr = 1400
    bad = merged.groupby("date")[volt_cols[0]].apply(lambda x: x.isna().sum())
    ok_days = bad[bad <= nan_thr].index
    merged = merged[merged["date"].isin(ok_days)].reset_index(drop=True)
    # 6. 보간 및 유효일 필터링
    merged[volt_cols] = merged[volt_cols].interpolate(method="linear")
    vt_thr, req_count = 3.6, 400*len(volt_cols)
    vc = merged.groupby("date")[volt_cols].apply(lambda df: (df>=vt_thr).sum().sum())
    keep = vc[vc>=req_count].index
    merged = merged[merged["date"].isin(keep)].reset_index(drop=True)
    # 7. 가장 앞/뒤 하루 제거
    dates = merged["date"].unique()
    if len(dates) <= 2:
        print(f"{base_filename} - bank {bank_label}: 유효일 부족")
        return
    merged = merged[~merged["date"].isin([dates[0], dates[-1]])].reset_index(drop=True)
    unique_dates = merged["date"].unique()
    # 8. 채널별 2D 배열 생성 및 저장
    for volt in volt_cols:
        X = np.empty([len(unique_dates), 1440], dtype=float)
        valid_idx = []
        for i, d in enumerate(unique_dates):
            day = merged[merged["date"]==d]
            series = day[volt]
            if series.shape[0] != 1440:
                print(f"{base_filename}, bank{bank_label}, {d}, {volt}: {series.shape[0]} rows")
                continue
            valid_idx.append(i)
            X[i] = series.to_numpy()
        if not valid_idx:
            print(f"{base_filename} - bank {bank_label}: {volt} 유효 데이터 없음")
            continue
        X = X[valid_idx]
        out_path = os.path.join(output_folder, f"{base_filename}_{volt}.npy")
        np.save(out_path, X)
        print(f"Saved: {out_path} shape={X.shape}")

# 사용 예시
input_folder = 'preprocessing_A/00370960(0002)'
ess_files = glob.glob(os.path.join(input_folder, '*.csv'))
base = os.path.basename(input_folder)
out1 = f"A_numpy/{base}_1"
out2 = f"A_numpy/{base}_2"
for p in [out1, out2]: os.makedirs(p, exist_ok=True)
for path in ess_files:
    df = pd.read_csv(path)
    name = os.path.splitext(os.path.basename(path))[0]
    df1 = df[df['bsc_fg_no']==1]
    df2 = df[df['bsc_fg_no']==2]
    if not df1.empty: process_bank_data_2d(df1, 1, name, out1)
    if not df2.empty: process_bank_data_2d(df2, 2, name, out2)


Saved: A_numpy/00370960(0002)_1/rack4_module1_cell_volt_1.npy shape=(34, 1440)
Saved: A_numpy/00370960(0002)_1/rack4_module1_cell_volt_2.npy shape=(34, 1440)
Saved: A_numpy/00370960(0002)_1/rack4_module1_cell_volt_3.npy shape=(34, 1440)
Saved: A_numpy/00370960(0002)_1/rack4_module1_cell_volt_4.npy shape=(34, 1440)
Saved: A_numpy/00370960(0002)_1/rack4_module1_cell_volt_5.npy shape=(34, 1440)
Saved: A_numpy/00370960(0002)_1/rack4_module1_cell_volt_6.npy shape=(34, 1440)
Saved: A_numpy/00370960(0002)_1/rack4_module1_cell_volt_7.npy shape=(34, 1440)
Saved: A_numpy/00370960(0002)_1/rack4_module1_cell_volt_8.npy shape=(34, 1440)
Saved: A_numpy/00370960(0002)_1/rack4_module1_cell_volt_9.npy shape=(34, 1440)
Saved: A_numpy/00370960(0002)_1/rack4_module1_cell_volt_10.npy shape=(34, 1440)
Saved: A_numpy/00370960(0002)_1/rack4_module1_cell_volt_11.npy shape=(34, 1440)
Saved: A_numpy/00370960(0002)_1/rack4_module1_cell_volt_12.npy shape=(34, 1440)
Saved: A_numpy/00370960(0002)_1/rack4_module1_cel

# 2.A.R7M17C6

In [7]:
import os
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# 입력 폴더 및 CSV 파일 리스트
ess_name = 'preprocessing_A/2.A.R7M17C6'
ess_list = glob.glob(os.path.join(ess_name, '*.csv'))

# 결과 저장 폴더 생성 (A_numpy/2.A.R7M17C6)
output_folder = os.path.join('A_numpy', '2.A.R7M17C6')
os.makedirs(output_folder, exist_ok=True)

for ess_path in ess_list:
    # CSV 파일 읽기
    df = pd.read_csv(ess_path)
    
    # colec_dt 컬럼 처리: 없으면 clct_dt 컬럼을 colec_dt로 이름 변경
    if "colec_dt" not in df.columns:
        if "clct_dt" in df.columns:
            df.rename(columns={'clct_dt': 'colec_dt'}, inplace=True)
        else:
            continue  # 두 컬럼 모두 없으면 해당 파일은 건너뜀

    # cell_volt 컬럼 처리:
    # 만약 "cell_volt_1", "cell_volt_2", ... 컬럼이 없으면,
    # "cel_volt_01", "cel_volt_02", ... 컬럼들을 찾아 "cell_volt_1", "cell_volt_2", ... 로 이름 변경
    if not any(col.startswith("cell_volt_") for col in df.columns):
        rename_dict = {}
        for col in df.columns:
            if col.startswith("cel_volt_"):
                num_str = col.replace("cel_volt_", "")
                try:
                    num = int(num_str)
                    rename_dict[col] = f"cell_volt_{num}"
                except ValueError:
                    pass
        if rename_dict:
            df.rename(columns=rename_dict, inplace=True)
    
    # colec_dt 컬럼을 datetime 형식으로 변환
    df["colec_dt"] = pd.to_datetime(df["colec_dt"])
    
    # 중복된 시간대 제거: 같은 시간대에 대해 가장 먼저 나온 값만 사용하고 나머지 중복 데이터는 제거
    df = df.drop_duplicates(subset="colec_dt", keep="first")
    
    # 분 단위의 새로운 타임라인 생성 후 merge
    start_time = df["colec_dt"].min()
    end_time = df["colec_dt"].max()
    new_time_range = pd.date_range(start=start_time, end=end_time, freq="min")
    new_df = pd.DataFrame({"colec_dt": new_time_range})
    merged_df = pd.merge(new_df, df, on="colec_dt", how="left")
    
    # 불필요한 "Unnamed: 0" 컬럼 제거 (존재하면)
    if "Unnamed: 0" in merged_df.columns:
        merged_df = merged_df.drop("Unnamed: 0", axis=1)
    
    # cell_volt 관련 컬럼 처리
    cel_volt_cols = [col for col in merged_df.columns if col.startswith("cell_volt_")]
    merged_df.loc[merged_df["cell_volt_1"] < 3.3, cel_volt_cols] = float('nan')
    
    # 날짜별 NaN 개수 확인 후, NaN이 일정 수치(nan_threshold) 이상인 날짜 제거
    nan_threshold = 1400
    merged_df["date"] = merged_df["colec_dt"].dt.date
    nan_counts = merged_df.groupby("date")["cell_volt_1"].apply(lambda x: x.isna().sum())
    dates_to_drop = nan_counts[nan_counts > nan_threshold].index
    merged_nan_remove_df = merged_df[~merged_df["date"].isin(dates_to_drop)].reset_index(drop=True)
    
    # 선형 보간 (interpolation)
    merged_nan_remove_df[cel_volt_cols] = merged_nan_remove_df[cel_volt_cols].interpolate(method="linear")
    
    # 전압 기준 유효 데이터 필터링
    voltage_threshold = 3.6
    required_valid_count = 400 * len(cel_volt_cols)
    valid_counts = merged_nan_remove_df.groupby("date")[cel_volt_cols].apply(lambda x: (x >= voltage_threshold).sum().sum())
    dates_to_keep = valid_counts[valid_counts >= required_valid_count].index
    merged_valid_volt_df = merged_nan_remove_df[merged_nan_remove_df["date"].isin(dates_to_keep)].reset_index(drop=True)
    
    # 첫 번째 날짜와 마지막 날짜 제외
    first_date = merged_valid_volt_df["date"].iloc[0]
    final_df = merged_valid_volt_df[merged_valid_volt_df["date"] != first_date].reset_index(drop=True)
    last_date = final_df["date"].iloc[-1]
    final_df = final_df[final_df["date"] != last_date].reset_index(drop=True)
    
    cel_volt_cols = [col for col in final_df.columns if col.startswith("cell_volt_")]
    unique_dates = final_df["date"].unique()
    
    # X_array = np.empty([len(unique_dates), 1440, len(cel_volt_cols)], dtype=object)
    
    # for i, date in enumerate(unique_dates):
    #     daily_data = final_df[final_df["date"] == date][cel_volt_cols]
    #     X_array[i] = daily_data.to_numpy()
    
    # base_filename = os.path.splitext(os.path.basename(ess_path))[0]
    # np.save(os.path.join(output_folder, f"{base_filename}.npy"), X_array)
    # 수정: 채널별 2D 배열 생성 & 저장
    base_filename = os.path.splitext(os.path.basename(ess_path))[0]
    for volt in cel_volt_cols:
        # (날짜 수, 1440) 2D 배열
        X_array = np.empty([len(unique_dates), 1440], dtype=object)
        for i, date in enumerate(unique_dates):
            daily_series = final_df[final_df["date"] == date][volt]
            X_array[i] = daily_series.to_numpy()
        # 파일명에 채널명 추가
        np.save(
            os.path.join(output_folder, f"{base_filename}_{volt}.npy"),
            X_array
        )
