In [2]:
import pandas as pd
import json
from datetime import datetime, timedelta

def prettyjson(final_dict):
    print(json.dumps(final_dict, ensure_ascii=False, indent=4))

df_nurse_record = pd.read_csv("data_processing/nursing_record_eng.csv")
unique_numbers_list = df_nurse_record['AlsUnitNo'].unique().tolist()

  df_nurse_record = pd.read_csv("data_processing/nursing_record_eng.csv")


In [3]:
# 기술적 알람 목록 로드
def load_technical_alarms(file_path="Filtered_AlarmLabelList.txt"):
    """기술적 알람 목록을 파일에서 로드하여 정규화"""
    technical_alarms = set()
    
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()
        
        for line in lines:
            line = line.strip()
            if not line:  # 빈 줄 건너뛰기
                continue
            
            # 슬래시로 구분된 여러 라벨 처리
            if "/" in line:
                labels = [label.strip() for label in line.split("/")]
            else:
                labels = [line]
            
            # 각 라벨을 정규화하여 저장
            for label in labels:
                if label:  # 빈 문자열이 아닌 경우만
                    normalized_label = label.lower().strip().replace(" ", "")
                    if normalized_label:
                        technical_alarms.add(normalized_label)
        
        print(f"기술적 알람 목록 로드 완료: {len(technical_alarms)}개 라벨")
        
    except Exception as e:
        print(f"기술적 알람 목록 로드 오류: {e}")
    
    return technical_alarms

# 전역 변수로 기술적 알람 목록 로드
TECHNICAL_ALARMS = load_technical_alarms()

기술적 알람 목록 로드 완료: 55개 라벨


In [4]:
def normalize_alarm_label(label):
    """알람 라벨을 비교를 위해 정규화"""
    if not label:
        return ""
    return label.lower().strip().replace(" ", "")

def is_technical_alarm(alarm_label):
    """주어진 알람 라벨이 기술적 알람인지 확인"""
    if not alarm_label:
        return False
    
    # 알람 라벨이 여러 개의 라벨로 구성된 경우 처리
    alarm_labels = []
    
    if isinstance(alarm_label, str):
        # 슬래시로 구분된 여러 라벨 처리
        if " / " in alarm_label:
            labels = [label.strip() for label in alarm_label.split(" / ")]
            alarm_labels.extend([label for label in labels if label and label != "None" and label != "[]"])
        elif "/" in alarm_label:
            labels = [label.strip() for label in alarm_label.split("/")]
            alarm_labels.extend([label for label in labels if label and label != "None" and label != "[]"])
        elif alarm_label.strip():
            label = alarm_label.strip()
            if label and label != "None" and label != "[]":
                alarm_labels.append(label)
    elif isinstance(alarm_label, (list, tuple)):
        # 리스트/튜플 형식 처리
        for item in alarm_label:
            if item and str(item).strip():
                label = str(item).strip()
                if label and label != "None" and label != "[]":
                    alarm_labels.append(label)
    
    # 알람 라벨이 없는 경우 - 기술적 알람이 아님
    if not alarm_labels:
        return False
    
    # 모든 라벨이 기술적 알람인지 확인
    clinical_count = 0
    for label in alarm_labels:
        normalized_label = normalize_alarm_label(label)
        if normalized_label not in TECHNICAL_ALARMS:
            clinical_count += 1
    
    # 하나라도 임상적 알람이 있으면 기술적 알람이 아님
    return clinical_count == 0

def has_nursing_records_in_timewindow(patient_id, alarm_timestamp, nursing_records_dict, time_window_minutes=30):
    """특정 알람 시간 기준 ±time_window_minutes 내에 간호기록이 있는지 확인"""
    try:
        # 알람 타임스탬프 파싱
        alarm_time = datetime.strptime(alarm_timestamp, "%Y-%m-%d %H:%M:%S")
        
        # 시간 범위 정의
        start_time = alarm_time - timedelta(minutes=time_window_minutes)
        end_time = alarm_time + timedelta(minutes=time_window_minutes)
        
        # 간호기록 확인
        for record_time_str, records in nursing_records_dict.items():
            try:
                # 간호기록 시간도 같은 형식으로 변환
                record_timestamp = record_time_str.replace(' ', ' ').strip()  # 공백 정규화
                record_time = datetime.strptime(record_timestamp, "%Y-%m-%d %H:%M:%S")
                
                # 시간 범위 내에 있는지 확인
                if start_time <= record_time <= end_time:
                    # 기록이 있는지 확인 (배열 또는 단일 객체 모두 지원)
                    if isinstance(records, list) and len(records) > 0:
                        return True
                    elif isinstance(records, dict):
                        return True
            except ValueError as ve:
                # 타임스탬프 파싱 실패 시 건너뛰기
                continue
        
        return False
        
    except Exception as e:
        print(f"간호기록 확인 오류 ({alarm_timestamp}): {e}")
        return False

In [None]:
len(unique_numbers_list)

In [None]:
# "지역병원코드" hospital
# "지역병원코드_코드명" hospitalNm
# "연구등록번호" AlsUnitNo
# "연구내원번호" AlsUnitNo2
# "진료일자" TimeStamp
# "전입일자" InTime
# "전출일자" OutTime
# "전입병동" InUnit
# "전입병동_코드명" InUnitCode
# "전출병동" OutUnit
# "전출병동_코드명" OutUnitCode
# "간호진단_프로토콜" Assessment
# "간호진단_프로토콜_코드명" AssessmentNm
# "간호중재" Diagonosis
# "간호중재_코드명" DiagonosisNm
# "간호활동" Implementation
# "간호활동_코드명" ImplementationNm
# "간호속성코드" Attribute
# "간호속성코드_코드명" AttributeNm
# "간호속성명칭" AttributeDetail
# "속성" AttributeDetailValue
# "Duty" Duty
# "Duty_코드명" DutyNm
# "기록당시 병동" RecordUnit
# "기록당시 병동_코드명" RecordUnitNm
# "시행일시" ImpleTime

In [5]:
def get_nursing_record(patient_id: int):
    patient_data = df_nurse_record[df_nurse_record['AlsUnitNo'] == patient_id]
    # columns = ['간호중재_코드명', '간호활동_코드명', '간호속성코드_코드명', '속성', 'Duty_코드명', '시행일시', '간호진단_프로토콜_코드명']
    columns = ['DiagonosisNm', 'ImplementationNm', 'AttributeNm', 'AttributeDetailValue', 'DutyNm', 'ImpleTime', 'AssessmentNm']
    result = patient_data[columns].sort_values('ImpleTime')

    # 시행일시별로 그룹화
    final_dict = {}
    grouped = result.groupby('ImpleTime')

    for time_key, group in grouped:
        # 각 그룹의 데이터를 리스트로 변환
        time_data = []
        for index, row in group.iterrows(): 
            time_data.append({
                "시행일시": row['ImpleTime'],
                "간호진단프로토콜(코드명)": row['AssessmentNm'],
                "간호중재(코드명)": row['DiagonosisNm'],
                "간호활동(코드명)": row['ImplementationNm'], 
                "간호속성코드(코드명)": row['AttributeNm'],
                "속성": row['AttributeDetailValue'],
                "Duty(코드명)": row['DutyNm'],
            })
        
        final_dict[time_key] = time_data
    return final_dict

# 실제 입퇴원 기간을 몰라. 그러니 해당 기간에 처음 울린 알람과 나중에 울린 알람
def get_admission_periods(patient_id):
    r = json.loads(open(f'TestData/{patient_id}.json', 'r', encoding='utf-8').read())
    admission_periods = []
    for idx, i in enumerate(sorted(list(r['DatSeq'].keys()), key=int)):
        alarms = list(r['DatSeq'][i].keys())
        admission_periods.append({
            "start": alarms[0].split(' ')[0],
            "end": alarms[-1].split(' ')[0],
            "id": f"admission{idx+1}"
        })
    return admission_periods
    

In [6]:
def get_alarms_and_waveforms(patient_id, nursing_records_dict, enable_nursing_filter=True, enable_technical_filter=True):
    """
    알람과 파형 데이터를 가져오면서 필터링을 적용
    
    Args:
        patient_id: 환자 ID
        nursing_records_dict: 간호기록 딕셔너리
        enable_nursing_filter: 간호기록 필터링 활성화 여부 (기본값: True)
        enable_technical_filter: 기술적 알람 필터링 활성화 여부 (기본값: True)
    """
    r = json.loads(open(f'TestData/{patient_id}.json', 'r', encoding='utf-8').read())
    waveforms = {}
    alarms = {}
    
    # 필터링 통계
    total_alarms = 0
    filtered_by_nursing = 0
    filtered_by_technical = 0
    filtered_by_waveform_timeout = 0

    for idx, i in enumerate(sorted(list(r['DatSeq'].keys()), key=int)):
        for ts in list(r['DatSeq'][i].keys()):
            total_alarms += 1
            
            data = r['DatSeq'][i][ts][0]
            date = ts.split(' ')[0]
            hms = ts.split(' ')[-1].split('.')[0]
            
            # 전체 타임스탬프 (알람 시간)
            alarm_timestamp = f"{date} {hms}"
            
            # 파형 타이밍 체크 (기존 로직 유지)
            II_time_diff_sec = data["II_time_diff_sec"]
            ABP_time_diff_sec = data["ABP_time_diff_sec"]
            Resp_time_diff_sec = data["Resp_time_diff_sec"]
            Pleth_time_diff_sec = data["Pleth_time_diff_sec"]
            
            if II_time_diff_sec > 60 or ABP_time_diff_sec > 60 or Resp_time_diff_sec > 60 or Pleth_time_diff_sec > 60:
                filtered_by_waveform_timeout += 1
                continue

            if data['SpO2_numeric_time_diff_sec'] > 60 or data['Pulse_numeric_time_diff_sec'] > 60 or data['ST_numeric_time_diff_sec'] > 60 or \
            data['Tskin_numeric_time_diff_sec'] > 60 or data['ABP_numeric_time_diff_sec'] > 60 or data['NBP_numeric_time_diff_sec'] > 60 or \
            data['Perf_numeric_time_diff_sec'] > 60 or data['PPV_numeric_time_diff_sec'] > 60:
                filtered_by_waveform_timeout += 1
                continue
            
            # 알람 라벨 추출
            alarm_label = str(' / '.join(data['Label'])).replace('  ', ' ').replace('?', '').replace('!', '').strip()
            
            # 기술적 알람 필터링
            if enable_technical_filter and is_technical_alarm(alarm_label):
                filtered_by_technical += 1
                continue
            
            # 간호기록 필터링
            if enable_nursing_filter and not has_nursing_records_in_timewindow(patient_id, alarm_timestamp, nursing_records_dict):
                filtered_by_nursing += 1
                continue
            
            # 필터링을 통과한 알람만 추가
            Severity = data["Severity"][0]
            Severity_color = {0: "Red", 1:"Yellow", 2:"ShortYellow", 3:"SevereCyan", 4:"Cyan", 5:"SilentCyan", 6:"White"}[Severity]
            
            II_wave = data["II_wave"]["WaveSamples_Base64_cal"]
            ABP_wave = data["ABP_wave"]["WaveSamples_Base64_cal"]
            Resp_wave = data["Resp_wave"]["WaveSamples_Base64_cal"]
            Pleth_wave = data["Pleth_wave"]["WaveSamples_Base64_cal"]
            
            if not f'admission{idx+1}' in alarms.keys():
                alarms[f'admission{idx+1}'] = {}
            if not date in alarms[f'admission{idx+1}'].keys():
                alarms[f'admission{idx+1}'][date] = []

            numeric_dict = {
                data['SpO2_numeric']['Label']: (data['SpO2_numeric']['Value'], data['SpO2_numeric_time_diff_sec']),
                data['Pulse_numeric']['Label']: (data['Pulse_numeric']['Value'], data['Pulse_numeric_time_diff_sec']),
                data['ST_numeric']['Label']: (data['ST_numeric']['Value'], data['ST_numeric_time_diff_sec']),
                data['Tskin_numeric']['Label']: (data['Tskin_numeric']['Value'], data['Tskin_numeric_time_diff_sec']),
                data['ABP_numeric']['Label']: (data['ABP_numeric']['Value'], data['ABP_numeric_time_diff_sec']),
                data['NBP_numeric']['Label']: (data['NBP_numeric']['Value'], data['NBP_numeric_time_diff_sec']),
                data['Perf_numeric']['Label']: (data['Perf_numeric']['Value'], data['Perf_numeric_time_diff_sec']),
                data['PPV_numeric']['Label']: (data['PPV_numeric']['Value'], data['PPV_numeric_time_diff_sec']),
            }

            alarms[f'admission{idx+1}'][date].append({
                "time": hms,
                "color": Severity_color,
                "id": f"{patient_id}-{date}-{hms}",
                "timestamp": alarm_timestamp
            })
            
            waveforms[alarm_timestamp] = {
                "ABP": ABP_wave,
                "Lead-II": II_wave,
                "Resp": Resp_wave,
                "Pleth": Pleth_wave,
                "Numeric": numeric_dict,
                "AlarmLabel": alarm_label
            }
    
    # 필터링 통계 출력
    print(f"\n환자 {patient_id} 알람 필터링 통계:")
    print(f"  - 전체 알람: {total_alarms}개")
    print(f"  - 파형 타임아웃으로 제거: {filtered_by_waveform_timeout}개")
    if enable_technical_filter:
        print(f"  - 기술적 알람으로 제거: {filtered_by_technical}개")
    if enable_nursing_filter:
        print(f"  - 간호기록 없어서 제거: {filtered_by_nursing}개")
    
    final_count = sum(len(alarms[adm][date]) for adm in alarms for date in alarms[adm])
    print(f"  - 최종 통과 알람: {final_count}개")

    return alarms, waveforms


In [7]:
import os

# 필터링 옵션 설정
ENABLE_NURSING_FILTER = True  # 간호기록 필터링 활성화
ENABLE_TECHNICAL_FILTER = True  # 기술적 알람 필터링 활성화

print(f"필터링 설정:")
print(f"  - 간호기록 필터링: {'활성화' if ENABLE_NURSING_FILTER else '비활성화'}")
print(f"  - 기술적 알람 필터링: {'활성화' if ENABLE_TECHNICAL_FILTER else '비활성화'}")
print()

for patient_id in list(set(unique_numbers_list) & set([int(i[:-5]) for i in os.listdir("TestData")])):
    try:
        admissions = get_admission_periods(patient_id)
        nursing_records = get_nursing_record(patient_id)
        
        # 필터링이 적용된 알람과 파형 데이터 가져오기
        alarms, waveforms = get_alarms_and_waveforms(
            patient_id, 
            nursing_records,
            enable_nursing_filter=ENABLE_NURSING_FILTER,
            enable_technical_filter=ENABLE_TECHNICAL_FILTER
        )
    except Exception as e:
        print(f"환자 {patient_id} 처리 오류: {e}")
        continue

    # 빈 날짜 및 빈 입원 기간 제거
    print(f"\n필터링 후 정리 작업:")
    
    # Step 1: 각 입원 기간에서 알람이 없는 날짜 제거
    alarms_cleaned = {}
    removed_dates = 0
    
    for admission_id, dates in alarms.items():
        dates_with_alarms = {}
        for date, alarm_list in dates.items():
            if alarm_list:  # 알람이 하나라도 있는 날짜만 유지
                dates_with_alarms[date] = alarm_list
            else:
                removed_dates += 1
                print(f"  - {admission_id}에서 날짜 {date} 제거 (알람 0개)")
        
        # Step 2: 알람이 있는 날짜가 하나라도 있는 입원 기간만 유지
        if dates_with_alarms:
            alarms_cleaned[admission_id] = dates_with_alarms
            print(f"  - {admission_id} 유지: {len(dates_with_alarms)}개 날짜에 알람 존재")
        else:
            print(f"  - {admission_id} 전체 제거: 모든 날짜에 알람 없음")
    
    # Step 3: admission_periods에서도 알람이 없는 입원 기간 제거
    admissions_cleaned = []
    removed_admissions = 0
    
    for admission in admissions:
        if admission['id'] in alarms_cleaned:
            admissions_cleaned.append(admission)
        else:
            removed_admissions += 1
            print(f"  - 입원 기간 {admission['id']} ({admission['start']} ~ {admission['end']}) 제거")
    
    # 통계 출력
    original_admission_count = len(admissions)
    cleaned_admission_count = len(admissions_cleaned)
    original_date_count = sum(len(dates) for dates in alarms.values())
    cleaned_date_count = sum(len(dates) for dates in alarms_cleaned.values())
    
    print(f"\n정리 완료:")
    print(f"  - 입원 기간: {original_admission_count}개 → {cleaned_admission_count}개 (제거: {removed_admissions}개)")
    print(f"  - 날짜: {original_date_count}개 → {cleaned_date_count}개 (제거: {removed_dates}개)")
    
    DataConverter = {
        "admission_periods": admissions_cleaned,
        "alarms": alarms_cleaned,
        "nursing_records": nursing_records,
        "waveforms": waveforms,
        "filter_settings": {
            "nursing_filter_enabled": ENABLE_NURSING_FILTER,
            "technical_filter_enabled": ENABLE_TECHNICAL_FILTER,
            "nursing_time_window_minutes": 30
        }
    }

    if not nursing_records:
        print(f"경고: 환자 {patient_id}의 간호기록이 없습니다.")

    with open(f'DATA/{patient_id}.json', 'w', encoding='utf-8') as f:
        json.dump(DataConverter, f, ensure_ascii=False)

    print(f"DATA/{patient_id}.json 파일 저장 완료!\n")

필터링 설정:
  - 간호기록 필터링: 활성화
  - 기술적 알람 필터링: 활성화


환자 1674060 알람 필터링 통계:
  - 전체 알람: 2890개
  - 파형 타임아웃으로 제거: 2863개
  - 기술적 알람으로 제거: 16개
  - 간호기록 없어서 제거: 1개
  - 최종 통과 알람: 10개

필터링 후 정리 작업:
  - admission2 유지: 1개 날짜에 알람 존재
  - 입원 기간 admission1 (2024-05-08 ~ 2024-05-08) 제거

정리 완료:
  - 입원 기간: 2개 → 1개 (제거: 1개)
  - 날짜: 1개 → 1개 (제거: 0개)
DATA/1674060.json 파일 저장 완료!

