# 利用AWS SageMaker以及Pytorch实现Bert文本多分类
Bert text classification on SageMaker using PyTorch

本实验使用数据数据集 [dbpedia dataset](https://wiki.dbpedia.org/services-resources/dbpedia-data-set-2014#2) 以及 BERT模型实现文本多分类，数据格式为csv文件，内容例举如下：

```英文文本
1,"E. D. Abbott Ltd"," Abbott of Farnham E D Abbott Limited was a British coachbuilding business based in Farnham Surrey trading under that name from 1929. A major part of their output was under sub-contract to motor vehicle manufacturers. Their business closed in 1972."
1,"Schwan-Stabilo"," Schwan-STABILO is a German maker of pens for writing colouring and cosmetics as well as markers and highlighters for office use. It is the world's largest manufacturer of highlighter pens Stabilo Boss."
1,"Q-workshop"," Q-workshop is a Polish company located in Poznań that specializes in designand production of polyhedral dice and dice accessories for use in various games (role-playing gamesboard games and tabletop wargames). They also run an online retail store and maintainan active forum community.Q-workshop was established in 2001 by Patryk Strzelewicz – a student from Poznań. Initiallythe company sold its products via online auction services but in 2005 a website and online store wereestablished."
```



In [1]:
import sys, os
import logging

sys.path.append("src")

logging.basicConfig(level="INFO", handlers=[logging.StreamHandler(sys.stdout)],
                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

### AWS Bucket 与 role 设置

In [None]:
import sagemaker
from sagemaker import get_execution_role
sm_session = sagemaker.session.Session()
role = get_execution_role()

In [None]:
### 创建S3存储桶路径，用于存放测试、验证数据集以及checkpoint

In [3]:
data_bucket = sm_session.default_bucket()

data_bucket_prefix = "bert-demo-<yourname>"  ##<yourname>替换为实际名称

# 全部数据集（训练与验证）存放路径..
s3_uri_data = "s3://{}/{}/data".format(data_bucket, data_bucket_prefix)
s3_uri_train = "{}/{}".format(s3_uri_data, "train.csv")
s3_uri_val = "{}/{}".format(s3_uri_data, "val.csv")

# 迷你数据集（训练与验证）存放路径..
s3_uri_mini_data = "s3://{}/{}/minidata".format(data_bucket, data_bucket_prefix)
s3_uri_mini_train = "{}/{}".format(s3_uri_mini_data, "train.csv")
s3_uri_mini_val = "{}/{}".format(s3_uri_mini_data, "val.csv")

# 数据分类存放路径..
s3_uri_classes = "{}/{}".format(s3_uri_data, "classes.txt")

# 测试数据分类存放路径..
s3_uri_test = "{}/{}".format(s3_uri_data, "test.csv")

s3_output_path = "s3://{}/{}/output".format(data_bucket, data_bucket_prefix)
s3_code_path = "s3://{}/{}/code".format(data_bucket, data_bucket_prefix)
s3_checkpoint = "s3://{}/{}/checkpoint".format(data_bucket, data_bucket_prefix)

In [6]:
prepare_dataset = True

## 准备数据集

In [12]:
tmp ="tmp"

In [None]:
%%bash -s  "$prepare_dataset"  "$s3_uri_test" "$s3_uri_classes" "$tmp"
   
prepare_dataset=$1
s3_test=$2
s3_classes=$3
tmp=$4

if [ "$prepare_dataset" == "True" ]
then  
    echo "Downloading data.."
    wget https://github.com/saurabh3949/Text-Classification-Datasets/raw/master/dbpedia_csv.tar.gz -P ${tmp}
    tar -xzvf ${tmp}/dbpedia_csv.tar.gz
    mv dbpedia_csv ${tmp}
    
    ls -l ${tmp}/dbpedia_csv/
    cat  ${tmp}/dbpedia_csv/classes.txt
    head -3 ${tmp}/dbpedia_csv/train.csv 
    
    echo aws s3 cp ${tmp}/dbpedia_csv/test.csv ${s3_test}
    aws s3 cp ${tmp}/dbpedia_csv/test.csv ${s3_test}
    
    aws s3 cp ${tmp}/dbpedia_csv/classes.txt ${s3_classes}
   
fi

#### 测试数据集与验证数据集划分

In [14]:
from sklearn.model_selection import train_test_split

def train_val_split(data_file, train_file_name = None, val_file_name = None, val_ratio =.30, train_ratio = .70):
    with open(data_file, "r") as f:
        lines = f.readlines()
        
    train, val = train_test_split( lines, test_size=val_ratio, train_size = train_ratio ,random_state=42)
    
    train_file_name = train_file_name or os.path.join(os.path.dirname(data_file), "train.csv")
    val_file_name = val_file_name or os.path.join(os.path.dirname(data_file), "val.csv")

    
    with open(train_file_name, "w") as f:
        f.writelines(train)
    print("Wrote {} records to train".format(len(train)))
    
    with open(val_file_name, "w") as f:
        f.writelines(val)
    print("Wrote {} records to validation".format(len(val)))
    
    return train_file_name, val_file_name


In [None]:
if prepare_dataset:
    from s3_util import S3Util
    
    s3util = S3Util()
    l_data_file = os.path.join(tmp, "dbpedia_csv", "train.csv")
    l_train, l_val = train_val_split(l_data_file)
    s3util.upload_file(l_train, s3_uri_train)
    s3util.upload_file(l_val, s3_uri_val)
    
    l_mini_train = os.path.join(os.path.dirname(l_data_file), "mini_train.csv")
    l_mini_val = os.path.join(os.path.dirname(l_data_file), "mini_val.csv") 
    l_train, l_val = train_val_split(l_data_file, l_mini_train, l_mini_val, val_ratio = 0.001, train_ratio=0.01)

    s3util.upload_file(l_mini_train, s3_uri_mini_train)
    s3util.upload_file(l_mini_val, s3_uri_mini_val)
    
    

In [10]:
# 删除临时文件..
!rm -rf $tmp

## 模型训练

使用SageMaker SPOT实例进行模型训练

In [16]:
inputs_full =  {
    "train" : s3_uri_train,
    "val" : s3_uri_val,
    "class" : s3_uri_classes
}

inputs_sample =  {
    "train" : s3_uri_mini_train,
    "val" : s3_uri_mini_val,
    "class" : s3_uri_classes
}

# 若使用整个数据集，模型训练时间将为4～5小时，如果想要快速验证模型，请使用input_sample数据集
inputs = inputs_sample

In [17]:
## checkpoint目录定义
sm_localcheckpoint_dir="/opt/ml/checkpoints/"

In [18]:
## Spot实例类型定义
instance_type = "ml.p3.8xlarge"
instance_type_gpu_map = {"ml.p3.8xlarge":4, "ml.p3.2xlarge": 1, "ml.p3.16xlarge":8}

In [19]:
## 超参定义
hp = {
"epochs" : 10,
"earlystoppingpatience" : 3,
"batch" : 8 * instance_type_gpu_map[instance_type],
"trainfile" :s3_uri_train.split("/")[-1],
"valfile" : s3_uri_val.split("/")[-1],
"classfile":s3_uri_classes.split("/")[-1],
"gradaccumulation" : 4,
"log-level":"INFO",
"maxseqlen" : 512,
"lr":0.00001,
"finetune": 0,
"checkpointdir" : sm_localcheckpoint_dir,
"checkpointfreq": 2
}



In [22]:
## 计量值定义
metric_definitions = [{"Name": "TrainLoss",
                     "Regex": "###score: train_loss### (\d*[.]?\d*)"}
                    ,{"Name": "ValidationLoss",
                     "Regex": "###score: val_loss### (\d*[.]?\d*)"}
                    ,{"Name": "TrainScore",
                     "Regex": "###score: train_score### (\d*[.]?\d*)"}
                   ,{"Name": "ValidationScore",
                     "Regex": "###score: val_score### (\d*[.]?\d*)"}
                    ]

In [23]:
# 如果使用spot实例进行训练，运行此步骤，跳过下一步
use_spot = True
train_max_run_secs =   2*24 * 60 * 60
spot_wait_sec =  5 * 60
max_wait_time_secs = train_max_run_secs +  spot_wait_sec

if not use_spot:
    max_wait_time_secs = None
    


In [None]:
# 如果使用本地模式进行训练，运行此步骤，略过上一步
if instance_type == 'local':
    use_spot = False
    max_wait_time_secs = 0
    wait = True
    # Use smaller dataset to run locally
    inputs = inputs_sample


In [24]:
job_type = "bert-classification"
base_name = "{}".format(job_type)

In [None]:
from sagemaker.pytorch import PyTorch

estimator = PyTorch(
     #entry_point='main_train_k_fold.py',
    entry_point='main.py',
                    source_dir = 'src',
                    role=role,
                    framework_version ="1.4.0",
                    py_version='py3',
                    train_instance_count=1,
                    train_instance_type=instance_type,
                    hyperparameters = hp,
                    output_path=s3_output_path,
                    metric_definitions=metric_definitions,
                    train_volume_size=30,
                    code_location=s3_code_path,
                    debugger_hook_config=False,
                    base_job_name =base_name,  
                    train_use_spot_instances = use_spot,
                    train_max_run =  train_max_run_secs,
                    train_max_wait = max_wait_time_secs,   
                    checkpoint_s3_uri=s3_checkpoint,
                    checkpoint_local_path=sm_localcheckpoint_dir)

estimator.fit(inputs, wait=True)

## 部署Bert模型

#### 推理容器

In [None]:
from sagemaker.pytorch import PyTorchModel
from sagemaker import get_execution_role
role = get_execution_role()

model_uri = estimator.model_data

model = PyTorchModel(model_data=model_uri,
                     role=role,
                     py_version = "py3",
                     framework_version='1.4.0',
                     entry_point='serve.py',
                     source_dir='src')

predictor = model.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge')

### API调用

In [43]:
import json


class TextSerDes:
    
     def serialize(self, x):
        data_bytes="\n".join(data).encode("utf-8")
        return data_bytes
    
     def deserialize(self, x, content_type):
        return json.loads(x.read().decode("utf-8")) 

In [None]:

predictor.serializer = TextSerDes().serialize
predictor.deserializer = TextSerDes().deserialize


response  = predictor.predict(data,  initial_args={ "Accept":"text/json", "ContentType" : "text/csv" }
                                   )

response 

## 删除SageMaker Endpoint

In [None]:
predictor.delete_endpoint()