

---



In [None]:
PATH_DATASET = '/content/drive/MyDrive/canAiry/data/02_2nd-dataset/04-1_TR-bearing'



---



# 준비

In [None]:
import os
import shutil
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from glob import glob
from tqdm import tqdm

In [None]:
IMG_WIDTH = 640
IMG_HEIGHT = 512

In [None]:
def check_abnormal(sample):
  with open(sample, "r") as f:
    dict_label = json.load(f)   
    for obj in dict_label["annotations"]:
      if obj["attributes"]["status"] != "normal":
        return True
    return False

In [None]:
def check_csv(df_temp):
  if df_temp.shape[0] != IMG_HEIGHT:
    return False
  list_temp = [row[1][0].split(';')[:-1] for row in df_temp.iterrows()]
  for temp in list_temp:
    if len(temp) != IMG_WIDTH:
      return False
  return True

In [None]:
def get_bbox_info(path_csv):
  bbox_info=[]
  path_label = path_csv.split('.')[0].replace('00_csv','01_label')+'.json'
  with open(path_label, "r") as f:
    dict_label = json.load(f)
    for obj in dict_label["annotations"]:
      x = obj["data"]["x"]
      if x < 0:
        x = 0
      y = obj["data"]["y"]
      if y < 0:
        y = 0
      w = obj["data"]["width"]
      h = obj["data"]["height"]
      # W = x+w
      # if W >= IMG_WIDTH:
      #   W = IMG_WIDTH
      # H = y+h
      # if H >= IMG_HEIGHT:
      #   H = IMG_HEIGHT
      status = obj["attributes"]["status"]
      bbox_info.append([y,y+h,x,x+w,status])
      #bbox_info.append([x,y,x+w,y+h,status])
  return bbox_info

> 원본 데이터셋 확인

In [None]:
PATH_CSV_TRAIN = PATH_DATASET+"/00_csv/train/"
PATH_CSV_VAL = PATH_DATASET+"/00_csv/val/"
PATH_CSV_TEST = PATH_DATASET+"/00_csv/test/"

csv_train = glob(PATH_CSV_TRAIN+'*.csv')
csv_val = glob(PATH_CSV_VAL+'*.csv')
csv_test = glob(PATH_CSV_TEST+'*.csv')
csv_size = [len(csv_train), len(csv_val), len(csv_test)]
csv_size

[960, 120, 120]

In [None]:
PATH_JSON_TRAIN = PATH_DATASET+"/01_label/train/"
PATH_JSON_VAL = PATH_DATASET+"/01_label/val/"
PATH_JSON_TEST = PATH_DATASET+"/01_label/test/"

json_train = glob(PATH_JSON_TRAIN+'*.json')
json_val = glob(PATH_JSON_VAL+'*.json')
json_test = glob(PATH_JSON_TEST+'*.json')
json_size = [len(json_train), len(json_val), len(json_test)]
json_size

[960, 120, 120]

In [None]:
PATH_JPG_TRAIN = PATH_DATASET+"/02_tlc/train/"
PATH_JPG_VAL = PATH_DATASET+"/02_tlc/val/"
PATH_JPG_TEST = PATH_DATASET+"/02_tlc/test/"

jpg_train = glob(PATH_JPG_TRAIN+'*.jpg')
jpg_val = glob(PATH_JPG_VAL+'*.jpg')
jpg_test = glob(PATH_JPG_TEST+'*.jpg')
jpg_size = [len(jpg_train), len(jpg_val), len(jpg_test)]
jpg_size

[960, 120, 120]

> ANOMALIB 데이터셋 경로 설정

In [None]:
CLASS_NAME = PATH_DATASET.split('/')[-1].split('_')[1]

PATH_DATASET_AD = "/content/drive/MyDrive/anomaly_detection/datasets/canAIry/"+CLASS_NAME
if os.path.isdir(PATH_DATASET_AD) == False:
  os.mkdir(PATH_DATASET_AD)

PATH_NORMAL = PATH_DATASET_AD + "/normal/"
if os.path.isdir(PATH_NORMAL) == False:
  os.mkdir(PATH_NORMAL)

PATH_NORMAL_TEST = PATH_NORMAL + "/test/"
if os.path.isdir(PATH_NORMAL_TEST) == False:
  os.mkdir(PATH_NORMAL_TEST)

PATH_ABNORMAL = PATH_DATASET_AD + "/abnormal/"
if os.path.isdir(PATH_ABNORMAL) == False:
  os.mkdir(PATH_ABNORMAL)

PATH_INFERENCE = PATH_DATASET_AD + "/inference/"
if os.path.isdir(PATH_INFERENCE) == False:
  os.mkdir(PATH_INFERENCE)

# BBOX Crop

## 기존

In [None]:
def resize_img(list_jpg):
  for path_jpg in tqdm(list_jpg):
    img = Image.open(path_jpg)
    resized_img = img.resize((IMG_WIDTH,IMG_HEIGHT))
    resized_img.save(path_jpg)

In [None]:
def crop_bbox(list_jpg):
  for jpg in tqdm(list_jpg):
    img = Image.open(jpg)
    sample_name = os.path.splitext(jpg.split('/')[-1])[0]
    sample_dir = jpg.split('02_tlc')[1].split('/')[1]
    path_csv = jpg.split('.')[0].replace('02_tlc','00_csv')+'.csv'
    bbox = get_bbox_info(path_csv)
    for i, b in enumerate(bbox):
      cropped_img = img.crop((b[2],b[0],b[3],b[1]))
      path_jpg = ""
      if sample_dir == 'test':
          path_jpg = PATH_INFERENCE+sample_name+'B('+str(i)+').jpg'
      else:
        if b[4] == 'normal':
          if sample_dir == 'val':
            path_jpg = PATH_NORMAL_TEST+sample_name+'B('+str(i)+').jpg'
          else:
            path_jpg = PATH_NORMAL+sample_name+'B('+str(i)+').jpg'
        else:
          path_jpg = PATH_ABNORMAL+sample_name+'B('+str(i)+').jpg' 
      # Raw 이미지(.jpg) 파일을 생성합니다.
      if os.path.exists(path_jpg) == False:
        cropped_img.save(path_jpg)

In [None]:
resize_img(jpg_train)
resize_img(jpg_val)
resize_img(jpg_test)

100%|██████████| 960/960 [05:20<00:00,  2.99it/s]
100%|██████████| 120/120 [00:39<00:00,  3.02it/s]
100%|██████████| 120/120 [00:38<00:00,  3.10it/s]


In [None]:
crop_bbox(jpg_train)
crop_bbox(jpg_val)
crop_bbox(jpg_test)

100%|██████████| 960/960 [04:16<00:00,  3.75it/s]
100%|██████████| 120/120 [00:29<00:00,  4.12it/s]
100%|██████████| 120/120 [00:28<00:00,  4.16it/s]


## 생성

In [None]:
def create_raw_image_bbox(list_csv):
  for path_csv in tqdm(list_csv):
    arr_image = np.zeros((512, 640))
    df_temp = pd.read_csv(path_csv)[4:].reset_index(drop=True)
    sample_dir = path_csv.split('00_csv')[1].split('/')[1]
    sample_name = os.path.splitext(path_csv.split('/')[-1])[0]
    # csv 파일 내 온도 데이터가 나타내는 전체 픽셀의 형상(640x512)을 확인합니다.
    if check_csv(df_temp):
      for idx, row in df_temp.iterrows():
        arr_image[idx, :] = np.array([float(x) for x in row[0].split(';')[:-1]])
      bbox = get_bbox_info(path_csv)
      for i, b in enumerate(bbox):
        arr_image_bbox=arr_image[b[0]:b[1],b[2]:b[3]]
        path_jpg = ""
        if sample_dir == 'test':
          path_jpg = PATH_INFERENCE+sample_name+'B('+str(i)+').jpg'
        else:
          if b[4] == 'normal':
            if sample_dir == 'val':
              path_jpg = PATH_NORMAL_TEST+sample_name+'B('+str(i)+').jpg'
            else:
              path_jpg = PATH_NORMAL+sample_name+'B('+str(i)+').jpg'
          else:
            path_jpg = PATH_ABNORMAL+sample_name+'B('+str(i)+').jpg' 
        # Raw 이미지(.jpg) 파일을 생성합니다.
        if os.path.exists(path_jpg) == False:
          plt.imsave(path_jpg, arr_image_bbox)
    else:
      if os.path.exists(path_csv):
        os.remove(path_csv)

In [None]:
create_raw_image_bbox(csv_train)
create_raw_image_bbox(csv_val)
create_raw_image_bbox(csv_test)

# 확인

In [None]:
sample_normal = glob(PATH_NORMAL+'*.jpg')
sample_normal_test = glob(PATH_NORMAL_TEST+'*.jpg')
sample_abnormal = glob(PATH_ABNORMAL+'*.jpg')
sample_inference = glob(PATH_INFERENCE+'*.jpg')
sample_size = [len(sample_normal), len(sample_normal_test), len(sample_abnormal), len(sample_inference)]
sample_size

[690, 91, 375, 120]