In [15]:
import tensorflow as tf
import os
from model2 import unet_model
from keras.utils import normalize
from glob import glob
import numpy as np
import cv2
import matplotlib.pyplot as plt
from patchify import patchify
import tifffile as tiff
from PIL import Image
from sklearn.model_selection import train_test_split
import random
from sklearn.preprocessing import LabelEncoder
from keras.utils import to_categorical
from keras.metrics import MeanIoU
from tensorflow.keras.models import load_model
import csv

### Input pipeline

In [16]:
# Read an sample to get correct configuration
path = "../data/Sandstone/images/image_0001.png"
img = cv2.imread(path, cv2.IMREAD_COLOR)
h,w,c = img.shape
h,w,c

(256, 256, 3)

In [17]:
# Configuration
colors = [[0, 0, 0], [0, 153, 255], [102, 255, 153], [0, 204, 153]]
cf = {}
cf['n'] = 4
cf['h'] = 256
cf['w'] = 256
cf['c'] = 1
cf['p'] =  16
cf['n_patches'] = (cf['h']  * cf['w']) // (cf['p'] * cf['p'])
cf['dropout'] =  0.1
cf['flat_p_shape'] = (
    cf['n_patches'],
    cf['p'] * cf['p'] * cf['c'],  # one slice
    1
)
model_path = os.path.join("../model/", "unet_sandstone_20_inp.keras")
csv_path = os.path.join("files", "log.csv")

In [18]:
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)    

class InputPipeline():
    def __init__(self, cf, path):
        self.cf = cf
        self.path = path
        self.load_dataset()
        
    def split_dataset(self):
        self.X_train, self.X_val, self.y_train, self.y_val = train_test_split(self.images, self.masks, test_size=0.2, random_state=42)
        self.X_val, self.X_test, self.y_val, self.y_test = train_test_split(self.X_val, self.y_val, test_size=0.5, random_state=42)
        print(
            len(self.X_train), len(self.y_train), 
            len(self.X_val), len(self.y_val),
            len(self.X_test), len(self.y_test)     
        )
        
    def load_dataset(self):
        self.images = sorted(glob(os.path.join(self.path, "images", "*.png")))
        self.masks = sorted(glob(os.path.join(self.path, "masks", "*.png")))
        self.split_dataset()
        
    def read_image(self, path):
        path = path.decode()
        image = cv2.imread(path, cv2.IMREAD_COLOR)
        image = cv2.resize(image, (self.cf["h"], self.cf["w"]))
        image = image / 255.0

        patch_shape = (self.cf["p"], self.cf["p"], self.cf["c"])   # (16, 16, 1)
        patches = patchify(image, patch_shape, self.cf["p"])       # (256, 16, 16, 1) ~ (256, 256, 1)
        patches = np.reshape(patches, self.cf["flat_p_shape"])     # (256, 16 x 16 x 1) ~ flatten the the patch
        patches = patches.astype(np.float32)
        return patches

    def read_mask(self, path):
        path = path.decode()
        mask = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, (self.cf["h"], self.cf["w"]))
        mask = mask.astype(np.int32)
        return mask
    
    def tf_parse(self, x, y):
        def _parse(x, y):
            x = self.read_image(x)
            y = self.read_mask(y)
            y = tf.one_hot(y, cf["n"])
            return x, y

        x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
        x.set_shape(cf["flat_p_shape"])
        y.set_shape([cf["h"], cf["w"], cf["n"]])
        return x, y
            
    def tf_dataset(self, batch=1):
        self.train_ds = tf.data.Dataset.from_tensor_slices((self.X_train, self.y_train))
        self.train_ds = self.train_ds.map(self.tf_parse).batch(batch).prefetch(10)
        
        self.val_ds = tf.data.Dataset.from_tensor_slices((self.X_val, self.y_val))
        self.val_ds = self.val_ds.map(self.tf_parse).batch(batch).prefetch(10)
        
        self.test_ds = tf.data.Dataset.from_tensor_slices((self.X_test, self.y_test))
        self.test_ds = self.test_ds.map(self.tf_parse).batch(batch).prefetch(10)
        
        return self.train_ds, self.val_ds, self.test_ds

In [19]:
dataset_path = "../data/Sandstone/"
ippl = InputPipeline(cf, dataset_path)
train_ds, val_ds, test_ds = ippl.tf_dataset()
print(train_ds)
print(val_ds)
print(test_ds)

204 204 26 26 26 26
<PrefetchDataset element_spec=(TensorSpec(shape=(None, 256, 256, 1), dtype=tf.float32, name=None), TensorSpec(shape=(None, 256, 256, 4), dtype=tf.float32, name=None))>
<PrefetchDataset element_spec=(TensorSpec(shape=(None, 256, 256, 1), dtype=tf.float32, name=None), TensorSpec(shape=(None, 256, 256, 4), dtype=tf.float32, name=None))>
<PrefetchDataset element_spec=(TensorSpec(shape=(None, 256, 256, 1), dtype=tf.float32, name=None), TensorSpec(shape=(None, 256, 256, 4), dtype=tf.float32, name=None))>


In [20]:
print(tf.data.Dataset.cardinality(train_ds))
print(tf.data.Dataset.cardinality(val_ds))
print(tf.data.Dataset.cardinality(test_ds))

tf.Tensor(204, shape=(), dtype=int64)
tf.Tensor(26, shape=(), dtype=int64)
tf.Tensor(26, shape=(), dtype=int64)


In [21]:
for image, mask in train_ds.batch(1).take(1):
    print(image.shape, mask.shape)
    

(1, 1, 256, 256, 1) (1, 1, 256, 256, 4)


In [22]:
model = unet_model(cf)
model.compile(
    loss="categorical_crossentropy", 
    optimizer='adam',
    metrics=['accuracy']
)
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 256, 256, 1  0           []                               
                                )]                                                                
                                                                                                  
 conv2d_19 (Conv2D)             (None, 256, 256, 16  160         ['input_2[0][0]']                
                                )                                                                 
                                                                                                  
 dropout_9 (Dropout)            (None, 256, 256, 16  0           ['conv2d_19[0][0]']              
                                )                                                           

In [23]:
history = model.fit(
    train_ds,
    verbose=1, 
    epochs=20, 
    validation_data=val_ds, 
    shuffle=False
)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


### Save models and results

In [25]:
# Extracting the training history
train_accuracy = history.history['accuracy']
val_accuracy = history.history['val_accuracy']
train_loss = history.history['loss']
val_loss = history.history['val_loss']

# Define the file name
file_name = f'../results/unet_sandstone_20_inp.csv'

# Writing the data into a CSV file
with open(file_name, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Epoch', 'Train Accuracy', 'Val Accuracy', 'Train Loss', 'Val Loss'])
    for epoch in range(len(train_accuracy)):
        writer.writerow([epoch+1, train_accuracy[epoch], val_accuracy[epoch], train_loss[epoch], val_loss[epoch]])

print(f"Training history has been saved to {file_name}")

Training history has been saved to unet_sandstone_20_inp.csv


In [26]:
model.save('../models/unet_sandstone_20_inp.hdf5')   # architectyre_dataset_epochs
_, acc = model.evaluate(test_ds)
print("Accuracy is = ", (acc * 100.0), "%")

Accuracy is =  89.8931086063385 %


### Visualization

### Run inference