In [None]:
from keras.preprocessing.image import ImageDataGenerator
from keras.applications.vgg16 import preprocess_input
from keras.layers import Flatten, Dense, Dropout
from keras.preprocessing import image
from keras.applications import VGG16
from keras.models import Sequential
import matplotlib.pyplot as plt
from ultralytics import YOLO
from numpy import asarray
import tensorflow as tf
from PIL import Image
import numpy as np
import keras
import cv2
import os
from tqdm import tqdm
import pandas as pd
import json
from pathlib import Path

In [None]:
# obb_model = YOLO('yolov8l-obb.pt')
# obb_model.train(data='obb/data.yaml', epochs=45, imgsz=640, optimizer='AdamW')

# seg_model = YOLO('yolov8n-seg.pt')
# seg_model.train(data='seg/data.yaml', epochs=100, imgsz=640, optimizer='AdamW', single_cls=True)

In [None]:
mask_color = (0, 0, 255)
img_width, img_height = 244, 244
ALL_L = 0
ALL_M = 1
BRICKS = 2
LONGTRANS = 3
RAVELING = 4
THRESHOLD = 0.05
all_l_c = (255,0,0)
all_m_c = (0,255,0)
bricks_c = (0,0,255)
longtrans_c = (100,100,100)
raveling_c = (100,100,0)
blank_image = np.zeros([640,640,3],dtype=np.uint8)
THRESHOLD = 0.05
obb_model = YOLO('obb_model.pt')
seg_model = YOLO('seg_model.pt')
conv_base = VGG16(weights='imagenet', include_top=False, input_shape=(img_width, img_height, 3))
class Distress:
    mask = blank_image.copy()
    color = (0,0,0)
    area = 0
    cnt = 0

In [None]:
class Distress:
    mask = blank_image.copy()
    color = (0,0,0)
    area = 0
    cnt = 0

def combine_distress(mask_img, new_mask, c):
    return cv2.bitwise_or(mask_img, cv2.fillPoly(blank_image.copy(), pts=np.int32([new_mask]), color=c))

def pred_seg_mdl(img_path):
    results = seg_model(img_path, verbose=False, device=[0])
    alligator_l = Distress()
    alligator_m = Distress()
    bricks = Distress()
    longtrans = Distress()
    raveling = Distress()

    alligator_l.color = all_l_c
    alligator_m.color = all_m_c
    bricks.color = bricks_c
    longtrans.color = longtrans_c
    raveling.color = raveling_c
    
    for r in results:
        id = 0
        if r.masks is None:
            continue
        for mask in r.masks.xy:
            cls = r.boxes.cls[id].item()
            if r.boxes.conf[id] > THRESHOLD:
                if cls == ALL_L:
                    alligator_l.mask = combine_distress(alligator_l.mask, mask, alligator_l.color)
                    alligator_l.cnt += 1
                elif cls == ALL_M:
                    alligator_m.mask = combine_distress(alligator_m.mask, mask, alligator_m.color)
                    alligator_m.cnt += 1
                elif cls == BRICKS:
                    bricks.mask = combine_distress(bricks.mask, mask, bricks.color)
                    bricks.cnt += 1
                elif cls == LONGTRANS:
                    longtrans.mask = combine_distress(longtrans.mask, mask, longtrans.color)
                    longtrans.cnt += 1
                elif cls == RAVELING:
                    raveling.mask = combine_distress(raveling.mask, mask, raveling.color)
                    raveling.cnt += 1
            id += 1
    return (alligator_l, alligator_m, bricks, longtrans, raveling)

def create_keras_img_array(img):
    new_img = cv2.resize(img, (img_width, img_height))
    img_array = image.img_to_array(new_img)
    img_array = np.expand_dims(img_array, axis=0)
    img_array /= 255
    return img_array

def get_mask(img_path):
    distresses = pred_seg_mdl(img_path)
    mask = blank_image.copy()
    for d in distresses:
        mask = cv2.bitwise_or(mask, d.mask)
    return mask

def apply_obb(img_path, alpha=0.3):
    img = cv2.imread(img_path)
    results = obb_model(img, verbose=False, device=[0])
    
    found = False
    for r in results:
        id = 0
        if r.obb is None:
            continue
        for mask in r.obb.xyxyxyxyn:
            if r.obb.conf[id] > THRESHOLD:
                found = True
                color = (255,0,0)
                # mask is normlized to 0-1, so we need to multiply by the image size
                # mask is tensor, so we need to convert it to numpy array
                mask = mask.cpu().numpy()
                mask = mask.reshape((-1,2))
                mask[:,0] *= img.shape[1]
                mask[:,1] *= img.shape[0]
                # fill polygon with alpha
                poly = cv2.fillPoly(img.copy(), pts=np.int32([mask]), color=color)
                img = cv2.addWeighted(poly, alpha, img, 1-alpha, 0)
            id += 1
    return img

def extract_seg_features(img_path:str):
    original_img = create_keras_img_array(cv2.imread(img_path))
    mask = create_keras_img_array(get_mask(img_path))

    conv_original_img = conv_base.predict(original_img, verbose=0)
    conv_mask = conv_base.predict(mask, verbose=0)

    return np.array([np.concatenate((conv_original_img, conv_mask), axis=0)])

def extract_obb_features(img_path, obb_model):
    original_img = create_keras_img_array(cv2.imread(img_path))
    obb_img = create_keras_img_array(apply_obb(img_path, obb_model))
    
    conv_original_img = conv_base.predict(original_img, verbose=0, use_multiprocessing=True)
    conv_obb_img = conv_base.predict(obb_img, verbose=0, use_multiprocessing=True)
    
    return np.array([np.concatenate((conv_original_img, conv_obb_img), axis=0)])

def extract_pci(train_csv):
    pcis = {}
    with open(train_csv, 'r') as f:
        lines = f.readlines()
        lines = lines[1:]
        for line in lines:
            name = (line.split(',')[0] + ',' + line.split(',')[1]).replace('"', '')
            pci = int(line.split(',')[2])
            pcis[name] = pci
    return pcis

In [None]:
pcis = extract_pci('train_v2/train.csv')
def train_vector(img_dir:str):
    features = np.array([])
    labels = np.array([])
    n = len(os.listdir(img_dir))
    for img in tqdm(os.listdir(img_dir)):
        img_path = os.path.join(img_dir, img)
        img_features = extract_seg_features(img_path)
        if features.size == 0:
            features = img_features
        else:
            features = np.append(features, img_features, axis=0)
        labels = np.append(labels, pcis[img])
    return features, labels

def train_obb_tensor(data_dir):
    x_data = np.array([])
    y_data = np.array([])
    
    for img in tqdm(os.listdir(data_dir)):
        img_path = os.path.join(data_dir, img)
        features = extract_obb_features(img_path, obb_model)
        if x_data.size == 0:
            x_data = features
        else:
            x_data = np.append(x_data, features, axis=0)
        y_data = np.append(y_data, pcis[img])
    return x_data, y_data

In [None]:
seg_x_train, seg_y_train = train_vector('train_v2/train')
obb_x_train, obb_y_train = train_obb_tensor('train_v2/train')

np.save('seg_x_train.npy', seg_x_train)
np.save('seg_y_train.npy', seg_y_train)
np.save('obb_x_train.npy', obb_x_train)
np.save('obb_y_train.npy', obb_y_train)

In [None]:
saved_features = np.load('seg_x_train.npy')
saved_labels = np.load('seg_y_train.npy')

saved_labels[saved_labels < 0] = 0

cop = saved_labels.copy()
new_saved_labels = np.eye(101)[saved_labels.astype(int)]

val_size = int(len(saved_features)*0.2)

x_val = saved_features[:val_size]
y_val = new_saved_labels[:val_size]
x_train = saved_features[val_size:]
y_train = new_saved_labels[val_size:]

seg_model = Sequential() 
seg_model.add(Flatten(input_shape=(2,7,7,512)))
seg_model.add(Dense(256, activation='relu'))
seg_model.add(Dense(256, activation='relu'))
seg_model.add(Dense(256, activation='relu'))
seg_model.add(Dense(256, activation='relu'))
seg_model.add(Dense(256, activation='relu'))
seg_model.add(Dropout(0.5))
seg_model.add(Dense(101, activation='sigmoid'))

seg_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001), loss='BinaryCrossentropy', metrics=['mean_squared_error', 'accuracy'])
history = seg_model.fit(x_train, y_train, epochs=16, batch_size=16, validation_data=(x_val, y_val))
seg_model.save('seg_model.keras')

In [None]:
saved_features = np.load('obb_x_train.npy')
saved_labels = np.load('obb_y_train.npy')

saved_labels[saved_labels < 0] = 0

cop = saved_labels.copy()
new_saved_labels = np.eye(101)[saved_labels.astype(int)]

val_size = int(len(saved_features)*0.2)

x_val = saved_features[:val_size]
y_val = new_saved_labels[:val_size]
x_train = saved_features[val_size:]
y_train = new_saved_labels[val_size:]

seg_model = Sequential() 
seg_model.add(Flatten(input_shape=(2,7,7,512)))
seg_model.add(Dense(256, activation='relu'))
seg_model.add(Dense(256, activation='relu'))
seg_model.add(Dense(256, activation='relu'))
seg_model.add(Dense(256, activation='relu'))
seg_model.add(Dense(256, activation='relu'))
seg_model.add(Dropout(0.5))
seg_model.add(Dense(101, activation='sigmoid'))

seg_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.000714), loss='BinaryCrossentropy', metrics=['mean_squared_error', 'accuracy'])
history = seg_model.fit(x_train, y_train, epochs=20, batch_size=16, validation_data=(x_val, y_val))
seg_model.save('obb_model.keras')

In [None]:
class_model = YOLO('yolov8s-cls.pt')
class_model.train(data='class/data.yaml', epochs=35, imgsz=640, optimizer='AdamW', lr0=0.000714)

model = YOLO('runs/classify/trainX/weights/best.pt')

test_path = Path('test_v2/test/')
rows = []
for tst_img in test_path.glob('**/*.jpg'):
    preds = model(tst_img)
    cls_dict = preds[0].names
    probs = preds[0].probs.data.cpu().numpy()
    pred_pci=int(cls_dict[np.argmax(probs)])
    rows.append({'image_name':os.path.basename(tst_img),
                 'pci':max(0,min(100,pred_pci))})
df_test = pd.DataFrame(rows)

df_test.to_csv("class.csv",header=True)
def gen_submit(df):
  out_json = []
  for idx, results in df.iterrows():
    out_json.append({results['image_name']:results['pci']})
  with open('class.json', 'w') as f:
    json.dump(out_json, f)

df_test['pci'] = df_test['pci'].astype(int)
gen_submit(df_test)

In [None]:
def seg_pred(dir='', name=''):
    path = os.path.join(dir, name)

    features = extract_seg_features(path)
    predictions = seg_model.predict(np.array([features]))
    return (predictions[0])

def obb_pred(dir='', name=''):
    path = os.path.join(dir, name)

    features = extract_obb_features(path, obb_model)
    predictions = obb_model.predict(np.array([features]))
    return (predictions[0])

def gen_submit(df, name='submission.json'):
    out_json = []
    for idx, results in df.iterrows():
        out_json.append({results['image_name']:results['PCI']})
    with open(name, 'w') as f:
        json.dump(out_json, f)

def predict_seg_submission(test_dir):
    names = []
    preds = []
    for img in tqdm(os.listdir(test_dir)):
        pred = seg_pred(test_dir, img)
        
        preds.append(np.argmax(pred))
        names.append(img)
    df = pd.DataFrame({'image_name':names, 'PCI':preds})
    gen_submit(df, 'seg.json')

def predict_obb_submission(test_dir):
    names = []
    preds = []
    for img in tqdm(os.listdir(test_dir)):
        pred = obb_pred(test_dir, img)
        
        preds.append(np.argmax(pred))
        names.append(img)
    df = pd.DataFrame({'image_name':names, 'PCI':preds})
    gen_submit(df, 'obb.json')

predict_seg_submission('test_v2/test')
predict_obb_submission('test_v2/test')

In [None]:
def grab_submissions(class_json_path, seg_json_path, obb_json_path):
    with open(class_json_path, 'r') as f:
        class_json = json.load(f)
    with open(seg_json_path, 'r') as f:
        seg_json = json.load(f)
    with open(obb_json_path, 'r') as f:
        obb_json = json.load(f)
        
    class_files = [list(d.keys())[0] for d in class_json]
    seg_files = [list(d.keys())[0] for d in seg_json]
    obb_files = [list(d.keys())[0] for d in obb_json]
    
    class_values = [list(d.values())[0] for d in class_json]
    seg_values = [list(d.values())[0] for d in seg_json]
    obb_values = [list(d.values())[0] for d in obb_json]
    
    new_values = []
    new_files = []
    for file in class_files:
        # take the min of the two values
        class_v = class_values[class_files.index(file)]
        seg_v = seg_values[seg_files.index(file)]
        obb_v = obb_values[obb_files.index(file)]
        
        to_add = min(class_v, obb_v)
        new_values.append(to_add)
        new_files.append(file)
    
    df = pd.DataFrame({'image_name':new_files, 'PCI':new_values})
    gen_submit(df, 'submission127.json')

grab_submissions('class.json', 'seg.json', 'obb.json')