-
Notifications
You must be signed in to change notification settings - Fork 0
/
jpeg_eval.py
161 lines (119 loc) · 5.72 KB
/
jpeg_eval.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""The goal is to map the JPEG encoded images' CR into MSSIM/PSNR/MSE
In other words, we will try to associate the compression ratio of the JPEG encoded images with the SSIM/PSNR/MSE
values of them regarding their original versions.
More specifically, the purpose is to generate a json file that sums up statistics in the following structure:
- quality N (aggregated images encoded w/ -quality == N)
- cr: min,max,avg,stddev
- mse: min,max,avg,stddev
- psnr: min,max,avg,stddev
- ssim: min,max,avg,stddev
This is a project secondary experiment.
"""
import os
from subprocess import Popen, PIPE
from typing import Any
import numpy as np
import pandas as pd
from numpy import ndarray
from pydicom import dcmread
import dicom_parser
import metrics
from dicom_parser import extract_attributes
from parameters import PathParameters, QUALITY_TOTAL_STEPS, MINIMUM_JPEG_QUALITY, ResultsColumnNames
from squeeze import squeeze_data
QUALITY_SPREAD: int = 1
# Quality settings
QUALITY_VALUES: np.ndarray = np.linspace(MINIMUM_JPEG_QUALITY, 100, QUALITY_TOTAL_STEPS) \
.astype(np.ubyte)
# Alias
R = ResultsColumnNames
def compress_n_compare():
"""Compress and extract jpeg statistic
The purpose is to compress a set of uncompressed images in JPEG
Compression is performed using multiple quality configurations
For each quality configuration, compute the metrics of the resulting image (CR, PSNR, SSIM versus the dataset)
:return:
"""
# Compress using the above quality parameters
# Save the compression ratio in a dataframe
results = pd.DataFrame(data=dict(filename=[], cr=[], mse=[], psnr=[], ssim=[]))
for file_name in os.listdir(PathParameters.DATASET_DICOM_PATH):
file_path: str = PathParameters.DATASET_DICOM_PATH + file_name
dcm_data = dcmread(file_path)
# Read input uncompressed image file
uncompressed_img: ndarray = dcm_data.pixel_array
print(f"Evaluating {file_name}", end="...")
for quality in QUALITY_VALUES:
body_part, bits_per_sample, color_space, modality, samples_per_pixel = extract_attributes(dcm_data)
bits_allocated = dcm_data.get(dicom_parser.BITS_ALLOCATED_TAG)
nframes: int = dicom_parser.get_number_of_frames(dcm_data, uncompressed_img.shape,
single_channel=samples_per_pixel == 1)
encoded_target_path = f"{PathParameters.DATASET_PATH}{modality.value}_{body_part.value}" \
f"_{color_space.value.replace('_', '')}_{samples_per_pixel.value}" \
f"_{bits_per_sample.value}_{nframes}.dcm"
# Encode input file
command = f"dcmcjpeg +ee +q {quality} {file_path} {encoded_target_path}"
if exec_cmd(command) is False:
print("Skipped because of error at encoding.")
continue
# Read encoded image file
img_encoded = dcmread(encoded_target_path)
encoded_pixel_array: ndarray = img_encoded.pixel_array
if not compatible_datatypes(encoded_pixel_array, uncompressed_img):
print(f"Unexpected loss of bit depth from {uncompressed_img.dtype} to {encoded_pixel_array.dtype}!")
break
# Get the compressed image size (bytes)
uncompressed_img_size: float | Any = uncompressed_img.size * (bits_allocated.value / 8)
img_encoded_size: int = len(img_encoded.PixelData)
# Calculate CR
cr = uncompressed_img_size / img_encoded_size
# Calculate the SSIM between the images
mse, ssim = (
metric(uncompressed_img, encoded_pixel_array) for metric in (metrics.custom_mse, metrics.custom_ssim)
)
psnr = metrics.custom_psnr(uncompressed_img, encoded_pixel_array, bits_per_sample=bits_per_sample.value)
# Write to dataframe
suffix = f'_q{quality}.jpeg'
file_name = os.path.basename(encoded_target_path).replace('.dcm', suffix)
# Ensure file name is unique (add id if need be)
if file_name in list(results["filename"].values):
file_name = file_name.replace(suffix, f"_1{suffix}")
i = 1
while file_name in results["filename"].values:
file_name = file_name.replace(f"_{i}{suffix}", f"_{i+1}{suffix}")
i += 1
results = pd.concat([pd.DataFrame(dict(
filename=[f"{file_name}"],
cr=[cr],
mse=[mse],
psnr=[psnr],
ssim=[ssim]
)), results])
print("Done!")
for generated_dcm in filter(lambda file: file.endswith(".dcm"), os.listdir(PathParameters.DATASET_PATH)):
os.remove(PathParameters.DATASET_PATH+generated_dcm)
results.to_csv(f"{PathParameters.JPEG_EVAL_RESULTS_PATH}.csv", index=False)
def compatible_datatypes(ndarray1, ndarray2):
return ndarray1.dtype == ndarray2.dtype
def exec_cmd(command) -> bool:
"""
@param command:
"""
p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE)
_, _ = p.communicate(timeout=15)
if p.returncode != 0:
print(f"\nError status {p.returncode} executing the following command: \"{command}\"."
f" Hint: Run it again to debug")
return False
return True
def check_deps():
"""Verifies the existence of dependencies
"""
print("Searching for the dcmcjpeg tool... ", end="")
if os.system("which dcmcjpeg") != 0:
print("dcmtk not found!")
exit(1)
if __name__ == '__main__':
check_deps()
compress_n_compare()
squeeze_data(PathParameters.JPEG_EVAL_RESULTS_PATH)