In [1]:
import tensorflow as tf
# import imquality
# from imquality import datasets
from utils import *
import cv2
import numpy as np
import tensorflow_datasets as tfds
import pandas as pd

def image_preprocess(image: tf.Tensor) -> tf.Tensor:
    image = tf.cast(image, tf.float32)
    image = tf.image.rgb_to_grayscale(image)
    image_low = gaussian_filter(image, 16, 7/6)
    image_low = rescale(image_low, 1/4, method=tf.image.ResizeMethod.BICUBIC)
    image_low = tf.image.resize(image_low, size=image_shape(
        image), method=tf.image.ResizeMethod.BICUBIC)

    return image-tf.cast(image_low, image.dtype)


def error_map(reference: tf.Tensor, distorted: tf.Tensor, p: float = 0.2):
    return tf.pow(tf.abs(reference-distorted), p)


def reliability_map(distorted: tf.Tensor, alpha: float) -> tf.Tensor:
    return 2/(1+tf.exp(-alpha*tf.abs(distorted)))-1


def average_reliability_map(distorted: tf.Tensor, alpha: float) -> tf.Tensor:
    r = reliability_map(distorted, alpha)
    return r/tf.reduce_mean(r)


def loss(model, x, y_true, r):
    y_pred = model(x)
    return tf.reduce_mean(tf.square((y_true-y_pred)*r))


def gradient(model, x, y_true, r):
    with tf.GradientTape() as tape:
        loss_value = loss(model, x, y_true, r)
        return loss_value, tape.gradient(loss_value, model.trainable_variables)


optimizer = tf.optimizers.Nadam(learning_rate=10**-4)


def calculate_error_map(features):
    I_d = image_preprocess(features['dist_img'])
    I_r = image_preprocess(features['ref_img'])
    # I_d = image_preprocess(features[0])
    # I_r = image_preprocess(features[1])
    # I_d, I_r = features
    r = rescale(average_reliability_map(I_d, 0.2), 1/4)
    e_gt = rescale(error_map(I_r, I_d, 0.2), 1/4)

    return (I_d, e_gt, r)
# builder = datasets.LiveIQA(data_dir="./LIVE/")
# builder.download_and_prepare(download_dir="./LIVE/")
# builder = tfds.core.builder_from_directory()

# ds = builder.as_dataset(shuffle_files=True)['train']
# ds = ds.shuffle(1024).batch(1)
data_dir = './imquality/datasets/kadid10k'
df = pd.read_csv(f'{data_dir}/dmos.csv')
df_dict = df.to_dict('records')
# train = ds.map(calculate_error_map)
data = []
# print('heyyeyeyey')
j = 0
for i in df_dict:
    frame = cv2.imread(f'{data_dir}/images/{i["dist_img"]}')
    frame = np.expand_dims(frame, axis=0)
    im = tf.constant(frame)
    # im = image_preprocess(im)
    frame2 = cv2.imread(f'{data_dir}/images/{i["ref_img"]}')
    frame2 = np.expand_dims(frame2, axis=0)
    im2 = tf.constant(frame2)
    # im2 = image_preprocess(im2)
    data.append({
        'dist_img': im,
        'ref_img': im2,
        'dmos': i['dmos'],
    })
    j+=1
    if j==5:
        break
    # print('ab')
# print(data[:5])
print('aye')
train = map(calculate_error_map, data)
input = tf.keras.Input(shape=(None, None, 1),
                       batch_size=1, name='original_image')
f = tf.keras.layers.Conv2D(48, (3, 3), name='Conv1',
                           activation='relu', padding='same')(input)
f = tf.keras.layers.Conv2D(48, (3, 3), name='Conv2',
                           activation='relu', padding='same', strides=(2, 2))(f)
f = tf.keras.layers.Conv2D(64, (3, 3), name='Conv3',
                           activation='relu', padding='same')(f)
f = tf.keras.layers.Conv2D(64, (3, 3), name='Conv4',
                           activation='relu', padding='same', strides=(2, 2))(f)
f = tf.keras.layers.Conv2D(64, (3, 3), name='Conv5',
                           activation='relu', padding='same')(f)
f = tf.keras.layers.Conv2D(64, (3, 3), name='Conv6',
                           activation='relu', padding='same')(f)
f = tf.keras.layers.Conv2D(128, (3, 3), name='Conv7',
                           activation='relu', padding='same')(f)
f = tf.keras.layers.Conv2D(128, (3, 3), name='Conv8',
                           activation='relu', padding='same')(f)
g = tf.keras.layers.Conv2D(1, (1, 1), name='Conv9',
                           padding='same', activation='linear')(f)

objective_error_model = tf.keras.Model(input, g, name='objective_error_model')
objective_error_model.summary()


for epoch in range(10):
    print(epoch)
    epoch_accuracy = tf.keras.metrics.MeanSquaredError()
    step = 0
    for I_d, e_gt, r in train:
        loss_value, gradients = gradient(objective_error_model, I_d, e_gt, r)
        _ = optimizer.apply_gradients(
            zip(gradients, objective_error_model.trainable_weights))
        _ = epoch_accuracy(e_gt, objective_error_model(I_d))

        if(step % 100 == 0):
            print("epoch:%s step:%s loss= %s" %
                  (epoch, step, epoch_accuracy.result().numpy()))
        step += 1


aye
Model: "objective_error_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 original_image (InputLayer)  [(1, None, None, 1)]     0         
                                                                 
 Conv1 (Conv2D)              (1, None, None, 48)       480       
                                                                 
 Conv2 (Conv2D)              (1, None, None, 48)       20784     
                                                                 
 Conv3 (Conv2D)              (1, None, None, 64)       27712     
                                                                 
 Conv4 (Conv2D)              (1, None, None, 64)       36928     
                                                                 
 Conv5 (Conv2D)              (1, None, None, 64)       36928     
                                                                 
 Conv6 (Conv2D)              (1, None, No

In [36]:
v = tf.keras.layers.GlobalAveragePooling2D(data_format='channels_last')(f)
h = tf.keras.layers.Dense(128, activation='relu')(v)
h = tf.keras.layers.Dense(1)(h)
subjective_error_model = tf.keras.Model(
    input, h, name='subjective_error_model')
subjective_error_model.compile(optimizer=optimizer, loss=tf.losses.MeanSquaredError(
), metrics=[tf.metrics.MeanSquaredError()])
subjective_error_model.summary()


def calculate_subjective_score(features):
    # I_d = image_preprocess(features['dist_img'])
    mos = features['dmos']
    return mos # (I_d, mos)
train2X = list(map(lambda _: [image_preprocess(_['dist_img'])], data))
train2Y = np.array(list(map(calculate_subjective_score, data)))
train2Y = train2Y.reshape((1, train2Y.shape[0]))

Model: "subjective_error_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 original_image (InputLayer)  [(1, None, None, 1)]     0         
                                                                 
 Conv1 (Conv2D)              (1, None, None, 48)       480       
                                                                 
 Conv2 (Conv2D)              (1, None, None, 48)       20784     
                                                                 
 Conv3 (Conv2D)              (1, None, None, 64)       27712     
                                                                 
 Conv4 (Conv2D)              (1, None, None, 64)       36928     
                                                                 
 Conv5 (Conv2D)              (1, None, None, 64)       36928     
                                                                 
 Conv6 (Conv2D)              (1, None, None,

In [34]:
train2X[0].shape

TensorShape([1, 384, 512, 1])

In [37]:
# print(list(train2))
history = subjective_error_model.fit(train2X, train2Y, epochs=100)
subjective_error_model.save_weights("models/")
tf.saved_model.save(subjective_error_model, "models2/")

# sample = next(iter(ds))
sample = data[0]
I_d = image_preprocess(sample['dist_img'])
target = sample['dmos'][0]
prediction = subjective_error_model.predict(I_d)[0][0]
print(prediction)


Epoch 1/100


ValueError: in user code:

    File "C:\Users\Krishna Baghel\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\training.py", line 1021, in train_function  *
        return step_function(self, iterator)
    File "C:\Users\Krishna Baghel\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\training.py", line 1010, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "C:\Users\Krishna Baghel\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\training.py", line 1000, in run_step  **
        outputs = model.train_step(data)
    File "C:\Users\Krishna Baghel\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\training.py", line 859, in train_step
        y_pred = self(x, training=True)
    File "C:\Users\Krishna Baghel\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\utils\traceback_utils.py", line 67, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "C:\Users\Krishna Baghel\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\input_spec.py", line 200, in assert_input_compatibility
        raise ValueError(f'Layer "{layer_name}" expects {len(input_spec)} input(s),'

    ValueError: Layer "subjective_error_model" expects 1 input(s), but it received 5 input tensors. Inputs received: [<tf.Tensor 'IteratorGetNext:0' shape=(None, 384, 512, 1) dtype=float32>, <tf.Tensor 'IteratorGetNext:1' shape=(None, 384, 512, 1) dtype=float32>, <tf.Tensor 'IteratorGetNext:2' shape=(None, 384, 512, 1) dtype=float32>, <tf.Tensor 'IteratorGetNext:3' shape=(None, 384, 512, 1) dtype=float32>, <tf.Tensor 'IteratorGetNext:4' shape=(None, 384, 512, 1) dtype=float32>]


In [38]:
import numpy as np
import pandas as pd
from scipy.io import loadmat

def mat2csv(file_mat, index=False):
    mat = loadmat(file_mat, squeeze_me=True)
    print(mat)
    for colname in mat.keys():
        # ignore private column names
        if colname.startswith("__"):
            continue
        
        for fieldname in mat[colname].dtype.names:

            # ignore struct fields different from "data"
            # rename or uncomment it to match your needs
            if fieldname != "data":
                continue

            # get dataset values from "data" field for current column
            element = mat[colname][fieldname].item()

            # it could exist other instances (eg. str) and other dimensions (eg. 1D)
            # update these lines if you need something different than this
            if isinstance(element, np.ndarray):
                if len(element.shape) == 2:
                    _, cols = element.shape
                    data = {f"x{i}" : element[:, i] for i in range(cols)}
                    pd.DataFrame(data).to_csv(f"{colname}.csv", index=False)

mat2csv("./dmos.mat")

TypeError: 'NoneType' object is not iterable