In [4]:
# 원본 데이터셋 다운로드

# 필요한 라이브러리 불러오기
from typing import List, Union
import requests
import zipfile
import io
import json
import csv
import pandas as pd

SEARCH_URL = 'https://api.fda.gov/download.json'


# 원하는 년도의 데이터 URL 검색
def search_download_url(start: Union[str|int], end: Union[str|int]) -> List[str]:
    response = requests.get(SEARCH_URL).json()
    event = response['results']['device']['event']['partitions']
    event_df = pd.DataFrame(event)
    event_df.loc[:, 'Year'] = event_df.loc[:, 'display_name'].str.split().str[0]
    
    cond = True
    cond &= event_df['Year'] >= str(start)
    cond &= event_df['Year'] <= str(end)
    
    urls = event_df.loc[cond, 'file'].values
    return urls

start, end = 2020, 2024
urls = search_download_url(start, end)
print(urls, len(urls))

['https://download.open.fda.gov/device/event/2020q3/device-event-0001-of-0005.json.zip'
 'https://download.open.fda.gov/device/event/2020q3/device-event-0002-of-0005.json.zip'
 'https://download.open.fda.gov/device/event/2020q3/device-event-0003-of-0005.json.zip'
 'https://download.open.fda.gov/device/event/2020q3/device-event-0004-of-0005.json.zip'
 'https://download.open.fda.gov/device/event/2020q3/device-event-0005-of-0005.json.zip'
 'https://download.open.fda.gov/device/event/2023q3/device-event-0001-of-0007.json.zip'
 'https://download.open.fda.gov/device/event/2023q3/device-event-0002-of-0007.json.zip'
 'https://download.open.fda.gov/device/event/2023q3/device-event-0003-of-0007.json.zip'
 'https://download.open.fda.gov/device/event/2023q3/device-event-0004-of-0007.json.zip'
 'https://download.open.fda.gov/device/event/2023q3/device-event-0005-of-0007.json.zip'
 'https://download.open.fda.gov/device/event/2023q3/device-event-0006-of-0007.json.zip'
 'https://download.open.fda.gov/

In [None]:
# 원하는 년도의 데이터 URL 검색
def search_download_url(start: Union[str|int], end: Union[str|int]) -> List[str]:
    response = requests.get(SEARCH_URL).json()
    event = response['results']['device']['event']['partitions']
    event_df = pd.DataFrame(event)
    event_df.loc[:, 'Year'] = event_df.loc[:, 'display_name'].str.split().str[0]
    
    cond = True
    cond &= event_df['Year'] >= str(start)
    cond &= event_df['Year'] <= str(end)
    
    urls = event_df.loc[cond, 'file'].values
    return urls

def unzip_json(url: str) -> dict:
    # 1) ZIP 파일을 메모리로 다운로드
    response = requests.get(url)
    zip_bytes = io.BytesIO(response.content)

    # 2) 메모리에서 ZIP 열기
    with zipfile.ZipFile(zip_bytes) as z:
        # ZIP 안의 JSON 파일 선택 (첫 번째 JSON 파일 예)
        json_filename = [name for name in z.namelist() if name.endswith(".json")][0]

        # 3) JSON 데이터 읽기 (역시 메모리에서 동작)
        with z.open(json_filename) as json_file:
            data = json.load(json_file)
    
    return data 


def collect_json_files(urls: List[str]) -> dict:
    collection = {}
    for url in urls:
        data = unzip_json(url)
        collection.update(data)
    return collection

In [None]:
import requests
import zipfile
import io
import json
import csv

# 1) ZIP 파일을 메모리로 다운로드
url = "https://download.open.fda.gov/device/event/2025q3/device-event-0007-of-0009.json.zip"
response = requests.get(url)
zip_bytes = io.BytesIO(response.content)

# 2) 메모리에서 ZIP 열기
with zipfile.ZipFile(zip_bytes) as z:
    # ZIP 안의 JSON 파일 선택 (첫 번째 JSON 파일 예)
    json_filename = [name for name in z.namelist() if name.endswith(".json")][0]

    # 3) JSON 데이터 읽기 (역시 메모리에서 동작)
    with z.open(json_filename) as json_file:
        data = json.load(json_file)

# # 4) JSON → CSV 변환 (로컬 JSON 저장 X)
# csv_filename = "output.csv"

# # JSON이 리스트 형태라고 가정
# with open(csv_filename, "w", newline="", encoding="utf-8") as f:
#     writer = csv.DictWriter(f, fieldnames=data[0].keys())
#     writer.writeheader()
#     writer.writerows(data)

# print("CSV 저장 완료:", csv_filename)


[{'manufacturer_contact_zip_ext': '',
  'manufacturer_g1_address_2': '',
  'event_location': '',
  'report_to_fda': '',
  'manufacturer_contact_t_name': 'MS',
  'manufacturer_contact_state': 'CA',
  'manufacturer_link_flag': 'Y',
  'manufacturer_contact_address_2': '',
  'manufacturer_g1_city': '',
  'manufacturer_contact_address_1': '6340 SEQUENCE DR.',
  'manufacturer_contact_pcity': '85820002',
  'event_type': 'Malfunction',
  'report_number': '3004753838-2025-233519',
  'type_of_report': ['Initial submission'],
  'product_problem_flag': 'Y',
  'date_received': '20250822',
  'manufacturer_address_2': '',
  'pma_pmn_number': 'DEN170088',
  'date_of_event': '20250622',
  'reprocessed_and_reused_flag': 'N',
  'manufacturer_address_1': '',
  'exemption_number': '',
  'manufacturer_contact_zip_code': '92121',
  'reporter_occupation_code': 'OTHER',
  'manufacturer_contact_plocal': '8582000200',
  'noe_summarized': '1',
  'manufacturer_contact_l_name': 'SPOTO',
  'source_type': ['Foreign',

In [30]:
import ijson
import pyarrow as pa
import pyarrow.parquet as pq
from collections import defaultdict

def infer_unified_schema(records):
    """모든 레코드에서 통합 스키마 추론"""
    # 샘플 레코드들로 스키마 생성
    temp_table = pa.Table.from_pylist(records[:min(1000, len(records))])
    return temp_table.schema

def normalize_record_to_schema(record, schema):
    """레코드를 스키마에 맞게 정규화"""
    normalized = {}
    for field in schema:
        normalized[field.name] = record.get(field.name, None)
    return normalized

def stream_json_to_parquet(json_file, parquet_file):
    """JSON 파일을 청크 단위로 읽어서 Parquet로 변환"""
    
    records = []
    writer = None
    schema = None
    chunk_size = 10000
    
    print("JSON 파일 읽는 중...")
    with open(json_file, 'rb') as f:
        parser = ijson.items(f, 'results.item')
        
        for i, record in enumerate(parser):
            record = clean_empty_arrays(record)
            records.append(record)
            
            # 첫 청크에서 스키마 생성
            if len(records) == chunk_size and schema is None:
                schema = infer_unified_schema(records)
                writer = pq.ParquetWriter(parquet_file, schema, compression='zstd')
                print(f"스키마 생성 완료: {len(schema)} 필드")
            
            # 청크 단위로 쓰기
            if len(records) >= chunk_size and writer is not None:
                # 스키마에 맞게 정규화
                normalized_records = [normalize_record_to_schema(r, schema) for r in records]
                table = pa.Table.from_pylist(normalized_records)
                writer.write_table(table)
                print(f"{i + 1:,}개 레코드 처리 중...")
                records = []
    
    # 남은 레코드 처리
    if records:
        if writer is None:
            # 전체가 한 청크보다 작은 경우
            table = pa.Table.from_pylist(records)
            pq.write_table(table, parquet_file, compression='zstd')
        else:
            normalized_records = [normalize_record_to_schema(r, schema) for r in records]
            table = pa.Table.from_pylist(normalized_records)
            writer.write_table(table)
        print(f"마지막 {len(records):,}개 레코드 처리")
    
    if writer:
        writer.close()
    
    print(f"완료! Parquet 파일 생성: {parquet_file}")

def clean_empty_arrays(obj):
    """빈 문자열만 있는 배열을 None으로 변환"""
    if isinstance(obj, dict):
        return {k: clean_empty_arrays(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        if obj == [""]:
            return None
        return [clean_empty_arrays(item) for item in obj]
    elif obj == "":
        return None
    return obj

# 사용
stream_json_to_parquet('device-event-0009-of-0009 2.json', 'output.parquet')

JSON 파일 읽는 중...
스키마 생성 완료: 84 필드
10,000개 레코드 처리 중...
20,000개 레코드 처리 중...
30,000개 레코드 처리 중...
40,000개 레코드 처리 중...
50,000개 레코드 처리 중...
마지막 9,444개 레코드 처리
완료! Parquet 파일 생성: output.parquet


In [26]:
import json
import pyarrow as pa
import pyarrow.parquet as pq
from collections import defaultdict

def flatten_dict(nested_dict, parent_key='', sep='_'):
    """
    중첩된 딕셔너리를 평탄화
    배열은 인덱스를 붙여서 처리
    """
    items = []
    
    for k, v in nested_dict.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        
        if isinstance(v, dict):
            # 딕셔너리는 재귀적으로 평탄화
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        elif isinstance(v, list) and v and isinstance(v[0], dict):
            # 딕셔너리 배열은 인덱스를 붙여서 평탄화
            for i, item in enumerate(v):
                items.extend(flatten_dict(item, f"{new_key}_{i}", sep=sep).items())
        else:
            # 기본 값이나 단순 배열은 그대로
            items.append((new_key, v))
    
    return dict(items)

def clean_empty_arrays(obj):
    """빈 문자열만 있는 배열을 None으로 변환"""
    if isinstance(obj, dict):
        return {k: clean_empty_arrays(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        if obj == [""]:
            return None
        return [clean_empty_arrays(item) for item in obj]
    elif obj == "":
        return None
    return obj

def collect_all_columns(records):
    """모든 레코드에서 등장하는 모든 컬럼 수집"""
    all_columns = set()
    for record in records:
        all_columns.update(record.keys())
    return sorted(all_columns)

def normalize_record(record, all_columns):
    """레코드를 통일된 컬럼 구조로 정규화"""
    normalized = {}
    for col in all_columns:
        normalized[col] = record.get(col, None)
    return normalized

print("JSON 파일 읽는 중...")
with open('device-event-0009-of-0009 2.json', 'r') as f:
    data = json.load(f)

print(f"{len(data['results']):,}개 레코드 발견. 정리 및 평탄화 중...")

# 정리 및 평탄화
flattened_records = []
for i, record in enumerate(data['results']):
    cleaned = clean_empty_arrays(record)
    flattened = flatten_dict(cleaned)
    flattened_records.append(flattened)
    
    if (i + 1) % 1000 == 0:
        print(f"{i + 1:,}개 레코드 평탄화 완료...")

print(f"\n총 {len(flattened_records):,}개 레코드 평탄화 완료")

# 모든 컬럼 수집
print("\n모든 컬럼 수집 중...")
all_columns = collect_all_columns(flattened_records)
print(f"총 {len(all_columns):,}개의 고유한 컬럼 발견")

# 레코드 정규화
print("\n레코드 정규화 중...")
normalized_records = []
for i, record in enumerate(flattened_records):
    normalized = normalize_record(record, all_columns)
    normalized_records.append(normalized)
    
    if (i + 1) % 1000 == 0:
        print(f"{i + 1:,}개 레코드 정규화 완료...")

print(f"\n컬럼 샘플 (처음 20개):")
print(all_columns[:20])

# Parquet로 저장
print("\nParquet로 변환 중...")
table = pa.Table.from_pylist(normalized_records)
pq.write_table(table, 'sample.parquet', compression='zstd')

print(f"\n완료! {len(normalized_records):,}개 레코드, {len(table.schema):,}개 컬럼 저장")

# 통계 출력
print("\n=== 컬럼 통계 ===")
print(f"전체 컬럼 수: {len(all_columns):,}")
print(f"\n컬럼 목록을 columns.txt에 저장...")
with open('columns.txt', 'w', encoding='utf-8') as f:
    for col in all_columns:
        f.write(f"{col}\n")
print("완료!")

JSON 파일 읽는 중...
59,444개 레코드 발견. 정리 및 평탄화 중...
1,000개 레코드 평탄화 완료...
2,000개 레코드 평탄화 완료...
3,000개 레코드 평탄화 완료...
4,000개 레코드 평탄화 완료...
5,000개 레코드 평탄화 완료...
6,000개 레코드 평탄화 완료...
7,000개 레코드 평탄화 완료...
8,000개 레코드 평탄화 완료...
9,000개 레코드 평탄화 완료...
10,000개 레코드 평탄화 완료...
11,000개 레코드 평탄화 완료...
12,000개 레코드 평탄화 완료...
13,000개 레코드 평탄화 완료...
14,000개 레코드 평탄화 완료...
15,000개 레코드 평탄화 완료...
16,000개 레코드 평탄화 완료...
17,000개 레코드 평탄화 완료...
18,000개 레코드 평탄화 완료...
19,000개 레코드 평탄화 완료...
20,000개 레코드 평탄화 완료...
21,000개 레코드 평탄화 완료...
22,000개 레코드 평탄화 완료...
23,000개 레코드 평탄화 완료...
24,000개 레코드 평탄화 완료...
25,000개 레코드 평탄화 완료...
26,000개 레코드 평탄화 완료...
27,000개 레코드 평탄화 완료...
28,000개 레코드 평탄화 완료...
29,000개 레코드 평탄화 완료...
30,000개 레코드 평탄화 완료...
31,000개 레코드 평탄화 완료...
32,000개 레코드 평탄화 완료...
33,000개 레코드 평탄화 완료...
34,000개 레코드 평탄화 완료...
35,000개 레코드 평탄화 완료...
36,000개 레코드 평탄화 완료...
37,000개 레코드 평탄화 완료...
38,000개 레코드 평탄화 완료...
39,000개 레코드 평탄화 완료...
40,000개 레코드 평탄화 완료...
41,000개 레코드 평탄화 완료...
42,000개 레코드 평탄화 완료...
43,000개 레코드 평탄화 완료...
44,000개 레코드 평탄화 완

In [None]:
def flatten_dict(nested_dict, parent_key='', sep='_'):
    """중첩된 딕셔너리를 평탄화"""
    items = []
    
    for k, v in nested_dict.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        elif isinstance(v, list) and v and isinstance(v[0], dict):
            for i, item in enumerate(v):
                items.extend(flatten_dict(item, f"{new_key}_{i}", sep=sep).items())
        else:
            items.append((new_key, v))
    
    return dict(items)

def clean_empty_arrays(obj):
    """빈 문자열만 있는 배열을 None으로 변환"""
    if isinstance(obj, dict):
        return {k: clean_empty_arrays(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        if obj == [""]:
            return None
        return [clean_empty_arrays(item) for item in obj]
    elif obj == "":
        return None
    return obj

def collect_all_columns_from_list(records_list):
    """Pass 1: 레코드 리스트에서 모든 컬럼 수집"""
    print("=== Pass 1: 모든 컬럼 수집 중 ===")
    all_columns = set()
    
    for i, record in enumerate(records_list):
        cleaned = clean_empty_arrays(record)
        flattened = flatten_dict(cleaned)
        all_columns.update(flattened.keys())
        
        if (i + 1) % 1000 == 0:
            print(f"{i + 1:,}개 레코드 스캔, {len(all_columns):,}개 컬럼 발견...")
    
    print(f"\n총 {len(records_list):,}개 레코드, {len(all_columns):,}개 고유 컬럼 발견")
    return sorted(all_columns)

def normalize_record(record, all_columns):
    """레코드를 통일된 컬럼 구조로 정규화"""
    normalized = {}
    for col in all_columns:
        normalized[col] = record.get(col, None)
    return normalized

def dict_to_parquet_streaming(records_generator, parquet_file, chunk_size=5000):
    # Pass 1: 모든 컬럼 수집
    print("=== Pass 1: 모든 컬럼 수집 및 타입 샘플링 중 ===")
    all_columns = set()
    temp_records = []
    record_count = 0
    
    for record in records_generator:
        cleaned = clean_empty_arrays(record)
        flattened = flatten_dict(cleaned)
        all_columns.update(flattened.keys())
        temp_records.append(record)
        record_count += 1
        
        if record_count % 1000 == 0:
            print(f"{record_count:,}개 레코드 스캔...")
    
    all_columns = sorted(all_columns)
    print(f"\n총 {record_count:,}개 레코드, {len(all_columns):,}개 컬럼 발견")
    
    # 스키마 정의: 모든 컬럼을 string으로 통일 (가장 안전)
    schema = pa.schema([(col, pa.string()) for col in all_columns])
    print(f"스키마 생성 완료: {len(schema)} 컬럼")
    
    # Pass 2: 변환
    print("\n=== Pass 2: Parquet 변환 중 ===")
    records_buffer = []
    writer = pq.ParquetWriter(parquet_file, schema, compression='zstd')  # 미리 writer 생성
    total_processed = 0
    
    for record in temp_records:
        cleaned = clean_empty_arrays(record)
        flattened = flatten_dict(cleaned)
        normalized = normalize_record(flattened, all_columns)
        
        # 모든 값을 문자열로 변환 (None은 유지)
        normalized = {k: (str(v) if v is not None else None) for k, v in normalized.items()}
        records_buffer.append(normalized)
        
        if len(records_buffer) >= chunk_size:
            table = pa.Table.from_pylist(records_buffer, schema=schema)  # 스키마 명시
            writer.write_table(table)
            total_processed += len(records_buffer)
            print(f"{total_processed:,}개 처리...")
            records_buffer = []
    
    # 남은 레코드
    if records_buffer:
        table = pa.Table.from_pylist(records_buffer, schema=schema)
        writer.write_table(table)
        total_processed += len(records_buffer)
    
    writer.close()
    print(f"\n완료! {total_processed:,}개 레코드 저장")

# 사용 예시
def record_generator(data_dict):
    """dict에서 레코드를 하나씩 yield"""
    for record in data_dict['results']:
        yield record


=== Pass 1: 모든 컬럼 수집 중 ===
1,000개 레코드 스캔, 163개 컬럼 발견...
2,000개 레코드 스캔, 163개 컬럼 발견...
3,000개 레코드 스캔, 163개 컬럼 발견...
4,000개 레코드 스캔, 163개 컬럼 발견...
5,000개 레코드 스캔, 163개 컬럼 발견...
6,000개 레코드 스캔, 163개 컬럼 발견...
7,000개 레코드 스캔, 163개 컬럼 발견...
8,000개 레코드 스캔, 163개 컬럼 발견...
9,000개 레코드 스캔, 167개 컬럼 발견...
10,000개 레코드 스캔, 167개 컬럼 발견...
11,000개 레코드 스캔, 167개 컬럼 발견...
12,000개 레코드 스캔, 167개 컬럼 발견...
13,000개 레코드 스캔, 167개 컬럼 발견...
14,000개 레코드 스캔, 167개 컬럼 발견...
15,000개 레코드 스캔, 167개 컬럼 발견...
16,000개 레코드 스캔, 167개 컬럼 발견...
17,000개 레코드 스캔, 167개 컬럼 발견...
18,000개 레코드 스캔, 167개 컬럼 발견...
19,000개 레코드 스캔, 167개 컬럼 발견...
20,000개 레코드 스캔, 167개 컬럼 발견...
21,000개 레코드 스캔, 167개 컬럼 발견...
22,000개 레코드 스캔, 167개 컬럼 발견...
23,000개 레코드 스캔, 167개 컬럼 발견...
24,000개 레코드 스캔, 167개 컬럼 발견...
25,000개 레코드 스캔, 167개 컬럼 발견...
26,000개 레코드 스캔, 167개 컬럼 발견...
27,000개 레코드 스캔, 167개 컬럼 발견...
28,000개 레코드 스캔, 167개 컬럼 발견...
29,000개 레코드 스캔, 167개 컬럼 발견...
30,000개 레코드 스캔, 167개 컬럼 발견...
31,000개 레코드 스캔, 167개 컬럼 발견...
32,000개 레코드 스캔, 167개 컬럼 발견...
33,000개 레코드 스캔, 179개 컬

ValueError: Table schema does not match schema used to create file: 
table:
adverse_event_flag: string
date_added: string
date_changed: string
date_facility_aware: string
date_manufacturer_received: string
date_of_event: string
date_received: string
date_report: string
date_report_to_fda: string
date_report_to_manufacturer: string
device: null
device_0_brand_name: string
device_0_catalog_number: string
device_0_combination_product_flag: string
device_0_date_received: string
device_0_date_removed_flag: null
device_0_date_returned_to_manufacturer: string
device_0_device_age_text: string
device_0_device_availability: string
device_0_device_evaluated_by_manufacturer: string
device_0_device_event_key: null
device_0_device_operator: string
device_0_device_report_product_code: string
device_0_device_sequence_number: string
device_0_expiration_date_of_device: string
device_0_generic_name: string
device_0_implant_flag: null
device_0_lot_number: string
device_0_manufacturer_d_address_1: string
device_0_manufacturer_d_address_2: string
device_0_manufacturer_d_city: string
device_0_manufacturer_d_country: string
device_0_manufacturer_d_name: string
device_0_manufacturer_d_postal_code: string
device_0_manufacturer_d_state: string
device_0_manufacturer_d_zip_code: string
device_0_manufacturer_d_zip_code_ext: string
device_0_model_number: string
device_0_openfda_device_class: string
device_0_openfda_device_name: string
device_0_openfda_fei_number: list<item: string>
  child 0, item: string
device_0_openfda_medical_specialty_description: string
device_0_openfda_registration_number: list<item: string>
  child 0, item: string
device_0_openfda_regulation_number: string
device_0_other_id_number: null
device_0_udi_di: string
device_0_udi_public: string
device_date_of_manufacturer: string
distributor_address_1: null
distributor_address_2: null
distributor_city: null
distributor_name: null
distributor_state: null
distributor_zip_code: null
distributor_zip_code_ext: null
event_key: null
event_location: string
event_type: string
exemption_number: string
health_professional: string
initial_report_to_fda: string
manufacturer_address_1: null
manufacturer_address_2: null
manufacturer_city: null
manufacturer_contact_address_1: string
manufacturer_contact_address_2: string
manufacturer_contact_area_code: string
manufacturer_contact_city: string
manufacturer_contact_country: string
manufacturer_contact_exchange: string
manufacturer_contact_extension: string
manufacturer_contact_f_name: string
manufacturer_contact_l_name: string
manufacturer_contact_pcity: string
manufacturer_contact_pcountry: string
manufacturer_contact_phone_number: string
manufacturer_contact_plocal: string
manufacturer_contact_postal_code: string
manufacturer_contact_state: string
manufacturer_contact_t_name: string
manufacturer_contact_zip_code: string
manufacturer_contact_zip_ext: string
manufacturer_country: null
manufacturer_g1_address_1: string
manufacturer_g1_address_2: string
manufacturer_g1_city: string
manufacturer_g1_country: string
manufacturer_g1_name: string
manufacturer_g1_postal_code: string
manufacturer_g1_state: string
manufacturer_g1_zip_code: string
manufacturer_g1_zip_code_ext: string
manufacturer_link_flag: string
manufacturer_name: null
manufacturer_postal_code: null
manufacturer_state: null
manufacturer_zip_code: null
manufacturer_zip_code_ext: null
mdr_report_key: string
mdr_text_0_mdr_text_key: string
mdr_text_0_patient_sequence_number: string
mdr_text_0_text: string
mdr_text_0_text_type_code: string
mdr_text_10_mdr_text_key: null
mdr_text_10_patient_sequence_number: null
mdr_text_10_text: null
mdr_text_10_text_type_code: null
mdr_text_11_mdr_text_key: null
mdr_text_11_patient_sequence_number: null
mdr_text_11_text: null
mdr_text_11_text_type_code: null
mdr_text_12_mdr_text_key: null
mdr_text_12_patient_sequence_number: null
mdr_text_12_text: null
mdr_text_12_text_type_code: null
mdr_text_13_mdr_text_key: null
mdr_text_13_patient_sequence_number: null
mdr_text_13_text: null
mdr_text_13_text_type_code: null
mdr_text_1_mdr_text_key: string
mdr_text_1_patient_sequence_number: string
mdr_text_1_text: string
mdr_text_1_text_type_code: string
mdr_text_2_mdr_text_key: string
mdr_text_2_patient_sequence_number: string
mdr_text_2_text: string
mdr_text_2_text_type_code: string
mdr_text_3_mdr_text_key: string
mdr_text_3_patient_sequence_number: string
mdr_text_3_text: string
mdr_text_3_text_type_code: string
mdr_text_4_mdr_text_key: string
mdr_text_4_patient_sequence_number: string
mdr_text_4_text: string
mdr_text_4_text_type_code: string
mdr_text_5_mdr_text_key: string
mdr_text_5_patient_sequence_number: string
mdr_text_5_text: string
mdr_text_5_text_type_code: string
mdr_text_6_mdr_text_key: string
mdr_text_6_patient_sequence_number: string
mdr_text_6_text: string
mdr_text_6_text_type_code: string
mdr_text_7_mdr_text_key: string
mdr_text_7_patient_sequence_number: string
mdr_text_7_text: string
mdr_text_7_text_type_code: string
mdr_text_8_mdr_text_key: string
mdr_text_8_patient_sequence_number: string
mdr_text_8_text: string
mdr_text_8_text_type_code: string
mdr_text_9_mdr_text_key: null
mdr_text_9_patient_sequence_number: null
mdr_text_9_text: null
mdr_text_9_text_type_code: null
noe_summarized: string
number_devices_in_event: null
number_patients_in_event: null
patient_0_date_received: string
patient_0_patient_age: string
patient_0_patient_ethnicity: string
patient_0_patient_problems: list<item: string>
  child 0, item: string
patient_0_patient_race: string
patient_0_patient_sequence_number: string
patient_0_patient_sex: string
patient_0_patient_weight: string
patient_0_sequence_number_outcome: list<item: string>
  child 0, item: string
patient_0_sequence_number_treatment: list<item: string>
  child 0, item: string
pma_pmn_number: string
previous_use_code: string
product_problem_flag: string
product_problems: list<item: string>
  child 0, item: string
remedial_action: list<item: string>
  child 0, item: string
removal_correction_number: string
report_date: string
report_number: string
report_source_code: string
report_to_fda: string
report_to_manufacturer: null
reporter_country_code: string
reporter_occupation_code: string
reprocessed_and_reused_flag: string
single_use_flag: string
source_type: list<item: string>
  child 0, item: string
summary_report_flag: string
suppl_dates_fda_received: string
suppl_dates_mfr_received: string
type_of_report: list<item: string>
  child 0, item: string vs. 
file:
adverse_event_flag: string
date_added: string
date_changed: string
date_facility_aware: string
date_manufacturer_received: string
date_of_event: string
date_received: string
date_report: string
date_report_to_fda: string
date_report_to_manufacturer: string
device: null
device_0_brand_name: string
device_0_catalog_number: string
device_0_combination_product_flag: string
device_0_date_received: string
device_0_date_removed_flag: null
device_0_date_returned_to_manufacturer: string
device_0_device_age_text: string
device_0_device_availability: string
device_0_device_evaluated_by_manufacturer: string
device_0_device_event_key: null
device_0_device_operator: string
device_0_device_report_product_code: string
device_0_device_sequence_number: string
device_0_expiration_date_of_device: string
device_0_generic_name: string
device_0_implant_flag: null
device_0_lot_number: string
device_0_manufacturer_d_address_1: string
device_0_manufacturer_d_address_2: string
device_0_manufacturer_d_city: string
device_0_manufacturer_d_country: string
device_0_manufacturer_d_name: string
device_0_manufacturer_d_postal_code: string
device_0_manufacturer_d_state: string
device_0_manufacturer_d_zip_code: string
device_0_manufacturer_d_zip_code_ext: string
device_0_model_number: string
device_0_openfda_device_class: string
device_0_openfda_device_name: string
device_0_openfda_fei_number: list<item: string>
  child 0, item: string
device_0_openfda_medical_specialty_description: string
device_0_openfda_registration_number: list<item: string>
  child 0, item: string
device_0_openfda_regulation_number: string
device_0_other_id_number: null
device_0_udi_di: string
device_0_udi_public: string
device_date_of_manufacturer: string
distributor_address_1: null
distributor_address_2: null
distributor_city: null
distributor_name: null
distributor_state: null
distributor_zip_code: null
distributor_zip_code_ext: null
event_key: null
event_location: string
event_type: string
exemption_number: string
health_professional: string
initial_report_to_fda: string
manufacturer_address_1: null
manufacturer_address_2: null
manufacturer_city: null
manufacturer_contact_address_1: string
manufacturer_contact_address_2: string
manufacturer_contact_area_code: string
manufacturer_contact_city: string
manufacturer_contact_country: string
manufacturer_contact_exchange: string
manufacturer_contact_extension: string
manufacturer_contact_f_name: string
manufacturer_contact_l_name: string
manufacturer_contact_pcity: string
manufacturer_contact_pcountry: string
manufacturer_contact_phone_number: string
manufacturer_contact_plocal: string
manufacturer_contact_postal_code: string
manufacturer_contact_state: string
manufacturer_contact_t_name: string
manufacturer_contact_zip_code: string
manufacturer_contact_zip_ext: string
manufacturer_country: null
manufacturer_g1_address_1: string
manufacturer_g1_address_2: string
manufacturer_g1_city: string
manufacturer_g1_country: string
manufacturer_g1_name: string
manufacturer_g1_postal_code: string
manufacturer_g1_state: string
manufacturer_g1_zip_code: string
manufacturer_g1_zip_code_ext: string
manufacturer_link_flag: string
manufacturer_name: null
manufacturer_postal_code: null
manufacturer_state: null
manufacturer_zip_code: null
manufacturer_zip_code_ext: null
mdr_report_key: string
mdr_text_0_mdr_text_key: string
mdr_text_0_patient_sequence_number: string
mdr_text_0_text: string
mdr_text_0_text_type_code: string
mdr_text_10_mdr_text_key: null
mdr_text_10_patient_sequence_number: null
mdr_text_10_text: null
mdr_text_10_text_type_code: null
mdr_text_11_mdr_text_key: null
mdr_text_11_patient_sequence_number: null
mdr_text_11_text: null
mdr_text_11_text_type_code: null
mdr_text_12_mdr_text_key: null
mdr_text_12_patient_sequence_number: null
mdr_text_12_text: null
mdr_text_12_text_type_code: null
mdr_text_13_mdr_text_key: null
mdr_text_13_patient_sequence_number: null
mdr_text_13_text: null
mdr_text_13_text_type_code: null
mdr_text_1_mdr_text_key: string
mdr_text_1_patient_sequence_number: string
mdr_text_1_text: string
mdr_text_1_text_type_code: string
mdr_text_2_mdr_text_key: string
mdr_text_2_patient_sequence_number: string
mdr_text_2_text: string
mdr_text_2_text_type_code: string
mdr_text_3_mdr_text_key: string
mdr_text_3_patient_sequence_number: string
mdr_text_3_text: string
mdr_text_3_text_type_code: string
mdr_text_4_mdr_text_key: string
mdr_text_4_patient_sequence_number: string
mdr_text_4_text: string
mdr_text_4_text_type_code: string
mdr_text_5_mdr_text_key: string
mdr_text_5_patient_sequence_number: string
mdr_text_5_text: string
mdr_text_5_text_type_code: string
mdr_text_6_mdr_text_key: string
mdr_text_6_patient_sequence_number: string
mdr_text_6_text: string
mdr_text_6_text_type_code: string
mdr_text_7_mdr_text_key: string
mdr_text_7_patient_sequence_number: string
mdr_text_7_text: string
mdr_text_7_text_type_code: string
mdr_text_8_mdr_text_key: null
mdr_text_8_patient_sequence_number: null
mdr_text_8_text: null
mdr_text_8_text_type_code: null
mdr_text_9_mdr_text_key: null
mdr_text_9_patient_sequence_number: null
mdr_text_9_text: null
mdr_text_9_text_type_code: null
noe_summarized: string
number_devices_in_event: null
number_patients_in_event: null
patient_0_date_received: string
patient_0_patient_age: string
patient_0_patient_ethnicity: string
patient_0_patient_problems: list<item: string>
  child 0, item: string
patient_0_patient_race: string
patient_0_patient_sequence_number: string
patient_0_patient_sex: string
patient_0_patient_weight: string
patient_0_sequence_number_outcome: list<item: string>
  child 0, item: string
patient_0_sequence_number_treatment: list<item: string>
  child 0, item: string
pma_pmn_number: string
previous_use_code: string
product_problem_flag: string
product_problems: list<item: string>
  child 0, item: string
remedial_action: list<item: string>
  child 0, item: string
removal_correction_number: string
report_date: string
report_number: string
report_source_code: string
report_to_fda: string
report_to_manufacturer: null
reporter_country_code: string
reporter_occupation_code: string
reprocessed_and_reused_flag: string
single_use_flag: string
source_type: list<item: string>
  child 0, item: string
summary_report_flag: string
suppl_dates_fda_received: string
suppl_dates_mfr_received: string
type_of_report: list<item: string>
  child 0, item: string

In [37]:
import pandas as pd

pandas_kwargs = {
    'columns': [],
    'engine': 'pyarrow'
}

df = pd.read_parquet('output.parquet', engine='pyarrow')
df.columns.tolist()

['adverse_event_flag',
 'date_added',
 'date_changed',
 'date_facility_aware',
 'date_manufacturer_received',
 'date_of_event',
 'date_received',
 'date_report',
 'date_report_to_fda',
 'date_report_to_manufacturer',
 'device',
 'device_0_brand_name',
 'device_0_catalog_number',
 'device_0_combination_product_flag',
 'device_0_date_received',
 'device_0_date_removed_flag',
 'device_0_date_returned_to_manufacturer',
 'device_0_device_age_text',
 'device_0_device_availability',
 'device_0_device_evaluated_by_manufacturer',
 'device_0_device_event_key',
 'device_0_device_operator',
 'device_0_device_report_product_code',
 'device_0_device_sequence_number',
 'device_0_expiration_date_of_device',
 'device_0_generic_name',
 'device_0_implant_flag',
 'device_0_lot_number',
 'device_0_manufacturer_d_address_1',
 'device_0_manufacturer_d_address_2',
 'device_0_manufacturer_d_city',
 'device_0_manufacturer_d_country',
 'device_0_manufacturer_d_name',
 'device_0_manufacturer_d_postal_code',
 'dev

In [38]:
sorted(df.columns.tolist())

['adverse_event_flag',
 'date_added',
 'date_changed',
 'date_facility_aware',
 'date_manufacturer_received',
 'date_of_event',
 'date_received',
 'date_report',
 'date_report_to_fda',
 'date_report_to_manufacturer',
 'device',
 'device_0_brand_name',
 'device_0_catalog_number',
 'device_0_combination_product_flag',
 'device_0_date_received',
 'device_0_date_removed_flag',
 'device_0_date_returned_to_manufacturer',
 'device_0_device_age_text',
 'device_0_device_availability',
 'device_0_device_evaluated_by_manufacturer',
 'device_0_device_event_key',
 'device_0_device_operator',
 'device_0_device_report_product_code',
 'device_0_device_sequence_number',
 'device_0_expiration_date_of_device',
 'device_0_generic_name',
 'device_0_implant_flag',
 'device_0_lot_number',
 'device_0_manufacturer_d_address_1',
 'device_0_manufacturer_d_address_2',
 'device_0_manufacturer_d_city',
 'device_0_manufacturer_d_country',
 'device_0_manufacturer_d_name',
 'device_0_manufacturer_d_postal_code',
 'dev

In [18]:
df[df['mdr_text_0_text_type_code'] == 'Description of Event or Problem']['mdr_text_0_text'].tolist()

['HEALTHCARE PROFESSIONAL REPORTED RIGHT SIDE RUPTURE. DEVICE HAS BEEN EXPLANTED.',
 'IT WAS REPORTED THAT DURING A PHOTOSELECTIVE VAPORIZATION OF THE PROSTATE PROCEDURE FOR BENIGN PROSTATIC HYPERPLASIA, THERE WAS A REFLECTING GLASS RUPTURE. DUE TO THIS, THE PROCEDURE WAS COMPLETED USING ANOTHER DEVICE. THERE WERE NO PATIENT COMPLICATIONS.',
 'IT WAS REPORTED THAT A PATIENT UNDERWENT AN ANAL ATRESIA PROCEDURE IN (B)(6) 2025 AND SUTURE WAS USED. IT WAS REPORTED THAT THE SUTURE WAS BROKEN WHEN THE SURGEON ATTEMPTED TO SEW THE TISSUE. 3 PRODUCTS HAVE SAME ISSUE. CHANGED ANOTHER ONE TO COMPLETE THE SURGERY. THERE WERE NO PATIENT CONSEQUENCE REPORTED. ADDITIONAL INFORMATION WAS REQUESTED.',
 'IT WAS REPORTED THAT DURING SETUP OR INSPECTION, THE CONTROL UNIT DYONICS 25 PUMP WAS NOT REGULATING PRESSURE, RESULTING IN EXTREME PRESSURE, EXCESSIVE SWELLING, AND CLOSING OF THE JOINT DESPITE ALL TROUBLESHOOTING EFFORTS. A BACK-UP DEVICE WAS AVAILABLE, AND NO SURGICAL DELAY WAS REPORTED. THERE WAS N

In [15]:
sorted(df.columns.tolist())

text_cols = [f'mdr_text_{i}_text_type_code' for i in range(14)]
unique_code = set()

for col in text_cols:
    unique_code.update(df[col].unique().tolist())

unique_code

{'Additional Manufacturer Narrative', 'Description of Event or Problem', None}