In [None]:
# ============================================================================
# HR Engine에서 diff < 0인 경우 제거된 매핑 데이터 수집 및 저장
# ============================================================================
# 
# 목적:
# - hr_calculator_engine_v4.R에서 clean_data로부터 diff < 0인 경우를 제거하는데,
#   이렇게 제거된 케이스들에 대한 매핑 데이터를 수집하여 저장
# - 저장된 데이터는 RR 쪽에서 outcome_dt 기반으로 생성한 매핑 데이터와 비교 검증에 사용
# - 검증이 성공하면 RR 쪽에 대한 새로운 engine 구축 예정
#
# 저장 위치:
# - /home/hashjamm/results/disease_network/hr_rr_mapping_validation_results/
#
# 저장 형식:
# - Parquet 파일 형식으로 저장
# - all_diff_negative_info: case == 1 & diff < 0인 케이스의 상세 정보 (단일 소스)
#   * HR과 RR 모두 case==1인 경우만 매핑 데이터 수집하므로, 검증을 위해 case==1인 경우만 저장
#   * 이후 이 데이터를 기반으로 각 매핑 데이터(edge_pids, edge_index_key_seq, edge_key_seq)를 추출
#
# ============================================================================

# ============================================================================
# 1. 라이브러리 로드
# ============================================================================

library(data.table)
library(arrow)
library(dplyr)
library(tidyr)  # expand_grid 함수 사용을 위해 필요
library(duckdb)
library(DBI)
library(glue)
library(future)  # 병렬 처리용
library(furrr)  # 병렬 처리용
library(progressr)  # 진행 상황 추적용

cat("라이브러리 로드 완료.\n\n")

In [None]:
# ============================================================================
# 2. 설정 변수 및 경로
# ============================================================================

# 저장 디렉토리 설정
output_dir <- "/home/hashjamm/results/disease_network/hr_rr_mapping_validation_results"
dir.create(output_dir, recursive = TRUE, showWarnings = FALSE)

# 데이터 파일 경로 설정 (실행 시 수정 가능)
matched_parquet_folder_path <- "/home/hashjamm/project_data/disease_network/matched_date_parquet/"
outcome_parquet_file_path <- "/home/hashjamm/project_data/disease_network/outcome_table.parquet"

# Follow-up 기간 설정
fu <- 8  # Follow-up 기간 (실행 시 수정 가능)

# 병렬 처리 설정
n_cores <- 15  # 사용할 코어 수 (시스템에 맞게 조정)

cat("설정 완료.\n\n")

In [None]:
# ============================================================================
# 3. 함수 정의
# ============================================================================

# ============================================================================
# 함수: diff < 0인 케이스의 매핑 데이터 추출
# ============================================================================
# 
# 목적: case == 1 & diff < 0인 케이스의 상세 정보를 수집하여 all_diff_negative_info에 저장
# - HR과 RR 모두 case==1인 경우만 매핑 데이터 수집하므로, 검증을 위해 case==1인 경우만 저장
#
# Args:
#   clean_data: 전처리된 데이터 (data.table)
#   cause_abb: 원인 질병 코드
#   outcome_abb: 결과 질병 코드
#   fu: Follow-up 기간
#   diff_negative_mapping_list: diff < 0 케이스의 매핑 데이터를 저장할 리스트 (참조 전달)
#
# Returns:
#   diff_negative_mapping_list에 데이터 추가 (in-place 수정)
#
# ============================================================================
extract_diff_negative_mapping <- function(
    clean_data, 
    cause_abb, 
    outcome_abb, 
    fu,
    diff_negative_mapping_list
) {
    # diff < 0인 케이스 추출 (제거되기 전)
    # [수정] case == 1인 경우만 저장 (HR과 RR 모두 case==1인 경우만 매핑 데이터 수집)
    # - HR: case == 1 & status == 1인 경우만 매핑 데이터 수집
    # - RR: case == 1 & outcome_status == 1인 경우만 매핑 데이터 수집
    # - 따라서 검증을 위해서는 case == 1 & diff < 0인 경우만 저장하는 것이 맞음
    diff_negative_data <- clean_data[diff < 0 & case == 1]
    
    if (nrow(diff_negative_data) > 0) {
        # key 생성 (cause_abb_outcome_abb_fu 형식)
        key <- paste(cause_abb, outcome_abb, fu, sep = "_")
        
        # [개선] all_diff_negative_info만 저장 (단일 소스)
        # - case == 1 & diff < 0인 케이스의 상세 정보 저장
        # - 이후 이 데이터를 기반으로 각 매핑 데이터를 추출할 수 있음
        # - 검증 및 디버깅 목적
        if (!"all_diff_negative_info" %in% names(diff_negative_mapping_list)) {
            diff_negative_mapping_list[["all_diff_negative_info"]] <- list()
        }
        # all_diff_negative_info에 저장할 데이터프레임 생성
        # - 필요한 모든 컬럼 포함 (index_key_seq, key_seq도 포함하여 이후 매핑 데이터 추출 가능)
        info_df <- data.frame(
            person_id = diff_negative_data$person_id,
            matched_id = diff_negative_data$matched_id,
            case = diff_negative_data$case,
            status = diff_negative_data$status,
            diff = diff_negative_data$diff,
            index_date = diff_negative_data$index_date,
            final_date = diff_negative_data$final_date,
            event_date = diff_negative_data$event_date,
            stringsAsFactors = FALSE
        )
        
        # index_key_seq와 key_seq 추가 (매핑 데이터 추출에 필요)
        if ("index_key_seq" %in% names(diff_negative_data)) {
            info_df$index_key_seq <- diff_negative_data$index_key_seq
        }
        if ("key_seq" %in% names(diff_negative_data)) {
            info_df$key_seq <- diff_negative_data$key_seq
        }
        
        diff_negative_mapping_list[["all_diff_negative_info"]][[key]] <- info_df
    }
    
    return(invisible(diff_negative_mapping_list))
}

# ============================================================================
# 함수: 매핑 데이터를 Parquet 파일로 저장
# ============================================================================
save_diff_negative_mapping_to_parquet <- function(
    diff_negative_mapping_list,
    output_dir,
    fu
) {
    cat(sprintf("--- diff < 0 케이스 매핑 데이터 저장 중 (fu=%d) ---\n", fu))
    
    # [개선] all_diff_negative_info만 저장 (단일 소스)
    if ("all_diff_negative_info" %in% names(diff_negative_mapping_list) && 
        length(diff_negative_mapping_list[["all_diff_negative_info"]]) > 0) {
        cat("  - all_diff_negative_info 저장 중...\n")
        # 리스트를 하나의 데이터프레임으로 결합 (key 컬럼 추가)
        all_info_list <- diff_negative_mapping_list[["all_diff_negative_info"]]
        all_info_df_list <- lapply(names(all_info_list), function(key) {
            info_df <- all_info_list[[key]]
            info_df$key <- key
            return(info_df)
        })
        all_info_combined <- bind_rows(all_info_df_list)
        parquet_file <- file.path(output_dir, sprintf("all_diff_negative_info_fu%d.parquet", fu))
        arrow::write_parquet(all_info_combined, parquet_file)
        cat(sprintf("    ✓ 완료: %s (%d개 행, %d개 키)\n", basename(parquet_file), nrow(all_info_combined), length(all_info_list)))
    } else {
        cat("  - all_diff_negative_info: 데이터 없음\n")
    }
    
    cat("--- 저장 완료 ---\n\n")
}

# ============================================================================
# 함수: 질병 코드 목록 가져오기 (hr_calculator_engine_v4.R의 get_disease_codes_from_path와 동일)
# ============================================================================
get_disease_codes_from_path <- function(matched_parquet_folder_path) {
    codes <- toupper(gsub("matched_(.*)\\.parquet", "\\1", list.files(matched_parquet_folder_path, pattern = "matched_.*\\.parquet")))
    return(sort(codes))
}

# ============================================================================
# 함수: 하나의 cause-outcome 조합 처리 (병렬 처리용)
# ============================================================================
process_one_combination <- function(
    cause_abb,
    outcome_abb,
    fu,
    matched_parquet_folder_path,
    outcome_parquet_file_path
) {
    # 결과를 저장할 리스트 초기화
    result_list <- list()
    result_list[["all_diff_negative_info"]] <- list()
    
    tryCatch({
        # 1. DuckDB 쿼리로 데이터 로드
        con_duck <- dbConnect(duckdb::duckdb())
        matched_parquet_file_path <- file.path(matched_parquet_folder_path, sprintf("matched_%s.parquet", tolower(cause_abb)))
        
        if (!file.exists(matched_parquet_file_path)) {
            dbDisconnect(con_duck, shutdown = TRUE)
            return(NULL)  # 파일이 없으면 NULL 반환
        }
        
        query <- glue::glue(
            "SELECT m.*, o.recu_fr_dt, o.abb_sick, o.key_seq\n",
            "FROM read_parquet('{matched_parquet_file_path}') AS m\n",
            "LEFT JOIN (\n",
            "    SELECT person_id, recu_fr_dt, abb_sick, key_seq\n",
            "    FROM read_parquet('{outcome_parquet_file_path}')\n",
            "    WHERE abb_sick = '{outcome_abb}'\n",
            ") AS o ON m.person_id = o.person_id"
        )
        clean_data <- as.data.table(dbGetQuery(con_duck, query))
        dbDisconnect(con_duck, shutdown = TRUE)
        
        # 2. 전처리 (hr_calculator_engine_v4.R의 process_batch와 동일)
        clean_data[, `:=`(
            index_date = as.IDate(index_date, "%Y%m%d"),
            death_date = as.IDate(paste0(dth_ym, "15"), "%Y%m%d"),
            end_date = as.IDate(paste0(2003 + fu, "1231"), "%Y%m%d"),
            event_date = as.IDate(recu_fr_dt, "%Y%m%d")
        )]
        clean_data[, final_date := fifelse(
            !is.na(event_date),
            pmin(event_date, end_date, na.rm = TRUE),
            pmin(death_date, end_date, na.rm = TRUE)
        )]
        clean_data[, status := fifelse(
            !is.na(event_date),
            fifelse(event_date <= final_date, 1, 0),
            fifelse(!is.na(death_date) & death_date <= final_date, 2, 0)
        )]
        clean_data[, diff := final_date - index_date]
        
        # 3. diff < 0 & case == 1인 케이스 추출
        diff_negative_data <- clean_data[diff < 0 & case == 1]
        
        if (nrow(diff_negative_data) > 0) {
            key <- paste(cause_abb, outcome_abb, fu, sep = "_")
            
            # all_diff_negative_info에 저장할 데이터프레임 생성
            info_df <- data.frame(
                person_id = diff_negative_data$person_id,
                matched_id = diff_negative_data$matched_id,
                case = diff_negative_data$case,
                status = diff_negative_data$status,
                diff = diff_negative_data$diff,
                index_date = diff_negative_data$index_date,
                final_date = diff_negative_data$final_date,
                event_date = diff_negative_data$event_date,
                stringsAsFactors = FALSE
            )
            
            # index_key_seq와 key_seq 추가
            if ("index_key_seq" %in% names(diff_negative_data)) {
                info_df$index_key_seq <- diff_negative_data$index_key_seq
            }
            if ("key_seq" %in% names(diff_negative_data)) {
                info_df$key_seq <- diff_negative_data$key_seq
            }
            
            result_list[["all_diff_negative_info"]][[key]] <- info_df
        }
        
        rm(clean_data, diff_negative_data)
        gc()
        
        return(result_list)
        
    }, error = function(e) {
        # 에러 발생 시 NULL 반환 (에러는 상위에서 처리)
        tryCatch({
            if (exists("con_duck")) dbDisconnect(con_duck, shutdown = TRUE)
        }, error = function(e2) {})
        return(NULL)
    })
}

# ============================================================================
# 함수: all_diff_negative_info에서 각 매핑 데이터 추출
# ============================================================================
extract_mappings_from_all_diff_negative_info <- function(
    all_diff_negative_info_df,
    output_dir,
    fu
) {
    cat(sprintf("=== all_diff_negative_info에서 매핑 데이터 추출 시작 (fu=%d) ===\n\n", fu))
    
    # key별로 그룹화하여 처리
    if (!"key" %in% names(all_diff_negative_info_df)) {
        stop("Error: 'key' 컬럼이 없습니다. all_diff_negative_info에 key 컬럼이 포함되어야 합니다.")
    }
    
    # 매핑 데이터를 저장할 리스트 초기화
    edge_pids_mapping <- list()
    edge_index_key_seq_mapping <- list()
    edge_key_seq_mapping <- list()
    
    # key별로 처리
    unique_keys <- unique(all_diff_negative_info_df$key)
    cat(sprintf("처리할 키 수: %d개\n\n", length(unique_keys)))
    
    for (key in unique_keys) {
        # 해당 key의 데이터 추출
        key_data <- all_diff_negative_info_df[all_diff_negative_info_df$key == key, ]
        
        # case==1 & status==1인 케이스만 필터링 (RR 프로세스와 일치하도록 엣지 정의 유지)
        edge_cases <- key_data[key_data$case == 1 & key_data$status == 1, ]
        
        if (nrow(edge_cases) > 0) {
            # edge_pids 매핑 (person_id)
            if ("person_id" %in% names(edge_cases)) {
                edge_pids_mapping[[key]] <- edge_cases$person_id
            }
            
            # edge_index_key_seq 매핑
            if ("index_key_seq" %in% names(edge_cases)) {
                valid_index_key_seq <- edge_cases$index_key_seq[!is.na(edge_cases$index_key_seq)]
                if (length(valid_index_key_seq) > 0) {
                    edge_index_key_seq_mapping[[key]] <- valid_index_key_seq
                }
            }
            
            # edge_key_seq 매핑 (outcome key_seq)
            if ("key_seq" %in% names(edge_cases)) {
                valid_key_seq <- edge_cases$key_seq[!is.na(edge_cases$key_seq)]
                if (length(valid_key_seq) > 0) {
                    edge_key_seq_mapping[[key]] <- valid_key_seq
                }
            }
        }
    }
    
    cat(sprintf("추출 완료:\n"))
    cat(sprintf("  - edge_pids: %d개 키\n", length(edge_pids_mapping)))
    cat(sprintf("  - edge_index_key_seq: %d개 키\n", length(edge_index_key_seq_mapping)))
    cat(sprintf("  - edge_key_seq: %d개 키\n\n", length(edge_key_seq_mapping)))
    
    return(list(
        edge_pids = edge_pids_mapping,
        edge_index_key_seq = edge_index_key_seq_mapping,
        edge_key_seq = edge_key_seq_mapping
    ))
}

# ============================================================================
# 함수: 추출된 매핑 데이터를 Parquet 파일로 저장
# ============================================================================
save_extracted_mappings_to_parquet <- function(
    mappings_list,
    output_dir,
    fu
) {
    cat(sprintf("=== 추출된 매핑 데이터 저장 시작 (fu=%d) ===\n\n", fu))
    
    # edge_pids_diff_negative 저장
    if (length(mappings_list$edge_pids) > 0) {
        cat("  - edge_pids_diff_negative 저장 중...\n")
        df_pids <- data.frame(
            key = names(mappings_list$edge_pids),
            stringsAsFactors = FALSE
        )
        df_pids$values <- I(mappings_list$edge_pids)
        parquet_file <- file.path(output_dir, sprintf("edge_pids_diff_negative_fu%d.parquet", fu))
        arrow::write_parquet(df_pids, parquet_file)
        cat(sprintf("    ✓ 완료: %s (%d개 키)\n", basename(parquet_file), nrow(df_pids)))
    } else {
        cat("  - edge_pids_diff_negative: 데이터 없음\n")
    }
    
    # edge_index_key_seq_diff_negative 저장
    if (length(mappings_list$edge_index_key_seq) > 0) {
        cat("  - edge_index_key_seq_diff_negative 저장 중...\n")
        df_index <- data.frame(
            key = names(mappings_list$edge_index_key_seq),
            stringsAsFactors = FALSE
        )
        df_index$values <- I(mappings_list$edge_index_key_seq)
        parquet_file <- file.path(output_dir, sprintf("edge_index_key_seq_diff_negative_fu%d.parquet", fu))
        arrow::write_parquet(df_index, parquet_file)
        cat(sprintf("    ✓ 완료: %s (%d개 키)\n", basename(parquet_file), nrow(df_index)))
    } else {
        cat("  - edge_index_key_seq_diff_negative: 데이터 없음\n")
    }
    
    # edge_key_seq_diff_negative 저장
    if (length(mappings_list$edge_key_seq) > 0) {
        cat("  - edge_key_seq_diff_negative 저장 중...\n")
        df_key_seq <- data.frame(
            key = names(mappings_list$edge_key_seq),
            stringsAsFactors = FALSE
        )
        df_key_seq$values <- I(mappings_list$edge_key_seq)
        parquet_file <- file.path(output_dir, sprintf("edge_key_seq_diff_negative_fu%d.parquet", fu))
        arrow::write_parquet(df_key_seq, parquet_file)
        cat(sprintf("    ✓ 완료: %s (%d개 키)\n", basename(parquet_file), nrow(df_key_seq)))
    } else {
        cat("  - edge_key_seq_diff_negative: 데이터 없음\n")
    }
    
    cat("\n=== 저장 완료 ===\n\n")
}

cat("함수 정의 완료.\n\n")

## 사용 예시

### 예시 1: all_diff_negative_info 파일 생성 (모든 cause-outcome 조합)

아래 셀은 `all_diff_negative_info_fu*.parquet` 파일을 생성하는 예시 코드입니다.

**사전 요구사항:**
- matched parquet 파일과 outcome parquet 파일이 있어야 합니다.
- 위 셀들에서 함수 정의가 완료되어 있어야 합니다.

**주의사항:**
- fu=8에 대해 모든 cause-outcome 조합을 처리합니다.
- 처리 시간이 오래 걸릴 수 있으므로 진행 상황을 확인하세요.


In [None]:
# ============================================================================
# 4. 실행 예시 1: all_diff_negative_info 파일 생성 (모든 cause-outcome 조합)
# ============================================================================
# 
# 이 셀은 모든 cause-outcome 조합에 대해 diff < 0 & case == 1인 케이스를 추출하여
# all_diff_negative_info_fu*.parquet 파일을 생성하는 예시입니다.
#
# 주의: 이 셀은 셀 2에서 설정한 변수들(fu, matched_parquet_folder_path,
# outcome_parquet_file_path, output_dir, n_cores)을 그대로 사용합니다.
# 필요시 아래 주석을 해제하여 재정의할 수 있습니다.
#

# 설정 재정의 (필요시에만 주석 해제하여 수정)
# fu <- 8  # Follow-up 기간
# matched_parquet_folder_path <- "/home/hashjamm/project_data/disease_network/matched_date_parquet/"
# outcome_parquet_file_path <- "/home/hashjamm/project_data/disease_network/outcome_table.parquet"
# output_dir <- "/home/hashjamm/results/disease_network/hr_rr_mapping_validation_results"
# n_cores <- 15  # 병렬 처리 코어 수

# 실행 코드
cat("=== 모든 cause-outcome 조합에 대한 diff < 0 & case == 1 케이스 수집 시작 ===\n\n")

# 질병 코드 목록 로드
disease_codes <- get_disease_codes_from_path(matched_parquet_folder_path)
cat(sprintf("발견된 질병 코드 수: %d개\n", length(disease_codes)))
cat(sprintf("질병 코드 목록: %s\n\n", paste(disease_codes, collapse = ", ")))

# 모든 cause-outcome 조합 생성 (cause != outcome)
all_combinations <- tidyr::expand_grid(cause_abb = disease_codes, outcome_abb = disease_codes) %>%
    filter(cause_abb != outcome_abb) %>%
    mutate(fu = fu)

total_combinations <- nrow(all_combinations)
cat(sprintf("총 처리할 조합 수: %d개\n\n", total_combinations))

# 병렬 처리 설정
cat(sprintf("병렬 처리 설정: %d개 코어 사용\n\n", n_cores))

# progressr 핸들러 설정
progressr::handlers(global = TRUE)
options(progressr.enable = TRUE)
progressr::handlers(progressr::handler_void())  # 터미널 출력 비활성화 (Jupyter와 충돌 방지)

plan(multisession, workers = n_cores, gc = TRUE, earlySignal = TRUE)
required_packages <- c("data.table", "duckdb", "DBI", "arrow", "dplyr", "glue", "tidyr")

# 진행 상황 추적
start_time <- Sys.time()

# 병렬 처리로 모든 조합 처리
progressr::with_progress({
    p <- progressr::progressor(steps = total_combinations)
    
    results <- future_lapply(1:nrow(all_combinations), function(i) {
        cause_abb <- all_combinations$cause_abb[i]
        outcome_abb <- all_combinations$outcome_abb[i]
        
        result <- process_one_combination(
            cause_abb = cause_abb,
            outcome_abb = outcome_abb,
            fu = fu,
            matched_parquet_folder_path = matched_parquet_folder_path,
            outcome_parquet_file_path = outcome_parquet_file_path
        )
        
        p()  # 진행 상황 업데이트
        return(result)
    }, future.seed = TRUE, future.packages = required_packages)
})

plan(sequential)  # 병렬 처리 종료

# 결과 합치기
cat("\n=== 결과 합치기 시작 ===\n")
diff_negative_mapping_list <- list()
diff_negative_mapping_list[["all_diff_negative_info"]] <- list()

success_count <- 0
error_count <- 0

for (i in 1:length(results)) {
    if (!is.null(results[[i]]) && 
        "all_diff_negative_info" %in% names(results[[i]]) &&
        length(results[[i]][["all_diff_negative_info"]]) > 0) {
        # 결과 합치기
        for (key in names(results[[i]][["all_diff_negative_info"]])) {
            diff_negative_mapping_list[["all_diff_negative_info"]][[key]] <- results[[i]][["all_diff_negative_info"]][[key]]
        }
        success_count <- success_count + 1
    } else if (is.null(results[[i]])) {
        error_count <- error_count + 1
    }
}

cat(sprintf("처리 완료: 성공 %d개, 실패 %d개, 총 %d개\n", success_count, error_count, total_combinations))
cat(sprintf("수집된 키 수: %d개\n", length(diff_negative_mapping_list[["all_diff_negative_info"]])))
cat(sprintf("소요 시간: %.1f초\n\n", as.numeric(difftime(Sys.time(), start_time, units = "secs"))))

# 모든 조합의 데이터를 하나의 Parquet 파일로 저장
if (length(diff_negative_mapping_list) > 0 && 
    "all_diff_negative_info" %in% names(diff_negative_mapping_list) &&
    length(diff_negative_mapping_list[["all_diff_negative_info"]]) > 0) {
    cat("=== Parquet 파일 저장 시작 ===\n")
    save_diff_negative_mapping_to_parquet(
        diff_negative_mapping_list = diff_negative_mapping_list,
        output_dir = output_dir,
        fu = fu
    )
    cat(sprintf("생성된 파일: %s\n", file.path(output_dir, sprintf("all_diff_negative_info_fu%d.parquet", fu))))
} else {
    cat("diff < 0 & case == 1인 케이스가 없어 파일을 생성하지 않았습니다.\n")
}


### 예시 2: all_diff_negative_info에서 매핑 데이터 추출

아래 셀은 생성된 `all_diff_negative_info_fu*.parquet` 파일에서 매핑 데이터를 추출하는 예시 코드입니다.

**사전 요구사항:**
- `all_diff_negative_info_fu*.parquet` 파일이 이미 생성되어 있어야 합니다.
- 위 셀들에서 함수 정의가 완료되어 있어야 합니다.


## Engine 버전 사용 가이드

### 대규모 데이터 처리 시 권장 방법

위의 예시 코드는 개발 및 테스트용입니다. 대규모 데이터 처리 시에는 **Engine 버전**을 사용하는 것을 권장합니다:

#### 파일 위치
- `hr_rr_mapping_validation_engine.R`: 메인 R 스크립트 (배치 처리 + 즉시 디스크 저장)
- `hr_rr_mapping_validation_manager.sh`: 자동 재시작 래퍼 스크립트

#### 사용 방법

```bash
# 1. sudoers 설정 (한 번만)
echo "hashjamm ALL=(ALL) NOPASSWD: /usr/bin/systemd-run" | sudo tee /etc/sudoers.d/systemd-run
sudo chmod 0440 /etc/sudoers.d/systemd-run

# 2. 실행 (fu=8)
cd /home/hashjamm/codes/disease_network
./hr_rr_mapping_validation_manager.sh 8

# 3. 백그라운드 실행 (tmux 권장)
tmux new-session -d -s validation './hr_rr_mapping_validation_manager.sh 8'
tmux attach-session -t validation  # 진행 상황 확인
```

#### 주요 특징
- ✅ 배치 처리 방식으로 메모리 사용량 제한
- ✅ 즉시 디스크 저장으로 메모리 부하 방지
- ✅ 자동 재시작 메커니즘 (메모리 부하 시)
- ✅ 완료 작업 추적으로 중복 처리 방지
- ✅ 실시간 진행 상황 및 리소스 모니터링

#### 자세한 사용 가이드
`HR_RR_MAPPING_VALIDATION_README.md` 파일을 참조하세요.

In [None]:
# ============================================================================
# 4. 실행 예시 2: all_diff_negative_info에서 매핑 데이터 추출 및 저장
# ============================================================================
#
# 주의: 이 셀은 셀 2에서 설정한 변수들(fu, output_dir)을 그대로 사용합니다.
# 필요시 아래 주석을 해제하여 재정의할 수 있습니다.
#

# 설정 재정의 (필요시에만 주석 해제하여 수정)
# fu <- 8  # Follow-up 기간
# output_dir <- "/home/hashjamm/results/disease_network/hr_rr_mapping_validation_results"

all_info_file <- file.path(output_dir, sprintf("all_diff_negative_info_fu%d.parquet", fu))

# 파일 존재 여부 확인
if (!file.exists(all_info_file)) {
    cat(sprintf("경고: 파일이 존재하지 않습니다: %s\n", all_info_file))
    cat("먼저 예시 1 셀을 실행하여 all_diff_negative_info 파일을 생성해야 합니다.\n")
} else {
    cat(sprintf("=== all_diff_negative_info 파일 로드 시작 ===\n"))
    cat(sprintf("파일 경로: %s\n\n", all_info_file))
    
    # 1. all_diff_negative_info 파일 로드
    all_info_df <- arrow::read_parquet(all_info_file)
    cat(sprintf("로드 완료: 총 %d개 행\n", nrow(all_info_df)))
    cat(sprintf("컬럼: %s\n\n", paste(names(all_info_df), collapse = ", ")))
    
    # 2. 매핑 데이터 추출
    cat("=== 매핑 데이터 추출 시작 ===\n\n")
    mappings <- extract_mappings_from_all_diff_negative_info(
        all_diff_negative_info_df = all_info_df,
        output_dir = output_dir,
        fu = fu
    )
    
    # 3. 추출된 매핑 데이터 저장
    cat("\n=== 매핑 데이터 저장 시작 ===\n\n")
    save_extracted_mappings_to_parquet(
        mappings_list = mappings,
        output_dir = output_dir,
        fu = fu
    )
    
    # 4. 결과 요약
    cat("=== 처리 완료 ===\n")
    cat(sprintf("생성된 파일:\n"))
    cat(sprintf("  - %s\n", file.path(output_dir, sprintf("edge_pids_diff_negative_fu%d.parquet", fu))))
    cat(sprintf("  - %s\n", file.path(output_dir, sprintf("edge_index_key_seq_diff_negative_fu%d.parquet", fu))))
    cat(sprintf("  - %s\n", file.path(output_dir, sprintf("edge_key_seq_diff_negative_fu%d.parquet", fu))))
}
