# SageMaker Processing을 활용한 PDF 기반 QA 데이터 생성 배치 파이프라인 자동화
이 노트북은 AWS SageMaker Processing 기능을 활용하여 PDF 파일로부터 QA(질문-답변) 데이터셋을 자동으로 생성하는 배치 파이프라인 예제입니다.


아래 이미지는 SageMaker Processing Job을 활용한 PDF QA 데이터 생성 파이프라인의 아키텍처를 보여줍니다:

![PDF QA 추출 아키텍처](../assets/images/processing_job_architecture.png)


이 아키텍처는 PDF 문서를 입력으로 받아 질문-답변 쌍을 자동으로 생성하는 전체 프로세스를 나타냅니다.
이 노트북에서는 SageMaker Processing Job을 사용하여 PDF 문서로부터 QA 데이터셋을 생성하는 방법을 단계별로 설명합니다. 이 과정은 대량의 PDF 다중 문서를 처리하고, 각 문서에서 질문과 답변 쌍을 추출하여 QA 데이터셋을 구축하는 데 유용합니다.
주요 단계는 다음과 같습니다:

1. SageMaker 세션 및 IAM 역할 설정
2. S3 버킷 및 데이터 경로 지정
3. 입력 PDF 파일을 S3로 업로드
4. 커스텀 Docker 이미지를 활용한 ScriptProcessor 구성
5. Processing Job 실행 및 상태 모니터링
6. 결과 데이터 다운로드

이 파이프라인을 통해 대용량 PDF 문서로부터 손쉽게 QA 데이터셋을 생성할 수 있습니다.


In [1]:
# 필요한 라이브러리와 모듈을 임포트합니다.
import os
import boto3
import sagemaker
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.processing import ScriptProcessor
from datetime import datetime



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


# SageMaker 세션 및 IAM 역할 설정

In [2]:
sagemaker_session = sagemaker.Session() # SageMaker 세션을 생성합니다.
role = sagemaker.get_execution_role()  # 현재 실행 중인 노트북 인스턴스의 역할

# S3 버킷 및 프리픽스 설정

S3는 SageMaker Processing Job의 입력 데이터 레이크(data lake) 저장소 역할을 합니다.  
PDF와 같은 원본 데이터를 S3에 업로드하면, SageMaker Processing Job이 해당 S3 경로에서 데이터를 읽어와 처리할 수 있습니다.  
S3에서 대용량의 데이터를 효율적으로 관리하고, 다양한 SageMaker 작업에서 재사용할 수 있습니다.

In [3]:
bucket = sagemaker_session.default_bucket()
prefix = "pdf-qa-generation"
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
input_prefix = f"{prefix}/input"
output_prefix = f"{prefix}/output/{timestamp}"

In [4]:
input_prefix

'pdf-qa-generation/input'

# 입력 PDF를 S3에 업로드

In [5]:
# 입력 PDF를 S3에 업로드
pdf_local_path = "data/fsi_data.pdf"  # 로컬 PDF 파일 경로
pdf_s3_key = f"{input_prefix}/pdf/fsi_data.pdf"
s3_client = boto3.client('s3')
s3_client.upload_file(pdf_local_path, bucket, pdf_s3_key)
print(f"PDF 업로드 완료: s3://{bucket}/{pdf_s3_key}")

PDF 업로드 완료: s3://sagemaker-us-east-1-975050087315/pdf-qa-generation/input/pdf/fsi_data.pdf


# 커스텀 Docker 이미지 ECR(AWS container hub)에 푸시 

- 00_sagemaker_environment_setup.ipynb 의 2. Set default parameters 출력정보를 참조하세요

In [12]:
# ECR 로그인 - AWS 인증 수행
aws ecr get-login-password --region <your-region> | docker login --username AWS --password-stdin <your-account-id>.dkr.ecr.<your-region>.amazonaws.com
# 로컬 이미지에 ECR 태그 지정
docker tag qa-extractor <your-account-id>.dkr.ecr.<your-region>.amazonaws.com/qa-extractor
# ECR 저장소 생성
aws ecr create-repository --repository-name qa-extractor --region <your-region>
# 이미지를 ECR로 푸시
docker push <your-account-id>.dkr.ecr.<your-region>.amazonaws.com/qa-extractor

Using default tag: latest
The push refers to repository [975050087315.dkr.ecr.us-east-1.amazonaws.com/qa-extractor]

[1B0027f30e: Preparing 
[1B2fee2d00: Preparing 
[1B7760677d: Preparing 
[1Ba171f478: Preparing 
[1B60d37ee0: Preparing 
[1Bcb3fbb9d: Preparing 
[1B032f1539: Preparing 
[1B7877232e: Preparing 
[1B270bc1c6: Preparing 
[1B3bb9c80f: Preparing 
[1Bffc45974: Preparing 
[1Bf7829cb7: Preparing 
[1B869b72ab: Preparing 
[14Blatest: digest: sha256:6d3f70675c4cae250e1b8b5aca8fb7d88d9ff5cb0b17ea9064e0d74c1e519c42 size: 3265


# 커스텀 Docker 이미지 URI 설정
- 아래는 예시 URI로, 실제 환경에 맞게 수정해야 합니다.

In [13]:
#custom_image_uri = "740128158828.dkr.ecr.us-east-1.amazonaws.com/qa-extractor:latest"
custom_image_uri = "<your-account-id>.dkr.ecr.<your-region>.amazonaws.com/<your-ecr-image>:latest"

# ScriptProcessor 설정

아래 코드는 SageMaker의 `ScriptProcessor` 객체를 생성하는 예시입니다.  
`ScriptProcessor`는 커스텀 Docker 이미지를 활용하여 Python 스크립트를 실행할 수 있도록 도와주는 SageMaker Processing의 필수 구성 요소입니다.

- `base_job_name`: 생성되는 Processing Job의 기본 이름을 지정합니다.
- `image_uri`: ECR에 저장된 커스텀 Docker 이미지의 URI를 입력합니다.
- `command`: 컨테이너 내에서 실행할 명령어(여기서는 `python3`)를 지정합니다.
- `instance_type`: unstructured Processing Job이 실행될 gpu 인스턴스 타입을 지정합니다. (`ml.g5.4xlarge` 등)
- `instance_count`: 사용할 인스턴스 개수를 지정합니다.
- `role`: SageMaker가 리소스에 접근할 때 사용할 IAM 역할을 지정합니다.
- `sagemaker_session`: SageMaker 세션 객체를 전달합니다.
- `max_runtime_in_seconds`: 작업의 최대 실행 시간을 초 단위로 지정합니다.

이 설정으로 PDF에서 QA 데이터를 추출하는 커스텀 스크립트를 대규모로 효율적으로 실행할 수 있습니다.

In [14]:
processor = ScriptProcessor(
    base_job_name="pdf-qa-generator",
    image_uri=custom_image_uri,
    command=["python3"],   # 실행 명령만 포함
    instance_type="ml.c7i.4xlarge", #"ml.g5.4xlarge" 이벤트실습계정에는 processing job의 gpu 쿼터가 없음
    instance_count=1,
    role=role,
    sagemaker_session=sagemaker_session,
    max_runtime_in_seconds=7200
)

# Processing Job 실행

아래 코드는 SageMaker Processing Job을 실행하는 예시입니다.  
`processor.run()` 메서드를 통해 지정한 Python 스크립트(`processing.py`)를 커스텀 Docker 컨테이너에서 실행합니다.  
입력 PDF 파일이 저장된 S3 경로를 컨테이너의 입력 디렉터리(`/opt/ml/processing/input/pdf`)로 전달하고,  
처리 결과(QA 쌍 데이터)는 `/opt/ml/processing/output`에서 S3 출력 경로로 저장됩니다.

- `wait=False`와 `logs=False` 옵션을 사용하여 비동기(Non-blocking)로 작업을 실행합니다.
- 실행이 시작되면 출력 데이터가 저장될 S3 경로가 출력됩니다.

- `arguments` 파라미터는 실행할 스크립트에 추가 인자를 전달할 때 사용합니다.  
    예시에서는 다음과 같은 인자를 전달합니다:
       
    - `--domain`: QA 데이터셋을 생성할 도메인(여기서는 "International Finance")
    - `--num_questions`: 각 문서에서 생성할 질문 개수(여기서는 5개)
    - `--model_id`: 사용할 생성형 AI 모델의 ID(여기서는 Claude 3 Sonnet)

In [15]:
# Processing Job 실행
processing_job = processor.run(
    code="processing.py",  # 스크립트 파일 이름
    inputs=[
        ProcessingInput(
            source=f"s3://{bucket}/{input_prefix}/pdf",
            destination="/opt/ml/processing/input/pdf"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="qa_pairs",
            source="/opt/ml/processing/output",
            destination=f"s3://{bucket}/{output_prefix}"
        )
    ],
    arguments=[
        "--domain", "International Finance",
        "--num_questions", "5",
        "--model_id", "anthropic.claude-3-sonnet-20240229-v1:0"  # 원하는 모델 ID 지정
    ],
    wait=False,  # 비동기 실행
    logs=False
)
#print(f"Processing Job 시작: {processing_job.job_name}")
print(f"출력 데이터는 다음 위치에 저장됩니다: s3://{bucket}/{output_prefix}")

출력 데이터는 다음 위치에 저장됩니다: s3://sagemaker-us-east-1-975050087315/pdf-qa-generation/output/2025-05-29-08-40-07


In [16]:
# (선택 사항) Job 상태 확인
job_status = processor.jobs[-1].describe()
print(f"Job 상태: {job_status['ProcessingJobStatus']}")

Job 상태: InProgress


In [17]:
# (선택 사항) Job이 완료될 때까지 기다리기
processor.jobs[-1].wait(logs=True)
print("Processing Job 완료!")

..................[34mThe `max_size` parameter is deprecated and will be removed in v4.26. Please specify in `size['longest_edge'] instead`.[0m
[34m입력 디렉토리: /opt/ml/processing/input[0m
[34m출력 디렉토리: /opt/ml/processing/output[0m
[34mPDF 파일 경로: /opt/ml/processing/input/pdf/fsi_data.pdf[0m
[34m출력 파일 경로: /opt/ml/processing/output/qa_pairs.jsonl[0m
[34mAWS 리전: us-east-1[0m
[34mPDF 질문 생성 작업 시작...[0m
[34m도메인: International Finance, 질문 수: 5, 모델: anthropic.claude-3-sonnet-20240229-v1:0[0m
[34mAWS Bedrock 클라이언트 설정 중...[0m
[34m추출된 요소 수: 3[0m
[34m요소 1/3 처리 중... (텍스트 길이: 2143)[0m
[34m```json[0m
[34m{[0m
[34m    "QUESTION": "외환수급 불균형 구조가 형성된 주요 원인은 무엇입니까?",[0m
[34m    "ANSWER": "외화 유입 대비 유출 우위가 지속되어 외환수급 불균형 구조가 형성되었습니다."[0m
[34m},[0m
[34m{
    "QUESTION": "우리나라의 대외건전성이 견고해진 주요 지표는 무엇입니까?",[0m
[34m    "ANSWER": "외환보유액이 4,000억불을 상회하고, 순대외금융자산이 1조불에 이르는 등 우리나라의 대외건전성이 견고해졌습니다."[0m
[34m},[0m
[34m{[0m
[34m    "QUESTION": "외환수급 균형을 위해 정부가 추진하려는 정책 방향은 무엇입니까?",[0m


# 결과 다운로드

In [21]:

s3_client.download_file(bucket, f"{output_prefix}/qa_pairs.jsonl", "qa_pairs.jsonl")
print("결과 다운로드 완료: qa_pairs.jsonl")

결과 다운로드 완료: qa_pairs.jsonl


# S3 저장경로 확인

SageMaker Processing Job의 입력 및 출력 데이터가 저장된 S3 경로를 확인할 수 있습니다.

In [19]:
print(f"출력 데이터 S3 경로: s3://{bucket}/{output_prefix}/qa_pairs.jsonl")


출력 데이터 S3 경로: s3://sagemaker-us-east-1-975050087315/pdf-qa-generation/output/2025-05-29-08-40-07/qa_pairs.jsonl


In [22]:
import json

with open("qa_pairs.jsonl", "r", encoding="utf-8") as f:
    for line in f:
        qa = json.loads(line)
        print(json.dumps(qa, ensure_ascii=False, indent=2))
        print("-" * 40)

{
  "QUESTION": "외환수급 불균형 구조가 형성된 주요 원인은 무엇입니까?",
  "ANSWER": "외화 유입 대비 유출 우위가 지속되어 외환수급 불균형 구조가 형성되었습니다."
}
----------------------------------------
{
  "QUESTION": "우리나라의 대외건전성이 견고해진 주요 지표는 무엇입니까?",
  "ANSWER": "외환보유액이 4,000억불을 상회하고, 순대외금융자산이 1조불에 이르는 등 우리나라의 대외건전성이 견고해졌습니다."
}
----------------------------------------
{
  "QUESTION": "외환수급 균형을 위해 정부가 추진하려는 정책 방향은 무엇입니까?",
  "ANSWER": "외환수급 균형을 위해 실물경제 및 외화자금시장 안정에 도움이 되는 방향으로 외환 유입 관련 규제들을 완화하려고 합니다."
}
----------------------------------------
{
  "QUESTION": "건전성 규제 완화 방안 중 선물환포지션 한도 조정 내용은 무엇입니까?",
  "ANSWER": "건전성 규제 완화 방안으로 국내은행의 선물환포지션 한도를 50%에서 75%로, 외국은행 국내지점의 한도를 250%에서 375%로 상향 조정합니다."
}
----------------------------------------
{
  "QUESTION": "정부가 외환수급 균형을 위한 정책을 추진하는 전제 조건은 무엇입니까?",
  "ANSWER": "정부는 견조한 대외건전성을 유지하는 범위 내에서 외환수급 균형을 위한 정책을 추진할 것입니다."
}
----------------------------------------
{
  "QUESTION": "은행권이 외화운용 제약에 따른 어려움과 WGBI 편입 및 RFI 진입 등에 따른 거래상대방 확대 등을 근거로 제기한 바는 무엇입니까?",
  "ANSWER": "은행권은 현행 선물환포지션 한도 150%를 상향 