In [None]:
import os

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow.keras as keras
import matplotlib.pyplot as plt
import imgaug.augmenters as iaa
import segmentation_models as sm

from PIL import Image 
from segmentation_models import Unet
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.losses import binary_crossentropy
from sklearn.model_selection import train_test_split

sm.set_framework('tf.keras')
sm.framework()

In [None]:
""" 데이터의 경로 지정 """

train_images_path = '../input/severstal-steel-defect-detection/train_images'
train_df = pd.read_csv('../input/severstal-steel-defect-detection/train.csv') 

In [None]:
""" 훈련용 이미지의 이름 목록 저장"""
train_image_names = os.listdir(train_images_path)
train_image_names = pd.DataFrame(train_image_names, columns =['ImageId'])
train_image_names # train 폴더에는 12568개의 이미지가 존재함을 확인

In [None]:
""" 
pandas의 merge를 이용하여 두 데이터프레임 train.csv와 train image 목록을 ImageId를 기준으로 병합
7095개의 결함과 결함이 없는 5902개의 이미지를 결합하여 결과적으로 12997개의 행이 됨
"""
train_df = pd.merge(train_df,train_image_names,how = 'outer',on = ['ImageId','ImageId'])
train_df = train_df.fillna(' ')
train_df # 12997(7095+5902) x 3 (ImageId, ClassId, EncodedPixels)

In [None]:
""" 
ImageId를 행 인덱스로, ClassId를 열 인덱스로, EncodedPixels를 분석할 열로 지정
numpy 합을 이용하되, 없는 값은 공백인 ' '로 채움
"""
train_data = pd.pivot_table(train_df, values='EncodedPixels', index='ImageId',columns='ClassId', aggfunc=np.sum,fill_value= ' ').astype(str)
train_data # 이미지별 결함의 위치로 변경하며 12568개로 돌아옴

In [None]:
""" train_df에는 ClassId가 공백인, 결함이 없는 이미지가 있었으므로 공백 열이 생성됨 """
train_data.columns

In [None]:
""" 결함이 없으므로 아무런 데이터가 없는 열임 """
train_data[' '].value_counts()

In [None]:
""" 삭제해줌 """
del train_data[' ']
train_data.columns

In [None]:
"""
index 설정을 리셋하여 ClassId 설정을 지운다.
"""
train_data = train_data.reset_index()
train_data

In [None]:
""" ClasssId 1, 2, 3, 4를 결함 1, 2, 3, 4로 수정 """
train_data.columns = ['ImageId','Defect_1','Defect_2','Defect_3','Defect_4']
train_data # Defect 1, 2, 3, 4의 위치를 12568개의 이미지에 각각 표시

In [None]:
"""
각각의 이미지가 결함을 가지고 있는지 알려주는 hasDefect, 결함 1, 2, 3, 4를 가지고 있는지를 알려주는 hasDefect_1/2/3/4 열 생성
"""

insert_column = []
for i in range(len(train_data)):
    if (train_data['Defect_1'][i]== ' ' and train_data['Defect_2'][i]== ' ' and train_data['Defect_3'][i]==' ' and train_data['Defect_4'][i]==' '):
        insert_column.append(0)
    else:
        insert_column.append(1)
train_data['hasDefect'] = insert_column

insert_column = []
for i in range(len(train_data)):
    if train_data['Defect_1'][i]==' ':
        insert_column.append(0)
    else:
        insert_column.append(1)
train_data['hasDefect_1'] = insert_column

insert_column = []
for i in range(len(train_data)):
    if train_data['Defect_2'][i]==' ':
        insert_column.append(0)
    else:
        insert_column.append(1)
train_data['hasDefect_2'] = insert_column

insert_column = []
for i in range(len(train_data)):
    if train_data['Defect_3'][i]==' ':
        insert_column.append(0)
    else:
        insert_column.append(1)
train_data['hasDefect_3'] = insert_column

insert_column = []
for i in range(len(train_data)):
    if train_data['Defect_4'][i]==' ':
        insert_column.append(0)
    else:
        insert_column.append(1)
train_data['hasDefect_4'] = insert_column

train_data

In [None]:
plt.figure(figsize = (4.8, 4)) 
plt.xlabel('number of defects in each image')
plt.ylabel('Number')
plt.bar([str(0),str(1),str(2),str(3)], train_data[['hasDefect_1','hasDefect_2','hasDefect_3','hasDefect_4']].sum(axis = 1).value_counts().sort_index(),
        color = ['orange','blue','green','red'] , width = 0.8)
plt.title('Distribution of number of defects in each image')

xlocs, xlabs = plt.xticks()
for i, v in zip( [0,1,2,3], train_data[['hasDefect_1','hasDefect_2','hasDefect_3','hasDefect_4']].sum(axis = 1).value_counts().sort_index()):
    plt.text(xlocs[i] - 0.15, v + 0.5, str(v))
plt.show()

In [None]:
plt.figure(figsize = (4.8, 4)) 
plt.xlabel('Defect type')
plt.ylabel('Number')
plt.bar([1,2,3,4],train_data[['hasDefect_1','hasDefect_2','hasDefect_3','hasDefect_4']].sum(axis = 0),
        color = ['red','green','blue','orange'] , width = 0.8)
xlocs, xlabs = plt.xticks()
plt.title('Distribution of steel among defect types')
for i, v in zip([1,2,3,4],train_data[['hasDefect_1','hasDefect_2','hasDefect_3','hasDefect_4']].sum(axis = 0)):
    plt.text(xlocs[i] - 0.15, v + 0.5, str(v))
plt.show()

**분석 결과 대다수의 이미지는 결함이 한 개이나, 다중 결함을 갖는 결함이 존재.  
또한 데이터의 분포가 매우 불균형하여, 증강이 필요**

In [None]:
"""
minority preference는 데이터가 소수인 클래스를 의미
소수 레이블을 우선적으로 사용하여 다중 결함 클래스에서 2, 4, 1, 3 순으로 데이터를 확인
"""

insert_column = []
for i in range(len(train_data)):
    if train_data['hasDefect_2'].iloc[i]==1:
        insert_column.append(2)
    elif train_data['hasDefect_4'].iloc[i]==1:
        insert_column.append(4)
    elif train_data['hasDefect_1'].iloc[i]==1:
        insert_column.append(1)
    elif train_data['hasDefect_3'].iloc[i]==1:
        insert_column.append(3)
    else:
        insert_column.append(0)
        
train_data['minority_preference']= insert_column
train_data

In [None]:
X = train_data.copy()
X_train, X_test = train_test_split(X, test_size = 0.2, stratify = X['minority_preference'],random_state=2022)
X_train, X_validation = train_test_split(X_train, test_size = 0.25, stratify = X_train['minority_preference'],random_state=2022)

print(X_train.shape, X_validation.shape, X_test.shape) # train, validation, test를 대략 6:2:2로 분리 (7540:2514:2514)

In [None]:
X_train.to_csv('../input/severstal-steel-defect-detection/X_train.csv')
X_validation.to_csv('../input/severstal-steel-defect-detection/X_validation.csv')
X_test.to_csv('../input/severstal-steel-defect-detection/X_test.csv')

In [None]:
del X
del train_data
del train_df

In [None]:
def rle2maskResize(rle):
    """ RLE 형식으로 코딩된 Encodedpixels를 mask 형태로 분리 """
    if (pd.isnull(rle))|(rle==''): 
        return np.zeros((height,width) ,dtype=np.uint8)
    
    mask= np.zeros( 256 * 1600 ,dtype=np.uint8)

    array = np.asarray([int(x) for x in rle.split()])
    starts = array[0::2]-1
    lengths = array[1::2]    
    for index, start in enumerate(starts):
        mask[int(start):int(start+lengths[index])] = 1
    
    if height == 128 and width == 800:
        return mask.reshape( (256, 1600), order='F' )[::2,::2]
    return mask.reshape( (256, 1600), order='F' )

In [None]:
""" keras를 이용하여 커스텀 데이터로더 생성 """
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, df, batch_size = 16, subset="train", shuffle=False, preprocess=None, augmentation=None, info={}):
        super().__init__()
        self.df = df
        self.shuffle = shuffle
        self.subset = subset
        self.batch_size = batch_size
        self.preprocess = preprocess
        self.augmentation = augmentation
        self.info = info
        self.path = '../input/severstal-steel-defect-detection/'
        self.data_path = self.path + 'train_images/'

        self.on_epoch_end()
        
    def __len__(self):
        return int(np.floor(len(self.df) / self.batch_size))
    
    def on_epoch_end(self):
        self.indexes = np.arange(len(self.df))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
    
    def __getitem__(self, index): 
        X = np.empty((self.batch_size, height, width, 3)).astype(np.float16)
        y = np.empty((self.batch_size, height, width, 4)).astype(np.float16)
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        for i,f in enumerate(self.df['ImageId'].iloc[indexes]):
            self.info[index*self.batch_size+i]=f
            if height == 128 and width == 800:
                X[i,] = Image.open(self.data_path + f).resize((800,128))
            else:
                X[i,] = Image.open(self.data_path + f)
            for j in range(4):
                y[i,:,:,j] = rle2maskResize(self.df['Defect_'+str(j+1)].iloc[indexes[i]])
        
        if self.augmentation != None and self.subset == 'train': # 증강은 train일때만, 입력했다면 적용
            for train_image in X:
                train_image = self.augmentation(train_image)
                
            for train_label in y:
                train_label = self.augmentation(train_label)

        if self.preprocess != None: # 전처리를 입력했다면 적용
            X = self.preprocess(X)
            
        return X, y

In [None]:
""" 전처리 """
def rescaling(item): # 값 범위를 0~255(pixel)에서 0~1로 정규화
    return item * 1./255

In [None]:
""" imgaug 라이브러리를 이용한 이미지 데이터 증강 """
def flip_augmentation(item):
    aug2 = iaa.Fliplr(1) # parameter * 180 도 만큼 이미지를 수평으로 뒤집음, 여기서는 좌우 대칭이 됨
    aug3 = iaa.Flipud(1) # parameter * 180 도 만큼 이미지를 수직으로 뒤집음, 여기서는 상하 대칭이 됨

    a = np.random.uniform()
    if a < 0.33:
        item = aug2.augment_image(item)
    elif a < 0.66:
        item = aug3.augment_image(item)
    return item

In [None]:
# https://www.kaggle.com/code/bigironsphere/loss-function-library-keras-pytorch
def dice_coef(y_true, y_pred, smooth=1e-6):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

def focal_tversky_loss(targets, inputs, alpha=0.3, beta=0.7, gamma=0.75, smooth=1e-6): 
    #flatten label and prediction tensors
    inputs = K.flatten(inputs)
    targets = K.flatten(targets)

    #True Positives, False Positives & False Negatives
    TP = K.sum((inputs * targets))
    FP = K.sum(((1-targets) * inputs))
    FN = K.sum((targets * (1-inputs)))
               
    Tversky = (TP + smooth) / (TP + alpha*FP + beta*FN + smooth)  
    FocalTversky = K.pow((1 - Tversky), gamma)
        
    return FocalTversky

In [None]:
BATCH_SIZE = 16
epochs = 30
height = 128
width = 800

In [None]:
train_dataset =      DataGenerator(X_train, batch_size= BATCH_SIZE, subset = "train", shuffle=True,
                            preprocess = rescaling, augmentation = flip_augmentation)

validation_dataset = DataGenerator(X_validation, batch_size= BATCH_SIZE, subset = 'validation',
                            preprocess = rescaling)

test_dataset =       DataGenerator(X_test, batch_size= BATCH_SIZE, subset = 'test',
                            preprocess = rescaling)

assert train_dataset[0][0].shape == (BATCH_SIZE, height, width, 3)
assert validation_dataset[0][0].shape == (BATCH_SIZE, height, width, 3)
assert test_dataset[0][0].shape == (BATCH_SIZE, height, width, 3)

assert train_dataset[0][1].shape == (BATCH_SIZE, height, width, 4)
assert validation_dataset[0][1].shape == (BATCH_SIZE, height, width, 4)
assert test_dataset[0][1].shape == (BATCH_SIZE, height, width, 4)

In [None]:
model = Unet('resnet18', input_shape=(height, width, 3), classes=4, activation='sigmoid') # segmentation_model.Unet Unet
model.compile(optimizer='adam', loss = focal_tversky_loss, metrics=[dice_coef])
model.summary()

In [None]:
""" validation loss가 최소인 가중치를 저장 """
callback = [ 
    tf.keras.callbacks.ModelCheckpoint(
      'best_weight.h5', 
      save_weights_only=True, 
      save_best_only=True,
      mode='min', 
      monitor='val_loss')
] 

In [None]:
""" 모델 학습 """

history = model.fit(
    train_dataset, 
    epochs = epochs,
    validation_data = validation_dataset,
    callbacks = [callback]
)

In [None]:
fig, (loss_plot, dice_plot) = plt.subplots(nrows = 1,ncols = 2,figsize=(25, 6))

loss_plot.plot(history.history['loss'], 'r', label = 'train loss')
loss_plot.plot(history.history['val_loss'], 'b', label = 'validation loss')
loss_plot.legend()
loss_plot.set_xlabel('epoch')
loss_plot.set_ylabel('loss')


dice_plot.plot(history.history['dice_coef'], 'r', label = 'train dice coefficient')
dice_plot.plot(history.history['val_dice_coef'], 'b', label = 'validation dice coefficient')
dice_plot.legend()
dice_plot.set_xlabel('epoch')
dice_plot.set_ylabel('dice coefficient')

In [None]:
""" 테스트 데이터셋으로 평가하여 성능 측정 """
model.evaluate(test_dataset)

In [None]:
def mask2pad(mask, pad=2):
    """
    Enlarge Mask to include more space around the defect
    """
    w = mask.shape[1]
    h = mask.shape[0]
    
    # MASK UP
    for k in range(1,pad,2):
        temp = np.concatenate([mask[k:,:],np.zeros((k,w))],axis=0)
        mask = np.logical_or(mask,temp)
    # MASK DOWN
    for k in range(1,pad,2):
        temp = np.concatenate([np.zeros((k,w)),mask[:-k,:]],axis=0)
        mask = np.logical_or(mask,temp)
    # MASK LEFT
    for k in range(1,pad,2):
        temp = np.concatenate([mask[:,k:],np.zeros((h,k))],axis=1)
        mask = np.logical_or(mask,temp)
    # MASK RIGHT
    for k in range(1,pad,2):
        temp = np.concatenate([np.zeros((h,k)),mask[:,:-k]],axis=1)
        mask = np.logical_or(mask,temp)
    
    return mask 

In [None]:
def mask2contour(mask, width=3):
    """
    Convert mask to its contour
    """
    w = mask.shape[1]
    h = mask.shape[0]
    mask2 = np.concatenate([mask[:,width:],np.zeros((h,width))],axis=1)
    mask2 = np.logical_xor(mask,mask2)
    mask3 = np.concatenate([mask[width:,:],np.zeros((width,w))],axis=0)
    mask3 = np.logical_xor(mask,mask3)
    return np.logical_or(mask2,mask3)

In [None]:
defects = list(X_test[X_test['Defect_1']!=''].sample(3).index)
defects += list(X_test[X_test['Defect_2']!=''].sample(3).index)
defects += list(X_test[X_test['Defect_3']!=''].sample(7).index)
defects += list(X_test[X_test['Defect_4']!=''].sample(3).index)

valid_batches = DataGenerator(X_test[X_test.index.isin(defects)], preprocess = rescaling)
preds = model.predict(valid_batches,verbose=1)

In [None]:
print('Plotting predictions...')
print('KEY: yellow=defect1, green=defect2, blue=defect3, magenta=defect4')

for i,batch in enumerate(valid_batches):
    plt.figure(figsize=(20,36))
    for k in range(16):
        plt.subplot(16,2,2*k+1)
        img = batch[0][k,]
        img = Image.fromarray(img.astype('uint8'))
        img = np.array(img)
        dft = 0
        extra = '  has defect '
        for j in range(4):
            msk = batch[1][k,:,:,j]
            if np.sum(msk)!=0:
                dft=j+1
                extra += ' '+str(j+1)
            msk = mask2pad(msk,pad=2)
            msk = mask2contour(msk,width=3)
            if j==0: # yellow
                img[msk==1,0] = 235 
                img[msk==1,1] = 235
            elif j==1: img[msk==1,1] = 210
            elif j==2: img[msk==1,2] = 255
            elif j==3: # magenta
                img[msk==1,0] = 255
                img[msk==1,2] = 255
        if extra=='  has defect ': extra =''
        plt.title('Train '+ X_test.iloc[16*i+k,0]+extra)
        plt.axis('off') 
        plt.imshow(img)
        plt.subplot(16,2,2*k+2)
        if dft!=0:
            msk = preds[16*i+k,:,:,dft-1]
            mx = np.round(np.max(msk),3)
            plt.title('Predict Defect '+str(dft)+'  (max pixel = '+str(mx)+')')
            plt.imshow(msk, cmap=plt.cm.gray)
        else:
            plt.title('Predict No Defect')
            plt.imshow(np.zeros((height,width)), cmap=plt.cm.gray)
        plt.axis('off')
    plt.subplots_adjust(wspace=0.05)
    plt.show()