Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

GH-32: Add generic reducer to output raw files

- It is named RawReducer
- Handles outputing to single or multiples files (requires feathers)
- Include working examples to generate tokyo-cabinets, constant dbs, and jsonlines
- Last but not least, unittests!
  • Loading branch information...
commit 4feb7d73c2e669117779790632ad0aa92ea2ffb0 1 parent 1ff5fef
Daniel Graña authored
32 dumbo/lib/cdbreducer.py
... ... @@ -0,0 +1,32 @@
  1 +import os
  2 +from tempfile import mkstemp
  3 +import cdb
  4 +
  5 +from .rawreducer import RawReducer, chunkedread, CHUNKSIZE
  6 +
  7 +
  8 +class CDBFactory(object):
  9 + """A RawReducer factory suitable to generate constant dbs from dumbo jobs
  10 +
  11 + For more info on constant dbs see http://cr.yp.to/cdb.html
  12 + """
  13 + chunksize = CHUNKSIZE
  14 +
  15 + def __init__(self):
  16 + fd, self.fn = mkstemp('.cdb', dir=os.getcwd())
  17 + os.close(fd)
  18 + self.maker = cdb.cdbmake(self.fn, self.fn + '.tmp')
  19 +
  20 + def __call__(self, key, values):
  21 + for value in values:
  22 + self.maker.add(key, value)
  23 +
  24 + def close(self):
  25 + self.maker.finish()
  26 + for chk in chunkedread(self.fn, chunksize=self.chunksize):
  27 + yield chk
  28 + os.unlink(self.fn)
  29 +
  30 +
  31 +class CDBReducer(RawReducer):
  32 + factory = CDBFactory
12 dumbo/lib/jsonlinesreducer.py
... ... @@ -0,0 +1,12 @@
  1 +try:
  2 + import json
  3 +except ImportError:
  4 + import simplejson as json
  5 +
  6 +from .rawreducer import RawReducer
  7 +
  8 +
  9 +class JsonLinesReducer(RawReducer):
  10 +
  11 + def factory(self):
  12 + return lambda key, values: (json.dumps(v) + '\n' for v in values)
72 dumbo/lib/rawreducer.py
... ... @@ -0,0 +1,72 @@
  1 +"""A reducer base class to output one or multiple files in its raw fileformat"""
  2 +from itertools import groupby
  3 +
  4 +
  5 +class RawReducer(object):
  6 + """Reducer to generate outputs in raw file format"""
  7 +
  8 + multipleoutput = False
  9 + singleopts = [
  10 + ('outputformat', 'raw'),
  11 + ]
  12 + multipleopts = [
  13 + ('getpath', 'yes'),
  14 + ('outputformat', 'raw'),
  15 + ('partitioner', 'fm.last.feathers.partition.Prefix'),
  16 + ('jobconf', 'feathers.output.filename.strippart=true'),
  17 + ]
  18 +
  19 + def __init__(self, factory=None, multipleoutput=None):
  20 + if factory:
  21 + self.factory = factory
  22 + if multipleoutput is not None:
  23 + self.multipleoutput = multipleoutput
  24 + self.opts = self.multipleopts if self.multipleoutput else self.singleopts
  25 +
  26 + def __call__(self, data):
  27 + if not self.multipleoutput:
  28 + data = (((None, key), values) for key, values in data)
  29 +
  30 + proc = self.factory()
  31 + for path, group in groupby(data, lambda x:x[0][0]):
  32 + proc = self.factory()
  33 + for (_, key), values in group:
  34 + for chk in proc(key, values) or ():
  35 + yield path, chk
  36 +
  37 + close = getattr(proc, 'close', tuple)
  38 + for chk in close() or ():
  39 + yield path, chk
  40 +
  41 + def factory(self):
  42 + """Processor factory used to consume reducer input (one per path on multiple outputs)
  43 +
  44 + Must return a callable (aka processor) that accepts two parameters
  45 + "key" and "values", and returns an iterable of strings or None.
  46 +
  47 + The processor may have a close() method that returns an iterable of
  48 + strings or None. This method is called when the last key-values pair
  49 + for a path is seen.
  50 +
  51 + """
  52 + return lambda key, values: values
  53 +
  54 +CHUNKSIZE = 2*1024*1024 # default chunk size to read a file
  55 +def chunkedread(filename_or_fileobj, chunksize=CHUNKSIZE):
  56 + """Returns a generator that reads a file in chunks"""
  57 + if hasattr(filename_or_fileobj, 'read'):
  58 + fileobj = filename_or_fileobj
  59 + needclose = False
  60 + else:
  61 + fileobj = open(filename_or_fileobj, 'rb')
  62 + needclose = True
  63 +
  64 + try:
  65 + content = fileobj.read(chunksize)
  66 + while content:
  67 + yield content
  68 + content = fileobj.read(chunksize)
  69 + finally:
  70 + if needclose:
  71 + fileobj.close()
  72 +
36 dumbo/lib/tokyocabinetreducer.py
... ... @@ -0,0 +1,36 @@
  1 +import os
  2 +from tempfile import mkstemp
  3 +from tokyo.cabinet import HDB, HDBOWRITER, HDBOCREAT
  4 +
  5 +from .rawreducer import RawReducer, chunkedread, CHUNKSIZE
  6 +
  7 +
  8 +class TokyoCabinetFactory(object):
  9 + """A RawReducer factory suitable to generate tokyocabinets from dumbo jobs"""
  10 +
  11 + dbcls = HDB
  12 + flags = HDBOWRITER | HDBOCREAT
  13 + methodname = 'putasync'
  14 + chunksize = CHUNKSIZE
  15 +
  16 + def __init__(self):
  17 + fd, self.fn = mkstemp('.db', 'tc-', os.getcwd())
  18 + os.close(fd)
  19 + self.db = self.dbcls()
  20 + self.db.setxmsiz(0)
  21 + self.db.open(self.fn, self.flags)
  22 + self.add = getattr(self.db, self.methodname)
  23 +
  24 + def __call__(self, key, values):
  25 + for value in values:
  26 + self.add(key, value)
  27 +
  28 + def close(self):
  29 + self.db.close()
  30 + for chk in chunkedread(self.fn, chunksize=self.chunksize):
  31 + yield chk
  32 + os.unlink(self.fn)
  33 +
  34 +
  35 +class TokyoCabinetReducer(RawReducer):
  36 + factory = TokyoCabinetFactory
41 tests/testcdbreducer.py
... ... @@ -0,0 +1,41 @@
  1 +import os
  2 +import unittest
  3 +from tempfile import mkstemp
  4 +import cdb
  5 +
  6 +from dumbo.lib.cdbreducer import CDBReducer, CDBFactory
  7 +
  8 +
  9 +class CDBTestCase(unittest.TestCase):
  10 +
  11 + def test_default(self):
  12 + proc = CDBFactory()
  13 + self.assertEqual(proc('k1', ['v1']), None)
  14 + self.assertEqual(proc('k2', ['v2', 'v3']), None)
  15 + chunks = proc.close()
  16 + fn = mkstemp()[1]
  17 + fo = open(fn, 'wb')
  18 + for chk in chunks:
  19 + self.assertTrue(len(chk) <= proc.chunksize)
  20 + fo.write(chk)
  21 + fo.close()
  22 +
  23 + db = cdb.init(fn)
  24 + self.assertEqual([(k, db[k]) for k in db.keys()],
  25 + [('k1', 'v1'), ('k2', 'v2')])
  26 + os.remove(fn)
  27 +
  28 + def test_reducer(self):
  29 + red = CDBReducer()
  30 + output = red(zip('abcde', '12345'))
  31 +
  32 + fn = mkstemp()[1]
  33 + fo = open(fn, 'wb')
  34 + fo.writelines(v for k, v in output)
  35 + fo.close()
  36 +
  37 + db = cdb.init(fn)
  38 + self.assertEqual([(k, db[k]) for k in db.keys()],
  39 + [('a', '1'), ('b', '2'), ('c', '3'), ('d', '4'), ('e', '5')])
  40 + os.remove(fn)
  41 +
109 tests/testrawreducer.py
... ... @@ -0,0 +1,109 @@
  1 +import os
  2 +import unittest
  3 +from tempfile import mkstemp
  4 +from cStringIO import StringIO
  5 +from dumbo.lib.rawreducer import RawReducer, chunkedread
  6 +
  7 +
  8 +DATA = [('k1', ['v1a', 'v1b']), ('k2', ['v2c']), ('k3', ['v3d', 'v3e', 'v3f'])]
  9 +MULTIDATA = sorted(((str(i), k), [v]) for k, vs in DATA for i, v in enumerate(vs))
  10 +
  11 +
  12 +class RawReducerTestCase(unittest.TestCase):
  13 +
  14 + def test_default_factory(self):
  15 + red = RawReducer()
  16 + self.assertEqual(list(red(iter(DATA))),
  17 + [(None, 'v1a'), (None, 'v1b'), (None, 'v2c'),
  18 + (None, 'v3d'), (None, 'v3e'), (None, 'v3f')])
  19 +
  20 + red = RawReducer(multipleoutput=True)
  21 + self.assertEqual(list(red(iter(MULTIDATA))),
  22 + [('0', 'v1a'), ('0', 'v2c'), ('0', 'v3d'),
  23 + ('1', 'v1b'), ('1', 'v3e'), ('2', 'v3f')])
  24 +
  25 + def test_custom_factory(self):
  26 + def first_value_factory():
  27 + return lambda k, v: [v[0]]
  28 +
  29 + red = RawReducer(first_value_factory)
  30 + self.assertEqual(list(red(iter(DATA))),
  31 + [(None, 'v1a'), (None, 'v2c'), (None, 'v3d')])
  32 +
  33 + red = RawReducer(first_value_factory, multipleoutput=True)
  34 + self.assertEqual(list(red(iter(MULTIDATA))),
  35 + [('0', 'v1a'), ('0', 'v2c'), ('0', 'v3d'),
  36 + ('1', 'v1b'), ('1', 'v3e'), ('2', 'v3f')])
  37 +
  38 + def test_custom_factory_with_close(self):
  39 + class CloseFactory(object):
  40 + def __init__(self):
  41 + self.items = []
  42 +
  43 + def __call__(self, key, values):
  44 + self.items.extend(values)
  45 +
  46 + def close(self):
  47 + return self.items
  48 +
  49 + red = RawReducer(CloseFactory)
  50 + self.assertEqual(list(red(iter(DATA))),
  51 + [(None, 'v1a'), (None, 'v1b'), (None, 'v2c'),
  52 + (None, 'v3d'), (None, 'v3e'), (None, 'v3f')])
  53 +
  54 + red = RawReducer(CloseFactory, multipleoutput=True)
  55 + self.assertEqual(list(red(iter(MULTIDATA))),
  56 + [('0', 'v1a'), ('0', 'v2c'), ('0', 'v3d'),
  57 + ('1', 'v1b'), ('1', 'v3e'), ('2', 'v3f')])
  58 +
  59 + def test_extending_rawreducer_class(self):
  60 + class DummyFactory(object):
  61 + def __call__(self, key, values):
  62 + yield key
  63 +
  64 + class DummyReducer(RawReducer):
  65 + factory = DummyFactory
  66 +
  67 + red = DummyReducer()
  68 + self.assertEqual(list(red(iter(DATA))),
  69 + [(None, 'k1'), (None, 'k2'), (None, 'k3')])
  70 +
  71 + red = DummyReducer(multipleoutput=True)
  72 + self.assertEqual(list(red(iter(MULTIDATA))),
  73 + [('0', 'k1'), ('0', 'k2'), ('0', 'k3'),
  74 + ('1', 'k1'), ('1', 'k3'), ('2', 'k3')])
  75 +
  76 + class MultiDummyReducer(RawReducer):
  77 + factory = DummyFactory
  78 + multipleoutput = True
  79 +
  80 + red = MultiDummyReducer()
  81 + self.assertEqual(list(red(iter(MULTIDATA))),
  82 + [('0', 'k1'), ('0', 'k2'), ('0', 'k3'),
  83 + ('1', 'k1'), ('1', 'k3'), ('2', 'k3')])
  84 +
  85 +
  86 +class ChunkedReadTestCase(unittest.TestCase):
  87 +
  88 + def test_chunkedread_on_fileobject(self):
  89 + fo = StringIO('one\nbig\nchunk\nof\ndata\n')
  90 + chunks = chunkedread(fo, chunksize=10)
  91 + self.assertEqual(chunks.next(), 'one\nbig\nch')
  92 + self.assertEqual(chunks.next(), 'unk\nof\ndat')
  93 + self.assertEqual(chunks.next(), 'a\n')
  94 + self.assertRaises(StopIteration, chunks.next)
  95 + fo.close()
  96 +
  97 + def test_chunkedread_on_filename(self):
  98 + fn = mkstemp()[1]
  99 + try:
  100 + fo = open(fn, 'wb')
  101 + fo.write('one\nbig\nchunk\nof\ndata\n')
  102 + fo.close()
  103 + chunks = chunkedread(fn, chunksize=10)
  104 + self.assertEqual(chunks.next(), 'one\nbig\nch')
  105 + self.assertEqual(chunks.next(), 'unk\nof\ndat')
  106 + self.assertEqual(chunks.next(), 'a\n')
  107 + self.assertRaises(StopIteration, chunks.next)
  108 + finally:
  109 + os.unlink(fn)
68 tests/testtokyocabinetreducer.py
... ... @@ -0,0 +1,68 @@
  1 +import unittest
  2 +import os
  3 +from tempfile import mkstemp
  4 +from tokyo.cabinet import HDB, HDBOREADER, BDB, BDBOREADER, BDBOWRITER, BDBOCREAT
  5 +
  6 +from dumbo.lib.tokyocabinetreducer import TokyoCabinetReducer, TokyoCabinetFactory
  7 +
  8 +
  9 +class TokyoCabinetTestCase(unittest.TestCase):
  10 +
  11 + def test_default(self):
  12 + proc = TokyoCabinetFactory()
  13 + self.assertEqual(proc('k1', ['v1']), None)
  14 + self.assertEqual(proc('k2', ['v2', 'v3']), None)
  15 + chunks = proc.close()
  16 + fn = mkstemp()[1]
  17 + fo = open(fn, 'wb')
  18 + for chk in chunks:
  19 + self.assertTrue(len(chk) <= proc.chunksize)
  20 + fo.write(chk)
  21 + fo.close()
  22 +
  23 + db = HDB()
  24 + db.open(fn, HDBOREADER)
  25 + self.assertEqual(list(db.iteritems()), [('k1', 'v1'), ('k2', 'v3')])
  26 + db.close()
  27 + os.remove(fn)
  28 +
  29 + def test_extended(self):
  30 + class BDBFactory(TokyoCabinetFactory):
  31 + dbcls = BDB
  32 + flags = BDBOWRITER | BDBOCREAT
  33 + methodname = 'addint'
  34 + chunksize = 10 # very small
  35 +
  36 + proc = BDBFactory()
  37 + self.assertEqual(proc('k1', [2]), None)
  38 + self.assertEqual(proc('k2', [3, 6]), None)
  39 + chunks = proc.close()
  40 + fn = mkstemp()[1]
  41 + fo = open(fn, 'wb')
  42 + for chk in chunks:
  43 + self.assertTrue(len(chk) <= 10)
  44 + fo.write(chk)
  45 + fo.close()
  46 +
  47 + db = BDB()
  48 + db.open(fn, BDBOWRITER)
  49 + self.assertEqual(len(db), 2)
  50 + self.assertEqual(db.addint('k1', 0), 2)
  51 + self.assertEqual(db.addint('k2', 0), 9)
  52 + db.close()
  53 + os.remove(fn)
  54 +
  55 + def test_reducer(self):
  56 + red = TokyoCabinetReducer()
  57 + output = red(zip('abcde', '12345'))
  58 +
  59 + fn = mkstemp()[1]
  60 + fo = open(fn, 'wb')
  61 + fo.writelines(v for k, v in output)
  62 + fo.close()
  63 + db = HDB()
  64 + db.open(fn, HDBOREADER)
  65 + self.assertEqual(list(db.iteritems()),
  66 + [('a', '1'), ('b', '2'), ('c', '3'), ('d', '4'), ('e', '5')])
  67 + db.close()
  68 + os.remove(fn)

0 comments on commit 4feb7d7

Please sign in to comment.
Something went wrong with that request. Please try again.