-
Notifications
You must be signed in to change notification settings - Fork 732
/
reader.py
152 lines (113 loc) · 4.54 KB
/
reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
__copyright__ = "Copyright 2016-2018, Netflix, Inc."
__license__ = "Apache, Version 2.0"
import os
import numpy as np
class YuvReader(object):
SUPPORTED_YUV_8BIT_TYPES = ['yuv420p',
'yuv422p',
'yuv444p',
]
SUPPORTED_YUV_10BIT_LE_TYPES = ['yuv420p10le',
'yuv422p10le',
'yuv444p10le',
]
# ex: for yuv420p, the width and height of U/V is 0.5x, 0.5x of Y
UV_WIDTH_HEIGHT_MULTIPLIERS_DICT = {'yuv420p': (0.5, 0.5),
'yuv422p': (0.5, 1.0),
'yuv444p': (1.0, 1.0),
'yuv420p10le': (0.5, 0.5),
'yuv422p10le': (0.5, 1.0),
'yuv444p10le': (1.0, 1.0),
}
def __init__(self, filepath, width, height, yuv_type):
self.filepath = filepath
self.width = width
self.height = height
self.yuv_type = yuv_type
self._asserts()
self.file = open(self.filepath, 'rb')
def close(self):
self.file.close()
# make YuvReader withable, e.g.:
# with YuvReader(...) as yuv_reader:
# ...
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
# make YuvReader iterable, e.g.:
# for y, u, v in yuv_reader:
# ...
def __iter__(self):
return self
def next(self):
try:
return self.next_y_u_v()
except EOFError:
raise StopIteration
@property
def num_bytes(self):
self._assert_file_exist()
return os.path.getsize(self.filepath)
@property
def num_frms(self):
w_multiplier, h_multiplier = self._get_uv_width_height_multiplier()
if self._is_10bitle():
num_frms = float(self.num_bytes) / self.width / self.height / (1.0 + w_multiplier * h_multiplier * 2) / 2
elif self._is_8bit():
num_frms = float(self.num_bytes) / self.width / self.height / (1.0 + w_multiplier * h_multiplier * 2)
else:
assert False
assert num_frms.is_integer(), 'Number of frames is not integer: {}'.format(num_frms)
return int(num_frms)
def _get_uv_width_height_multiplier(self):
self._assert_yuv_type()
return self.UV_WIDTH_HEIGHT_MULTIPLIERS_DICT[self.yuv_type]
def _assert_yuv_type(self):
assert (self.yuv_type in self.SUPPORTED_YUV_8BIT_TYPES
or self.yuv_type in self.SUPPORTED_YUV_10BIT_LE_TYPES), \
'Unsupported YUV type: {}'.format(self.yuv_type)
def _assert_file_exist(self):
assert os.path.exists(self.filepath), \
"File does not exist: {}".format(self.filepath)
def _asserts(self):
# assert YUV type
self._assert_yuv_type()
# assert file exists
self._assert_file_exist()
# assert file size: if consists of integer number of frames
assert isinstance(self.num_frms, int)
def _is_8bit(self):
return self.yuv_type in self.SUPPORTED_YUV_8BIT_TYPES
def _is_10bitle(self):
return self.yuv_type in self.SUPPORTED_YUV_10BIT_LE_TYPES
def next_y_u_v(self):
y_width = self.width
y_height = self.height
uv_w_multiplier, uv_h_multiplier = self._get_uv_width_height_multiplier()
uv_width = int(y_width * uv_w_multiplier)
uv_height = int(y_height * uv_h_multiplier)
if self._is_10bitle():
pix_type = np.uint16
elif self._is_8bit():
pix_type = np.uint8
else:
assert False
y = np.fromfile(self.file, pix_type, count=y_width*y_height)
if y.size == 0:
raise EOFError
u = np.fromfile(self.file, pix_type, count=uv_width*uv_height)
if u.size == 0:
raise EOFError
v = np.fromfile(self.file, pix_type, count=uv_width*uv_height)
if v.size == 0:
raise EOFError
y = y.reshape(y_height, y_width)
u = u.reshape(uv_height, uv_width)
v = v.reshape(uv_height, uv_width)
if self._is_10bitle():
return y.astype(np.double) / 4.0, u.astype(np.double) / 4.0, v.astype(np.double) / 4.0
elif self._is_8bit():
return y.astype(np.double), u.astype(np.double), v.astype(np.double)
else:
assert False