Skip to content

Commit

Permalink
implement a parse_schema function
Browse files Browse the repository at this point in the history
  • Loading branch information
scottbelden committed Jul 9, 2018
1 parent 5729d04 commit 057d716
Show file tree
Hide file tree
Showing 18 changed files with 393 additions and 347 deletions.
6 changes: 5 additions & 1 deletion benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import datetime
import time

from fastavro import writer, reader, schemaless_writer, schemaless_reader
from fastavro import writer, reader, schemaless_writer, schemaless_reader, parse_schema
from fastavro._timezone import utc

from fastavro.validation import validate, validate_many


def write(schema, records, runs=1):
times = []
schema = parse_schema(schema)
for _ in range(runs):
iostream = BytesIO()
start = time.time()
Expand All @@ -22,6 +23,7 @@ def write(schema, records, runs=1):

def write_schemaless(schema, records, runs=1):
times = []
schema = parse_schema(schema)
for _ in range(runs):
for record in records:
iostream = BytesIO()
Expand All @@ -36,6 +38,7 @@ def write_schemaless(schema, records, runs=1):
def validater(schema, records, runs=1):
times = []
valid = []
schema = parse_schema(schema)
for _ in range(runs):
start = time.time()
valid = validate_many(records, schema)
Expand All @@ -59,6 +62,7 @@ def read(iostream, runs=1):

def read_schemaless(iostream, schema, num_records, runs=1):
times = []
schema = parse_schema(schema)
for _ in range(runs):
for _ in range(num_records):
iostream.seek(0)
Expand Down
16 changes: 6 additions & 10 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ Example
-------

::

# Writing
from fastavro import writer
from fastavro import writer, reader, parse_schema

schema = {
'doc': 'A weather reading.',
Expand All @@ -37,6 +35,7 @@ Example
{'name': 'temp', 'type': 'int'},
],
}
parsed_schema = parse_schema(schema)

# 'records' can be an iterable (including generator)
records = [
Expand All @@ -46,17 +45,13 @@ Example
{u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
]

# Writing
with open('weather.avro', 'wb') as out:
writer(out, schema, records)
writer(out, parsed_schema, records)

# Reading
import fastavro

with open('weather.avro', 'rb') as fo:
reader = fastavro.reader(fo)
schema = reader.schema

for record in reader:
for record in reader(fo):
print(record)


Expand All @@ -68,6 +63,7 @@ Documentation

reader
writer
schema
validation
command_line_script

Expand Down
4 changes: 4 additions & 0 deletions docs/schema.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fastavro.schema
===============

.. autofunction:: fastavro._schema_py.parse_schema
16 changes: 1 addition & 15 deletions fastavro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,14 @@
import fastavro.schema
import fastavro.validation


def _acquaint_schema(schema):
"""Add a new schema to the schema repo.
Parameters
----------
schema: dict
Schema to add to repo
"""
fastavro.read.acquaint_schema(schema)
fastavro.write.acquaint_schema(schema)


reader = iter_avro = fastavro.read.reader
block_reader = fastavro.read.block_reader
schemaless_reader = fastavro.read.schemaless_reader
writer = fastavro.write.writer
schemaless_writer = fastavro.write.schemaless_writer
acquaint_schema = _acquaint_schema
fastavro.schema.acquaint_schema = _acquaint_schema
is_avro = fastavro.read.is_avro
validate = fastavro.validation.validate
parse_schema = fastavro.schema.parse_schema

__all__ = [
n for n in locals().keys() if not n.startswith('_')
Expand Down
46 changes: 15 additions & 31 deletions fastavro/_read.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ import json
from ._six import (
btou, iteritems, is_str, str2ints, fstint
)
from ._schema import (
extract_record_type, extract_named_schemas_into_repo, populate_schema_defs,
extract_logical_type
)
from ._schema import extract_record_type, extract_logical_type, parse_schema
from ._schema_common import SCHEMA_DEFS
from ._read_common import (
SchemaResolutionError, MAGIC, SYNC_SIZE, HEADER_SCHEMA
Expand Down Expand Up @@ -498,12 +495,6 @@ LOGICAL_READERS = {
'long-time-micros': read_time_micros,
}

READERS = {}


cpdef read_data(fo, writer_schema, reader_schema=None):
return _read_data(fo, writer_schema, reader_schema)


cpdef _read_data(fo, writer_schema, reader_schema=None):
"""Read data from file object according to schema."""
Expand Down Expand Up @@ -542,8 +533,11 @@ cpdef _read_data(fo, writer_schema, reader_schema=None):
elif record_type == 'record' or record_type == 'error':
data = read_record(fo, writer_schema, reader_schema)
else:
fn = READERS[record_type]
data = fn(fo, writer_schema, reader_schema)
return _read_data(
fo,
SCHEMA_DEFS[record_type],
SCHEMA_DEFS.get(reader_schema)
)
except ReadError:
raise EOFError('cannot read %s from %s' % (record_type, fo))

Expand Down Expand Up @@ -594,16 +588,6 @@ except ImportError:
pass


def acquaint_schema(schema):
"""Extract schema into READERS"""
extract_named_schemas_into_repo(
schema,
READERS,
lambda schema: lambda fo, _, r_schema: read_data(
fo, schema, SCHEMA_DEFS.get(r_schema)),
)


def _iter_avro_records(fo, header, codec, writer_schema, reader_schema):
for block in _iter_avro_blocks(fo, header, codec, writer_schema, reader_schema):
for record in block:
Expand Down Expand Up @@ -671,19 +655,19 @@ class file_reader:
k: btou(v) for k, v in iteritems(self._header['meta'])
}

self.schema = self.writer_schema = \
json.loads(self.metadata['avro.schema'])
self.schema = json.loads(self.metadata['avro.schema'])
self.codec = self.metadata.get('avro.codec', 'null')

self.reader_schema = reader_schema

if self.writer_schema == reader_schema:
if self.schema == reader_schema:
# No need for the reader schema if they are the same
reader_schema = None

acquaint_schema(self.writer_schema)
self.writer_schema = parse_schema(self.schema, _write_hint=False)

if reader_schema:
populate_schema_defs(reader_schema)
self.reader_schema = parse_schema(reader_schema, _write_hint=False)

self._elems = None

Expand Down Expand Up @@ -721,16 +705,16 @@ class block_reader(file_reader):


cpdef schemaless_reader(fo, writer_schema, reader_schema=None):
acquaint_schema(writer_schema)

if writer_schema == reader_schema:
# No need for the reader schema if they are the same
reader_schema = None

writer_schema = parse_schema(writer_schema)

if reader_schema:
populate_schema_defs(reader_schema)
reader_schema = parse_schema(reader_schema)

return read_data(fo, writer_schema, reader_schema)
return _read_data(fo, writer_schema, reader_schema)


cpdef is_avro(path_or_buffer):
Expand Down
57 changes: 25 additions & 32 deletions fastavro/_read_py.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
from .six import (
xrange, btou, iteritems, is_str, str2ints, fstint
)
from .schema import (
extract_record_type, extract_named_schemas_into_repo, populate_schema_defs,
extract_logical_type
)
from .schema import extract_record_type, extract_logical_type, parse_schema
from ._schema_common import SCHEMA_DEFS
from ._read_common import (
SchemaResolutionError, MAGIC, SYNC_SIZE, HEADER_SCHEMA
Expand Down Expand Up @@ -461,16 +458,26 @@ def read_data(fo, writer_schema, reader_schema=None):

if reader_schema and record_type in AVRO_TYPES:
match_schemas(writer_schema, reader_schema)
try:
data = READERS[record_type](fo, writer_schema, reader_schema)

reader_fn = READERS.get(record_type)
if reader_fn:
try:
data = reader_fn(fo, writer_schema, reader_schema)
except StructError:
raise EOFError('cannot read %s from %s' % (record_type, fo))

if 'logicalType' in writer_schema:
fn = LOGICAL_READERS.get(logical_type)
if fn:
return fn(data, writer_schema, reader_schema)

return data
except StructError:
raise EOFError('cannot read %s from %s' % (record_type, fo))
else:
return read_data(
fo,
SCHEMA_DEFS[record_type],
SCHEMA_DEFS.get(reader_schema)
)


def skip_sync(fo, sync_marker):
Expand Down Expand Up @@ -513,16 +520,6 @@ def snappy_read_block(fo):
pass


def acquaint_schema(schema):
"""Extract schema into READERS"""
extract_named_schemas_into_repo(
schema,
READERS,
lambda schema: lambda fo, _, r_schema: read_data(
fo, schema, SCHEMA_DEFS.get(r_schema)),
)


def _iter_avro_records(fo, header, codec, writer_schema, reader_schema):
"""Return iterator over avro records."""
for block in _iter_avro_blocks(fo, header, codec, writer_schema,
Expand Down Expand Up @@ -593,19 +590,19 @@ def __init__(self, fo, reader_schema=None):
k: btou(v) for k, v in iteritems(self._header['meta'])
}

self.schema = self.writer_schema = \
json.loads(self.metadata['avro.schema'])
self.schema = json.loads(self.metadata['avro.schema'])
self.codec = self.metadata.get('avro.codec', 'null')

self.reader_schema = reader_schema

if self.writer_schema == reader_schema:
if self.schema == reader_schema:
# No need for the reader schema if they are the same
reader_schema = None

acquaint_schema(self.writer_schema)
self.writer_schema = parse_schema(self.schema, _write_hint=False)

if reader_schema:
populate_schema_defs(reader_schema)
self.reader_schema = parse_schema(reader_schema, _write_hint=False)

self._elems = None

Expand All @@ -631,13 +628,11 @@ class reader(file_reader):
Reader schema
Example::
from fastavro import reader
with open('some-file.avro', 'rb') as fo:
avro_reader = reader(fo)
schema = avro_reader.schema
for record in avro_reader:
process_record(record)
"""
Expand All @@ -663,13 +658,11 @@ class block_reader(file_reader):
Reader schema
Example::
from fastavro import block_reader
with open('some-file.avro', 'rb') as fo:
avro_reader = block_reader(fo)
schema = avro_reader.schema
for block in avro_reader:
process_block(block)
"""
Expand Down Expand Up @@ -698,22 +691,22 @@ def schemaless_reader(fo, writer_schema, reader_schema=None):
be given to allow for schema migration
Example::
parsed_schema = fastavro.parse_schema(schema)
with open('file.avro', 'rb') as fp:
record = fastavro.schemaless_reader(fp, schema)
record = fastavro.schemaless_reader(fp, parsed_schema)
Note: The ``schemaless_reader`` can only read a single record.
"""
acquaint_schema(writer_schema)

if writer_schema == reader_schema:
# No need for the reader schema if they are the same
reader_schema = None

writer_schema = parse_schema(writer_schema)

if reader_schema:
populate_schema_defs(reader_schema)
reader_schema = parse_schema(reader_schema)

return read_data(fo, writer_schema, reader_schema)

Expand Down
Loading

0 comments on commit 057d716

Please sign in to comment.