-
Notifications
You must be signed in to change notification settings - Fork 32
/
demo_p910.py
executable file
·181 lines (137 loc) · 5.23 KB
/
demo_p910.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import argparse
import csv
import sys
import pprint
import numpy as np
from scipy import linalg
def read_csv_into_2darray(csv_filepath):
"""
Read data from CSV file.
The data should be organized in a 2D matrix, separated by comma. Each row
correspond to a PVS; each column corresponds to a subject. If a vote is
missing, a 'nan' is put in place.
:param csv_filepath: filepath to the CSV file.
:return: the numpy array in 2D.
"""
with open(csv_filepath, 'rt') as datafile:
datareader = csv.reader(datafile, delimiter=',')
data = [row for row in datareader]
return np.array(data, dtype=np.float64)
def weighed_nanmean_2d(a, wts, axis):
"""
Compute the weighted arithmetic mean along the specified axis, ignoring
NaNs. It is similar to numpy's nanmean function, but with a weight.
:param a: 1D array.
:param wts: 1D array carrying the weights.
:param axis: either 0 or 1, specifying the dimension along which the means
are computed.
:return: 1D array containing the mean values.
"""
assert len(a.shape) == 2
assert axis in [0, 1]
d0, d1 = a.shape
if axis == 0:
return np.divide(
np.nansum(np.multiply(a, np.tile(wts, (d1, 1)).T), axis=0),
np.nansum(np.multiply(~np.isnan(a), np.tile(wts, (d1, 1)).T), axis=0)
)
elif axis == 1:
return np.divide(
np.nansum(np.multiply(a, np.tile(wts, (d0, 1))), axis=1),
np.nansum(np.multiply(~np.isnan(a), np.tile(wts, (d0, 1))), axis=1),
)
else:
assert False
def one_or_nan(x):
"""
Construct a "mask" array with the same dimension as x, with element NaN
where x has NaN at the same location; and element 1 otherwise.
:param x: array_like
:return: an array with the same dimension as x
"""
y = np.ones(x.shape)
y[np.isnan(x)] = float('nan')
return y
def get_sos_j(sig_r_j, o_ji):
"""
Compute SOS (standard deviation of score) for PVS j
:param sig_r_j:
:param o_ji:
:return: array containing the SOS for PVS j
"""
den = np.nansum(one_or_nan(o_ji) /
np.tile(sig_r_j ** 2, (o_ji.shape[1], 1)).T, axis=1)
s_j_std = 1.0 / np.sqrt(np.maximum(0., den))
return s_j_std
def run_alternating_projection(o_ji):
"""
Run Alternating Projection (AP) algorithm.
:param o_ji: 2D numpy array containing raw votes. The first dimension
corresponds to the PVSs (j); the second dimension corresponds to the
subjects (i). If a vote is missing, the element is NaN.
:return: dictionary containing results keyed by 'mos_j', 'sos_j', 'bias_i'
and 'inconsistency_i'.
"""
J, I = o_ji.shape
# video by video, estimate MOS by averaging over subjects
psi_j = np.nanmean(o_ji, axis=1) # mean marginalized over i
# subject by subject, estimate subject bias by comparing with MOS
b_ji = o_ji - np.tile(psi_j, (I, 1)).T
b_i = np.nanmean(b_ji, axis=0) # mean marginalized over j
MAX_ITR = 1000
DELTA_THR = 1e-8
EPSILON = 1e-8
itr = 0
while True:
psi_j_prev = psi_j
# subject by subject, estimate subject inconsistency by averaging the
# residue over stimuli
r_ji = o_ji - np.tile(psi_j, (I, 1)).T - np.tile(b_i, (J, 1))
sig_r_i = np.nanstd(r_ji, axis=0)
sig_r_j = np.nanstd(r_ji, axis=1)
# video by video, estimate MOS by averaging over subjects, inversely
# weighted by residue variance
w_i = 1.0 / (sig_r_i ** 2 + EPSILON)
# mean marginalized over i:
psi_j = weighed_nanmean_2d(o_ji - np.tile(b_i, (J, 1)), wts=w_i, axis=1)
# subject by subject, estimate subject bias by comparing with MOS,
# inversely weighted by residue variance
b_ji = o_ji - np.tile(psi_j, (I, 1)).T
# mean marginalized over j:
b_i = np.nanmean(b_ji, axis=0)
itr += 1
delta_s_j = linalg.norm(psi_j_prev - psi_j)
msg = 'Iteration {itr:4d}: change {delta_psi_j}, psi_j {psi_j}, ' \
'b_i {b_i}, sig_r_i {sig_r_i}'.format(
itr=itr, delta_psi_j=delta_s_j, psi_j=np.mean(psi_j),
b_i=np.mean(b_i), sig_r_i=np.mean(sig_r_i))
sys.stdout.write(msg + '\r')
sys.stdout.flush()
if delta_s_j < DELTA_THR:
break
if itr >= MAX_ITR:
break
psi_j_std = get_sos_j(sig_r_j, o_ji)
sys.stdout.write("\n")
mean_b_i = np.mean(b_i)
b_i -= mean_b_i
psi_j += mean_b_i
return {
'mos_j': list(psi_j),
'sos_j': list(psi_j_std),
'bias_i': list(b_i),
'inconsistency_i': list(sig_r_i),
}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--input-csv", dest="input_csv", nargs=1, type=str,
help="Filepath to input CSV file. The data should be organized in a 2D "
"matrix, separated by comma. The rows correspond to PVSs; the "
"columns correspond to subjects. If a vote is missing, input 'nan'"
" instead.", required=True)
args = parser.parse_args()
input_csv = args.input_csv[0]
o_ji = read_csv_into_2darray(input_csv)
ret = run_alternating_projection(o_ji)
pprint.pprint(ret)