# 데이터 전처리

In [1]:
import csv
import io
import json
import os

from loguru import logger
from PIL import Image
import ray
import tensorflow as tf

In [2]:
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

workdir = os.path.join(os.getenv('HOME'), 'aiffel/mpii')
os.chdir(workdir)

num_train_shards = 64
num_val_shards = 8
ray.init()
ray.shutdown()
tf.get_logger().setLevel('ERROR')

2022-01-24 17:48:33,754	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


### json 파싱

- json 파일들은 이미지에 담겨있는 사람의 pose keypoint 정보들을 가지고 있어서 Pose Estimation을 위한 label로 삼을 수 있다.

- json 파일을 열어서 샘플로 annotation정보를 1개만 출력.

In [3]:
import json, os

json_file_path = os.getenv('HOME')+'/aiffel/mpii/mpii_human_pose_v1_u12_2/train.json'

with open(json_file_path) as train_json:
    train_annos = json.load(train_json)
    json_formatted_str = json.dumps(train_annos[0], indent=2)
    print(json_formatted_str)

{
  "joints_vis": [
    1,
    1,
    1,
    1,
    1,
    1,
    1,
    1,
    1,
    1,
    1,
    1,
    1,
    1,
    1,
    1
  ],
  "joints": [
    [
      620.0,
      394.0
    ],
    [
      616.0,
      269.0
    ],
    [
      573.0,
      185.0
    ],
    [
      647.0,
      188.0
    ],
    [
      661.0,
      221.0
    ],
    [
      656.0,
      231.0
    ],
    [
      610.0,
      187.0
    ],
    [
      647.0,
      176.0
    ],
    [
      637.0201,
      189.8183
    ],
    [
      695.9799,
      108.1817
    ],
    [
      606.0,
      217.0
    ],
    [
      553.0,
      161.0
    ],
    [
      601.0,
      167.0
    ],
    [
      692.0,
      185.0
    ],
    [
      693.0,
      240.0
    ],
    [
      688.0,
      313.0
    ]
  ],
  "image": "015601864.jpg",
  "scale": 3.021046,
  "center": [
    594.0,
    257.0
  ]
}


`joints`가 label로 사용할 Keypoint의 label.

이미지 형상과 사람의 포즈에 따라 모든 label이 이미지에 나타나지 않기 때문에 `joints_vis`를 이용해서 실제로 사용할 수 있는 keypoint인지 나타낸다.

MPII의 경우 1(visible)/0(non) 으로만 나누어지기 때문에 쉽다.

coco의 경우 2/1/0 으로 표현하여 occlusion 상황까지 label화 되어 있다.

#### `joints` 순서는

- 0 - 오른쪽발목
- 1 - 오른쪽 무릎
- 2 - 오른쪽 엉덩이
- 3 - 왼쪽엉덩이
- 4 - 왼쪽무릎
- 5 - 왼쪽발목
- 6 - 골반
- 7 - 가슴
- 8 - 목
- 9 - 머리 위
- 10 - 오른쪽 손목
- 11 - 오른쪽 팔꿈치
- 12 - 오른쪽 어깨
- 13 - 왼쪽 어깨
- 14 - 왼쪽 팔꿈치
- 15 - 왼쪽 손목


#### `scale`과 `center`

- 높이 = scale * 200px
- center는 사람의 중심점

In [4]:
# json annotation을 파싱하는 함수
def parse_one_annotation(anno, image_dir):
    filename = anno['image']
    joints = anno['joints']
    joints_visibility = anno['joints_vis']
    annotation = {
        'filename':filename,
        'filepath':os.path.join(image_dir, filename),
        'joints_visibility':joints_visibility,
        'joints':joints,
        'center':anno['center'],
        'scale':anno['scale']
    }
    return annotation

- image의 전체 path를 묶어 dict 타입의 label로 만들어낸다.
- 이 label을 가지고 학습을 진행한다.

# tfrecord

### tfrecord 파일 만들기

- 실제 프로젝트에서는 튜토리얼 데이터셋보다 훨씬 큰 크기의 데이터를 다뤄야하는데 일반적으로 학습과정에서 gpu의 연산속도보다 HDD I/O가 느리기 때문에 병목 현상이 발생하고 실험효율성이 떨어진다.

#### 학습데이터를 어떻게 빠르게 읽는가? ==> 중요

- data read(또는 prefetch) 또는 데이터 변환단계
- gpu 학습과 병렬적으로 수행되도록 prefetch를 적용해야한다.
- 수행방법은 tf.data 의 map함수이용, cache에 저장해두는 방법사용

tensorflow에서는 데이터셋을 tfrecord 형태로 표현하기 위한 자동화도구 제공.

tfrecord는 binary record sequence를 저장하기 위한 형식

내부적으로 protocol buffer 라는 것을 이용
#### [protocol buffer](https://developers.google.com/protocol-buffers/?hl=ko)

- protobuf는 직렬화 데이터 라이브러리
- 데이터셋 크기가 크기때문에 빠른 학습을 위해서 이 정보를 tfrecord 파일로 변환

In [5]:
# tfrecord 파일로 변환
import ray

def build_tf_records(annotations, total_shards, split):
    chunks = chunkify(annotations, total_shards)
    futures = [
        # train_0001_of_0064.tfrecords
        build_single_tfrecord.remote(
            chunk, './tfrecords_mpii/{}_{}_of_{}.tfrecords'.format(split,
                                                                   str(i+1).zfill(4),
                                                                   str(total_shards).zfill(4))) for i, chunk in enumerate(chunks)
    ]
    ray.get(futures)

#### 용어 정리
- annotation 을 total_shards 개수로 나눈다.(chunkify)(train:64개, val:8개)
- build_single_tfrecord 함수를 통해 tfrecord로 저장
- 각 chunk끼리 dependency가 없기 때문에 병렬처리가능, ray를 사용


#### annotation을 shard로 나눠야하는 이유?
- I/O 병목을 피하기 위해 입력 파일을 여러개로 나눈 뒤, 병렬적으로 prefetch하는 것이 학습 속도를 빠르게한다.
- 데이터를 읽는 호스트보다 최소 10배 많은 파일을 보유하는것이 좋다. 동시에 각 파일은 I/O prefetch의 이점을 누릴 수 있도록 충분히 커야한다.(최소 10MB이상, 이상적으로는 100MB 이상)



튜토리얼대로 annotation을 적절한 개수로 나눈다.

In [6]:
# annotation을 적절한 개수로 나누는 함수
def chunkify(l,n):
    size = len(l)//n
    start = 0
    results = []
    for i in range(n-1):
        results.append(l[start:start+size])
        start += size
    results.append(l[start:])
    return results

- l은 annotation, n은 shards개수
- shard개수 단위로 annotation list를 나누어서 새로운 list를 만든다.
- numpy array라고 가정하면 (size, shard, anno_content) 정도의 shape을 가진다.

In [7]:
# tfrecord 1개를 저장하는 함수
@ray.remote
def build_single_tfrecord(chunk, path):
    print('start to build tf records for' + path)
    
    with tf.io.TFRecordWriter(path) as writer:
        for anno_list in chunk:
            tf_example = generate_tfexample(anno_list)
            writer.write(tf_example.SerializeToString())
            
    print('finished building tf records for'+path)

- TFRecordWriter를 이용하여 anno_list를 shard개수 단위로 작성
- generate_tfexample 함수를 사용
- **write 할 때 string으로 serialize해야한다.**

In [8]:
# tf.example
def generate_tfexample(anno):
    filename = anno['filename']
    filepath = anno['filepath']
    with open(filepath, 'rb') as image_file:
        content = image_file.read()

    image = Image.open(filepath)
    if image.format != 'JPEG' or image.mode != 'RGB':
        image_rgb = image.convert('RGB')
        with io.BytesIO() as output:
            image_rgb.save(output, format="JPEG", quality=95)
            content = output.getvalue()

    width, height = image.size
    depth = 3

    c_x = int(anno['center'][0])
    c_y = int(anno['center'][1])
    scale = anno['scale']

    x = [
        int(joint[0]) if joint[0] >= 0 else int(joint[0]) 
        for joint in anno['joints']
    ]
    y = [
        int(joint[1]) if joint[1] >= 0 else int(joint[0]) 
        for joint in anno['joints']
    ]
    # 0 - invisible, 1 - occluded, 2 - visible
    v = [0 if joint_v == 0 else 2 for joint_v in anno['joints_visibility']]

    feature = {
        'image/height':
        tf.train.Feature(int64_list=tf.train.Int64List(value=[height])),
        'image/width':
        tf.train.Feature(int64_list=tf.train.Int64List(value=[width])),
        'image/depth':
        tf.train.Feature(int64_list=tf.train.Int64List(value=[depth])),
        'image/object/parts/x':
        tf.train.Feature(int64_list=tf.train.Int64List(value=x)),
        'image/object/parts/y':
        tf.train.Feature(int64_list=tf.train.Int64List(value=y)),
        'image/object/center/x': 
        tf.train.Feature(int64_list=tf.train.Int64List(value=[c_x])),
        'image/object/center/y': 
        tf.train.Feature(int64_list=tf.train.Int64List(value=[c_y])),
        'image/object/scale':
        tf.train.Feature(float_list=tf.train.FloatList(value=[scale])),
        'image/object/parts/v':
        tf.train.Feature(int64_list=tf.train.Int64List(value=v)),
        'image/encoded':
        _bytes_feature(content),
        'image/filename':
        _bytes_feature(filename.encode())
    }

    return tf.train.Example(features=tf.train.Features(feature=feature))

- 정의한 json의 python type의 값들을 tfexample에 사용할 수 있는 값으로 변환
- image 파일은 byte로 변환. bitmap으로 저장하게 되면 파일 용량이 커지므로 jpeg 타입이 아닌 경우 jpeg로 변화 후 content로 불러서 저장
- 각 label값을 tf.train.Feature로 저장. 데이터타입에 주의
- 이미지는 byte 인코딩 된 값을 그대로 넣는다.

In [9]:
def _bytes_feature(value):
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy()
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

# Ray

- Ray는 파이썬을 위한 간단한 분산 애플리케이션 API

#### multiprocessing과 Ray의 사용상 차이점은?
- multiprocenssing : 병렬화를 위해 추상적 구조를 새로 설계
- Ray : 쓰던 코드에서 거의 수정없이 병렬화 할 수 있다.

In [10]:
# tfrecords_mpii.py
import csv
import io
import json
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

from loguru import logger
from PIL import Image
import ray
import tensorflow as tf

num_train_shards = 64
num_val_shards = 8
ray.init()
tf.get_logger().setLevel('ERROR')


def chunkify(l, n):
    size = len(l) // n
    start = 0
    results = []
    for i in range(n - 1):
        results.append(l[start:start + size])
        start += size
    results.append(l[start:])
    return results


def _bytes_feature(value):
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy(
        )  # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def generate_tfexample(anno):
    filename = anno['filename']
    filepath = anno['filepath']
    with open(filepath, 'rb') as image_file:
        content = image_file.read()

    image = Image.open(filepath)
    if image.format != 'JPEG' or image.mode != 'RGB':
        image_rgb = image.convert('RGB')
        with io.BytesIO() as output:
            image_rgb.save(output, format="JPEG", quality=95)
            content = output.getvalue()

    width, height = image.size
    depth = 3

    c_x = int(anno['center'][0])
    c_y = int(anno['center'][1])
    scale = anno['scale']

    # x = [
    #     joint[0] / width if joint[0] >= 0 else joint[0]
    #     for joint in anno['joints']
    # ]
    # y = [
    #     joint[1] / height if joint[1] >= 0 else joint[0]
    #     for joint in anno['joints']
    # ]
    x = [
        int(joint[0]) if joint[0] >= 0 else int(joint[0]) 
        for joint in anno['joints']
    ]
    y = [
        int(joint[1]) if joint[1] >= 0 else int(joint[0]) 
        for joint in anno['joints']
    ]
    # 0 - invisible, 1 - occluded, 2 - visible
    v = [0 if joint_v == 0 else 2 for joint_v in anno['joints_visibility']]

    feature = {
        'image/height':
        tf.train.Feature(int64_list=tf.train.Int64List(value=[height])),
        'image/width':
        tf.train.Feature(int64_list=tf.train.Int64List(value=[width])),
        'image/depth':
        tf.train.Feature(int64_list=tf.train.Int64List(value=[depth])),
        'image/object/parts/x':
        tf.train.Feature(int64_list=tf.train.Int64List(value=x)),
        'image/object/parts/y':
        tf.train.Feature(int64_list=tf.train.Int64List(value=y)),
        'image/object/center/x': 
        tf.train.Feature(int64_list=tf.train.Int64List(value=[c_x])),
        'image/object/center/y': 
        tf.train.Feature(int64_list=tf.train.Int64List(value=[c_y])),
        'image/object/scale':
        tf.train.Feature(float_list=tf.train.FloatList(value=[scale])),
        # 'image/object/parts/x':
        # tf.train.Feature(float_list=tf.train.FloatList(value=x)),
        # 'image/object/parts/y':
        # tf.train.Feature(float_list=tf.train.FloatList(value=y)),
        'image/object/parts/v':
        tf.train.Feature(int64_list=tf.train.Int64List(value=v)),
        'image/encoded':
        _bytes_feature(content),
        'image/filename':
        _bytes_feature(filename.encode())
    }

    return tf.train.Example(features=tf.train.Features(feature=feature))


@ray.remote
def build_single_tfrecord(chunk, path):
    print('start to build tf records for ' + path)

    with tf.io.TFRecordWriter(path) as writer:
        for anno_list in chunk:
            tf_example = generate_tfexample(anno_list)
            writer.write(tf_example.SerializeToString())

    print('finished building tf records for ' + path)


def build_tf_records(annotations, total_shards, split):
    chunks = chunkify(annotations, total_shards)
    futures = [
        # train_0001_of_0064.tfrecords
        build_single_tfrecord.remote(
            chunk, './tfrecords_mpii/{}_{}_of_{}.tfrecords'.format(
                split,
                str(i + 1).zfill(4),
                str(total_shards).zfill(4),
            )) for i, chunk in enumerate(chunks)
    ]
    ray.get(futures)


def parse_one_annotation(anno, image_dir):
    filename = anno['image']
    joints = anno['joints']
    joints_visibility = anno['joints_vis']
    annotation = {
        'filename': filename,
        'filepath': os.path.join(image_dir, filename),
        'joints_visibility': joints_visibility,
        'joints': joints,
        'center': anno['center'],
        'scale' : anno['scale']
    }
    return annotation


def main():
    print('Start to parse annotations.')
    if not os.path.exists('./tfrecords_mpii'):
        os.makedirs('./tfrecords_mpii')

    with open('./mpii_human_pose_v1_u12_2/train.json') as train_json:
        train_annos = json.load(train_json)
        train_annotations = [
            parse_one_annotation(anno, './images/')
            for anno in train_annos
        ]
        print('First train annotation: ', train_annotations[0])
        del (train_annos)

    with open('./mpii_human_pose_v1_u12_2/validation.json') as val_json:
        val_annos = json.load(val_json)
        val_annotations = [
            parse_one_annotation(anno, './images/') for anno in val_annos
        ]
        print('First val annotation: ', val_annotations[0])
        del (val_annos)

    print('Start to build TF Records.')
    build_tf_records(train_annotations, num_train_shards, 'train')
    build_tf_records(val_annotations, num_val_shards, 'val')

    print('Successfully wrote {} annotations to TF Records.'.format(
        len(train_annotations) + len(val_annotations)))


if __name__ == '__main__':
    main()

2022-01-24 17:48:39,165	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


Start to parse annotations.
First train annotation:  {'filename': '015601864.jpg', 'filepath': './images/015601864.jpg', 'joints_visibility': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 'joints': [[620.0, 394.0], [616.0, 269.0], [573.0, 185.0], [647.0, 188.0], [661.0, 221.0], [656.0, 231.0], [610.0, 187.0], [647.0, 176.0], [637.0201, 189.8183], [695.9799, 108.1817], [606.0, 217.0], [553.0, 161.0], [601.0, 167.0], [692.0, 185.0], [693.0, 240.0], [688.0, 313.0]], 'center': [594.0, 257.0], 'scale': 3.021046}
First val annotation:  {'filename': '005808361.jpg', 'filepath': './images/005808361.jpg', 'joints_visibility': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 'joints': [[804.0, 711.0], [816.0, 510.0], [908.0, 438.0], [1040.0, 454.0], [906.0, 528.0], [883.0, 707.0], [974.0, 446.0], [985.0, 253.0], [982.7591, 235.9694], [962.2409, 80.0306], [869.0, 214.0], [798.0, 340.0], [902.0, 253.0], [1067.0, 253.0], [1167.0, 353.0], [1142.0, 478.0]], 'center': [966.0, 340.0], 'scale': 4.7

[2m[36m(pid=629)[0m finished building tf records for ./tfrecords_mpii/train_0036_of_0064.tfrecords
[2m[36m(pid=629)[0m start to build tf records for ./tfrecords_mpii/train_0039_of_0064.tfrecords
[2m[36m(pid=631)[0m finished building tf records for ./tfrecords_mpii/train_0037_of_0064.tfrecords
[2m[36m(pid=631)[0m start to build tf records for ./tfrecords_mpii/train_0040_of_0064.tfrecords
[2m[36m(pid=630)[0m finished building tf records for ./tfrecords_mpii/train_0038_of_0064.tfrecords
[2m[36m(pid=630)[0m start to build tf records for ./tfrecords_mpii/train_0041_of_0064.tfrecords
[2m[36m(pid=631)[0m finished building tf records for ./tfrecords_mpii/train_0040_of_0064.tfrecords
[2m[36m(pid=631)[0m start to build tf records for ./tfrecords_mpii/train_0042_of_0064.tfrecords
[2m[36m(pid=629)[0m finished building tf records for ./tfrecords_mpii/train_0039_of_0064.tfrecords
[2m[36m(pid=629)[0m start to build tf records for ./tfrecords_mpii/train_0043_of_0064.tfrec

`cd ~/mpii_folder && python tfrecords_mpii.py` 를 터미널을 통해 수행한다,10분예상

In [14]:
# 200Mb 정도의 tfrecords들이 72개 만들어짐.
!cd ~/aiffel/mpii/tfrecords_mpii && ls | wc

     72      72    2072


# Data label 로 만들기

tfrecords 파일을 읽고 전처리를 할 수 있는 dataloader

In [15]:
import tensorflow as tf

class Preprocessor(object):
    def __init__(self,
                 image_shape=(256, 256, 3),
                 heatmap_shape=(64, 64, 16),
                 is_train=False):
        self.is_train = is_train
        self.image_shape = image_shape
        self.heatmap_shape = heatmap_shape

    def __call__(self, example):
        features = self.parse_tfexample(example)
        image = tf.io.decode_jpeg(features['image/encoded'])

        if self.is_train:
            random_margin = tf.random.uniform([1], 0.1, 0.3)[0]
            image, keypoint_x, keypoint_y = self.crop_roi(image, features, margin=random_margin)
            image = tf.image.resize(image, self.image_shape[0:2])
        else:
            image, keypoint_x, keypoint_y = self.crop_roi(image, features)
            image = tf.image.resize(image, self.image_shape[0:2])

        image = tf.cast(image, tf.float32) / 127.5 - 1
        heatmaps = self.make_heatmaps(features, keypoint_x, keypoint_y)

        # print (image.shape, heatmaps.shape, type(heatmaps))

        return image, heatmaps

`Preprocessor` 클래스 코드에서 `__call__()` 메소드 내부에서 진행되는 주요 과정을 정리하면

- tfrecord 파일이기 때문에 병렬로 읽는 것은 tf가 지원. `self.parse_tfexample()`에 구현되어있고, 이 함수를 통해 `tf.tensor`로 이루어진 dictionary형태의 `features`를 얻을 수 있다.
- image는 `features['image/encoded']` 형태로 사용할 수 있고 tfrecord를 저장할 때 jpeg encoding 된 값을 넣었으므로 `tf.io.decode_jpeg()`로 decoding 하여 tensor 형태의 이미지를 얻는다.
- `crop_roi()` 메소드를 이용해 해당 이미지를 학습하기 편하도록 트릭적용
- `make_heatmaps()` 메소드로 label을 heatmap으로 나타낸다.

In [16]:
def parse_tfexample(self, example_proto):
        image_feature_description = {
            'image/height': tf.io.FixedLenFeature([], tf.int64),
            'image/width': tf.io.FixedLenFeature([], tf.int64),
            'imaage/depth': tf.io.FixedLenFeature([], tf.int64),
            'image/object/parts/x': tf.io.VarLenFeature(tf.int64),
            'image/object/parts/y': tf.io.VarLenFeature(tf.int64),
            'image/object/parts/v': tf.io.VarLenFeature(tf.int64),
            'image/object/center/x': tf.io.FixedLenFeature([], tf.int64),
            'image/object/center/y': tf.io.FixedLenFeature([], tf.int64),
            'image/object/scale': tf.io.FixedLenFeature([], tf.float32),
            'image/encoded': tf.io.FixedLenFeature([], tf.string),
            'image/filename': tf.io.FixedLenFeature([], tf.string),
        }
        return tf.io.parse_single_example(example_proto,
                                          image_feature_description)

- tfrecord 파일 형식을 저장한 data type feature에 맞게 parsing한다.