# [Module 9.1] preprocess.py 파일 이해하기

이 노트북은 preprocess.py 파일의 모듈(함수)가 어떻게 쓰이는지, 노트북 2.0, 3.0에서 사용한 Use Case 기준으로 정리 합니다.
- (1) 전처리 모델 학습(Training Feature Transformer)
- (2) 전처리 모델 배치 추론 (Inference Feature Transformer)
- (3) Realtime Endpoint 추론 


preprocess.py는 크게 아래 다섯개의 모듈(함수)로 구성 되어 있습니다.
다섯개의 모듈이 위의 Use Case를 지원 합니다.


- main() 함수
```python
    if __name__ == '__main__':
```
- input_fn(input_data, request_content_type)
- model_fn(model_dir)
- predict_fn(input_data, model)
- output_fn(prediction, accept)

Reference: <br>
아래 블로그의 Step 4: Create preprocessing script 단락을 보시면 조금 더 설명이 있습니다. 참고 하세요.<br>
Blog: [Preprocess input data before making predictions using Amazon SageMaker inference pipelines and Scikit-learn](https://aws.amazon.com/blogs/machine-learning/preprocess-input-data-before-making-predictions-using-amazon-sagemaker-inference-pipelines-and-scikit-learn/)



## (1) 전처리 모델 학습(Training Feature Transformer)

학습은 아래의 SKLearn Estimator의 fit()를 통해서 합니다.

```python
sklearn_preprocessor = SKLearn(
    entry_point=script_path, # script_path: preprocess.py
    role=role,
    train_instance_type="local")
sklearn_preprocessor.fit({'train': s3_input_train})
```
위의 Estimator SKLearn.fit() 함수는 인스턴스를 뛰우고, sagemaker_sklearn_container 이미지를 다운로드 받습니다. <br>
sagemaker_sklearn_container 에서는 아래 명령어를 실행 합니다. 
```python
/miniconda3/bin/python -m preprocessing
```
위 명령어는 preprocessing.py 파일을 실행하기에 **main() 만을** 실행합니다.
```
if __name__ == '__main__':
    ...
    ...
    preprocessor.fit(concat_data)
    joblib.dump(preprocessor, os.path.join(args.model_dir, "model.joblib"))
    print("saved model!")
``` 
joblib.dump() 는 /opt/ml/model/model.joblib 에 저장을 하게 하고, 이 파일은 SageMaker의 의해서 S3로 업로드 하게 됩니다. (에: s3://sagemaker-us-east-2-057716757052/sagemaker-scikit-learn-2020-08-08-02-58-52-731/output/model.tar.gz)



## (2) 전처리 모델 배치 추론 (Inference Feature Transformer)

위에서 학습을 한 전처리 모델을 가지고 Raw Train, Validation, Test 데이타를 제공하여 전처리 데이타(preprocessed data)를 만드는 과정 입니다. 

내부적으로 **input data (input_fn() 호출) --> 전처리모델.transform() (model_fn(), predict_fn() 호출)** 하여 전처리된 데이타가 생성이 됩니다. (<font color="red">**output_fn()은 사용 안함**</font>)

predict_fn() 는 전처리 모델 배치 추론의 컨테이너에 아래와 같이 환경 변수에 영향을 받습니다.
model_fn(), input_fn() 는 아래 내용 참고 하세요
```python
scikit_learn_inferencee_model = sklearn_preprocessor.create_model(
    env={'TRANSFORM_MODE': 'feature-transform'})
```
위의 환경 변수로 인해서 아래 predict_fn() 안에서 _is_feature_transform() True이고 이에 해당하는 코드가 실행이 됩니다. features = model.transform(input_data) 가 실행 됩니다.

```python
def predict_fn(input_data, model):
    """Preprocess input data
    
    We implement this because the default predict_fn uses .predict(), but our model is a preprocessor
    so we want to use .transform().

    The output is returned in the following order:
    
        rest of features either one hot encoded or standardized
    """

    
    if _is_feature_transform():
        features = model.transform(input_data)


        if label_column in input_data:
            # Return the label (as the first column) and the set of features.
            return np.insert(features.toarray(), 0, pd.get_dummies(input_data[label_column])['True.'], axis=1)
        else:
            # Return only the set of features
            return features
    
    # 아래는 후처리에서 사용이 되고, 여기서는 사용을 하지 않습니다. 
    if _is_inverse_label_transform():
        features = input_data.iloc[:,0]>0.5
        features = features.values
        return features
```


### model_fn(model_dir) 함수

SageMaker는 model_dir 의 경로로 부터 모델을 리턴한다

```python
def model_fn(model_dir):
    """Deserialize fitted model
    """
    if _is_feature_transform():
        preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
        return preprocessor



### input_fn(input_data, request_content_type) 함수

input_fn 함수는 request_content_type로서 'text/csv'를 받게 되고, 입력 컬럼수에 따라서 컬럼 이름을 설정하여 dataframe을 리턴 합니다.

```python
def input_fn(input_data, request_content_type):
    if _is_feature_transform():
        if content_type == 'text/csv':
            # Read the raw input data as CSV.
            df = pd.read_csv(StringIO(input_data),  header=None)
            if len(df.columns) == len(feature_columns_names) + 1:
                # This is a labelled example, includes the  label
                df.columns = feature_columns_names + [label_column]
            elif len(df.columns) == len(feature_columns_names):
                # This is an unlabelled example.
                df.columns = feature_columns_names
            return df
        else:
            raise ValueError("{} not supported by script!".format(content_type))
    
    # 아래는 후처리에서 사용이 되고, 여기서는 사용을 하지 않습니다.     
    if _is_inverse_label_transform():
        if (content_type == 'text/csv' or content_type == 'text/csv; charset=utf-8'):
            # Read the raw input data as CSV.
            df = pd.read_csv(StringIO(str_buffer),  header=None)
            logging.info(f"Shape of the requested data: '{df.shape}'")
            return df
        else:
            raise ValueError("{} not supported by script!".format(content_type))
```


## (3) Realtime Endpoint 추론 

Inference Pipeline의 One model을 만들고 Endpoint를 생성 합니다.
아래와 같이 instance를 제공하여 추론을 요청하면, 결과가 False로 나옵니다
```
instance: 
 KS,186,510,400-6454,no,no,0,137.8,97,23.43,187.7,118,15.95,146.4,85,6.59,8.7,6,2.35,1
Churn result?: 
 b'False\n'
```


아래는 Realtime Endpoint에 위의 Instance 추론 요청을 하였고, 이후에 CloudWatch에서 로그를 확인 하였습니다.<br>

- Container-1 (전처리)
    - input_fn --> predict_fn --> output_fn 을 하였고, predict_fn에서는 model.transform(input_data)를 통하여 전치리를 하였습니다.
- Container-2 (XGBoost)
    - csv 입력값을 받아서, 모델에서 결과값을 제공 합니다.
- Container-3 (후처리)
    - input_fn --> predict_fn --> output_fn 으로 호출을 하며, predict_fn에서 XGBoost에서 받은 Score 값을 False 로 변환 합니다.


![Fig9.1.inferenceflow](img/Fig9.1.inference_flow.png)

### output_fn(prediction, accept) 함수

```python
def output_fn(prediction, accept):
    """Format prediction output
    
    The default accept/content-type between containers for serial inference is JSON.
    We also want to set the ContentType or mimetype as the same value as accept so the next
    container can read the response payload correctly.
    """
    
    accept = 'text/csv'
    if type(prediction) is not np.ndarray:
        prediction=prediction.toarray()
    
   
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append({"features": row})

        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), mimetype=accept)
    elif accept == 'text/csv':
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this 
        script.".format(accept))
```                            

```