In [1]:
!sudo apt-get install libmagickwand-dev
!pip install --no-cache-dir \
    opencv-python-headless==4.6.*\
    rawpy==0.17.* \
    pandas \
    Pillow==7.1.2 \
    scikit-image==0.16.2 \
    scipy \
    tqdm \
    Wand

Reading package lists... Done
Building dependency tree       
Reading state information... Done
libmagickwand-dev is already the newest version (8:6.9.7.4+dfsg-16ubuntu6.14).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'sudo apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 20 not upgraded.
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
!git clone https://github.com/andreacos/BoostingCNN-Jpeg-Primary-Quantization-Matrix-Estimation

fatal: destination path 'BoostingCNN-Jpeg-Primary-Quantization-Matrix-Estimation' already exists and is not an empty directory.


In [3]:
# INIT IMPORT
import io
import os
import sys
import cv2
import numpy as np
import tensorflow as tf

import pandas as pd

from glob import glob
from PIL import Image

from skimage.util import view_as_windows

In [4]:
# GET PATH
sys.path.insert(1, "BoostingCNN-Jpeg-Primary-Quantization-Matrix-Estimation")

In [5]:
# IMPORT METHODS
from networks import custom_two_terms_loss_wrapper, custom_softmax_activation, custom_mse_wrapper
from utils import max_min_coefficient, label2coefficient, string2Q

In [6]:
# INIT PARAMS
model_path = "BoostingCNN-Jpeg-Primary-Quantization-Matrix-Estimation/models/"
data_path = "BoostingCNN-Jpeg-Primary-Quantization-Matrix-Estimation/resources/"
img_path = "test_images/"

model_file = model_path + 'model_QF1_60-98_QF2_90-2-term-loss.h5'
img_file =  data_path + '00000000_redaf7d93t.TIF_85_90.png'

# Load the table linking each pair of JPEG quality factors to the corresponding Q's coefficients
qf_map = np.load( data_path + 'qf1_qf2_map_90.npy', allow_pickle=True)

examples = [
    (77,),
    (70,90),
    (60,90),
    (98,90),
    (89,90),
]

In [7]:
# Предобработка изображений
def preprocess_input(im, target_size, scale=255.):
    if type(im) == str :
      im = cv2.imread(im)

    im = cv2.cvtColor(im, cv2.COLOR_BGR2YCrCb)[:,:,0]

    if im.shape != target_size:
        im = cv2.resize(im, target_size)

    return im.astype(np.float32) / scale

In [8]:
# Расчёт MSE
def mse (predictions, k_dct):

  result = np.mean((k_dct - predictions) **2, axis=1)

  return result

In [9]:
# Предсказание
def predict(img, k_dct, max_coeffs):
  prediction = model.predict(np.expand_dims(img, [0, -1]), verbose=0)
  predicted_label = label2coefficient(prediction.flatten(), max_coefficients=max_coeffs)

  t_mse = mse(predicted_label,k_dct)
  min_mse = np.argmin(t_mse)
  result = labels[min_mse]

  return result

In [10]:
# Сжатие изображения
def compress_image(img, *qfs):

  for qf in qfs:
    jpeg_encoded = cv2.imencode('.jpg', img, [int(cv2.IMWRITE_JPEG_QUALITY), qf])[1]
    
    jpeg_encoded_image = Image.open(io.BytesIO(jpeg_encoded))
    
    img = np.array(jpeg_encoded_image)

  return img

In [11]:
# Базовый кейс
def test_case():
  x = preprocess_input(img_file, (64, 64), 255.)
  prediction = model.predict(np.expand_dims(x, [0, -1]))

  predicted_label = label2coefficient(prediction.flatten(), max_coefficients=max_coeffs)
  print(len(predicted_label),predicted_label)

In [12]:
# Max value for coefficients
max_coeffs, _ = max_min_coefficient(quality_range=(50, 100),
                                    n_coeffs=15,
                                    zig_zag_order=True)
model = tf.keras.models.load_model(model_file,
                                    custom_objects=({'custom_softmax': custom_softmax_activation(max_coeffs),
                                                    'custom_two_terms_loss_wrapper': custom_two_terms_loss_wrapper(max_coeffs, 0.8),
                                                    'custom_mse': custom_mse_wrapper(max_coeffs)})
                                    )



In [13]:
# Готовим данные из исходных
labels = qf_map[:,:2]
k_dct = []
for rec in qf_map:
  k_dct.append(string2Q(rec[2], size=(8, 8), flatten=True)[:15])
k_dct = np.array(k_dct)

In [14]:
#qf_map
#labels
#k_dct

In [15]:
test_case()

15 [ 7  5  4  6  4  3  7  6  4  6  8  7  7  8 11]


In [16]:
# A0: Программно сгенерировать 5 принципиально различных ситуаций: однократное сжатие, 𝑄𝐹1 < 𝑄𝐹2, 𝑄𝐹1 ≪ 𝑄𝐹2, 𝑄𝐹1 > 𝑄𝐹2, 𝑄𝐹1 ≈ 𝑄𝐹2. 
anls_data = []

for e in examples:
  for path in glob(img_path+'*.tif'):
    tmp = compress_image(cv2.imread(path), *e)
    tmp = tmp[:64,:64]
    tmp = preprocess_input(tmp, (64, 64), 255.)
    pred = predict(tmp, k_dct, max_coeffs)

    anls_data.append([pred[0], e[0], np.abs(pred[0]-e[0])])
print('Complete!')

Complete!


In [17]:
# A1: Сравнить ошибку при оценивании 𝑄𝐹1 по одному патчу и при оценивании по 𝑝 = 10 патчам, выбранным из одного изображения без пересечения.
result_data = []

for e in examples:
  for path in glob(img_path+'*.tif'):
    tmp = compress_image(cv2.imread(path), *e)
    chunks = view_as_windows(tmp, (64, 64, 3), 64).reshape(-1,64,64,3)
    tmp_result = []
    for c in chunks[:10]:
      c = preprocess_input(c, (64, 64), 255.)
      pred = predict(c, k_dct, max_coeffs)

      tmp_result.append([pred[0], np.abs(pred[0]-e[0])])
      
    mean_tmp_res = np.mean(tmp_result, axis=0)
    result_data.append([mean_tmp_res[0], e[0], mean_tmp_res[1]])
print('Complete!')

Complete!


In [18]:
df_a0 = pd.DataFrame(anls_data, columns=['Prediction','QF1', 'Error'])
df_a1 = pd.DataFrame(result_data, columns=['Prediction','QF1', 'Error'])

df_a0_g = df_a0.groupby('QF1').mean()
df_a1_g = df_a1.groupby('QF1').mean()

In [19]:
# Вывод результатов А0
df_a0_g

Unnamed: 0_level_0,Prediction,Error
QF1,Unnamed: 1_level_1,Unnamed: 2_level_1
60,62.5,2.5
70,67.75,3.083333
77,75.416667,1.916667
89,92.75,3.75
98,95.583333,2.416667


In [20]:
# Вывод результатов А1
df_a1_g

Unnamed: 0_level_0,Prediction,Error
QF1,Unnamed: 1_level_1,Unnamed: 2_level_1
60,61.8,1.8
70,68.083333,2.533333
77,75.333333,2.016667
89,91.066667,3.45
98,95.591667,2.408333
