Skip to content

Commit

Permalink
first run of avro-386
Browse files Browse the repository at this point in the history
  • Loading branch information
Philip Zeyliger committed Feb 3, 2010
1 parent 7e4037d commit a01ba18
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 79 deletions.
71 changes: 52 additions & 19 deletions lang/py/src/avro/datafile.py
Expand Up @@ -16,6 +16,7 @@
"""
Read/Write Avro File Object Containers.
"""
import zlib
import uuid
import cStringIO
from avro import schema
Expand All @@ -37,9 +38,12 @@
{"name": "meta", "type": {"type": "map", "values": "bytes"}},
{"name": "sync", "type": {"type": "fixed", "name": "sync", "size": %d}}]}
""" % (MAGIC_SIZE, SYNC_SIZE))
VALID_CODECS = ['null']
VALID_CODECS = ['null', 'deflate']
VALID_ENCODINGS = ['binary'] # not used yet

CODEC_KEY = "avro.codec"
SCHEMA_KEY = "avro.schema"

#
# Exceptions
#
Expand All @@ -61,9 +65,11 @@ def generate_sync_marker():
return uuid.uuid4().bytes

# TODO(hammer): make 'encoder' a metadata property
def __init__(self, writer, datum_writer, writers_schema=None):
def __init__(self, writer, datum_writer, writers_schema=None, codec=None):
"""
If the schema is not present, presume we're appending.
@param writer: File-like object to write into.
"""
self._writer = writer
self._encoder = io.BinaryEncoder(writer)
Expand All @@ -73,10 +79,14 @@ def __init__(self, writer, datum_writer, writers_schema=None):
self._block_count = 0
self._meta = {}


if writers_schema is not None:
if codec is None:
codec = 'null'
assert codec in VALID_CODECS, "Unknown codec: " + codec
self._sync_marker = DataFileWriter.generate_sync_marker()
self.set_meta('codec', 'null')
self.set_meta('schema', str(writers_schema))
self.set_meta('avro.codec', codec)
self.set_meta('avro.schema', str(writers_schema))
self.datum_writer.writers_schema = writers_schema
self._write_header()
else:
Expand All @@ -86,11 +96,11 @@ def __init__(self, writer, datum_writer, writers_schema=None):
# TODO(hammer): collect arbitrary metadata
# collect metadata
self._sync_marker = dfr.sync_marker
self.set_meta('codec', dfr.get_meta('codec'))
self.set_meta('avro.codec', dfr.get_meta('avro.codec'))

# get schema used to write existing file
schema_from_file = dfr.get_meta('schema')
self.set_meta('schema', schema_from_file)
schema_from_file = dfr.get_meta('avro.schema')
self.set_meta('avro.schema', schema_from_file)
self.datum_writer.writers_schema = schema.parse(schema_from_file)

# seek to the end of the file and prepare for writing
Expand Down Expand Up @@ -130,12 +140,22 @@ def _write_block(self):
self.encoder.write_long(self.block_count)

# write block contents
if self.get_meta('codec') == 'null':
self.writer.write(self.buffer_writer.getvalue())
uncompressed_data = self.buffer_writer.getvalue()
if self.get_meta(CODEC_KEY) == 'null':
compressed_data = uncompressed_data
elif self.get_meta(CODEC_KEY) == 'deflate':
# The first two characters and last character are zlib
# wrappers around deflate data.
compressed_data = zlib.compress(self.buffer_writer.getvalue())[2:-1]
else:
fail_msg = '"%s" codec is not supported.' % self.get_meta('codec')
fail_msg = '"%s" codec is not supported.' % self.get_meta(CODEC_KEY)
raise DataFileException(fail_msg)

# Write length of block
self.encoder.write_long(len(compressed_data))
# Write block
self.writer.write(compressed_data)

# write sync marker
self.writer.write(self.sync_marker)

Expand Down Expand Up @@ -177,30 +197,35 @@ class DataFileReader(object):
# TODO(hammer): allow user to specify the encoder
def __init__(self, reader, datum_reader):
self._reader = reader
self._decoder = io.BinaryDecoder(reader)
self._raw_decoder = io.BinaryDecoder(reader)
self._datum_decoder = None # Maybe reset at every block.
self._datum_reader = datum_reader

# read the header: magic, meta, sync
self._read_header()

# ensure codec is valid
codec_from_file = self.get_meta('codec')
if codec_from_file is not None and codec_from_file not in VALID_CODECS:
raise DataFileException('Unknown codec: %s.' % codec_from_file)
self.codec = self.get_meta('avro.codec')
if self.codec is None:
self.codec = "null"
if self.codec not in VALID_CODECS:
raise DataFileException('Unknown codec: %s.' % self.codec)
self.codec = self.codec

# get file length
self._file_length = self.determine_file_length()

# get ready to read
self._block_count = 0
self.datum_reader.writers_schema = schema.parse(self.get_meta('schema'))
self.datum_reader.writers_schema = schema.parse(self.get_meta(SCHEMA_KEY))

def __iter__(self):
return self

# read-only properties
reader = property(lambda self: self._reader)
decoder = property(lambda self: self._decoder)
raw_decoder = property(lambda self: self._raw_decoder)
datum_decoder = property(lambda self: self._datum_decoder)
datum_reader = property(lambda self: self._datum_reader)
sync_marker = property(lambda self: self._sync_marker)
meta = property(lambda self: self._meta)
Expand Down Expand Up @@ -235,7 +260,7 @@ def _read_header(self):
self.reader.seek(0, 0)

# read header into a dict
header = self.datum_reader.read_data(META_SCHEMA, META_SCHEMA, self.decoder)
header = self.datum_reader.read_data(META_SCHEMA, META_SCHEMA, self.raw_decoder)

# check magic number
if header.get('magic') != MAGIC:
Expand All @@ -250,7 +275,15 @@ def _read_header(self):
self._sync_marker = header['sync']

def _read_block_header(self):
self.block_count = self.decoder.read_long()
self.block_count = self.raw_decoder.read_long()
if self.codec == "null":
# Skip a long; we don't need to use the length.
self.raw_decoder.read_long()
self._datum_decoder = self._raw_decoder
else:
data = self.raw_decoder.read_bytes()
uncompressed = zlib.decompress(data, -15)
self._datum_decoder = io.BinaryDecoder(cStringIO.StringIO(uncompressed))

def _skip_sync(self):
"""
Expand All @@ -277,7 +310,7 @@ def next(self):
else:
self._read_block_header()

datum = self.datum_reader.read(self.decoder)
datum = self.datum_reader.read(self.datum_decoder)
self.block_count -= 1
return datum

Expand Down
45 changes: 45 additions & 0 deletions lang/py/src/avro/tool.py
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Command-line tool for manipulating Avro data files.
NOTE: The API for the command-line tool is experimental.
"""

import sys
from avro import datafile, io

def main(args=sys.argv):
if len(args) == 1:
print "Usage: %s (dump)" % args[0]
return 1

if args[1] == "dump":
if len(args) != 3:
print "Usage: %s dump input_file" % args[0]
return 1
for d in datafile.DataFileReader(file_or_stdin(args[2]), io.DatumReader()):
print repr(d)
return 0

def file_or_stdin(f):
if f == "-":
return sys.stdin
else:
return file(f)

if __name__ == "__main__":
sys.exit(main(sys.argv))
125 changes: 65 additions & 60 deletions lang/py/test/test_datafile.py
Expand Up @@ -51,6 +51,7 @@
)

FILENAME = 'test_datafile.out'
CODECS_TO_VALIDATE = (None, 'null', 'deflate')

# TODO(hammer): clean up written files with ant, not os.remove
class TestDataFile(unittest.TestCase):
Expand All @@ -61,38 +62,40 @@ def test_round_trip(self):
print ''
correct = 0
for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
print ''
print 'SCHEMA NUMBER %d' % (i + 1)
print '================'
print ''
print 'Schema: %s' % example_schema
print 'Datum: %s' % datum
for codec in CODECS_TO_VALIDATE:
print ''
print 'SCHEMA NUMBER %d' % (i + 1)
print '================'
print ''
print 'Schema: %s' % example_schema
print 'Datum: %s' % datum
print 'Codec: %s' % codec

# write data in binary to file 10 times
writer = open(FILENAME, 'wb')
datum_writer = io.DatumWriter()
schema_object = schema.parse(example_schema)
dfw = datafile.DataFileWriter(writer, datum_writer, schema_object)
for i in range(10):
dfw.append(datum)
dfw.close()
# write data in binary to file 10 times
writer = open(FILENAME, 'wb')
datum_writer = io.DatumWriter()
schema_object = schema.parse(example_schema)
dfw = datafile.DataFileWriter(writer, datum_writer, schema_object, codec=codec)
for i in range(10):
dfw.append(datum)
dfw.close()

# read data in binary from file
reader = open(FILENAME, 'rb')
datum_reader = io.DatumReader()
dfr = datafile.DataFileReader(reader, datum_reader)
round_trip_data = []
for datum in dfr:
round_trip_data.append(datum)
# read data in binary from file
reader = open(FILENAME, 'rb')
datum_reader = io.DatumReader()
dfr = datafile.DataFileReader(reader, datum_reader)
round_trip_data = []
for datum in dfr:
round_trip_data.append(datum)

print 'Round Trip Data: %s' % round_trip_data
print 'Round Trip Data Length: %d' % len(round_trip_data)
is_correct = [datum] * 10 == round_trip_data
if is_correct: correct += 1
print 'Correct Round Trip: %s' % is_correct
print ''
print 'Round Trip Data: %s' % round_trip_data
print 'Round Trip Data Length: %d' % len(round_trip_data)
is_correct = [datum] * 10 == round_trip_data
if is_correct: correct += 1
print 'Correct Round Trip: %s' % is_correct
print ''
os.remove(FILENAME)
self.assertEquals(correct, len(SCHEMAS_TO_VALIDATE))
self.assertEquals(correct, len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE))

def test_append(self):
print ''
Expand All @@ -101,44 +104,46 @@ def test_append(self):
print ''
correct = 0
for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
print ''
print 'SCHEMA NUMBER %d' % (i + 1)
print '================'
print ''
print 'Schema: %s' % example_schema
print 'Datum: %s' % datum
for codec in CODECS_TO_VALIDATE:
print ''
print 'SCHEMA NUMBER %d' % (i + 1)
print '================'
print ''
print 'Schema: %s' % example_schema
print 'Datum: %s' % datum
print 'Codec: %s' % codec

# write data in binary to file once
writer = open(FILENAME, 'wb')
datum_writer = io.DatumWriter()
schema_object = schema.parse(example_schema)
dfw = datafile.DataFileWriter(writer, datum_writer, schema_object)
dfw.append(datum)
dfw.close()

# open file, write, and close nine times
for i in range(9):
writer = open(FILENAME, 'ab+')
dfw = datafile.DataFileWriter(writer, io.DatumWriter())
# write data in binary to file once
writer = open(FILENAME, 'wb')
datum_writer = io.DatumWriter()
schema_object = schema.parse(example_schema)
dfw = datafile.DataFileWriter(writer, datum_writer, schema_object, codec=codec)
dfw.append(datum)
dfw.close()

# read data in binary from file
reader = open(FILENAME, 'rb')
datum_reader = io.DatumReader()
dfr = datafile.DataFileReader(reader, datum_reader)
appended_data = []
for datum in dfr:
appended_data.append(datum)
# open file, write, and close nine times
for i in range(9):
writer = open(FILENAME, 'ab+')
dfw = datafile.DataFileWriter(writer, io.DatumWriter())
dfw.append(datum)
dfw.close()

# read data in binary from file
reader = open(FILENAME, 'rb')
datum_reader = io.DatumReader()
dfr = datafile.DataFileReader(reader, datum_reader)
appended_data = []
for datum in dfr:
appended_data.append(datum)

print 'Appended Data: %s' % appended_data
print 'Appended Data Length: %d' % len(appended_data)
is_correct = [datum] * 10 == appended_data
if is_correct: correct += 1
print 'Correct Appended: %s' % is_correct
print ''
print 'Appended Data: %s' % appended_data
print 'Appended Data Length: %d' % len(appended_data)
is_correct = [datum] * 10 == appended_data
if is_correct: correct += 1
print 'Correct Appended: %s' % is_correct
print ''
os.remove(FILENAME)
self.assertEquals(correct, len(SCHEMAS_TO_VALIDATE))
self.assertEquals(correct, len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE))

if __name__ == '__main__':
unittest.main()

0 comments on commit a01ba18

Please sign in to comment.