diff --git a/code/cpngfilters.pyx b/code/cpngfilters.pyx new file mode 100644 index 00000000..5872cd11 --- /dev/null +++ b/code/cpngfilters.pyx @@ -0,0 +1,134 @@ +#cython: boundscheck=False +#cython: wraparound=False + +from libc.stdlib cimport abs as c_abs + +cimport cpython.array + + +# TODO: I don't know how can I not return any value (void doesn't work) +cpdef int undo_filter_sub(int filter_unit, unsigned char[:] scanline, + unsigned char[:] previous, unsigned char[:] result) nogil: + """Undo sub filter.""" + + cdef int l = result.shape[0] + cdef int ai = 0 + cdef unsigned char x, a + + # Loops starts at index fu. Observe that the initial part + # of the result is already filled in correctly with + # scanline. + for i in range(filter_unit, l): + x = scanline[i] + a = result[ai] + result[i] = (x + a) & 0xff + ai += 1 + return 0 + + +cpdef int undo_filter_up(int filter_unit, unsigned char[:] scanline, + unsigned char[:] previous, unsigned char[:] result) nogil: + """Undo up filter.""" + + cdef int i + cdef int l = result.shape[0] + cdef unsigned char x, b + + for i in range(l): + x = scanline[i] + b = previous[i] + result[i] = (x + b) & 0xff + return 0 + + +cpdef int undo_filter_average(int filter_unit, unsigned char[:] scanline, + unsigned char[:] previous, unsigned char[:] result) nogil: + """Undo up filter.""" + + cdef int i, ai + cdef int l = result.shape[0] + cdef unsigned char x, a, b + + ai = -filter_unit + for i in range(l): + x = scanline[i] + if ai < 0: + a = 0 + else: + a = result[ai] + b = previous[i] + result[i] = (x + ((a + b) >> 1)) & 0xff + ai += 1 + return 0 + + +cpdef int undo_filter_paeth(int filter_unit, unsigned char[:] scanline, + unsigned char[:] previous, unsigned char[:] result) nogil: + """Undo Paeth filter.""" + + # Also used for ci. + cdef int ai = -filter_unit + cdef int l = result.shape[0] + cdef int i, pa, pb, pc, p + cdef unsigned char x, a, b, c, pr + + for i in range(l): + x = scanline[i] + if ai < 0: + a = c = 0 + else: + a = result[ai] + c = previous[ai] + b = previous[i] + p = a + b - c + pa = c_abs(p - a) + pb = c_abs(p - b) + pc = c_abs(p - c) + if pa <= pb and pa <= pc: + pr = a + elif pb <= pc: + pr = b + else: + pr = c + result[i] = (x + pr) & 0xff + ai += 1 + return 0 + + +cpdef int convert_rgb_to_rgba(unsigned char[:] row, unsigned char[:] result) nogil: + cdef int i, l, j, k + l = min(row.shape[0] / 3, result.shape[0] / 4) + for i in range(l): + j = i * 3 + k = i * 4 + result[k] = row[j] + result[k + 1] = row[j + 1] + result[k + 2] = row[j + 2] + return 0 + + +cpdef int convert_l_to_rgba(unsigned char[:] row, unsigned char[:] result) nogil: + cdef int i, l, j, k, lum + l = min(row.shape[0], result.shape[0] / 4) + for i in range(l): + j = i + k = i * 4 + lum = row[j] + result[k] = lum + result[k + 1] = lum + result[k + 2] = lum + return 0 + + +cpdef int convert_la_to_rgba(unsigned char[:] row, unsigned char[:] result) nogil: + cdef int i, l, j, k, lum + l = min(row.shape[0] / 2, result.shape[0] / 4) + for i in range(l): + j = i * 2 + k = i * 4 + lum = row[j] + result[k] = lum + result[k + 1] = lum + result[k + 2] = lum + result[k + 3] = row[j + 1] + return 0 diff --git a/code/png.py b/code/png.py index 3ba2cefe..b55dd3ab 100755 --- a/code/png.py +++ b/code/png.py @@ -180,6 +180,12 @@ import zlib # http://www.python.org/doc/2.4.4/lib/module-warnings.html import warnings +try: + import pyximport + pyximport.install() + import cpngfilters as pngfilters +except ImportError: + pass __all__ = ['Image', 'Reader', 'Writer', 'write_chunks', 'from_array'] @@ -1450,17 +1456,12 @@ def undo_filter(self, filter_type, scanline, previous): """ # :todo: Would it be better to update scanline in place? - - # Create the result byte array. It seems that the best way to - # create the array to be the right size is to copy from an - # existing sequence. *sigh* - # If we fill the result with scanline, then this allows a - # micro-optimisation in the "null" and "sub" cases. - result = array('B', scanline) + # Yes, with the Cython extension making the undo_filter fast, + # updating scanline inplace makes the code 3 times faster + # (reading 50 images of 800x800 went from 40s to 16s) + result = scanline if filter_type == 0: - # And here, we _rely_ on filling the result with scanline, - # above. return result if filter_type not in (1,2,3,4): @@ -1543,7 +1544,11 @@ def paeth(): # Call appropriate filter algorithm. Note that 0 has already # been dealt with. - (None, sub, up, average, paeth)[filter_type]() + (None, + pngfilters.undo_filter_sub, + pngfilters.undo_filter_up, + pngfilters.undo_filter_average, + pngfilters.undo_filter_paeth)[filter_type](fu, scanline, previous, result) return result def deinterlace(self, raw): @@ -2182,8 +2187,10 @@ def asRGBA(self): return width,height,pixels,meta typecode = 'BH'[meta['bitdepth'] > 8] maxval = 2**meta['bitdepth'] - 1 + maxbuffer = struct.pack('=' + typecode, maxval) * 4 * width def newarray(): - return array(typecode, [0]) * 4 * width + return array(typecode, maxbuffer) + if meta['alpha'] and meta['greyscale']: # LA to RGBA def convert(): @@ -2192,18 +2199,14 @@ def convert(): # into first three target channels, and A channel # into fourth channel. a = newarray() - for i in range(3): - a[i::4] = row[0::2] - a[3::4] = row[1::2] + pngfilters.convert_la_to_rgba(row, a) yield a elif meta['greyscale']: # L to RGBA def convert(): for row in pixels: a = newarray() - for i in range(3): - a[i::4] = row - a[3::4] = array(typecode, [maxval]) * width + pngfilters.convert_l_to_rgba(row, a) yield a else: assert not meta['alpha'] and not meta['greyscale'] @@ -2211,9 +2214,7 @@ def convert(): def convert(): for row in pixels: a = newarray() - for i in range(3): - a[i::4] = row[i::3] - a[3::4] = array(typecode, [maxval]) * width + pngfilters.convert_rgb_to_rgba(row, a) yield a meta['alpha'] = True meta['greyscale'] = False @@ -2313,6 +2314,97 @@ def _itertools_chain(*iterables): itertools.chain = _itertools_chain +# === Support for users without Cython === + +try: + pngfilters +except: + class pngfilters(object): + def undo_filter_sub(filter_unit, scanline, previous, result): + """Undo sub filter.""" + + ai = 0 + # Loops starts at index fu. Observe that the initial part + # of the result is already filled in correctly with + # scanline. + for i in range(filter_unit, len(result)): + x = scanline[i] + a = result[ai] + result[i] = (x + a) & 0xff + ai += 1 + undo_filter_sub = staticmethod(undo_filter_sub) + + def undo_filter_up(filter_unit, scanline, previous, result): + """Undo up filter.""" + + for i in range(len(result)): + x = scanline[i] + b = previous[i] + result[i] = (x + b) & 0xff + undo_filter_up = staticmethod(undo_filter_up) + + def undo_filter_average(filter_unit, scanline, previous, result): + """Undo up filter.""" + + ai = -filter_unit + for i in range(len(result)): + x = scanline[i] + if ai < 0: + a = 0 + else: + a = result[ai] + b = previous[i] + result[i] = (x + ((a + b) >> 1)) & 0xff + ai += 1 + undo_filter_average = staticmethod(undo_filter_average) + + def undo_filter_paeth(filter_unit, scanline, previous, result): + """Undo Paeth filter.""" + + # Also used for ci. + ai = -filter_unit + for i in range(len(result)): + x = scanline[i] + if ai < 0: + a = c = 0 + else: + a = result[ai] + c = previous[ai] + b = previous[i] + p = a + b - c + pa = abs(p - a) + pb = abs(p - b) + pc = abs(p - c) + if pa <= pb and pa <= pc: + pr = a + elif pb <= pc: + pr = b + else: + pr = c + result[i] = (x + pr) & 0xff + ai += 1 + undo_filter_paeth = staticmethod(undo_filter_paeth) + + def convert_la_to_rgba(row, result): + for i in range(3): + result[i::4] = row[0::2] + result[3::4] = row[1::2] + convert_la_to_rgba = staticmethod(convert_la_to_rgba) + + def convert_l_to_rgba(row, result): + """Convert a grayscale image to RGBA. This method assumes the alpha + channel in result is already correctly initialized.""" + for i in range(3): + result[i::4] = row + convert_l_to_rgba = staticmethod(convert_l_to_rgba) + + def convert_rgb_to_rgba(row, result): + """Convert an RGB image to RGBA. This method assumes the alpha + channel in result is already correctly initialized.""" + for i in range(3): + result[i::4] = row[i::3] + convert_rgb_to_rgba = staticmethod(convert_rgb_to_rgba) + # === Internal Test Support === @@ -2458,7 +2550,7 @@ def testP2(self): x,y,pixels,meta = r.asRGB8() self.assertEqual(x, 1) self.assertEqual(y, 4) - self.assertEqual(list(pixels), map(list, [a, b, b, c])) + self.assertEqual(map(list, pixels), map(list, [a, b, b, c])) def testPtrns(self): "Test colour type 3 and tRNS chunk (and 4-bit palette)." a = (50,99,50,50) @@ -2486,7 +2578,7 @@ def testRGBtoRGBA(self): x,y,pixels,meta = r.asRGBA8() # Test the pixels at row 9 columns 0 and 1. row9 = list(pixels)[9] - self.assertEqual(row9[0:8], + self.assertEqual(list(row9[0:8]), [0xff, 0xdf, 0xff, 0xff, 0xff, 0xde, 0xff, 0xff]) def testLtoRGBA(self): "asRGBA() on grey source.""" @@ -2832,6 +2924,103 @@ def testNumpyarray(self): img = from_array(pixels, 'L') img.save('testnumpyL16.png') + def paeth(self, x, a, b, c): + p = a + b - c + pa = abs(p - a) + pb = abs(p - b) + pc = abs(p - c) + if pa <= pb and pa <= pc: + pr = a + elif pb <= pc: + pr = b + else: + pr = c + return x - pr + + # test filters and unfilters + def testFilterScanlineFirstLine(self): + fo = 3 # bytes per pixel + line = [30, 31, 32, 230, 231, 232] + out = filter_scanline(0, line, fo, None) # none + self.assertEqual(list(out), [0, 30, 31, 32, 230, 231, 232]) + out = filter_scanline(1, line, fo, None) # sub + self.assertEqual(list(out), [1, 30, 31, 32, 200, 200, 200]) + out = filter_scanline(2, line, fo, None) # up + # TODO: All filtered scanlines start with a byte indicating the filter + # algorithm, except "up". Is this a bug? Should the expected output + # start with 2 here? + self.assertEqual(list(out), [30, 31, 32, 230, 231, 232]) + out = filter_scanline(3, line, fo, None) # average + self.assertEqual(list(out), [3, 30, 31, 32, 215, 216, 216]) + out = filter_scanline(4, line, fo, None) # paeth + self.assertEqual(list(out), [ + 4, self.paeth(30, 0, 0, 0), self.paeth(31, 0, 0, 0), + self.paeth(32, 0, 0, 0), self.paeth(230, 30, 0, 0), + self.paeth(231, 31, 0, 0), self.paeth(232, 32, 0, 0) + ]) + def testFilterScanline(self): + prev = [20, 21, 22, 210, 211, 212] + line = [30, 32, 34, 230, 233, 236] + fo = 3 + out = filter_scanline(0, line, fo, prev) # none + self.assertEqual(list(out), [0, 30, 32, 34, 230, 233, 236]) + out = filter_scanline(1, line, fo, prev) # sub + self.assertEqual(list(out), [1, 30, 32, 34, 200, 201, 202]) + out = filter_scanline(2, line, fo, prev) # up + self.assertEqual(list(out), [2, 10, 11, 12, 20, 22, 24]) + out = filter_scanline(3, line, fo, prev) # average + self.assertEqual(list(out), [3, 20, 22, 23, 110, 112, 113]) + out = filter_scanline(4, line, fo, prev) # paeth + self.assertEqual(list(out), [ + 4, self.paeth(30, 0, 20, 0), self.paeth(32, 0, 21, 0), + self.paeth(34, 0, 22, 0), self.paeth(230, 30, 210, 20), + self.paeth(233, 32, 211, 21), self.paeth(236, 34, 212, 22) + ]) + def testUnfilterScanline(self): + reader = Reader(bytes='') + reader.psize = 3 + scanprev = array('B', [20, 21, 22, 210, 211, 212]) + scanline = array('B', [30, 32, 34, 230, 233, 236]) + def cp(a): + return array('B', a) + + out = reader.undo_filter(0, cp(scanline), cp(scanprev)) + self.assertEqual(list(out), list(scanline)) # none + out = reader.undo_filter(1, cp(scanline), cp(scanprev)) + self.assertEqual(list(out), [30, 32, 34, 4, 9, 14]) # sub + out = reader.undo_filter(2, cp(scanline), cp(scanprev)) + self.assertEqual(list(out), [50, 53, 56, 184, 188, 192]) # up + out = reader.undo_filter(3, cp(scanline), cp(scanprev)) + self.assertEqual(list(out), [40, 42, 45, 99, 103, 108]) # average + out = reader.undo_filter(4, cp(scanline), cp(scanprev)) + self.assertEqual(list(out), [50, 53, 56, 184, 188, 192]) # paeth + def testUnfilterScanlinePaeth(self): + # This tests more edge cases in the paeth unfilter + reader = Reader(bytes='') + reader.psize = 3 + scanprev = array('B', [2, 0, 0, 0, 9, 11]) + scanline = array('B', [6, 10, 9, 100, 101, 102]) + + out = reader.undo_filter(4, scanline, scanprev) + self.assertEqual(list(out), [8, 10, 9, 108, 111, 113]) # paeth + def testIterstraight(self): + def arraify(list_of_str): + return [array('B', s) for s in list_of_str] + reader = Reader(bytes='') + reader.row_bytes = 6 + reader.psize = 3 + rows = reader.iterstraight(arraify(['\x00abcdef', '\x00ghijkl'])) + self.assertEqual(list(rows), arraify(['abcdef', 'ghijkl'])) + + rows = reader.iterstraight(arraify(['\x00abc', 'def\x00ghijkl'])) + self.assertEqual(list(rows), arraify(['abcdef', 'ghijkl'])) + + rows = reader.iterstraight(arraify(['\x00abcdef\x00ghijkl'])) + self.assertEqual(list(rows), arraify(['abcdef', 'ghijkl'])) + + rows = reader.iterstraight(arraify(['\x00abcdef\x00ghi', 'jkl'])) + self.assertEqual(list(rows), arraify(['abcdef', 'ghijkl'])) + # === Command Line Support === def _dehex(s):