# <B> Preprocessing </B>
* Container: codna_pytorch_py39

## AutoReload

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import boto3

## 1. Processing-job for preprocessing

In [None]:
import os
import wget
import sagemaker
from sagemaker.pytorch.estimator import PyTorch
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.processing import ProcessingInput, ProcessingOutput, FrameworkProcessor

## 2. parameter store 설정

In [None]:
from utils.ssm import parameter_store
strRegionName=boto3.Session().region_name
pm = parameter_store(strRegionName)
prefix = pm.get_params(key="PREFIX")

* params for processing job

In [None]:
local_mode = True

if local_mode: 
    instance_type = 'local'
    
    import os
    from sagemaker.local import LocalSession
    
    sagemaker_session = LocalSession()
    data_path = os.path.join(os.getcwd(), "data")
    
else:
    instance_type = "ml.m5.xlarge" ## "ml.g4dn.xlarge"
    sagemaker_session = sagemaker.Session()
    data_path = pm.get_params(key=prefix + '-S3-DATA-PATH')
    
print (f"instance-type: {instance_type}")
print (f"image-uri: {pm.get_params(key=''.join([prefix, '-IMAGE-URI']))}")
print (f"role: {pm.get_params(key=prefix + '-SAGEMAKER-ROLE-ARN')}")
print (f"bucket: {pm.get_params(key=prefix + '-BUCKET')}")
print (f"dataset-path: {data_path}")
print (f"sagemaker_session: {sagemaker_session}")

* Define processing job

In [None]:
dataset_processor = FrameworkProcessor(
    estimator_cls=PyTorch,
    framework_version=None,
    image_uri=pm.get_params(key=''.join([prefix, "-IMAGE-URI"])),
    instance_type=instance_type,
    instance_count=1,
    role=pm.get_params(key=prefix + "-SAGEMAKER-ROLE-ARN"),
    base_job_name="preprocessing", # bucket에 보이는 이름 (pipeline으로 묶으면 pipeline에서 정의한 이름으로 bucket에 보임)
    sagemaker_session=sagemaker_session
)

proc_prefix = "/opt/ml/processing"

output_path = os.path.join(
    "s3://{}".format(pm.get_params(key=prefix + "-BUCKET")),
    prefix,
    "preprocessing",
    "data"
)

In [None]:
output_path

In [None]:
dataset_processor.run(
    #job_name="preprocessing", ## 이걸 넣어야 캐시가 작동함, 안그러면 프로세서의 base_job_name 이름뒤에 날짜 시간이 붙어서 캐시 동작 안함
    code='preprocessing.py', #소스 디렉토리 안에서 파일 path
    source_dir=os.getcwd() + "/code", #현재 파일에서 소스 디렉토리 상대경로 # add processing.py and requirements.txt here
    inputs=[
        ProcessingInput(
            input_name="input-data",
            source=data_path,
            destination=os.path.join(proc_prefix, "input")
        ),
    ],
    outputs=[       
        ProcessingOutput(
            output_name="output-data",
            source=os.path.join(proc_prefix, "output"),
            destination=output_path
        ),
    ],
    arguments=["--proc_prefix", proc_prefix, \
               "--train_mount_dir", "/opt/ml/input/data/training/", \
               "--test_mount_dir", "/opt/ml/input/data/testing/"],
)

In [None]:
!aws s3 sync $output_path ./data/preprocessing --quiet
output_path

## 3. parameter store에 Processing output 추가

In [None]:
pm.put_params(key="-".join([prefix, "PREP-DATA-PATH"]), value=output_path, overwrite=True)