In [None]:
# =====================
# 표준 라이브러리
# =====================
import sys
import re
from pathlib import Path
from pprint import pprint
from typing import Union

# =====================
# 서드 파티 라이브러리
# =====================
import polars as pl
import polars.selectors as cs

# =====================
# 경로 설정
# =====================
PROJECT_ROOT = Path.cwd().parent
DATA_DIR = PROJECT_ROOT / "data"

if str(PROJECT_ROOT) in sys.path:
    sys.path.remove(str(PROJECT_ROOT))
sys.path.insert(0, str(PROJECT_ROOT))

# =====================
# 로컬 모듈
# =====================
from src.utils import process_lazyframe_in_chunks
from src.loading import DataLoader
from src.preprocess.udi_preprocessor import UDIProcessor
from src.preprocess import (
    create_udi_preprocessor,
    create_company_preprocessor,
    create_generic_preprocessor,
    create_number_preprocessor,
)
# 새로운 위치에서 import
from src.utils.polars.patterns import get_pattern_cols
from src.preprocess.eda import analyze_null_values
from src.preprocess.transforms import replace_pattern_with_null

In [None]:
# maude 데이터 불러오기
loader1 = DataLoader(
    start=2024,
    end=2024,
    output_file = DATA_DIR / 'bronze' / 'maude_raw.parquet',
    max_workers=4
)

adapter = 'polars'
polars_kwargs = {
    'use_statistics': True,
    'parallel': 'auto',
    'low_memory': False,
    'rechunk': False,
    'cache': True,
}
maude_lf = loader1.load(adapter=adapter, **polars_kwargs)
maude_lf

In [None]:
maude_lf.select(pl.col('mdr_report_key').n_unique()).head().collect()

In [None]:
IDENTIFIER_PATTERNS = [
    r"^device_\d+_brand_name$",
    r"identifiers_\d+_id", 
    r"identifiers_\d+_issuing_agency", 
    r"identifiers_\d+_package_discontinue_date", 
    r"identifiers_\d+_package_status", 
    r"identifiers_\d+_package_type", 
    r"identifiers_\d+_quantity_per_package", 
    r"identifiers_\d+_type", 
    r"identifiers_\d+_unit_of_use_id"
]
UDI_DI_PATTERNS = [r'^identifiers_\d+_id$']
TYPE_PATTERNS = [
    r'identifiers_\d+_type'
]

CUSTOMER_PATTERNS = [r'^customer']
DEVICE_SIZE_PATTERNS = [r'^device_sizes']
STORAGE_PATTERNS = [r'^storage']
GMDN_PATTERNS = [r'^gmdn']
PREMARKET_PATTERNS = [r'^premarket']


# 1. MAUDE 데이터 준비

## 1.1 필요한 컬럼 선택
원본 MAUDE 데이터에서 전처리에 필요한 컬럼만 선택하여 메모리 사용량 최적화

In [None]:
# 기본 변수
BASE_COLS = [
    'mdr_report_key',
    'report_number', 
    'adverse_event_flag', 
    'product_problem_flag', 
    'event_type',
    'previous_use_code', 
    'single_use_flag', 
    'reprocessed_and_reused_flag',
    'product_problems'
]

DATE_COLS = [
    'date_of_event', 
    'date_received', 
    'device_date_of_manufacturer', 
]

DEVICE_COLS = [
    "device_0_manufacturer_d_name",
    "device_0_manufacturer_d_postal_code",
    "device_0_brand_name",
    "device_0_catalog_number",
    "device_0_model_number",
    "device_0_udi_di",
    "device_0_lot_number",
    "device_0_udi_public",
    "device_0_device_report_product_code",
    "device_0_device_operator",
    "device_0_openfda_device_class",
    "device_0_openfda_device_name",
]


PATIENT_COLS = [
    "patient_0_patient_age",
    "patient_0_sequence_number_outcome",
]



MDR_TEXT_PATTERNS = [
    r"^mdr_text_.*_text$",
    r"^mdr_text_.*_text_type_code$",
]

MDR_TEXT_COLS = get_pattern_cols(maude_lf, MDR_TEXT_PATTERNS[:1])
MDR_COLS = get_pattern_cols(maude_lf, MDR_TEXT_PATTERNS)
TOTAL_COLS = BASE_COLS + DATE_COLS + DEVICE_COLS + PATIENT_COLS + MDR_COLS

In [None]:
maude_lf = maude_lf.select(TOTAL_COLS)

## 1.2 초기 필터링
- **Device Class 3**: 고위험 의료기기만 선택
- **MDR Text 존재**: 부작용 보고 텍스트가 있는 행만 선택

In [None]:
maude_lf = maude_lf.filter(
    pl.col('device_0_openfda_device_class').eq('3'), 
    pl.col('mdr_text_0_text').is_not_null()
)

maude_lf.select(pl.len()).collect().item()

# 2. MAUDE 데이터 클렌징

**전략**: 저품질 수기 입력 데이터의 일관성 확보를 위한 정규화 작업
- 청크 단위 처리로 메모리 효율성 확보
- 중간 파일 저장으로 실패 시 복구 가능

In [None]:
maude_step1_path = DATA_DIR / 'silver' / 'clean_step1.parquet'
maude_step2_path = DATA_DIR / 'silver' / 'clean_step2.parquet'
maude_step3_path = DATA_DIR / 'silver' / 'clean_step3.parquet'
maude_step4_path = DATA_DIR / 'silver' / 'clean_step4.parquet'

## 2.1 의료기기명 정규화
**목적**: Product Code별로 일관된 Device Name 사용
**방법**: 최빈값(mode)을 정규 표현(canonical name)으로 선택하여 변이 제거

In [None]:
def device_name_clean(maude_lf, col_name):
    """
    의료기기명 텍스트 정규화
    
    처리 단계:
    1. 특수문자 제거 (알파벳, 숫자, 공백만 유지)
    2. 소문자 변환
    3. 불필요한 공백 정리
    
    Args:
        maude_lf: MAUDE LazyFrame
        col_name: 정규화할 컬럼명
    
    Returns:
        정규화된 LazyFrame
    """
    return maude_lf.with_columns(
        pl.col(col_name)
        .str.replace_all(r'[^a-zA-Z0-9\s]', '')  # 특수문자 제거
        .str.to_lowercase()  # 소문자 변환
        .str.strip_chars()  # 앞뒤 공백 제거
        .str.replace_all(r'\s+', ' ')  # 연속된 공백을 하나로
        .alias(col_name)
    )
    
def device_name_product_code_match(maude_lf, name_col, code_col):
    """
    Product Code별 Device Name 통일
    
    **문제**: 동일한 product code에 여러 표기 방식의 device name 존재
    **해결**: 각 product code별로 가장 빈도가 높은 이름을 정규 표현으로 사용
    
    Args:
        maude_lf: MAUDE LazyFrame
        name_col: device name 컬럼
        code_col: product code 컬럼
    
    Returns:
        통일된 device name이 적용된 LazyFrame
    """ 
    # product_code별로 가장 빈도가 높은 device_name 선택
    code_to_name = (
        maude_lf
        .select([name_col, code_col])
        .filter(pl.col(name_col).is_not_null() & pl.col(code_col).is_not_null())
        .group_by([code_col, name_col])  # 빈도 계산을 위한 그룹화
        .count()
        .sort("count", descending=True)
        .group_by(code_col)
        .first()  # 각 code별 최빈값 선택
        .select([code_col, pl.col(name_col).alias("canonical_name")])
    )
    
    # 원본 데이터에 정규 표현 매핑
    result = (
        maude_lf
        .join(code_to_name, on=code_col, how="left") 
        .with_columns(
            pl.coalesce(pl.col("canonical_name"), pl.col(name_col)).alias(name_col)
        )
        .drop("canonical_name")
    )
    
    return result

In [None]:
# 1. device_name 클린징
maude_lf = device_name_clean(maude_lf, "device_0_openfda_device_name")

# 2. product_code와 매칭하여 통일
maude_lf = device_name_product_code_match(
    maude_lf, 
    "device_0_openfda_device_name", 
    "device_0_device_report_product_code"
)

maude_lf.select(pl.len()).collect().item()

## 2.2 제조사명 정규화
**목적**: 우편번호를 기준으로 누락된 제조사명 복원 및 통일
**방법**: 동일 우편번호의 최빈 제조사명을 사용

In [None]:
def manufacturer_postal_match(maude_lf, name_col, postal_col):
    """
    우편번호 기반 제조사명 매칭
    
    **활용**: 우편번호를 통해 제조사 위치를 역추적
    - 누락된 제조사명 복원
    - 동일 제조사의 다양한 표기 통일
    
    Args:
        maude_lf: MAUDE LazyFrame
        name_col: manufacturer name 컬럼
        postal_col: postal code 컬럼
    
    Returns:
        제조사명이 보완/통일된 LazyFrame
    """
    # 우편번호별로 가장 빈도가 높은 제조사명 선택
    postal_to_name = (
        maude_lf
        .select([name_col, postal_col])
        .filter(pl.col(name_col).is_not_null() & pl.col(postal_col).is_not_null())
        .unique()
        .group_by(postal_col)
        .agg(
            pl.col(name_col).mode().first().alias("canonical_name")
        )
    )
    
    # 정규 표현을 원본 데이터에 적용
    result = (
        maude_lf
        .join(postal_to_name, on=postal_col, how="left")
        .with_columns(
            # 정규 표현이 있으면 사용, 없으면 원래 이름 유지
            pl.coalesce(pl.col("canonical_name"), pl.col(name_col)).alias(name_col)
        )
        .drop("canonical_name")
    )
    
    return result

In [None]:
maude_lf = manufacturer_postal_match(
    maude_lf, 
    'device_0_manufacturer_d_name', 
    'device_0_manufacturer_d_postal_code'
)

maude_lf.select(pl.len()).collect().item()

## 2.3 Product Code 정규화
**목적**: Product Code는 영문 대문자만 허용되므로 불필요한 문자 제거

In [None]:
def product_code_clean(maude_lf, col_name):
    """
    Product Code 정규화
    
    **규칙**: FDA Product Code는 영문 대문자만 허용
    
    Args:
        maude_lf: MAUDE LazyFrame
        col_name: product code 컬럼명
    
    Returns:
        정규화된 LazyFrame
    """
    return maude_lf.with_columns(
        pl.col(col_name)
        .str.replace_all(r'[^A-Z]', '')  # 영어 대문자 이외 문자 제거
        .alias(col_name)
    )

In [None]:
maude_lf = product_code_clean(
    maude_lf, 'device_0_device_report_product_code'
)
maude_lf.select(pl.len()).collect().item()

## 2.4 환자 나이 정규화
**목적**: 다양한 형식의 나이 데이터를 일(day) 단위로 통일
**처리**: "70 YRS", "6 MO", "30 DAYS" 등 → 일(day) 단위로 변환 후 0-120년 범위로 제한

In [None]:
def process_age_columns(
        maude_lf: pl.LazyFrame,
        src_col: str = "patient_0_patient_age"
) -> pl.LazyFrame:
    """
    환자 나이를 일(day) 단위로 정규화
    
    처리 과정:
    1. 텍스트 대문자 변환
    2. NA 패턴 제거 (UNK, NA, VARIOUS 등)
    3. 숫자와 단위 추출
    4. 단위별 일 단위 변환:
       - DAY/DAYS/D → 그대로
       - WEEK/WK → ×7
       - MONTH/MO → ×30
       - YEAR/YR → ×365
    5. 0-120세 범위로 제한 (0 ~ 43,800일)
    
    Args:
        maude_lf: MAUDE LazyFrame
        src_col: 나이 컬럼명
    
    Returns:
        나이가 일 단위로 정규화된 LazyFrame
    """
    schema_names = maude_lf.collect_schema().names()

    if src_col in schema_names:
        # 대문자 변환
        maude_lf = maude_lf.with_columns(
            pl.col(src_col)
            .cast(pl.Utf8)
            .str.to_uppercase()
            .alias("_age_text_upper")
        )
        
        # NA 패턴을 null로 변환
        na_patterns = r'UNK|NA|VARIOUS'
        maude_lf = replace_pattern_with_null(maude_lf, '_age_text_upper', na_patterns)

        # 숫자 추출
        maude_lf = maude_lf.with_columns(
            pl.col("_age_text_upper")
            .str.extract(r"(\d+)", 1)
            .cast(pl.Float64)
            .alias("_age_value")
        )

        # 단위 추출
        maude_lf = maude_lf.with_columns(
            pl.col("_age_text_upper")
            .str.extract(r"(DAY|DA|DAYS|D|WEEK|WEEKS|WK|WKS|MONTH|MONTHS|MO|YEAR|YEARS|YR|YRS)",
                          group_index=1)
            .alias("_age_unit")
        )

        # 단위별로 일 단위로 변환
        maude_lf = maude_lf.with_columns(
            pl.when(pl.col("_age_value").is_null())
              .then(None)  # 숫자 자체가 없으면 null
            .when(pl.col("_age_unit").is_null())
              .then(None)  
            .when(pl.col("_age_unit").str.contains("DAY|DA|DYAS|D"))
              .then(pl.col("_age_value"))
            .when(pl.col("_age_unit").str.contains("WEEK|WEEKS|WK|WKS"))
              .then(pl.col("_age_value") * 7)
            .when(pl.col("_age_unit").str.contains("MONTH|MONTHS|MO"))
              .then(pl.col("_age_value") * 30)
            .when(pl.col("_age_unit").str.contains("YEAR|YEARS|YR|YRS"))
              .then(pl.col("_age_value") * 365)
            .otherwise(None)
            .alias("_age_days"))
        
        # 반올림 후 0-120세 범위로 제한
        maude_lf = maude_lf.with_columns(
            pl.col("_age_days")
            .round(0)
            .cast(pl.Int64)
            .clip(0, 120*365)  # 0 ~ 43,800일
            .alias("patient_0_patient_age")
        )
        
        # 임시 컬럼 제거
        maude_lf = maude_lf.drop(["_age_value", "_age_text_upper", "_age_unit", "_age_days"])

        return maude_lf

In [None]:
maude_lf = process_age_columns(maude_lf, "patient_0_patient_age")
maude_lf.select(
    pl.col("patient_0_patient_age").min().alias("min_age"),
    pl.col("patient_0_patient_age").max().alias("max_age")
).collect()
maude_lf.select(pl.len()).collect().item()

## 2.5 날짜 정규화 및 검증
**목적**: 다양한 형식의 날짜 데이터를 Date 타입으로 통일하고 1900년대 오류 데이터 제거

In [None]:
def cast_date_cols_safe(
    maude_lf: pl.LazyFrame,
    date_cols: list[str],
    fmt: str = "%Y%m%d",
) -> pl.LazyFrame:
    """
    날짜 컬럼을 안전하게 Date 타입으로 변환
    
    처리 규칙:
    - 이미 Date 타입 → 변환 건너뛰기
    - Datetime 타입 → Date로 다운캐스트
    - 기타 (Utf8/Int 등) → 지정된 포맷으로 파싱
    
    Args:
        maude_lf: MAUDE LazyFrame
        date_cols: 날짜 컬럼 리스트
        fmt: 날짜 파싱 포맷 (기본값: "%Y%m%d")
    
    Returns:
        날짜가 Date 타입으로 변환된 LazyFrame
    """
    schema = maude_lf.collect_schema()
    exprs = []

    for col in date_cols:
        if col not in schema:
            continue

        dt = schema[col]

        # 이미 Date면 건드리지 않음
        if dt == pl.Date:
            continue

        # Datetime이면 Date로만 다운캐스트
        if dt == pl.Datetime:
            exprs.append(pl.col(col).cast(pl.Date, strict=False).alias(col))
            continue

        # 그 외(Utf8/Int 등)만 fmt로 파싱
        exprs.append(
            pl.col(col)
              .cast(pl.Utf8)
              .str.strptime(pl.Date, format=fmt, strict=False)
              .alias(col)
        )

    return maude_lf if not exprs else maude_lf.with_columns(exprs)

In [None]:
maude_lf = cast_date_cols_safe(
    maude_lf,
    date_cols=DATE_COLS
)
maude_lf.select(pl.len()).collect().item()

In [None]:
# 1900년대 날짜 데이터는 입력 오류로 간주하여 null 처리
# (사건 날짜가 1900-1999년인 경우 데이터 품질 이슈)
maude_lf = maude_lf.with_columns(
    pl.when(pl.col("date_of_event").dt.year().is_between(1900, 1999))
      .then(pl.lit(None, dtype=pl.Date))  # Date 타입 유지하면서 null 지정
      .otherwise(pl.col("date_of_event"))        
      .alias("date_of_event")
)
maude_lf.select(pl.len()).collect().item()

In [None]:
print("date_of_event 범위:")
print(maude_lf.select(
    pl.col("date_of_event").min().alias("min_date"),
    pl.col("date_of_event").max().alias("max_date"),
).collect())

display(maude_lf.select(pl.len()).head().collect().item())
maude_lf.select(pl.col('mdr_report_key').n_unique()).collect().item()

## 2.6 텍스트 전처리 (4단계 파이프라인)
**전략**: 컬럼 유형별로 특화된 전처리 적용
- 청크 단위 처리로 대용량 데이터 안정적 처리
- 각 단계 후 중간 파일 저장 (실패 복구용)

In [None]:
# Step 1: UDI DI 전처리
preprocessor1 = create_udi_preprocessor()
preprocessor1.apply_to_lazyframe(
    maude_lf, 'device_0_udi_di', maude_step1_path, chunk_size=100_000
)
del preprocessor1  # 명시적 메모리 해제

In [None]:
# Step 2: 회사명/브랜드명 전처리
maude_lf2 = pl.scan_parquet(maude_step1_path)
preprocessor2 = create_company_preprocessor()
preprocessor2.apply_to_lazyframe(
    maude_lf, 
    ['device_0_manufacturer_d_name', 'device_0_brand_name'], 
    maude_step2_path, 
    chunk_size=100_000
)
del maude_lf2, preprocessor2

In [None]:
# Step 1 중간 파일 정리
maude_step1_path.unlink(missing_ok=True)

# Step 3: 모델번호/카탈로그번호/로트번호 전처리
maude_lf3 = pl.scan_parquet(maude_step2_path)
preprocessor3 = create_number_preprocessor()
preprocessor3.apply_to_lazyframe(
    maude_lf3, 
    ['device_0_model_number', 'device_0_catalog_number', 'device_0_lot_number'], 
    maude_step3_path, 
    chunk_size=100_000
)
del maude_lf3, preprocessor3

In [None]:
# Step 2 중간 파일 정리
maude_step2_path.unlink(missing_ok=True)

# Step 4: MDR 텍스트 전처리
maude_lf4 = pl.scan_parquet(maude_step3_path)
preprocessor4 = create_generic_preprocessor()
preprocessor4.apply_to_lazyframe(
    maude_lf4, 
    MDR_TEXT_COLS,
    maude_step4_path, 
    chunk_size=100_000
)
del maude_lf4, preprocessor4

In [None]:
# Step 3 중간 파일 정리
maude_step3_path.unlink(missing_ok=True)

# 전처리 완료된 MAUDE 데이터 로드
loader4 = DataLoader(
    start=2024,
    end=2024,
    output_file=maude_step4_path,
)

cleaned_maude_lf = loader4.load(adapter=adapter, **polars_kwargs)

display(cleaned_maude_lf.select(pl.len()).head().collect())
cleaned_maude_lf.select(pl.col('mdr_report_key').n_unique()).head().collect()

## 2.7 MDR 텍스트 결합
**목적**: 분산된 여러 개의 MDR 텍스트 컬럼을 하나로 통합
**처리**: 중복 제거 후 텍스트 타입별로 포맷팅하여 결합

In [None]:
def combine_mdr_texts(lf: pl.LazyFrame, n: int = 5) -> pl.LazyFrame:
    """
    여러 MDR 텍스트 컬럼을 중복 제거하여 하나로 결합
    
    처리 과정:
    1. mdr_text_N_text 컬럼 최대 n개 선택
    2. 각 텍스트의 타입 코드 매칭
    3. 중복 텍스트 제거
    4. "[타입코드]\n텍스트" 형식으로 포맷팅
    5. 여러 텍스트를 "\n\n"으로 결합
    
    Args:
        lf: MAUDE LazyFrame
        n: 사용할 최대 텍스트 컬럼 개수
    
    Returns:
        combined_mdr_text 컬럼이 추가된 LazyFrame
    """
    cols = lf.collect_schema().names()
    
    # 최대 n개까지만 사용
    text_cols = sorted([c for c in cols if c.startswith('mdr_text_') and c.endswith('_text')])[:n]

    # (텍스트, 타입코드) 쌍 생성
    pairs = []
    for text_col in text_cols:
        type_col = re.sub(r'_text$', '_text_type_code', text_col)
        if type_col in cols:
            pairs.append((text_col, type_col))
    
    if not pairs:
        return lf.with_columns(pl.lit(None).alias('combined_mdr_text'))
    
    # 1. 중복 제거 및 포맷팅된 리스트 생성
    lf = lf.with_columns(
        pl.struct([pl.col(tc) for tc, _ in pairs] + [pl.col(ty) for _, ty in pairs])
        .map_elements(
            lambda s: deduplicate_and_format(s, pairs),
            return_dtype=pl.List(pl.String)
        )
        .alias('deduplicated_formatted')
    )
    
    # 2. 리스트를 문자열로 결합
    lf = lf.with_columns(
        pl.col('deduplicated_formatted')
        .list.join("\n\n")
        .alias('combined_mdr_text')
    )
    
    return lf.drop('deduplicated_formatted')


def deduplicate_and_format(struct_val, pairs):
    """
    텍스트 중복 제거 및 포맷팅
    
    Args:
        struct_val: 텍스트와 타입 코드를 담은 구조체
        pairs: (텍스트 컬럼, 타입 컬럼) 쌍 리스트
    
    Returns:
        포맷팅된 텍스트 리스트
    """
    seen = {}
    result = []
    
    for text_col, type_col in pairs:
        text = struct_val.get(text_col)
        type_val = struct_val.get(type_col)
        
        # 유효하고 중복되지 않은 텍스트만 추가
        if text is not None and text != "" and text not in seen:
            seen[text] = True
            type_display = type_val if type_val else ""
            result.append(f"[{type_display}]\n{text}")
    
    return result

In [None]:
# MDR 텍스트 결합 실행
combined_lf = combine_mdr_texts(cleaned_maude_lf)

display(combined_lf.select(pl.len()).collect().item())
combined_lf.select(pl.col('mdr_report_key').n_unique()).head().collect()

# 3. UDI 데이터 준비 및 매칭

**목적**: 저품질 MAUDE 데이터를 고품질 UDI 데이터로 보완
**전략**: Primary UDI-DI를 기준으로 일관된 의료기기 정보 매칭

## 3.1 UDI 데이터 로딩 및 필터링

In [None]:
# udi 데이터 불러오기
udi_loader = DataLoader(
    name='udi',
    output_file=DATA_DIR / 'bronze' / 'udi_raw.parquet',
    max_workers = 2
)

udi_lf = udi_loader.load(adapter, **polars_kwargs)
udi_lf

### Device Class 3 필터링
MAUDE와 동일하게 Class 3 의료기기만 선택

In [None]:
# device_class 컬럼 중 하나라도 '3'을 포함하는 행 필터링
udi_lf = udi_lf.filter(
    pl.any_horizontal(cs.matches('*device_class$') == '3')
)

udi_lf.select(pl.len()).collect().item()

In [None]:
# 불필요한 컬럼 제거 (고객정보, 기기크기, 저장정보, GMDN, 사전승인 정보 등)
drop_patterns = CUSTOMER_PATTERNS + DEVICE_SIZE_PATTERNS + STORAGE_PATTERNS + GMDN_PATTERNS + PREMARKET_PATTERNS
regex = "|".join(drop_patterns)

udi_lf = udi_lf.select(~cs.matches(regex))

In [None]:
udi_di_cols = get_pattern_cols(udi_lf, UDI_DI_PATTERNS)
identifiers_cols = get_pattern_cols(udi_lf, IDENTIFIER_PATTERNS)
type_cols = get_pattern_cols(udi_lf, TYPE_PATTERNS)

In [None]:
# type-udi_di 쌍 생성 (인덱스로 매칭)
def extract_index(col_name):
    """컬럼명에서 identifiers_N_ 형태의 인덱스 추출"""
    match = re.search(r'identifiers_(\d+)_', col_name)
    return int(match.group(1)) if match else None

# 각 type 컬럼에 대응하는 id 컬럼 매칭
type_id_pairs = []
for type_col in type_cols:
    idx = extract_index(type_col)
    udi_di_col = f'identifiers_{idx}_id'
    if udi_di_col in udi_di_cols:
        type_id_pairs.append((type_col, udi_di_col))

len(type_id_pairs)

In [None]:
udi_lf = cast_date_cols_safe(
    udi_lf,
    date_cols=['publish_date'],
    fmt='%Y-%m-%d'
)

udi_lf.filter(
    pl.col('publish_date').is_not_null()
).head().select('publish_date').collect()

In [None]:
udi_step1_path = DATA_DIR / 'silver' / 'udi_primary.parquet'
udi_step2_path = DATA_DIR / 'silver' / 'udi_clean.parquet'

## 3.2 Primary UDI-DI 추출 및 전처리

In [None]:
# Primary UDI-DI 추출
# UDI는 여러 개 존재할 수 있으나, "Primary"로 표시된 것만 사용
def primary_transform(lf: pl.LazyFrame):
    """
    Type이 'Primary'인 UDI-DI만 추출
    
    Returns:
        primary_udi_di 컬럼이 추가된 LazyFrame
    """
    return lf.with_columns(
        pl.coalesce([
            pl.when(pl.col(type_col).eq("Primary"))
            .then(pl.col(id_col))
            for type_col, id_col in type_id_pairs
        ]).alias('primary_udi_di')
    )

process_lazyframe_in_chunks(
    udi_lf, 
    primary_transform, 
    udi_step1_path, 
    10_000,
    desc="Primary UDI-DI extraction"
)

In [None]:
# udi 데이터 불러오기
udi_loader = DataLoader(
    name='udi',
    output_file=udi_step1_path,
)

primary_udi_lf = udi_loader.load(adapter, **polars_kwargs)

In [None]:
# UDI 회사명/브랜드명 전처리
preprocessor1 = create_company_preprocessor()

preprocessor1.apply_to_lazyframe(
    primary_udi_lf,
    ['company_name', 'brand_name'],
    udi_step2_path,
    10_000,
)

del preprocessor1, primary_udi_lf
udi_step1_path.unlink(missing_ok=True)

In [None]:
# udi 데이터 불러오기
udi_loader = DataLoader(
    name='udi',
    output_file=udi_step2_path,
)

cleaned_udi_lf = udi_loader.load(adapter, **polars_kwargs)

## 3.3 컬럼명 통일 및 매칭 준비
MAUDE와 UDI의 동일 의미 컬럼을 같은 이름으로 통일

In [None]:
# UDI 컬럼명 → 공통 컬럼명
rename_udi_lf = cleaned_udi_lf.rename({
    'company_name': 'manufacturer',
    'brand_name': 'brand',
    'version_or_model_number': 'model_number',
    'primary_udi_di': 'udi_di',
})

# MAUDE 컬럼명 → 공통 컬럼명
rename_maude_lf = combined_lf.rename({
    'device_0_manufacturer_d_name': 'manufacturer',
    'device_0_brand_name': 'brand',
    'device_0_model_number': 'model_number',
    'device_0_catalog_number': 'catalog_number',
    'device_0_lot_number': 'lot_number',
    'device_0_udi_di': 'udi_di',
    'device_0_udi_public': 'udi_public'
})

In [None]:
target_cols = [
    'manufacturer',
    'brand',
    'model_number',
    'catalog_number'
]

join_col = 'udi_di'

common_cols = target_cols + [join_col]

maude_cols = common_cols + [
    'mdr_report_key',
]

udi_cols = common_cols + udi_di_cols

In [None]:
udi_necessary_lf = rename_udi_lf.select(pl.col(udi_cols))
maude_necessary_lf = rename_maude_lf.select(pl.col(maude_cols))

maude_necessary_lf.select(pl.len()).collect().item()

In [None]:
maude_necessary_lf.select(pl.col('mdr_report_key').n_unique()).head().collect()

## 3.4 MAUDE-UDI 매칭 실행
**UDIProcessor**: 다단계 매칭 전략으로 최대한 많은 데이터 매칭
- udi_confidence: 매칭 신뢰도 (EXACT, HIGH, MEDIUM, LOW, VERY_LOW)
- device_version_id: 매칭된 기기 식별자

In [None]:
output_path=DATA_DIR / 'silver' / "maude_with_udi.parquet"

In [None]:
# UDI 매칭 프로세서 실행
processor = UDIProcessor()
result_path = processor.process(
    maude_lf=rename_maude_lf,
    udi_lf=rename_udi_lf,
    output_path=Path(output_path),
    chunk_size=10_000
)

In [None]:
loader4 = DataLoader(
    name='event',
    output_file=output_path
)

semifinal_lf = loader4.load(adapter=adapter, **polars_kwargs)
display(semifinal_lf.select(pl.len()).collect().item())
display(semifinal_lf.select(pl.col('mdr_report_key').n_unique()).head().collect().item())

In [None]:
# 브랜드명 보정: 동일 제조사+기기버전 내에서 가장 짧은 브랜드명 사용
# (긴 브랜드명은 불필요한 정보가 추가된 경우가 많음)
semifinal_lf = (
    semifinal_lf
    .with_columns(
        pl.col("brand_final")
          .drop_nulls()
          .first()
          .over(
              ["manufacturer_final", "device_version_id"],
              order_by=pl.col("brand_final").str.len_chars()
          )
          .alias("brand_final")
    )
)

# 4. 저품질 데이터 필터링

**목적**: 분석에 부적합한 저품질 행 제거
**기준**: 
- 제조사 정보 누락
- UDI 매칭 실패 또는 낮은 신뢰도
- 기기 식별자 Unknown

## 4.1 품질 필터링 기준
- 제조사 정보 필수 (manufacturer_final)
- 기기 식별자가 UNK로 시작하지 않음
- UDI 매칭 신뢰도 MEDIUM 이상 (LOW, VERY_LOW 제외)

In [None]:
filtered_lf = semifinal_lf.filter(
    pl.col('manufacturer_final').is_not_null(),  # 제조사 정보 존재
    ~ pl.col('device_version_id').str.starts_with('UNK'),  # 기기 식별자 Unknown 제외
    pl.col('udi_confidence').ne('VERY_LOW'),  # 매칭 신뢰도 필터
    pl.col('udi_confidence').ne('LOW'),
)

# 참고: Top 10 Product Code 확인 (선택적 필터링용)
top10_lst = filtered_lf.group_by('device_0_device_report_product_code').agg(
    pl.len().alias('count')
).sort('count', descending=True).head(10).collect().to_pandas()['device_0_device_report_product_code'].to_list()

filtered_lf.select(pl.len()).collect().item()

In [None]:
# filtered_lf = filtered_lf.filter(
#     pl.col('device_0_device_report_product_code').is_in(top10_lst),
# )

display(filtered_lf.select(pl.len()).collect().item())

display(
    filtered_lf.select(
        pl.col(['combined_mdr_text', 'product_problems']).n_unique()
    ).head().collect()
)

filtered_lf.group_by(['combined_mdr_text', 'product_problems']).agg(
    pl.col('mdr_report_key').n_unique()
).select(pl.len()).head().collect()

In [None]:
matched_lf = filtered_lf.clone()


# 5. 타입 변환 및 인코딩

**목적**: 데이터 타입 최적화 및 분석 준비

## 5.1 Categorical 타입 변환

### event_type → Categorical
제한된 카테고리 값으로 메모리 효율적 저장

In [None]:
def convert_event_type(matched_lf, verbose=True):
    """
    event_type을 Categorical 타입으로 변환
    
    **이유**: 
    - event_type은 제한된 카테고리 값만 가짐 (Death, Injury, Malfunction 등)
    - Categorical 변환 시 메모리 사용량 대폭 감소
    
    Args:
        matched_lf: LazyFrame
        verbose: 진행상황 출력 여부
    
    Returns:
        Categorical 타입으로 변환된 LazyFrame
    """
    return matched_lf.with_columns(
        pl.col('event_type').cast(pl.Categorical)
    )

In [None]:
matched_lf = convert_event_type( matched_lf, "event_type")

matched_lf.select(pl.len()).collect().item()

## 5.2 Boolean 타입 변환

### Flag 컬럼 Boolean 변환
Y/N, I/R 등의 플래그를 True/False로 변환

대상 컬럼:
- adverse_event_flag (부작용 발생 여부)
- product_problem_flag (제품 문제 여부)
- report_to_fda (FDA 보고 여부)
- previous_use_code (이전 사용 여부)
- device_operator (기기 사용자 유형)

In [None]:
def cast_flags_to_bool(
    matched_lf: pl.LazyFrame, 
    flag_cols: Union[str | list[str]], 
    true_val: str = 'Y', 
    false_val: str = 'N'
) -> pl.LazyFrame:
    """
    플래그 컬럼을 Boolean으로 변환
    
    변환 규칙:
    - true_val → True
    - false_val → False
    - 기타 ('I', null 등) → None
    
    Args:
        matched_lf: LazyFrame
        flag_cols: 변환할 컬럼명 또는 리스트
        true_val: True로 매핑할 값 (기본: 'Y')
        false_val: False로 매핑할 값 (기본: 'N')
    
    Returns:
        Boolean으로 변환된 LazyFrame
    """
    if isinstance(flag_cols, str):
      flag_cols = [flag_cols]
    
    schema_names = matched_lf.collect_schema().names()

    return matched_lf.with_columns([
        pl.when(pl.col(col).eq(true_val))
          .then(True)
        .when(pl.col(col).eq(false_val))
          .then(False)
        .otherwise(None)
        .alias(col)
        for col in flag_cols
        if col in schema_names
    ])

In [None]:
# Y/N 플래그 Boolean 변환
matched_lf = cast_flags_to_bool(matched_lf, [
    "adverse_event_flag",
    "product_problem_flag",
    'reprocessed_and_reused_flag',
    'single_use_flag',
    "report_to_fda"
])

matched_lf.select(pl.len()).collect().item()

In [None]:
# previous_use_code: I(Initial) → True, R(Reuse) → False
matched_lf = cast_flags_to_bool(
    matched_lf, 
    'previous_use_code', 
    true_val='I', false_val='R'
)

matched_lf.select(pl.len()).collect().item()

In [None]:
# device_operator: HEALTH PROFESSIONAL → True, LAY USER/PATIENT → False
matched_lf = cast_flags_to_bool(
    matched_lf, 
    'device_0_device_operator', 
    true_val='HEALTH PROFESSIONAL', 
    false_val='LAY USER/PATIENT'
)

matched_lf.select(pl.len()).collect().item()

## 5.3 정수형 변환
mdr_report_key를 Int32로 변환하여 메모리 최적화

In [None]:
def cast_to_int32(
    matched_lf: pl.LazyFrame,
    int_cols: str | list[str],
) -> pl.LazyFrame:
    """
    컬럼을 Int32로 안전하게 변환
    
    Args:
        matched_lf: LazyFrame
        int_cols: 변환할 컬럼명 또는 리스트
    
    Returns:
        Int32로 변환된 LazyFrame
    """
    if isinstance(int_cols, str):
        int_cols = [int_cols]
    
    schema = matched_lf.collect_schema()
    
    # 스키마에 존재하는 컬럼만 필터링
    valid_cols = [col for col in int_cols if col in schema]
    
    # 존재하지 않는 컬럼 경고
    for col in set(int_cols) - set(valid_cols):
        print(f"[WARN] Column '{col}' not found. Skipped.")
    
    if not valid_cols:
        return matched_lf
    
    return matched_lf.with_columns([
        pl.col(col).cast(pl.Int32, strict=False)
        for col in valid_cols
    ])

In [None]:
matched_lf = cast_to_int32(
    matched_lf, 'mdr_report_key'
)

matched_lf.select(pl.len()).collect().item()

# 6. 중복 사건 제거

**전략**: 같은 기기의 같은 날짜 중복 보고 제거
**어려움**: 단순 중복이 아닌 "동일 사건의 중복 보고" 판단 필요

In [None]:
# UDI 매칭 결과 컬럼명 정리 (_final 접미사 제거)
columns_with_final = [col for col in matched_lf.collect_schema().names() if '_final' in col]

matched_lf = matched_lf.with_columns([
    pl.col(col).alias(col.replace('_final', ''))
    for col in columns_with_final
]).drop(columns_with_final)

matched_lf.select(pl.len()).collect().item()

In [None]:
# 중복 판단 기준 컬럼
# 동일한 조합 = 같은 사건의 중복 보고로 간주
dedup_cols = [
    'report_number',  # 보고 번호
    'date_of_event',  # 사건 발생 날짜
    'manufacturer',  # 제조사
    'device_version_id',  # 기기 버전
    'lot_number',  # 로트 번호
    'udi_public'  # 공개 UDI
]

# Unknown/NA 패턴 정의
na_patterns = r'^None$|^UNK|NOT APPLICABLE|NOT REPORTED|^N/A$|^NA$|^$|^\s+$|^UNKNOWN$|^NI$|^NULL$'

In [None]:
# 중복 가능성이 있는 행 식별
# duplicate_cnt: 동일한 dedup_cols 조합을 가진 행의 개수
matched_lf_with_cnt = matched_lf.with_columns(
    pl.len().over(dedup_cols).alias('duplicate_cnt')
)

# 중복이 있는 행만 추출 (cnt >= 2)
# cnt가 1인 경우는 고유한 행이므로 처리 불필요
matched_lf_duplicates_only = matched_lf_with_cnt.filter(
    pl.col('duplicate_cnt') >= 2
)

duplicate_cnt = matched_lf_duplicates_only.select(pl.len()).collect().item()
print(f"중복 가능성이 있는 행: {duplicate_cnt:,}개")

unique_cnt = matched_lf.unique(subset=dedup_cols, maintain_order=True).select(pl.len()).collect().item()
print(f"고유한 조합 수: {unique_cnt:,}개")

matched_lf.select(pl.len()).collect().item()

In [None]:
def remove_na_values(matched_lf: pl.LazyFrame, dedup_cols, na_patterns, verbose=True):
    """
    NA/Unknown 값이 포함된 행 제거
    
    **이유**: NA 값으로 중복 판단 시 잘못된 매칭 가능성
    
    처리 과정:
    1. 각 dedup_col에 대해 유효값 조건 생성
       (null 아님 AND NA 패턴에 매칭 안됨)
    2. 모든 조건을 AND로 결합
    3. 필터 적용

    Args:
        matched_lf: LazyFrame
        dedup_cols: 체크할 컬럼 리스트
        na_patterns: NA/Unknown 패턴 정규식
        verbose: 진행상황 출력 여부

    Returns:
        NA 값이 제거된 LazyFrame
    """
    if verbose:
        print("NA/Unknown 값 제거")
        print(f"패턴: {na_patterns}")
    
    before_cnt = matched_lf.select(pl.len()).collect().item()
    if verbose:
        print(f"제거 전 행 개수: {before_cnt:,}개")
    
    # 각 컬럼별로 유효값 조건 생성
    conditions = []
    
    for col in dedup_cols:
        if col in matched_lf.collect_schema().names():
            cond = (
                pl.col(col).is_not_null()
                & ~pl.col(col).cast(pl.Utf8).str.to_uppercase().str.contains(na_patterns)
            )
            conditions.append(cond)
            
            if verbose:
                print(f"  '{col}': NA/Unknown 제거 조건 추가")
        else:
            if verbose:
                print(f"  '{col}': 컬럼 없음, 건너뜀")
    
    if not conditions:
        if verbose:
            print("제거할 조건 없음. 원본 반환")
        return matched_lf
    
    # 모든 조건을 AND로 결합
    final_condition = conditions[0]
    for cond in conditions[1:]:
        final_condition = final_condition & cond

    print('='*50)
    print(final_condition)
    
    # 필터 적용
    matched_lf_cleaned = matched_lf.filter(final_condition)
    
    after_cnt = matched_lf_cleaned.select(pl.len()).collect().item()
    removed_cnt = before_cnt - after_cnt
    
    if verbose:
        print(f"제거 후 행 개수: {after_cnt:,}개")
        print(f"제거된 행 개수: {removed_cnt:,}개")
    
    return matched_lf_cleaned

In [None]:
def analyze_duplicates(matched_lf, group_cols, verbose=True):
    """
    중복 데이터 분석
    
    처리 과정:
    1. 전체 행 개수 확인
    2. 고유(unique) 조합 개수 확인
    3. 중복 개수 = 전체 - 고유

    Args:
        matched_lf: LazyFrame
        group_cols: 중복 판단 기준 컬럼
        verbose: 진행상황 출력 여부

    Returns:
        tuple: (전체 개수, 고유 개수, 중복 개수)
    """
    if verbose:
        print("중복 분석")

    total_cnt = matched_lf.select(pl.len()).collect().item()
    unique_cnt = matched_lf.unique(
        subset=group_cols,
        maintain_order=True
    ).select(pl.len()).collect().item()
    
    duplicate_cnt = total_cnt - unique_cnt

    if verbose:
        print(f"전체 개수: {total_cnt:,}개")
        print(f"고유 개수: {unique_cnt:,}개")
        print(f"중복 개수: {duplicate_cnt:,}개")
        print("\n중복 판단 컬럼:")
        for i, col in enumerate(group_cols, start=1):
            print(f"{i}. {col}")

    return total_cnt, unique_cnt, duplicate_cnt

In [None]:
def remove_duplicates(matched_lf, dedup_cols, keep='first', verbose=True):
    """
    중복 데이터 제거
    
    처리 과정:
    1. dedup_cols 기준으로 중복 판단
    2. keep 옵션에 따라 첫번째/마지막 행 유지
    3. 중복 제거된 DataFrame 반환

    Args:
        matched_lf: LazyFrame
        dedup_cols: 중복 판단 컬럼 리스트
        keep: 'first' 또는 'last'
            - 'first': 첫번째 행 유지
            - 'last': 마지막 행 유지
        verbose: 진행상황 출력 여부

    Returns:
        중복이 제거된 LazyFrame
    """
    if verbose:
        print("중복 제거 시작")
        print(f"중복 판단 컬럼: {dedup_cols}")
        print(f"유지 옵션: {keep}")

    matched_lf_deduped = matched_lf.unique(
        subset=dedup_cols,
        maintain_order=True,
        keep='first'
    )

    if verbose:
        before_cnt = matched_lf.select(pl.len()).collect().item()
        after_cnt = matched_lf_deduped.select(pl.len()).collect().item()
        removed_cnt = before_cnt - after_cnt

        print(f"제거 전 행 개수: {before_cnt:,}개")
        print(f"제거 후 행 개수: {after_cnt:,}개")
        print(f"제거된 행 개수: {removed_cnt:,}개")

    return matched_lf_deduped

In [None]:
# 중복 가능성 있는 행에서 NA 값 제거
matched_lf_cleaned = remove_na_values(matched_lf_duplicates_only, dedup_cols, na_patterns)

matched_lf_cleaned.select(pl.len()).collect().item()

In [None]:
# 중복 분석
total, unique, duplicate = analyze_duplicates(matched_lf_cleaned, dedup_cols)
pprint((total, unique, duplicate))

matched_lf_cleaned.select(pl.len()).collect().item()

In [None]:
# 중복 제거 (첫번째 행만 유지)
remove = remove_duplicates(matched_lf_cleaned, dedup_cols, keep='first')

remove.select(pl.len()).collect().item()

In [None]:
remove.select(
    pl.col('combined_mdr_text').n_unique()
).head().collect()

In [None]:
matched_lf.select(pl.col('mdr_report_key').n_unique()).head().collect()

In [None]:
# Anti Join: 중복으로 판정된 행을 원본에서 제거
# (remove에 있는 mdr_report_key를 matched_lf에서 제외)
result_lf = matched_lf.join(
    remove,
    on='mdr_report_key',
    how='anti'  # 매칭되지 않는 행만 유지
)

display(result_lf.select(pl.col('mdr_report_key').n_unique()).head().collect().item())
remove.select(pl.len()).collect().item(), result_lf.select(pl.len()).collect().item(), matched_lf.select(pl.len()).collect().item()

In [None]:
matched_lf = result_lf.clone()

# 7. Categorical 인코딩

**목적**: 환자 outcome을 One-Hot Encoding으로 변환

In [None]:
def sequence_number_outcome_clean(matched_lf, col_name):
    """
    환자 outcome을 One-Hot Encoding으로 변환
    
    Outcome 코드:
    - L: Life Threatening (생명 위협)
    - H: Hospitalization (입원)
    - S: Disability (장애)
    - C: Congenital Anomaly (선천적 이상)
    - R: Required Intervention (개입 필요)
    - D: Death (사망)
    - O: Other/Unknown/Invalid (기타/알 수 없음/유효하지 않음)
    
    Args:
        matched_lf: LazyFrame
        col_name: outcome 컬럼명
    
    Returns:
        outcome_L, outcome_H, ... 컬럼이 추가된 LazyFrame
    """
    outcome_mapping = {
        'Life Threatening': 'L',
        'Hospitalization': 'H',
        'Disability': 'S',
        'Congenital Anomaly': 'C',
        'Required Intervention': 'R',
        'Death': 'D',
        'Other': 'O',
        'Invalid Data': 'O',
        'Unknown': 'O',
        'No Information': 'O',
        'Not Applicable': 'O',
    }
    
    all_outcomes = ['L', 'H', 'S', 'C', 'R', 'D', 'O']
    
    # 1. 문자열 파싱 (리스트 형태의 문자열 → 실제 리스트)
    result = matched_lf.with_columns(
        pl.col(col_name)
        .str.replace_all(r'^\[|\]$', "")  # 대괄호 제거
        .str.replace_all(r"'", "")  # 따옴표 제거
        .str.split(",")
        .list.eval(pl.element().str.strip_chars())
        .alias("_outcome_list")
    )
    
    # 2. 매핑 적용
    for key, value in outcome_mapping.items():
        result = result.with_columns(
            pl.col("_outcome_list")
            .list.eval(pl.element().str.replace(key, value))
            .alias("_outcome_list")
        )
    
    # 3. One-Hot Encoding
    for outcome in all_outcomes:
        result = result.with_columns(
            pl.col("_outcome_list")
            .list.contains(outcome)
            .alias(f"outcome_{outcome}")
        )
    
    result = result.drop("_outcome_list", col_name)
    
    return result

In [None]:
# Outcome One-Hot Encoding 적용
matched_lf = sequence_number_outcome_clean(matched_lf, "patient_0_sequence_number_outcome")

display(matched_lf.select(pl.col('mdr_report_key').n_unique()).head().collect().item())
display(matched_lf.select(pl.len()).collect().item())

matched_lf.select(['outcome_L', 'outcome_H', 'outcome_S', 'outcome_C', 'outcome_R', 'outcome_D', 'outcome_O']).head().collect()

# 8. 데이터 품질 검증

**목적**: 수기 입력 데이터의 논리적 일관성 검증 및 불일치 데이터 제거

## 8.1 Adverse Event Flag 검증
**문제**: 수기 입력된 adverse_event_flag의 신뢰도 낮음
**해결**: event_type으로 논리적 검증 후 불일치 데이터 제거

In [None]:
# event_type 기반 논리적 adverse_event_flag 생성
# Death 또는 Injury → True (부작용 발생)
# 기타 → False
matched_lf = matched_lf.with_columns(
    pl.when(pl.col("event_type").is_in(["Death", "Injury"]))
      .then(True)
      .otherwise(False)
      .alias("adverse_event_flag_logic")
)

display(matched_lf.select(pl.col('mdr_report_key').n_unique()).head().collect().item())
display(matched_lf.select(pl.len()).collect().item())

In [None]:
# 논리적 검증 결과 확인

cols = ["event_type", "adverse_event_flag", "adverse_event_flag_logic"]

# 1) True 샘플 (Death/Injury 사건)
true_df = (
    matched_lf.filter(pl.col("adverse_event_flag_logic") == True)
      .select(cols)
      .collect()
)
true_samples = true_df.sample(n=min(20, true_df.height), seed=42)

# 2) False 샘플 (Malfunction 등)
false_df = (
    matched_lf.filter(pl.col("adverse_event_flag_logic") == False)
      .select(cols)
      .collect()
)
false_samples = false_df.sample(n=min(20, false_df.height), seed=42)

# 3) 충돌 샘플 (원본 flag와 논리 파생값이 다른 경우)
conflict_df = (
    matched_lf.filter(
        pl.col("adverse_event_flag").is_not_null()
        & (pl.col("adverse_event_flag") != pl.col("adverse_event_flag_logic"))
    )
    .select(cols)
    .collect()
)
conflict_samples = conflict_df.sample(n=min(20, conflict_df.height), seed=42)


display(matched_lf.select(pl.col('mdr_report_key').n_unique()).head().collect().item())
display(matched_lf.select(pl.len()).collect().item())

# 일치하는 데이터만 유지 (불일치 데이터 제거)
logical_lf = matched_lf.filter(
    pl.col("adverse_event_flag").eq(pl.col("adverse_event_flag_logic"))
)

display(logical_lf.select(pl.col('mdr_report_key').n_unique()).head().collect().item())
display(logical_lf.select(pl.len()).collect().item())

true_samples, false_samples.to_pandas(), conflict_samples.to_pandas()

## 8.2 날짜 순서 검증
**논리적 순서**: 제조일 < 사건일 ≤ 보고일
불가능한 순서의 데이터 제거

In [None]:
# 날짜 순서가 논리적으로 타당한 행만 유지
logical_lf = logical_lf.filter(
    pl.col('device_date_of_manufacturer').lt(pl.col('date_of_event')),  # 제조일 < 사건일
    pl.col('date_of_event').le(pl.col('date_received'))  # 사건일 ≤ 보고일
)

In [None]:
display(logical_lf.select(pl.len()).collect().item())

display(
    logical_lf.select(
        pl.col(['combined_mdr_text', 'product_problems']).n_unique()
    ).head().collect()
)

logical_lf.group_by(['combined_mdr_text', 'product_problems']).agg(
    pl.col('mdr_report_key').n_unique()
).select(pl.len()).head().collect().item()

# 9. 최종 데이터 준비 및 저장

## 9.1 컬럼명 정리 및 재구성

In [None]:
# mdr_report_key 기준 정렬
logical_lf = logical_lf.sort('mdr_report_key')

In [None]:
# 컬럼명을 더 명확하고 일관성 있게 변경

rename_map = {
    'patient_0_patient_age': 'patient_age',
    'device_0_device_operator': 'operator',
    'device_0_device_report_product_code': 'product_code',
    'device_0_openfda_device_name': 'product_name',
    'combined_mdr_text': 'mdr_text',
    'device_version_id': 'udi_di',
    'previous_use_code': 'previous_use_flag',
    'date_of_event': 'date_occurred',
    'device_date_of_manufacturer': 'date_manufactured',
    'brand': 'brand_name',
    'manufacturer': 'manufacturer_name'
}

logical_lf = logical_lf.drop('udi_di').rename(rename_map)

In [None]:
# 최종 저장할 컬럼 정의

# 기본 정보
BASE_COLS = [
    'mdr_report_key',
    'adverse_event_flag',
    'product_problem_flag', 
    'date_occurred',
    'date_received', 
    'date_manufactured', 
    'event_type',
    'previous_use_flag', 
    'single_use_flag', 
    'reprocessed_and_reused_flag',
    'product_problems'
]

# 의료기기 정보
DEVICE_COLS = [
    "manufacturer_name",
    "brand_name",
    "model_number",
    "udi_di",
    "product_code",
    "operator",
    "product_name",
]

# 환자 정보
PATIENT_COLS = [
    "patient_age",
]

# MDR 텍스트
MDR_TEXT_COLS = [
    'mdr_text'
]

# Outcome (One-Hot Encoded)
OUTCOME_PATTERNS = [r'^outcome']
OUTCOME_COLS = get_pattern_cols(logical_lf, OUTCOME_PATTERNS)

# 전체 컬럼
TOTAL_COLS = BASE_COLS + DEVICE_COLS + PATIENT_COLS + MDR_TEXT_COLS + OUTCOME_COLS

In [None]:
# 최종 컬럼만 선택
final_lf = logical_lf.select(TOTAL_COLS)

# Null 값 분포 분석
final_cols = final_lf.collect_schema().names()
analyze_null_values(final_lf, final_cols)

In [None]:
final_path = DATA_DIR / 'silver' / 'maude_preprocess.parquet'

In [None]:
# 전처리 완료된 데이터를 Parquet 형식으로 저장
# - compression='zstd': 압축 알고리즘 (빠른 속도 + 좋은 압축률)
# - compression_level=3: 압축 레벨 (1-22, 3은 속도와 압축률 균형)
final_lf.sink_parquet(final_path, compression='zstd', compression_level=3, mkdir=True)