Skip to content

Commit

Permalink
Add support and testing for 'dict' in SchemaGenerator (fixes #58) (#63)
Browse files Browse the repository at this point in the history
* Add support and testing for 'dict' in SchemaGenerator

* Fix lines lengths for flake8

* Reformat verify_data_chunk so that git diff is smaller
  • Loading branch information
ZiggerZZ committed Dec 7, 2020
1 parent d5c3cd3 commit f315ac2
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 21 deletions.
35 changes: 19 additions & 16 deletions bigquery_schema_generator/generate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,21 @@ def __init__(
# If CSV, force keep_nulls = True
self.keep_nulls = True if (input_format == 'csv') else keep_nulls

# If JSON, sort the schema using the name of the column to be
# If JSON or dict, sort the schema using the name of the column to be
# consistent with 'bq load'.
# If CSV, preserve the original ordering because 'bq load` matches the
# CSV column with the respective schema entry using the position of the
# column in the schema.
self.sorted_schema = (input_format == 'json')
self.sorted_schema = (input_format in {'json', 'dict'})

self.line_number = 0
self.error_logs = []

def log_error(self, msg):
self.error_logs.append({'line_number': self.line_number, 'msg': msg})

def deduce_schema(self, file, *, schema_map=None):
"""Loop through each newlined-delimited line of 'file' and deduce the
def deduce_schema(self, input_data, *, schema_map=None):
"""Loop through each element of 'input_data' and deduce the
BigQuery schema. The schema is returned as a recursive map that contains
both the database schema and some additional metadata about each entry.
It has the following form:
Expand Down Expand Up @@ -171,9 +171,11 @@ def deduce_schema(self, file, *, schema_map=None):
"""

if self.input_format == 'csv':
reader = csv.DictReader(file)
reader = csv.DictReader(input_data)
elif self.input_format == 'json' or self.input_format is None:
reader = json_reader(file)
reader = json_reader(input_data)
elif self.input_format == 'dict':
reader = input_data
else:
raise Exception(f"Unknown input_format '{self.input_format}'")

Expand Down Expand Up @@ -202,11 +204,12 @@ def deduce_schema(self, file, *, schema_map=None):
raise json_object
else:
self.log_error(
'Record should be a JSON Object but was a'
f' {type(json_object)}'
'Record should be a JSON Object '
f'but was a {type(json_object)}'
)
if not self.ignore_invalid_lines:
raise Exception('Record must be a JSON Object')
raise Exception(f'Record must be a JSON Object '
f'but was a {type(json_object)}')
finally:
logging.info(f'Processed {self.line_number} lines')

Expand Down Expand Up @@ -714,15 +717,15 @@ def run(
print(file=output_file)


def json_reader(file):
def json_reader(input_data):
"""A generator that converts an iterable of newline-delimited JSON objects
('file' could be a 'list' for testing purposes) into an iterable of Python
dict objects. If the line cannot be parsed as JSON, the exception thrown by
the json.loads() is yielded back, instead of the json object. The calling
code can check for this exception with an isinstance() function, then
continue processing the rest of the file.
('input_data' could be a 'list' for testing purposes) into an iterable of
Python dict objects. If the line cannot be parsed as JSON, the exception
thrown by the json.loads() is yielded back, instead of the json object.
The calling code can check for this exception with an isinstance() function,
then continue processing the rest of the file.
"""
for line in file:
for line in input_data:
try:
yield json.loads(line)
except Exception as e:
Expand Down
34 changes: 29 additions & 5 deletions tests/test_generate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from bigquery_schema_generator.generate_schema import convert_type
from bigquery_schema_generator.generate_schema import is_string_type
from bigquery_schema_generator.generate_schema import json_full_path
from bigquery_schema_generator.generate_schema import json_reader
from .data_reader import DataReader


Expand Down Expand Up @@ -432,6 +433,7 @@ class TestDataChunksFromFile(unittest.TestCase):
schema matches the one produced by SchemaGenerator.deduce_schema(). Multiple
test cases are stored in TESTDATA_FILE. The data_reader.py module knows how
to parse that file.
JSON chunks are verified as JSON but also as dict.
"""

TESTDATA_FILE = 'testdata.txt'
Expand All @@ -456,6 +458,15 @@ def test_all_data_chunks(self):
raise e

def verify_data_chunk(self, chunk):
self.verify_data_chunk_as_csv_json_dict(chunk=chunk, as_dict=False)
self.verify_data_chunk_as_csv_json_dict(chunk=chunk, as_dict=True)

def verify_data_chunk_as_csv_json_dict(self, *, chunk, as_dict):
"""Verify the given chunk from the testdata.txt file. If `as_dict` is
True, then if the input_format of the chunk is 'json', pretend
that the input data was given as an internal Python dict, and verify
the 'input_format=dict' code path in SchemaGenerator.
"""
chunk_count = chunk['chunk_count']
line_number = chunk['line_number']
data_flags = chunk['data_flags']
Expand All @@ -471,10 +482,23 @@ def verify_data_chunk(self, chunk):
expected_schema = chunk['schema']
existing_schema = chunk['existing_schema']

print(
f"Test chunk: {chunk_count}; line_number: {line_number}; "
f"first record: {records[0]}"
)
if as_dict:
if input_format == 'json':
print(
f"Test chunk: {chunk_count}; line_number: {line_number}; "
f"input_format='dict'"
)
input_format = 'dict'
records = json_reader(records)
else:
# Don't bother converting CSV data chunks into Python dict.
return
else:
print(
f"Test chunk: {chunk_count}; line_number: {line_number}; "
f"first record: {records[0]}"
)

# Generate schema.
generator = SchemaGenerator(
input_format=input_format,
Expand Down Expand Up @@ -549,7 +573,7 @@ def test_bq_schema_to_map_round_trip_permutations(self):
schema_map = bq_schema_to_map(schema)
for input_format_and_mode in valid_input_formats_and_modes:
for keep_null_param in valid_keep_null_params:
for quotes_are_strings in\
for quotes_are_strings in \
valid_quoted_values_are_strings:
generator = SchemaGenerator(
input_format=input_format_and_mode[0],
Expand Down

0 comments on commit f315ac2

Please sign in to comment.