From 43bc4632dcc9248ab0361397434f345c20f9132d Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Tue, 1 Dec 2020 20:57:26 -0800 Subject: [PATCH 1/8] Squash merge of all commits in abroglesc-existing-schema branch --- Makefile | 2 +- README.md | 59 +- bigquery_schema_generator/generate_schema.py | 298 +++++-- tests/data_reader.py | 110 ++- tests/test_generate_schema.py | 123 ++- tests/testdata.txt | 888 ++++++++++++++++++- 6 files changed, 1335 insertions(+), 145 deletions(-) diff --git a/Makefile b/Makefile index 7268aff..99e2189 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ tests: python3 -m unittest flake8: - flake8 bigquery_schema_generator \ + flake8 bigquery_schema_generator tests \ --count \ --ignore W503 \ --show-source \ diff --git a/README.md b/README.md index aa8fb7f..3ee4d3a 100644 --- a/README.md +++ b/README.md @@ -235,13 +235,14 @@ as shown by the `--help` flag below. Print the built-in help strings: -``` +```bash $ generate-schema --help -usage: generate_schema.py [-h] [--input_format INPUT_FORMAT] [--keep_nulls] - [--quoted_values_are_strings] [--infer_mode] - [--debugging_interval DEBUGGING_INTERVAL] - [--debugging_map] [--sanitize_names] - [--ignore_invalid_lines] +usage: generate-schema [-h] [--input_format INPUT_FORMAT] [--keep_nulls] + [--quoted_values_are_strings] [--infer_mode] + [--debugging_interval DEBUGGING_INTERVAL] + [--debugging_map] [--sanitize_names] + [--ignore_invalid_lines] + [--existing_schema_path EXISTING_SCHEMA_PATH] Generate BigQuery schema from JSON or CSV file. @@ -261,6 +262,10 @@ optional arguments: standard --ignore_invalid_lines Ignore lines that cannot be parsed instead of stopping + --existing_schema_path EXISTING_SCHEMA_PATH + File that contains the existing BigQuery schema for a + table. This can be fetched with: `bq show --schema + :: ``` #### Input Format (`--input_format`) @@ -282,7 +287,7 @@ array or empty record as its value, the field is suppressed in the schema file. This flag enables this field to be included in the schema file. In other words, using a data file containing just nulls and empty values: -``` +```bash $ generate_schema { "s": null, "a": [], "m": {} } ^D @@ -291,7 +296,7 @@ INFO:root:Processed 1 lines ``` With the `keep_nulls` flag, we get: -``` +```bash $ generate-schema --keep_nulls { "s": null, "a": [], "m": {} } ^D @@ -331,7 +336,7 @@ consistent with the algorithm used by `bq load`. However, for the `BOOLEAN`, normal strings instead. This flag disables type inference for `BOOLEAN`, `INTEGER` and `FLOAT` types inside quoted strings. -``` +```bash $ generate-schema { "name": "1" } ^D @@ -365,6 +370,12 @@ feature for JSON files, but too difficult to implement in practice because fields are often completely missing from a given JSON record (instead of explicitly being defined to be `null`). +In addition to the above, this option, when used in conjunction with +--existing_schema_map, will allow fields to be relaxed from REQUIRED to NULLABLE +if they were REQUIRED in the existing schema and NULL rows are found in the new +data we are inferring a schema from. In this case it can be used with either +input_format, CSV or JSON. + See [Issue #28](https://github.com/bxparks/bigquery-schema-generator/issues/28) for implementation details. @@ -374,7 +385,7 @@ By default, the `generate_schema.py` script prints a short progress message every 1000 lines of input data. This interval can be changed using the `--debugging_interval` flag. -``` +```bash $ generate-schema --debugging_interval 50 < file.data.json > file.schema.json ``` @@ -385,7 +396,7 @@ the bookkeeping metadata map which is used internally to keep track of the various fields and their types that were inferred using the data file. This flag is intended to be used for debugging. -``` +```bash $ generate-schema --debugging_map < file.data.json > file.schema.json ``` @@ -435,6 +446,20 @@ deduction logic will handle any missing or extra columns gracefully. Fixes [Issue #49](https://github.com/bxparks/bigquery-schema-generator/issues/49). +#### Existing Schema Path (`--existing_schema_path`) +There are cases where we would like to start from an existing BigQuery table schema +rather than starting from scratch with a new batch of data we would like to load. +In this case we can specify the path to a local file on disk that is our existing +bigquery table schema. This can be generated via the following bq cli command: +```bash +bq show --schema :. > existing_table_schema.json +``` + +We can then run generate-schema with the additional option +```bash +--existing_schema_path existing_table_schema.json +``` + ## Schema Types ### Supported Types @@ -534,7 +559,7 @@ compatibility rules implemented by **bq load**: Here is an example of a single JSON data record on the STDIN (the `^D` below means typing Control-D, which indicates "end of file" under Linux and MacOS): -``` +```bash $ generate-schema { "s": "string", "b": true, "i": 1, "x": 3.1, "t": "2017-05-22T17:10:00-07:00" } ^D @@ -569,7 +594,7 @@ INFO:root:Processed 1 lines ``` In most cases, the data file will be stored in a file: -``` +```bash $ cat > file.data.json { "a": [1, 2] } { "i": 3 } @@ -596,7 +621,7 @@ $ cat file.schema.json Here is the schema generated from a CSV input file. The first line is the header containing the names of the columns, and the schema lists the columns in the same order as the header: -``` +```bash $ generate-schema --input_format csv e,b,c,d,a 1,x,true,,2.0 @@ -634,7 +659,7 @@ INFO:root:Processed 3 lines ``` Here is an example of the schema generated with the `--infer_mode` flag: -``` +```bash $ generate-schema --input_format csv --infer_mode name,surname,age John @@ -701,7 +726,7 @@ json.dump(schema, output_file, indent=2) I wrote the `bigquery_schema_generator/anonymize.py` script to create an anonymized data file `tests/testdata/anon1.data.json.gz`: -``` +```bash $ ./bigquery_schema_generator/anonymize.py < original.data.json \ > anon1.data.json $ gzip anon1.data.json @@ -709,7 +734,7 @@ $ gzip anon1.data.json This data file is 290MB (5.6MB compressed) with 103080 data records. Generating the schema using -``` +```bash $ bigquery_schema_generator/generate_schema.py < anon1.data.json \ > anon1.schema.json ``` diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index c3f3bd7..0d0f3f1 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -105,17 +105,6 @@ def __init__( # rest of the file. self.ignore_invalid_lines = ignore_invalid_lines - # 'infer_mode' is supported for only input_format = 'csv' because - # the header line gives us the complete list of fields to be expected in - # the CSV file. In JSON data files, certain fields will often be - # completely missing instead of being set to 'null' or "". If the field - # is not even present, then it becomes incredibly difficult (not - # impossible, but more effort than I want to expend right now) to figure - # out which fields are missing so that we can mark the appropriate - # schema entries with 'filled=False'. - if infer_mode and input_format != 'csv': - raise Exception("infer_mode requires input_format=='csv'") - # If CSV, force keep_nulls = True self.keep_nulls = True if (input_format == 'csv') else keep_nulls @@ -130,11 +119,11 @@ def __init__( self.error_logs = [] def log_error(self, msg): - self.error_logs.append({'line': self.line_number, 'msg': msg}) + self.error_logs.append({'line_number': self.line_number, 'msg': msg}) # TODO: BigQuery is case-insensitive with regards to the 'name' of the # field. Verify that the 'name' is unique regardless of the case. - def deduce_schema(self, file): + def deduce_schema(self, file, *, schema_map=None): """Loop through each newlined-delimited line of 'file' and deduce the BigQuery schema. The schema is returned as a recursive map that contains both the database schema and some additional metadata @@ -170,7 +159,8 @@ def deduce_schema(self, file): The 'filled' entry indicates whether all input data records contained the given field. If the --infer_mode flag is given, the 'filled' entry - is used to convert a NULLABLE schema entry to a REQUIRED schema entry. + is used to convert a NULLABLE schema entry to a REQUIRED schema entry or + to relax an existing field in schema_map from REQUIRED to NULLABLE. The function returns a tuple of 2 things: * an OrderedDict which is sorted by the 'key' of the column name @@ -189,7 +179,9 @@ def deduce_schema(self, file): else: raise Exception("Unknown input_format '%s'" % self.input_format) - schema_map = OrderedDict() + if schema_map is None: + schema_map = OrderedDict() + try: for json_object in reader: @@ -232,18 +224,35 @@ def deduce_schema_for_line(self, json_object, schema_map, base_path=None): nested record that leads to this specific entry. """ for key, value in json_object.items(): - schema_entry = schema_map.get(key) + # The canonical key is the lower-cased version of the sanitized key + # so that the case of the field name is preserved when generating + # the schema but we don't create invalid, duplicate, fields since + # BigQuery is case insensitive + canonical_key = self.sanitize_name(key).lower() + schema_entry = schema_map.get(canonical_key) new_schema_entry = self.get_schema_entry( key=key, value=value, - base_path=base_path, + base_path=base_path ) - schema_map[key] = self.merge_schema_entry( + schema_map[canonical_key] = self.merge_schema_entry( old_schema_entry=schema_entry, new_schema_entry=new_schema_entry, - base_path=base_path, + base_path=base_path ) + def sanitize_name(self, value): + ''' Sanitizes a column name within the schema. + + We explicitly choose to not perform the lowercasing here as this + cause us to lose case sensitivity when generating the final schema + ''' + if self.sanitize_names: + new_value = re.sub('[^a-zA-Z0-9_]', '_', value[:127]) + else: + new_value = value + return new_value + def merge_schema_entry( self, old_schema_entry, @@ -282,10 +291,22 @@ def merge_schema_entry( # new 'soft' does not clobber old 'hard' if old_status == 'hard' and new_status == 'soft': + mode = self.merge_mode(old_schema_entry, + new_schema_entry, + base_path) + if mode is None: + return None + old_schema_entry['info']['mode'] = mode return old_schema_entry # new 'hard' clobbers old 'soft' if old_status == 'soft' and new_status == 'hard': + mode = self.merge_mode(old_schema_entry, + new_schema_entry, + base_path) + if mode is None: + return None + new_schema_entry['info']['mode'] = mode return new_schema_entry # Verify that it's soft->soft or hard->hard @@ -303,11 +324,18 @@ def merge_schema_entry( new_type = new_info['type'] new_mode = new_info['mode'] + full_old_name = json_full_path(base_path, old_name) + full_new_name = json_full_path(base_path, new_name) + # Defensive check, names should always be the same. if old_name != new_name: - raise Exception( - 'old_name (%s) != new_name(%s), should never happen' % - (old_name, new_name)) + if old_name.lower() != new_name.lower(): + raise Exception( + 'old_name (%s) != new_name(%s), should never happen' % + (full_old_name, full_new_name)) + else: + # preserve old name if case is different + new_info['name'] = old_info['name'] # Recursively merge in the subfields of a RECORD, allowing # NULLABLE to become REPEATED (because 'bq load' allows it). @@ -318,12 +346,13 @@ def merge_schema_entry( old_info['mode'] = 'REPEATED' self.log_error( ('Converting schema for "%s" from NULLABLE RECORD ' - 'into REPEATED RECORD') % old_name) + 'into REPEATED RECORD') % full_old_name) elif old_mode == 'REPEATED' and new_mode == 'NULLABLE': # TODO: Maybe remove this warning output. It was helpful during # development, but maybe it's just natural. self.log_error( - 'Leaving schema for "%s" as REPEATED RECORD' % old_name) + 'Leaving schema for "%s" as REPEATED RECORD' % + full_old_name) # RECORD type needs a recursive merging of sub-fields. We merge into # the 'old_schema_entry' which assumes that the 'old_schema_entry' @@ -340,31 +369,75 @@ def merge_schema_entry( ) return old_schema_entry + new_mode = self.merge_mode(old_schema_entry, + new_schema_entry, + base_path) + if new_mode is None: + return None + new_schema_entry['info']['mode'] = new_mode + + # For all other types... + if old_type != new_type: + # Check that the converted types are compatible. + candidate_type = convert_type(old_type, new_type) + if not candidate_type: + self.log_error( + f'Ignoring field with mismatched type: ' + f'old=({old_status},{full_old_name},{old_mode},{old_type});' + ' ' + f'new=({new_status},{full_new_name},{new_mode},{new_type})') + return None + + new_info['type'] = candidate_type + return new_schema_entry + + def merge_mode(self, old_schema_entry, new_schema_entry, base_path): + old_info = old_schema_entry['info'] + new_info = new_schema_entry['info'] + old_mode = old_info['mode'] + old_name = old_info['name'] + old_type = old_info['type'] + old_status = old_schema_entry['status'] + new_mode = new_info['mode'] + new_name = new_info['name'] + new_type = new_info['type'] + new_status = new_schema_entry['status'] full_old_name = json_full_path(base_path, old_name) full_new_name = json_full_path(base_path, new_name) - - # For all other types, the old_mode must be the same as the new_mode. It - # might seem reasonable to allow a NULLABLE {primitive_type} to be - # upgraded to a REPEATED {primitive_type}, but currently 'bq load' does - # not support that so we must also follow that rule. - if old_mode != new_mode: + # If the old field is a REQUIRED primitive (which could only have come + # from an existing schema), the new field can be either a + # NULLABLE(filled) or a NULLABLE(unfilled). + if old_mode == 'REQUIRED' and new_mode == 'NULLABLE': + # If the new field is filled, then retain the REQUIRED. + if new_schema_entry['filled']: + return old_mode + else: + # The new field is not filled (i.e. an empty or null field). + # If --infer_mode is active, then we allow the REQUIRED to + # revert back to NULLABLE. + if self.infer_mode: + return new_mode + else: + self.log_error( + f'Ignoring non-RECORD field with mismatched mode.' + ' cannot convert to NULLABLE because infer_schema not' + ' set: ' + f'old=({old_status},{full_old_name},{old_mode},' + f'{old_type});' + f' new=({new_status},{full_new_name},{new_mode},' + f'{new_type})' + ) + return None + elif old_mode != new_mode: self.log_error( f'Ignoring non-RECORD field with mismatched mode: ' - f'old=({old_status},{full_old_name},{old_mode},{old_type}); ' - f'new=({new_status},{full_new_name},{new_mode},{new_type})') - return None - - # Check that the converted types are compatible. - candidate_type = convert_type(old_type, new_type) - if not candidate_type: - self.log_error( - f'Ignoring field with mismatched type: ' - f'old=({old_status},{full_old_name},{old_mode},{old_type}); ' - f'new=({new_status},{full_new_name},{new_mode},{new_type})') + f'old=({old_status},{full_old_name},{old_mode},' + f'{old_type});' + f' new=({new_status},{full_new_name},{new_mode},' + f'{new_type})' + ) return None - - new_info['type'] = candidate_type - return new_schema_entry + return old_mode def get_schema_entry(self, key, value, base_path=None): """Determines the 'schema_entry' of the (key, value) pair. Calls @@ -377,6 +450,7 @@ def get_schema_entry(self, key, value, base_path=None): value_mode, value_type = self.infer_bigquery_type(value) if not value_mode or not value_type: return None + sanitized_key = self.sanitize_name(key) if value_type == 'RECORD': new_base_path = json_full_path(base_path, key) @@ -403,7 +477,7 @@ def get_schema_entry(self, key, value, base_path=None): ('info', OrderedDict([ ('fields', fields), ('mode', value_mode), - ('name', key), + ('name', sanitized_key), ('type', value_type), ])), ]) @@ -413,7 +487,7 @@ def get_schema_entry(self, key, value, base_path=None): ('filled', False), ('info', OrderedDict([ ('mode', 'NULLABLE'), - ('name', key), + ('name', sanitized_key), ('type', 'STRING'), ])), ]) @@ -423,7 +497,7 @@ def get_schema_entry(self, key, value, base_path=None): ('filled', False), ('info', OrderedDict([ ('mode', 'REPEATED'), - ('name', key), + ('name', sanitized_key), ('type', 'STRING'), ])), ]) @@ -434,7 +508,7 @@ def get_schema_entry(self, key, value, base_path=None): ('info', OrderedDict([ ('fields', OrderedDict()), ('mode', value_mode), - ('name', key), + ('name', sanitized_key), ('type', 'RECORD'), ])), ]) @@ -452,7 +526,7 @@ def get_schema_entry(self, key, value, base_path=None): ('filled', filled), ('info', OrderedDict([ ('mode', value_mode), - ('name', key), + ('name', sanitized_key), ('type', value_type), ])), ]) @@ -582,17 +656,19 @@ def flatten_schema(self, schema_map): keep_nulls=self.keep_nulls, sorted_schema=self.sorted_schema, infer_mode=self.infer_mode, - sanitize_names=self.sanitize_names, - ) + input_format=self.input_format) - def run(self, input_file=sys.stdin, output_file=sys.stdout): + def run(self, input_file=sys.stdin, + output_file=sys.stdout, schema_map=None): """Read the data records from the input_file and print out the BigQuery schema on the output_file. The error logs are printed on the sys.stderr. Args: input_file: a file-like object (default: sys.stdin) output_file: a file-like object (default: sys.stdout) + schema_map: the existing bigquery schema_map we start with """ - schema_map, error_logs = self.deduce_schema(input_file) + schema_map, error_logs = self.deduce_schema(input_file, + schema_map=schema_map) for error in error_logs: logging.info("Problem on line %s: %s", error['line'], error['msg']) @@ -700,13 +776,11 @@ def is_string_type(thetype): ] -def flatten_schema_map( - schema_map, - keep_nulls=False, - sorted_schema=True, - infer_mode=False, - sanitize_names=False, -): +def flatten_schema_map(schema_map, + keep_nulls=False, + sorted_schema=True, + infer_mode=False, + input_format='json'): """Converts the 'schema_map' into a more flatten version which is compatible with BigQuery schema. @@ -768,20 +842,26 @@ def flatten_schema_map( keep_nulls=keep_nulls, sorted_schema=sorted_schema, infer_mode=infer_mode, - sanitize_names=sanitize_names, + input_format=input_format ) elif key == 'type' and value in ['QINTEGER', 'QFLOAT', 'QBOOLEAN']: - # Convert QINTEGER -> INTEGER, similarly for QFLAT and QBOOLEAN. + # Convert QINTEGER -> INTEGER, similarly for QFLOAT and QBOOLEAN new_value = value[1:] elif key == 'mode': - if infer_mode and value == 'NULLABLE' and filled: + # 'infer_mode' to set a field as REQUIRED is supported for only + # input_format = 'csv' because the header line gives us the + # complete list of fields to be expected in the CSV file. In + # JSON data files, certain fields will often be completely + # missing instead of being set to 'null' or "". If the field is + # not even present, then it becomes incredibly difficult (not + # impossible, but more effort than I want to expend right now) + # to figure out which fields are missing so that we can mark the + # appropriate schema entries with 'filled=False'. + if (infer_mode and value == 'NULLABLE' and filled + and input_format == 'csv'): new_value = 'REQUIRED' else: new_value = value - elif key == 'name' and sanitize_names: - new_value = SchemaGenerator.FIELD_NAME_MATCHER.sub( - '_', value, - )[0:127] else: new_value = value new_info[key] = new_value @@ -789,6 +869,77 @@ def flatten_schema_map( return schema +def bq_schema_to_map(schema): + """ convert BQ JSON table schema representation to SchemaGenerator + schema_map representaton """ + if isinstance(schema, dict): + schema = schema['fields'] + return OrderedDict((f['name'].lower(), bq_schema_field_to_entry(f)) + for f in schema) + + +BQ_TYPES = frozenset( + '''STRING + BYTES + INTEGER + FLOAT + BOOLEAN + TIMESTAMP + DATE + TIME + DATETIME + RECORD'''.split()) + +BQ_TYPE_ALIASES = { + 'INT64': 'INTEGER', + 'FLOAT64': 'FLOAT', + 'BOOL': 'BOOLEAN', + 'STRUCT': 'RECORD', +} + + +def bq_type_to_entry_type(type): + if type in BQ_TYPES: + return type + if type in BQ_TYPE_ALIASES: + return BQ_TYPE_ALIASES[type] + raise TypeError(f'Unknown BQ type ""{type}"') + + +def bq_schema_field_to_entry(field): + type = bq_type_to_entry_type(field['type']) + # In some cases with nested fields within a record, bigquery does not + # populate a mode field. We will assume this is NULLABLE in this case + mode = field.get('mode', 'NULLABLE') + # maintain order of info fields + if type == 'RECORD': + info = OrderedDict([ + ('fields', bq_schema_to_map(field['fields'])), + ('mode', mode), + ('name', field['name']), + ('type', type), + ]) + else: + info = OrderedDict([ + ('mode', mode), + ('name', field['name']), + ('type', type), + ]) + return OrderedDict([ + ('status', 'hard'), + ('filled', mode != 'NULLABLE'), + ('info', info), + ]) + + +def read_existing_schema_from_file(existing_schema_path): + if existing_schema_path: + with open(existing_schema_path, 'r') as f: + existing_json_schema = json.load(f) + return bq_schema_to_map(existing_json_schema) + return None + + def json_full_path(base_path, key): """Return the dot-separated JSON full path to a particular key. e.g. 'server.config.port'. Column names in CSV files are never nested, @@ -818,7 +969,8 @@ def main(): action="store_true") parser.add_argument( '--infer_mode', - help="Determine if mode can be 'NULLABLE' or 'REQUIRED'", + help="Automatically determine if mode can be 'NULLABLE' or 'REQUIRED'" + " instead of the default 'NULLABLE'", action='store_true') parser.add_argument( '--debugging_interval', @@ -837,6 +989,12 @@ def main(): '--ignore_invalid_lines', help='Ignore lines that cannot be parsed instead of stopping', action="store_true") + parser.add_argument( + '--existing_schema_path', + help='File that contains the existing BigQuery schema for a table.' + ' This can be fetched with:' + ' `bq show --schema ::', + default=None) args = parser.parse_args() # Configure logging. @@ -852,7 +1010,9 @@ def main(): sanitize_names=args.sanitize_names, ignore_invalid_lines=args.ignore_invalid_lines, ) - generator.run() + existing_schema_map = read_existing_schema_from_file( + args.existing_schema_path) + generator.run(schema_map=existing_schema_map) if __name__ == '__main__': diff --git a/tests/data_reader.py b/tests/data_reader.py index 8a7f0ac..1a53c8b 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -32,8 +32,11 @@ class DataReader: DATA [flags] json_records ... + EXISTING_SCHEMA + existing json schema from bq api + ... ERRORS - line: msg + line_number: msg ... SCHEMA bigquery_schema @@ -44,7 +47,7 @@ class DataReader: json_records ... ERRORS - line: msg + line_number: msg ... SCHEMA bigquery_schema @@ -56,7 +59,10 @@ class DataReader: the following components: * a DATA section containing the newline-separated JSON data records + * an optional EXISTING_SCHEMA section contains the existing base + BigQuery schema to build off of * an optional ERRORS section containing the expected error messages + messages when the schema is known to schema decoder in advance * a SCHEMA section containing the expected BigQuery schema * comment lines start with a '#' character. @@ -92,17 +98,20 @@ class DataReader: data_flags = chunk['data_flags'] keep_nulls = ('keep_nulls' in data_flags) records = chunk['records'] + existing_schema = chunk['existing_schema'] schema = chunk['schema'] ... """ # Recognized tags. # TODO: Change to a hash set to speed up the lookup if many are added. - TAG_TOKENS = ['DATA', 'ERRORS', 'SCHEMA', 'END'] + TAG_TOKENS = ['DATA', 'ERRORS', 'EXISTING_SCHEMA', 'SCHEMA', 'END'] def __init__(self, testdata_file): self.testdata_file = testdata_file self.next_line = None + self.line_number = 0 + self.chunk_count = 0 def read_chunk(self): """Returns a dict with the next test chunk from the data file, @@ -111,22 +120,28 @@ def read_chunk(self): 'data_flags': [data_flags], 'data': [data lines], 'errors': {errors}, + 'existing_schema': schema_string, 'schema': schema_string } Returns None if there are no more test chunks. """ - data_flags, records = self.read_data_section() + data_flags, records, line_number = self.read_data_section() if data_flags is None: return None + existing_schema = self.read_existing_schema_section() errors = self.read_errors_section() - error_map = self.process_errors(errors) + error_map = self.process_errors(errors or []) schema = self.read_schema_section() self.read_end_marker() + self.chunk_count += 1 return { + 'chunk_count': self.chunk_count, + 'line_number': line_number, 'data_flags': data_flags, 'records': records, - 'errors': errors, + 'existing_schema': existing_schema, + 'errors': errors or [], 'error_map': error_map, 'schema': schema } @@ -138,29 +153,58 @@ def read_data_section(self): # First tag must be 'DATA [flags]' tag_line = self.read_line() + line_number = self.line_number if tag_line is None: - return (None, None) + return (None, None, line_number) (tag, data_flags) = self.parse_tag_line(tag_line) if tag != 'DATA': raise Exception( - "Unrecoginized tag line '%s', should be DATA" % tag_line) + "Unrecoginized tag line_number '%s', should be DATA" % tag_line) # Read the DATA records until the next TAG_TOKEN. records = [] while True: - line = self.read_line() - if line is None: + line_number = self.read_line() + if line_number is None: raise Exception( "Unexpected EOF, should be ERRORS or SCHEMA tag") - (tag, _) = self.parse_tag_line(line) + (tag, _) = self.parse_tag_line(line_number) if tag in self.TAG_TOKENS: if tag == 'DATA': raise Exception("Unexpected DATA tag") - self.push_back(line) + self.push_back(line_number) break - records.append(line) + records.append(line_number) - return (data_flags, records) + return (data_flags, records, line_number) + + def read_existing_schema_section(self): + """Returns the JSON string of the existing_schema section. + """ + + # The next tag must be 'EXISTING_SCHEMA' + tag_line = self.read_line() + if tag_line is None: + raise Exception("Unexpected EOF, should be EXISTING_SCHEMA tag") + (tag, _) = self.parse_tag_line(tag_line) + if tag == 'EXISTING_SCHEMA': + # Read the EXISTING_SCHEMA records until the next TAG_TOKEN + schema_lines = [] + while True: + line_number = self.read_line() + if line_number is None: + break + (tag, _) = self.parse_tag_line(line_number) + if tag in self.TAG_TOKENS: + if tag in ('DATA', 'EXISTING_SCHEMA'): + raise Exception(f"Unexpected {tag} tag") + self.push_back(line_number) + break + schema_lines.append(line_number) + return ''.join(schema_lines) + else: + self.push_back(tag_line) + return [] def read_errors_section(self): """Return a dictionary of errors which are expected from the parsing of @@ -206,21 +250,22 @@ def read_schema_section(self): (tag, _) = self.parse_tag_line(tag_line) if tag != 'SCHEMA': raise Exception( - "Unrecoginized tag line '%s', should be SCHEMA" % tag_line) + "Unrecoginized tag line_number '%s', should be SCHEMA" + % tag_line) # Read the SCHEMA records until the next TAG_TOKEN schema_lines = [] while True: - line = self.read_line() - if line is None: + line_number = self.read_line() + if line_number is None: break - (tag, _) = self.parse_tag_line(line) + (tag, _) = self.parse_tag_line(line_number) if tag in self.TAG_TOKENS: - if tag == 'SCHEMA': - raise Exception("Unexpected SCHEMA tag") - self.push_back(line) + if tag in ('DATA', 'ERRORS', 'EXISTING_SCHEMA', 'SCHEMA'): + raise Exception(f"Unexpected {tag} tag") + self.push_back(line_number) break - schema_lines.append(line) + schema_lines.append(line_number) return ''.join(schema_lines) @@ -232,21 +277,21 @@ def read_end_marker(self): (tag, _) = self.parse_tag_line(tag_line) if tag != 'END': raise Exception( - "Unrecoginized tag line '%s', should be END" % tag_line) + "Unrecoginized tag line_number '%s', should be END" % tag_line) - def parse_tag_line(self, line): - """Parses a potential tag line of the form 'TAG [flags...]' where + def parse_tag_line(self, line_number): + """Parses a potential tag line_number of the form 'TAG [flags...]' where 'flags' is a list of strings separated by spaces. Returns the tuple of (tag, [flags]). """ - tokens = line.split() + tokens = line_number.split() if tokens: return (tokens[0], tokens[1:]) else: return (None, []) def read_line(self): - """Return the next line, while supporting a one-line push_back(). + """Return the next line_number, while supporting a one-line_number push_back(). Comment lines begin with a '#' character and are skipped. Blank lines are skipped. Prepending and trailing whitespaces are stripped. @@ -259,6 +304,7 @@ def read_line(self): while True: line = self.testdata_file.readline() + self.line_number += 1 # EOF if line == '': return None @@ -289,20 +335,21 @@ def process_errors(self, error_records): (line_number, message) = self.parse_error_line(error) error_entry = error_map.get(line_number) if error_entry is None: - error_entry = {'line': line_number, 'msgs': []} + error_entry = {'line_number': line_number, 'msgs': []} error_map[line_number] = error_entry messages = error_entry['msgs'] messages.append(message) return error_map def parse_error_line(self, line): - """Parse the error line of the form: - line: msg + """Parse the error line_number of the form: + line_number: msg """ pos = line.find(':') if pos < 0: raise Exception( - "Error line must be of the form 'line: msg': '%s'" % line) + "Error line_number must be of the form 'line_number: msg': '%s'" + % line) line_number = int(line[0:pos]) message = line[pos + 1:].strip() return (line_number, message) @@ -322,6 +369,7 @@ def main(): break print("DATA_FLAGS: %s" % chunk['data_flags']) print("DATA: %s" % chunk['records']) + print("EXISTING_SCHEMA: %s" % chunk['existing_schema']) print("ERRORS: %s" % chunk['errors']) print("ERROR_MAP: %s" % chunk['error_map']) print("SCHEMA: %s" % chunk['schema']) diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index bb68ee6..a949d27 100644 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -19,9 +19,11 @@ import json from io import StringIO from collections import OrderedDict +from bigquery_schema_generator.generate_schema import BQ_TYPES from bigquery_schema_generator.generate_schema import SchemaGenerator -from bigquery_schema_generator.generate_schema import is_string_type +from bigquery_schema_generator.generate_schema import bq_schema_to_map from bigquery_schema_generator.generate_schema import convert_type +from bigquery_schema_generator.generate_schema import is_string_type from bigquery_schema_generator.generate_schema import json_full_path from .data_reader import DataReader @@ -425,16 +427,10 @@ def test_json_full_path(self): self.assertEqual('server.port', json_full_path('server', 'port')) -class TestFromDataFile(unittest.TestCase): - """Read the test case data from TESTDATA_FILE and verify that the expected - schema matches the one produced by SchemaGenerator.deduce_schema(). Multiple - test cases are stored in TESTDATA_FILE. The data_reader.py module knows how - to parse that file. - """ - +class TestDataChunksFromFile(unittest.TestCase): TESTDATA_FILE = 'testdata.txt' - def test(self): + def test_all_data_chunks(self): # Find the TESTDATA_FILE in the same directory as this script file. dir_path = os.path.dirname(os.path.realpath(__file__)) testdata_path = os.path.join(dir_path, self.TESTDATA_FILE) @@ -442,15 +438,20 @@ def test(self): # Read each test case (data chunk) and verify the expected schema. with open(testdata_path) as testdatafile: data_reader = DataReader(testdatafile) - chunk_count = 0 while True: chunk = data_reader.read_chunk() if chunk is None: break - chunk_count += 1 - self.verify_data_chunk(chunk_count, chunk) - - def verify_data_chunk(self, chunk_count, chunk): + try: + self.verify_data_chunk(chunk) + except AssertionError as e: + print("Error when processing chunk starting on line_number:" + f" {chunk['line_number']}\n") + raise e + + def verify_data_chunk(self, chunk): + chunk_count = chunk['chunk_count'] + line_number = chunk['line_number'] data_flags = chunk['data_flags'] input_format = 'csv' if ('csv' in data_flags) else 'json' keep_nulls = ('keep_nulls' in data_flags) @@ -462,8 +463,10 @@ def verify_data_chunk(self, chunk_count, chunk): expected_errors = chunk['errors'] expected_error_map = chunk['error_map'] expected_schema = chunk['schema'] + existing_schema = chunk['existing_schema'] - print("Test chunk %s: First record: %s" % (chunk_count, records[0])) + print("Test chunk %s, line_number %s: First record: %s" % + (chunk_count, line_number, records[0])) # Generate schema. generator = SchemaGenerator( input_format=input_format, @@ -471,9 +474,12 @@ def verify_data_chunk(self, chunk_count, chunk): keep_nulls=keep_nulls, quoted_values_are_strings=quoted_values_are_strings, sanitize_names=sanitize_names, - ignore_invalid_lines=ignore_invalid_lines, - ) - schema_map, error_logs = generator.deduce_schema(records) + ignore_invalid_lines=ignore_invalid_lines) + existing_schema_map = None + if existing_schema: + existing_schema_map = bq_schema_to_map(json.loads(existing_schema)) + schema_map, error_logs = generator.deduce_schema( + records, schema_map=existing_schema_map) schema = generator.flatten_schema(schema_map) # Check the schema, preserving order @@ -481,29 +487,96 @@ def verify_data_chunk(self, chunk_count, chunk): self.assertEqual(expected, schema) # Check the error messages - self.assertEqual(len(expected_errors), len(error_logs)) + try: + self.assertEqual(len(expected_errors), len(error_logs)) + except AssertionError as e: + print(f"Number of errors mismatched, expected:" + f" {len(expected_errors)} got: {len(error_logs)}") + print(f"Errors: {error_logs}") + print(f"Expected Errors: {expected_errors}") + raise e self.assert_error_messages(expected_error_map, error_logs) def assert_error_messages(self, expected_error_map, error_logs): # Convert the list of errors into a map error_map = {} for error in error_logs: - line_number = error['line'] + line_number = error['line_number'] messages = error_map.get(line_number) if messages is None: messages = [] error_map[line_number] = messages messages.append(error['msg']) - # Check that each entry in 'error_logs' is expected. Currently checks - # only that the number of errors matches on a per line basis. - # TODO: Look deeper and verify that the error message strings match as - # well. for line_number, messages in sorted(error_map.items()): expected_entry = expected_error_map.get(line_number) self.assertIsNotNone(expected_entry) expected_messages = expected_entry['msgs'] - self.assertEqual(len(expected_messages), len(messages)) + self.assertEqual(expected_messages, messages) + + +class TestBigQuerySchemaToSchemaMap(unittest.TestCase): + def test_bq_schema_to_map_round_trip_permutations(self): + ''' This checks that each possible type of consititued schema, when + generated, then converted to a schema_map, then back to the schema, + they are equal. + + This function is really ugly but has good coverage. This was + migrated from pytest fixtures which were a bit cleaner but we + ideally did not want to add a new dependency / library that is used + for testing. + ''' + valid_types = BQ_TYPES + valid_modes = ['NULLABLE', 'REQUIRED', 'REPEATED'] + valid_input_formats_and_modes = [('csv', True), + ('csv', False), + ('json', False)] + valid_keep_null_params = [True, False] + valid_quoted_values_are_strings = [True, False] + for valid_type in valid_types: + for valid_mode in valid_modes: + bq_entry = self.make_bq_schema_entry(valid_mode, valid_type) + schema = [bq_entry] + schema_map = bq_schema_to_map(schema) + for input_format_and_mode in valid_input_formats_and_modes: + for keep_null_param in valid_keep_null_params: + for quotes_are_strings in\ + valid_quoted_values_are_strings: + generator = SchemaGenerator( + input_format=input_format_and_mode[0], + infer_mode=input_format_and_mode[1], + keep_nulls=keep_null_param, + quoted_values_are_strings=quotes_are_strings) + flattened = generator.flatten_schema(schema_map) + try: + self.assertEqual(schema, flattened) + except AssertionError as e: + print("test_bq_schema_to_map_permutations" + " failed for case where: " + f"bq_entry={bq_entry}\n" + "schema_generator created with values:" + f"{input_format_and_mode[0]}" + f"-{input_format_and_mode[1]}" + f"-{keep_null_param}" + f"-{quotes_are_strings}") + raise e + + def make_bq_schema_entry(self, mode, type): + ''' Creates a bigquery schema entry + ''' + if type == 'RECORD': + return OrderedDict([ + ('fields', [self.make_bq_schema_entry('NULLABLE', 'STRING')]), + ('mode', mode), + ('name', 'a'), + ('type', type), + ]) + else: + return OrderedDict([ + ('mode', mode), + ('name', 'a'), + ('type', type), + ]) if __name__ == '__main__': diff --git a/tests/testdata.txt b/tests/testdata.txt index 086128b..9c69dd2 100644 --- a/tests/testdata.txt +++ b/tests/testdata.txt @@ -36,6 +36,37 @@ SCHEMA ] END +# If 'keep_nulls' flag is given, then the input data with null values produces +# schema which seems to be the best match. This also checks capitalization +DATA keep_nulls +{ "s": null, "aBc": [], "m": {} } +SCHEMA +[ + { + "mode": "REPEATED", + "name": "aBc", + "type": "STRING" + }, + { + "fields": [ + { + "mode": "NULLABLE", + "name": "__unknown__", + "type": "STRING" + } + ], + "mode": "NULLABLE", + "name": "m", + "type": "RECORD" + }, + { + "mode": "NULLABLE", + "name": "s", + "type": "STRING" + } +] +END + # Fields with primitive types. BYTES is not supported. DATA { "s": "string", "b": true, "d": "2017-01-01", "i": 1, "t": "17:10:00", "ts": "2017-05-22T17:10:00-07:00", "x": 3.1 } @@ -86,8 +117,8 @@ DATA ignore_invalid_lines { "x": 3 } this is not a JSON object ERRORS -1: Record should be a JSON object but was a -3: Record cannot be parsed: Exception: Expecting value: line 1 column 1 (char 0) +1: Record should be a JSON Object but was a +3: Record could not be parsed: Exception: Expecting value: line 1 column 1 (char 0) SCHEMA [ { @@ -973,3 +1004,856 @@ SCHEMA } ] END + +# Empty JSON input file, existing schema no modification done, purely testing round trip schema generation +DATA keep_nulls +{} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END + +# JSON existing schema file valid record instead of string, no modification done +DATA keep_nulls +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END + +# JSON existing schema changes in casing when generating schema should not result in a change from what existed already +DATA keep_nulls +{"eventname": "test event1", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} +{"EVENTNAME": "test event2", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} +EXISTING_SCHEMA +[ + { + "name": "eventName", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "eventName", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END + +# JSON file field added, schema expanded +DATA keep_nulls +{"event_name": "test event", "extra_field": "testing", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "extra_field", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END + +# JSON existing schema REQUIRED to NULLABLE no infer_mode set, errors with cannot change mode +DATA keep_nulls +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": 1.3} +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": null} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "REQUIRED" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +ERRORS +2: Ignoring non-RECORD field with mismatched mode. cannot convert to NULLABLE because infer_schema not set: old=(hard,test_float,REQUIRED,FLOAT); new=(soft,test_float,NULLABLE,STRING) +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + } +] +END + +# JSON existing schema REQUIRED to NULLABLE infer_mode set so this should relax the field mode +DATA keep_nulls infer_mode +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": 1.3} +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": null} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "REQUIRED" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END + +# Testing standard SQL types to legacy types +DATA keep_nulls infer_mode +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": 1.3} +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": null} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "STRUCT", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INT64" + }, + { + "name": "dest_port", + "type": "INT64" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT64", + "mode": "REQUIRED" + }, + { + "name": "test_boolean", + "type": "BOOL", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END From cdcdb907dd5fe2ba97968a359f7a9189a565f8f3 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Fri, 4 Dec 2020 11:27:03 -0800 Subject: [PATCH 2/8] Addressing final comments from bxparks --- bigquery_schema_generator/generate_schema.py | 29 +++++++++++- tests/data_reader.py | 46 ++++++++++---------- tests/test_generate_schema.py | 6 +++ tests/testdata.txt | 33 ++++++++++++++ 4 files changed, 90 insertions(+), 24 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 0d0f3f1..3b7803d 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -392,6 +392,25 @@ def merge_schema_entry( return new_schema_entry def merge_mode(self, old_schema_entry, new_schema_entry, base_path): + ''' + This method determines if the 'mode' of a schema entry can + transition from REQUIRED -> NULLABLE. A REQUIRED mode can only have + come from an existing schema (though the --existing_schema_path + flag), because REQUIRED is created only in the flatten_schema() + method. Therefore, a NULLABLE->REQUIRED transition cannot occur. + + We have the following sub cases for the REQUIRED -> NULLABLE + transition: + + 1) If the target is filled=True, then we will retain the REQUIRED + mode. + 2) If the target is filled=False, then we control the outcome by + overloading the --infer_mode flag: + a) If --infer_mode is given, then we allow the + REQUIRED -> NULLABLE transition. + b) If --infer_mode is not given, then we log an error and ignore + this field from the schema. + ''' old_info = old_schema_entry['info'] new_info = new_schema_entry['info'] old_mode = old_info['mode'] @@ -402,8 +421,10 @@ def merge_mode(self, old_schema_entry, new_schema_entry, base_path): new_name = new_info['name'] new_type = new_info['type'] new_status = new_schema_entry['status'] + full_old_name = json_full_path(base_path, old_name) full_new_name = json_full_path(base_path, new_name) + # If the old field is a REQUIRED primitive (which could only have come # from an existing schema), the new field can be either a # NULLABLE(filled) or a NULLABLE(unfilled). @@ -671,7 +692,8 @@ def run(self, input_file=sys.stdin, schema_map=schema_map) for error in error_logs: - logging.info("Problem on line %s: %s", error['line'], error['msg']) + logging.info("Problem on line %s: %s", error['line_number'], + error['msg']) if self.debugging_map: json.dump(schema_map, output_file, indent=2) @@ -857,6 +879,11 @@ def flatten_schema_map(schema_map, # impossible, but more effort than I want to expend right now) # to figure out which fields are missing so that we can mark the # appropriate schema entries with 'filled=False'. + # + # The --infer_mode option is activated only for + # input_format == 'csv' in this function, which allows us to + # overload the --infer_mode flag to mean that a REQUIRED mode of + # an existing schema can transition to a NULLABLE mode. if (infer_mode and value == 'NULLABLE' and filled and input_format == 'csv'): new_value = 'REQUIRED' diff --git a/tests/data_reader.py b/tests/data_reader.py index 1a53c8b..4b43274 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -159,22 +159,22 @@ def read_data_section(self): (tag, data_flags) = self.parse_tag_line(tag_line) if tag != 'DATA': raise Exception( - "Unrecoginized tag line_number '%s', should be DATA" % tag_line) + "Unrecoginized tag line '%s', should be DATA" % tag_line) # Read the DATA records until the next TAG_TOKEN. records = [] while True: - line_number = self.read_line() - if line_number is None: + line = self.read_line() + if line is None: raise Exception( "Unexpected EOF, should be ERRORS or SCHEMA tag") - (tag, _) = self.parse_tag_line(line_number) + (tag, _) = self.parse_tag_line(line) if tag in self.TAG_TOKENS: if tag == 'DATA': raise Exception("Unexpected DATA tag") - self.push_back(line_number) + self.push_back(line) break - records.append(line_number) + records.append(line) return (data_flags, records, line_number) @@ -191,16 +191,16 @@ def read_existing_schema_section(self): # Read the EXISTING_SCHEMA records until the next TAG_TOKEN schema_lines = [] while True: - line_number = self.read_line() - if line_number is None: + line = self.read_line() + if line is None: break - (tag, _) = self.parse_tag_line(line_number) + (tag, _) = self.parse_tag_line(line) if tag in self.TAG_TOKENS: if tag in ('DATA', 'EXISTING_SCHEMA'): raise Exception(f"Unexpected {tag} tag") - self.push_back(line_number) + self.push_back(line) break - schema_lines.append(line_number) + schema_lines.append(line) return ''.join(schema_lines) else: self.push_back(tag_line) @@ -256,16 +256,16 @@ def read_schema_section(self): # Read the SCHEMA records until the next TAG_TOKEN schema_lines = [] while True: - line_number = self.read_line() - if line_number is None: + line = self.read_line() + if line is None: break - (tag, _) = self.parse_tag_line(line_number) + (tag, _) = self.parse_tag_line(line) if tag in self.TAG_TOKENS: if tag in ('DATA', 'ERRORS', 'EXISTING_SCHEMA', 'SCHEMA'): raise Exception(f"Unexpected {tag} tag") - self.push_back(line_number) + self.push_back(line) break - schema_lines.append(line_number) + schema_lines.append(line) return ''.join(schema_lines) @@ -277,21 +277,21 @@ def read_end_marker(self): (tag, _) = self.parse_tag_line(tag_line) if tag != 'END': raise Exception( - "Unrecoginized tag line_number '%s', should be END" % tag_line) + "Unrecoginized tag line '%s', should be END" % tag_line) - def parse_tag_line(self, line_number): - """Parses a potential tag line_number of the form 'TAG [flags...]' where + def parse_tag_line(self, line): + """Parses a potential tag line of the form 'TAG [flags...]' where 'flags' is a list of strings separated by spaces. Returns the tuple of (tag, [flags]). """ - tokens = line_number.split() + tokens = line.split() if tokens: return (tokens[0], tokens[1:]) else: return (None, []) def read_line(self): - """Return the next line_number, while supporting a one-line_number push_back(). + """Return the next line, while supporting a one-line push_back(). Comment lines begin with a '#' character and are skipped. Blank lines are skipped. Prepending and trailing whitespaces are stripped. @@ -342,13 +342,13 @@ def process_errors(self, error_records): return error_map def parse_error_line(self, line): - """Parse the error line_number of the form: + """Parse the error line of the form: line_number: msg """ pos = line.find(':') if pos < 0: raise Exception( - "Error line_number must be of the form 'line_number: msg': '%s'" + "Error line must be of the form 'line_number: msg': '%s'" % line) line_number = int(line[0:pos]) message = line[pos + 1:].strip() diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index a949d27..a279f24 100644 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -428,6 +428,12 @@ def test_json_full_path(self): class TestDataChunksFromFile(unittest.TestCase): + """Read the test case data from TESTDATA_FILE and verify that the expected + schema matches the one produced by SchemaGenerator.deduce_schema(). Multiple + test cases are stored in TESTDATA_FILE. The data_reader.py module knows how + to parse that file. + """ + TESTDATA_FILE = 'testdata.txt' def test_all_data_chunks(self): diff --git a/tests/testdata.txt b/tests/testdata.txt index 9c69dd2..1347b46 100644 --- a/tests/testdata.txt +++ b/tests/testdata.txt @@ -1857,3 +1857,36 @@ SCHEMA } ] END + +# Verify that the --infer_mode flag for JSON input does NOT cause the schema mode to +# become REQUIRED, even though in the following, it will mistakenly think that every field +# defined for all records, because it cannot (easily) detect the absence of a field in a record. +DATA infer_mode +{ "i" : 1, "f": 1.0, "r": {"a": 1} } +{ "f": 1.0 } +SCHEMA +[ + { + "mode": "NULLABLE", + "name": "f", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "i", + "type": "INTEGER" + }, + { + "fields": [ + { + "mode": "NULLABLE", + "name": "a", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "r", + "type": "RECORD" + } +] +END \ No newline at end of file From 9fe11361f49f6446f5aeb766ad79c566401f663d Mon Sep 17 00:00:00 2001 From: Brian Park Date: Sat, 5 Dec 2020 10:01:28 -0800 Subject: [PATCH 3/8] generate_schema.py: Reformat comments and some code for consistency --- bigquery_schema_generator/generate_schema.py | 120 ++++++++++--------- tests/test_generate_schema.py | 29 ++--- 2 files changed, 80 insertions(+), 69 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 3b7803d..a3ed764 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -121,13 +121,11 @@ def __init__( def log_error(self, msg): self.error_logs.append({'line_number': self.line_number, 'msg': msg}) - # TODO: BigQuery is case-insensitive with regards to the 'name' of the - # field. Verify that the 'name' is unique regardless of the case. def deduce_schema(self, file, *, schema_map=None): - """Loop through each newlined-delimited line of 'file' and - deduce the BigQuery schema. The schema is returned as a recursive map - that contains both the database schema and some additional metadata - about each entry. It has the following form: + """Loop through each newlined-delimited line of 'file' and deduce the + BigQuery schema. The schema is returned as a recursive map that contains + both the database schema and some additional metadata about each entry. + It has the following form: schema_map := { key: schema_entry @@ -242,11 +240,11 @@ def deduce_schema_for_line(self, json_object, schema_map, base_path=None): ) def sanitize_name(self, value): - ''' Sanitizes a column name within the schema. + """Sanitizes a column name within the schema. - We explicitly choose to not perform the lowercasing here as this - cause us to lose case sensitivity when generating the final schema - ''' + We explicitly choose to not perform the lowercasing here as this + cause us to lose case sensitivity when generating the final schema + """ if self.sanitize_names: new_value = re.sub('[^a-zA-Z0-9_]', '_', value[:127]) else: @@ -385,32 +383,32 @@ def merge_schema_entry( f'Ignoring field with mismatched type: ' f'old=({old_status},{full_old_name},{old_mode},{old_type});' ' ' - f'new=({new_status},{full_new_name},{new_mode},{new_type})') + f'new=({new_status},{full_new_name},{new_mode},{new_type})' + ) return None new_info['type'] = candidate_type return new_schema_entry def merge_mode(self, old_schema_entry, new_schema_entry, base_path): - ''' - This method determines if the 'mode' of a schema entry can - transition from REQUIRED -> NULLABLE. A REQUIRED mode can only have - come from an existing schema (though the --existing_schema_path - flag), because REQUIRED is created only in the flatten_schema() - method. Therefore, a NULLABLE->REQUIRED transition cannot occur. - - We have the following sub cases for the REQUIRED -> NULLABLE - transition: - - 1) If the target is filled=True, then we will retain the REQUIRED - mode. - 2) If the target is filled=False, then we control the outcome by - overloading the --infer_mode flag: - a) If --infer_mode is given, then we allow the - REQUIRED -> NULLABLE transition. - b) If --infer_mode is not given, then we log an error and ignore - this field from the schema. - ''' + """This method determines if the 'mode' of a schema entry can + transition from REQUIRED -> NULLABLE. A REQUIRED mode can only have + come from an existing schema (though the --existing_schema_path + flag), because REQUIRED is created only in the flatten_schema() + method. Therefore, a NULLABLE->REQUIRED transition cannot occur. + + We have the following sub cases for the REQUIRED -> NULLABLE + transition: + + 1) If the target is filled=True, then we will retain the REQUIRED + mode. + 2) If the target is filled=False, then we control the outcome by + overloading the --infer_mode flag: + a) If --infer_mode is given, then we allow the + REQUIRED -> NULLABLE transition. + b) If --infer_mode is not given, then we log an error and ignore + this field from the schema. + """ old_info = old_schema_entry['info'] new_info = new_schema_entry['info'] old_mode = old_info['mode'] @@ -473,6 +471,7 @@ def get_schema_entry(self, key, value, base_path=None): return None sanitized_key = self.sanitize_name(key) + # yapf: disable if value_type == 'RECORD': new_base_path = json_full_path(base_path, key) # recursively figure out the RECORD @@ -491,7 +490,6 @@ def get_schema_entry(self, key, value, base_path=None): base_path=new_base_path, ) - # yapf: disable schema_entry = OrderedDict([ ('status', 'hard'), ('filled', True), @@ -677,10 +675,15 @@ def flatten_schema(self, schema_map): keep_nulls=self.keep_nulls, sorted_schema=self.sorted_schema, infer_mode=self.infer_mode, - input_format=self.input_format) + input_format=self.input_format, + ) - def run(self, input_file=sys.stdin, - output_file=sys.stdout, schema_map=None): + def run( + self, + input_file=sys.stdin, + output_file=sys.stdout, + schema_map=None, + ): """Read the data records from the input_file and print out the BigQuery schema on the output_file. The error logs are printed on the sys.stderr. Args: @@ -688,8 +691,9 @@ def run(self, input_file=sys.stdin, output_file: a file-like object (default: sys.stdout) schema_map: the existing bigquery schema_map we start with """ - schema_map, error_logs = self.deduce_schema(input_file, - schema_map=schema_map) + schema_map, error_logs = self.deduce_schema( + input_file, schema_map=schema_map + ) for error in error_logs: logging.info("Problem on line %s: %s", error['line_number'], @@ -790,19 +794,24 @@ def convert_type(atype, btype): return None +STRING_TYPES = frozenset([ + 'STRING', 'TIMESTAMP', 'DATE', 'TIME', 'QINTEGER', 'QFLOAT', 'QBOOLEAN' +]) + + def is_string_type(thetype): """Returns true if the type is one of: STRING, TIMESTAMP, DATE, or TIME.""" - return thetype in [ - 'STRING', 'TIMESTAMP', 'DATE', 'TIME', 'QINTEGER', 'QFLOAT', 'QBOOLEAN' - ] + return thetype in STRING_TYPES -def flatten_schema_map(schema_map, - keep_nulls=False, - sorted_schema=True, - infer_mode=False, - input_format='json'): +def flatten_schema_map( + schema_map, + keep_nulls=False, + sorted_schema=True, + infer_mode=False, + input_format='json', +): """Converts the 'schema_map' into a more flatten version which is compatible with BigQuery schema. @@ -905,17 +914,18 @@ def bq_schema_to_map(schema): for f in schema) -BQ_TYPES = frozenset( - '''STRING - BYTES - INTEGER - FLOAT - BOOLEAN - TIMESTAMP - DATE - TIME - DATETIME - RECORD'''.split()) +BQ_TYPES = frozenset([ + 'STRING', + 'BYTES', + 'INTEGER', + 'FLOAT', + 'BOOLEAN', + 'TIMESTAMP', + 'DATE', + 'TIME', + 'DATETIME', + 'RECORD', +]) BQ_TYPE_ALIASES = { 'INT64': 'INTEGER', diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index a279f24..9d9a51a 100644 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -523,20 +523,21 @@ def assert_error_messages(self, expected_error_map, error_logs): class TestBigQuerySchemaToSchemaMap(unittest.TestCase): def test_bq_schema_to_map_round_trip_permutations(self): - ''' This checks that each possible type of consititued schema, when - generated, then converted to a schema_map, then back to the schema, - they are equal. - - This function is really ugly but has good coverage. This was - migrated from pytest fixtures which were a bit cleaner but we - ideally did not want to add a new dependency / library that is used - for testing. - ''' + """This checks that each possible type of consititued schema, when + generated, then converted to a schema_map, then back to the schema, they + are equal. + + This function is really ugly but has good coverage. This was migrated + from pytest fixtures which were a bit cleaner but we ideally did not + want to add a new dependency / library that is used for testing. + """ valid_types = BQ_TYPES valid_modes = ['NULLABLE', 'REQUIRED', 'REPEATED'] - valid_input_formats_and_modes = [('csv', True), - ('csv', False), - ('json', False)] + valid_input_formats_and_modes = [ + ('csv', True), + ('csv', False), + ('json', False), + ] valid_keep_null_params = [True, False] valid_quoted_values_are_strings = [True, False] for valid_type in valid_types: @@ -568,8 +569,8 @@ def test_bq_schema_to_map_round_trip_permutations(self): raise e def make_bq_schema_entry(self, mode, type): - ''' Creates a bigquery schema entry - ''' + """Creates a bigquery schema entry + """ if type == 'RECORD': return OrderedDict([ ('fields', [self.make_bq_schema_entry('NULLABLE', 'STRING')]), From e564f7b92ce2a6ec0f84c7547d94658a9ca87a97 Mon Sep 17 00:00:00 2001 From: Brian Park Date: Sat, 5 Dec 2020 10:23:24 -0800 Subject: [PATCH 4/8] Convert all remaining '%' string interpolations to f-strings for consistency and readability --- bigquery_schema_generator/generate_schema.py | 57 +++++++++++--------- tests/data_reader.py | 27 +++++----- tests/test_generate_schema.py | 6 ++- 3 files changed, 51 insertions(+), 39 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index a3ed764..089e272 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -175,7 +175,7 @@ def deduce_schema(self, file, *, schema_map=None): elif self.input_format == 'json' or self.input_format is None: reader = json_reader(file) else: - raise Exception("Unknown input_format '%s'" % self.input_format) + raise Exception(f"Unknown input_format '{self.input_format}'") if schema_map is None: schema_map = OrderedDict() @@ -186,7 +186,7 @@ def deduce_schema(self, file, *, schema_map=None): # Print a progress message periodically. self.line_number += 1 if self.line_number % self.debugging_interval == 0: - logging.info("Processing line %s", self.line_number) + logging.info(f'Processing line {self.line_number}') # Deduce the schema from this given data record. if isinstance(json_object, dict): @@ -196,18 +196,19 @@ def deduce_schema(self, file, *, schema_map=None): ) elif isinstance(json_object, Exception): self.log_error( - f'Record could not be parsed: Exception: {json_object}') + f'Record could not be parsed: Exception: {json_object}' + ) if not self.ignore_invalid_lines: raise json_object else: self.log_error( - 'Record should be a JSON Object but was a ' - f'{type(json_object)}' + 'Record should be a JSON Object but was a' + f' {type(json_object)}' ) if not self.ignore_invalid_lines: raise Exception('Record must be a JSON Object') finally: - logging.info("Processed %s lines", self.line_number) + logging.info(f'Processed {self.line_number} lines') return schema_map, self.error_logs @@ -310,8 +311,9 @@ def merge_schema_entry( # Verify that it's soft->soft or hard->hard if old_status != new_status: raise Exception( - ('Unexpected schema_entry type, this should never happen: ' - 'old (%s); new (%s)') % (old_status, new_status)) + f'Unexpected schema_entry type, this should never happen: ' + f'old ({old_status}); new ({new_status})' + ) old_info = old_schema_entry['info'] old_name = old_info['name'] @@ -329,8 +331,9 @@ def merge_schema_entry( if old_name != new_name: if old_name.lower() != new_name.lower(): raise Exception( - 'old_name (%s) != new_name(%s), should never happen' % - (full_old_name, full_new_name)) + 'Unexpected difference in name, should never happen:' + f' old_name ({full_old_name}) != new_name ({full_new_name})' + ) else: # preserve old name if case is different new_info['name'] = old_info['name'] @@ -343,14 +346,15 @@ def merge_schema_entry( if old_mode == 'NULLABLE' and new_mode == 'REPEATED': old_info['mode'] = 'REPEATED' self.log_error( - ('Converting schema for "%s" from NULLABLE RECORD ' - 'into REPEATED RECORD') % full_old_name) + f'Converting schema for "{full_old_name}" from ' + 'NULLABLE RECORD into REPEATED RECORD' + ) elif old_mode == 'REPEATED' and new_mode == 'NULLABLE': # TODO: Maybe remove this warning output. It was helpful during # development, but maybe it's just natural. self.log_error( - 'Leaving schema for "%s" as REPEATED RECORD' % - full_old_name) + f'Leaving schema for "{full_old_name}" as REPEATED RECORD' + ) # RECORD type needs a recursive merging of sub-fields. We merge into # the 'old_schema_entry' which assumes that the 'old_schema_entry' @@ -382,8 +386,7 @@ def merge_schema_entry( self.log_error( f'Ignoring field with mismatched type: ' f'old=({old_status},{full_old_name},{old_mode},{old_type});' - ' ' - f'new=({new_status},{full_new_name},{new_mode},{new_type})' + f' new=({new_status},{full_new_name},{new_mode},{new_type})' ) return None @@ -440,8 +443,8 @@ def merge_mode(self, old_schema_entry, new_schema_entry, base_path): self.log_error( f'Ignoring non-RECORD field with mismatched mode.' ' cannot convert to NULLABLE because infer_schema not' - ' set: ' - f'old=({old_status},{full_old_name},{old_mode},' + ' set:' + f' old=({old_status},{full_old_name},{old_mode},' f'{old_type});' f' new=({new_status},{full_new_name},{new_mode},' f'{new_type})' @@ -566,15 +569,16 @@ def infer_bigquery_type(self, node_value): array_type = self.infer_array_type(node_value) if not array_type: self.log_error( - "All array elements must be the same compatible type: %s" % - node_value) + 'All array elements must be the same compatible type:' + f' {node_value}' + ) return (None, None) # Disallow array of special types (with '__' not supported). # EXCEPTION: allow (REPEATED __empty_record) ([{}]) because it is # allowed by 'bq load'. if '__' in array_type and array_type != '__empty_record__': - self.log_error('Unsupported array element type: %s' % array_type) + self.log_error(f'Unsupported array element type: {array_type}') return (None, None) return ('REPEATED', array_type) @@ -640,7 +644,8 @@ def infer_value_type(self, value): return '__empty_array__' else: raise Exception( - 'Unsupported node type: %s (should not happen)' % type(value)) + f'Unsupported node type: {type(value)} (should not happen)' + ) def infer_array_type(self, elements): """Return the type of all the array elements, accounting for the same @@ -696,8 +701,9 @@ def run( ) for error in error_logs: - logging.info("Problem on line %s: %s", error['line_number'], - error['msg']) + logging.info( + f"Problem on line {error['line_number']}: {error['msg']}" + ) if self.debugging_map: json.dump(schema_map, output_file, indent=2) @@ -828,7 +834,8 @@ def flatten_schema_map( """ if not isinstance(schema_map, dict): raise Exception( - "Unexpected type '%s' for schema_map" % type(schema_map)) + f"Unexpected type '{type(schema_map)}' for schema_map" + ) # Build the BigQuery schema from the internal 'schema_map'. schema = [] diff --git a/tests/data_reader.py b/tests/data_reader.py index 4b43274..0ab61fc 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -159,7 +159,8 @@ def read_data_section(self): (tag, data_flags) = self.parse_tag_line(tag_line) if tag != 'DATA': raise Exception( - "Unrecoginized tag line '%s', should be DATA" % tag_line) + f"Unrecoginized tag line '{tag_line}', should be DATA" + ) # Read the DATA records until the next TAG_TOKEN. records = [] @@ -250,8 +251,8 @@ def read_schema_section(self): (tag, _) = self.parse_tag_line(tag_line) if tag != 'SCHEMA': raise Exception( - "Unrecoginized tag line_number '%s', should be SCHEMA" - % tag_line) + f"Unrecoginized tag line_number '{tag_line}', should be SCHEMA" + ) # Read the SCHEMA records until the next TAG_TOKEN schema_lines = [] @@ -277,7 +278,8 @@ def read_end_marker(self): (tag, _) = self.parse_tag_line(tag_line) if tag != 'END': raise Exception( - "Unrecoginized tag line '%s', should be END" % tag_line) + f"Unrecoginized tag line '{tag_line}', should be END" + ) def parse_tag_line(self, line): """Parses a potential tag line of the form 'TAG [flags...]' where @@ -348,8 +350,8 @@ def parse_error_line(self, line): pos = line.find(':') if pos < 0: raise Exception( - "Error line must be of the form 'line_number: msg': '%s'" - % line) + f"Error line must be of the form 'line_number: msg': '{line}'" + ) line_number = int(line[0:pos]) message = line[pos + 1:].strip() return (line_number, message) @@ -367,12 +369,13 @@ def main(): chunk = data_reader.read_chunk() if chunk is None: break - print("DATA_FLAGS: %s" % chunk['data_flags']) - print("DATA: %s" % chunk['records']) - print("EXISTING_SCHEMA: %s" % chunk['existing_schema']) - print("ERRORS: %s" % chunk['errors']) - print("ERROR_MAP: %s" % chunk['error_map']) - print("SCHEMA: %s" % chunk['schema']) + print(f"DATA_FLAGS: {chunk['data_flags']}") + print(f"DATA: {chunk['records']}") + print(f"EXISTING_SCHEMA: {chunk['existing_schema']}") + print(f"ERRORS: {chunk['errors']}") + print(f"ERROR_MAP: {chunk['error_map']}") + print(f"SCHEMA: {chunk['schema']}") + print() if __name__ == '__main__': diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index 9d9a51a..e88dc0f 100644 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -471,8 +471,10 @@ def verify_data_chunk(self, chunk): expected_schema = chunk['schema'] existing_schema = chunk['existing_schema'] - print("Test chunk %s, line_number %s: First record: %s" % - (chunk_count, line_number, records[0])) + print( + f"Test chunk: {chunk_count}; line_number: {line_number}; " + f"first record: {records[0]}" + ) # Generate schema. generator = SchemaGenerator( input_format=input_format, From 0325f6993739aade36adddd84d5748bd52e76dcf Mon Sep 17 00:00:00 2001 From: Brian Park Date: Sat, 5 Dec 2020 10:26:11 -0800 Subject: [PATCH 5/8] tests/testdata.txt: Wrap some long comment lines --- tests/testdata.txt | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/tests/testdata.txt b/tests/testdata.txt index 1347b46..f6b522a 100644 --- a/tests/testdata.txt +++ b/tests/testdata.txt @@ -763,8 +763,7 @@ SCHEMA ] END -# Incompatible types error printing full path -# given +# Incompatible types error printing full path given DATA {"source_machine":{"port":80},"dest_machine":{"port":80}} {"source_machine":{"port":80},"dest_machine":{"port":"http-port"}} @@ -1005,7 +1004,8 @@ SCHEMA ] END -# Empty JSON input file, existing schema no modification done, purely testing round trip schema generation +# Empty JSON input file, existing schema no modification done, purely testing +# round trip schema generation DATA keep_nulls {} EXISTING_SCHEMA @@ -1126,7 +1126,8 @@ SCHEMA ] END -# JSON existing schema file valid record instead of string, no modification done +# JSON existing schema file valid record instead of string, no modification +# done DATA keep_nulls {"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} EXISTING_SCHEMA @@ -1247,7 +1248,8 @@ SCHEMA ] END -# JSON existing schema changes in casing when generating schema should not result in a change from what existed already +# JSON existing schema changes in casing when generating schema should not +# result in a change from what existed already DATA keep_nulls {"eventname": "test event1", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} {"EVENTNAME": "test event2", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} @@ -1495,7 +1497,8 @@ SCHEMA ] END -# JSON existing schema REQUIRED to NULLABLE no infer_mode set, errors with cannot change mode +# JSON existing schema REQUIRED to NULLABLE no infer_mode set, errors with +# cannot change mode DATA keep_nulls {"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": 1.3} {"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": null} @@ -1614,7 +1617,8 @@ SCHEMA ] END -# JSON existing schema REQUIRED to NULLABLE infer_mode set so this should relax the field mode +# JSON existing schema REQUIRED to NULLABLE infer_mode set so this should relax +# the field mode DATA keep_nulls infer_mode {"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": 1.3} {"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": null} @@ -1858,9 +1862,10 @@ SCHEMA ] END -# Verify that the --infer_mode flag for JSON input does NOT cause the schema mode to -# become REQUIRED, even though in the following, it will mistakenly think that every field -# defined for all records, because it cannot (easily) detect the absence of a field in a record. +# Verify that the --infer_mode flag for JSON input does NOT cause the schema +# mode to become REQUIRED, even though in the following, it will mistakenly +# think that every field is defined for all records, because it cannot (easily) +# detect the absence of a field in a record. DATA infer_mode { "i" : 1, "f": 1.0, "r": {"a": 1} } { "f": 1.0 } @@ -1889,4 +1894,4 @@ SCHEMA "type": "RECORD" } ] -END \ No newline at end of file +END From 3d8989b5f3a977de6f502d67d5967be30dfdee92 Mon Sep 17 00:00:00 2001 From: Brian Park Date: Sat, 5 Dec 2020 10:28:26 -0800 Subject: [PATCH 6/8] generate_schema.py: Rename deduce_schema_for_line() to the more accurate deduce_schema_for_record() --- bigquery_schema_generator/generate_schema.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 089e272..7cacbab 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -190,7 +190,7 @@ def deduce_schema(self, file, *, schema_map=None): # Deduce the schema from this given data record. if isinstance(json_object, dict): - self.deduce_schema_for_line( + self.deduce_schema_for_record( json_object=json_object, schema_map=schema_map, ) @@ -212,7 +212,7 @@ def deduce_schema(self, file, *, schema_map=None): return schema_map, self.error_logs - def deduce_schema_for_line(self, json_object, schema_map, base_path=None): + def deduce_schema_for_record(self, json_object, schema_map, base_path=None): """Figures out the BigQuery schema for the given 'json_object' and updates 'schema_map' with the latest info. A 'schema_map' entry of type 'soft' is a provisional entry that can be overwritten by a subsequent @@ -463,7 +463,7 @@ def merge_mode(self, old_schema_entry, new_schema_entry, base_path): def get_schema_entry(self, key, value, base_path=None): """Determines the 'schema_entry' of the (key, value) pair. Calls - deduce_schema_for_line() recursively if the value is another object + deduce_schema_for_record() recursively if the value is another object instead of a primitive (this will happen only for JSON input file). 'base_path' is the string representing the current path within the @@ -480,14 +480,14 @@ def get_schema_entry(self, key, value, base_path=None): # recursively figure out the RECORD fields = OrderedDict() if value_mode == 'NULLABLE': - self.deduce_schema_for_line( + self.deduce_schema_for_record( json_object=value, schema_map=fields, base_path=new_base_path, ) else: for val in value: - self.deduce_schema_for_line( + self.deduce_schema_for_record( json_object=val, schema_map=fields, base_path=new_base_path, From db9df285b05b56894e6b34747d4dfe132fdb801f Mon Sep 17 00:00:00 2001 From: Brian Park Date: Sat, 5 Dec 2020 10:43:21 -0800 Subject: [PATCH 7/8] README.md: Add information about REQUIRED versus NULLABLE in documentation about --existing_schema_path flag --- README.md | 46 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 3ee4d3a..c159e2c 100644 --- a/README.md +++ b/README.md @@ -371,10 +371,10 @@ fields are often completely missing from a given JSON record (instead of explicitly being defined to be `null`). In addition to the above, this option, when used in conjunction with ---existing_schema_map, will allow fields to be relaxed from REQUIRED to NULLABLE -if they were REQUIRED in the existing schema and NULL rows are found in the new -data we are inferring a schema from. In this case it can be used with either -input_format, CSV or JSON. +`--existing_schema_map`, will allow fields to be relaxed from REQUIRED to +NULLABLE if they were REQUIRED in the existing schema and NULL rows are found in +the new data we are inferring a schema from. In this case it can be used with +either input_format, CSV or JSON. See [Issue #28](https://github.com/bxparks/bigquery-schema-generator/issues/28) for implementation details. @@ -422,9 +422,9 @@ generate the schema file. The transformations are: My recollection is that the `bq load` command does *not* normalize the JSON key names. Instead it prints an error message. So the `--sanitize_names` flag is useful mostly for CSV files. For JSON files, you'll have to do a second pass -through the data files to cleanup the column names anyway. See [Issue -#14](https://github.com/bxparks/bigquery-schema-generator/issues/14) and [Issue -#33](https://github.com/bxparks/bigquery-schema-generator/issues/33). +through the data files to cleanup the column names anyway. See +[Issue #14](https://github.com/bxparks/bigquery-schema-generator/issues/14) and +[Issue #33](https://github.com/bxparks/bigquery-schema-generator/issues/33). #### Ignore Invalid Lines (`--ignore_invalid_lines`) @@ -443,14 +443,16 @@ does throw an exception on a given line, we would not be able to catch it and continue processing. Fortunately, CSV files are fairly robust, and the schema deduction logic will handle any missing or extra columns gracefully. -Fixes [Issue -#49](https://github.com/bxparks/bigquery-schema-generator/issues/49). +Fixes +[Issue #49](https://github.com/bxparks/bigquery-schema-generator/issues/49). #### Existing Schema Path (`--existing_schema_path`) -There are cases where we would like to start from an existing BigQuery table schema -rather than starting from scratch with a new batch of data we would like to load. -In this case we can specify the path to a local file on disk that is our existing -bigquery table schema. This can be generated via the following bq cli command: + +There are cases where we would like to start from an existing BigQuery table +schema rather than starting from scratch with a new batch of data we would like +to load. In this case we can specify the path to a local file on disk that is +our existing bigquery table schema. This can be generated via the following `bq +show --schema` command: ```bash bq show --schema :. > existing_table_schema.json ``` @@ -460,11 +462,27 @@ We can then run generate-schema with the additional option --existing_schema_path existing_table_schema.json ``` +There is some subtle interaction between the `--existing_schema_path` and fields +which are marked with a `mode` of `REQUIRED` in the existing schema. If the new +data contains a `null` value (either in a CSV or JSON data file), it is not +clear if the schema should be changed to `mode=NULLABLE` or whether the new data +should be ignored and the schema should remain `mode=REQUIRED`. The choice is +determined by overloading the `--infer_mode` flag: + +* If `--infer_mode` is given, the new schema will be allowed to revert back to + `NULLABLE`. +* If `--infer_mode` is not given, the offending new record will be ignored + and the new schema will remain `REQUIRED`. + +See discussion in +[PR #57](https://github.com/bxparks/bigquery-schema-generator/pull/57) for +more details. + ## Schema Types ### Supported Types -The **bq show --schema** command produces a JSON schema file that uses the +The `bq show --schema` command produces a JSON schema file that uses the older [Legacy SQL date types](https://cloud.google.com/bigquery/data-types). For compatibility, **generate-schema** script will also generate a schema file using the legacy data types. From e5a50afe247b2404fe55af2096ae7b92b2bb5edc Mon Sep 17 00:00:00 2001 From: Brian Park Date: Sat, 5 Dec 2020 10:50:11 -0800 Subject: [PATCH 8/8] Bump version to 1.3 --- CHANGELOG.md | 5 +++++ README.md | 4 +++- bigquery_schema_generator/version.py | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 113f258..a82f520 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ # Changelog * Unreleased +* 1.3 (2020-12-05) + * Allow an existing schema file to be specified using + `--existing_schema_path` flag, so that new data can be merged into it. + See #40, #57, and #61. + (Thanks to abroglesc@ and bozzzzo@). * 1.2 (2020-10-27) * Print full path of nested JSON elements in error messages (See #52; thanks abroglesc@). diff --git a/README.md b/README.md index c159e2c..5989187 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ $ generate-schema < file.data.json > file.schema.json $ generate-schema --input_format csv < file.data.csv > file.schema.json ``` -Version: 1.2 (2020-10-27) +Version: 1.3 (2020-12-05) Changelog: [CHANGELOG.md](CHANGELOG.md) @@ -791,6 +791,8 @@ and 3.8. * Bug fix in `--sanitize_names` by Riccardo M. Cefala (riccardomc@). * Print full path of nested JSON elements in error messages, by Austin Brogle (abroglesc@). +* Allow an existing schema file to be specified using `--existing_schema_path`, + by Austin Brogle (abroglesc@) and Bozo Dragojevic (bozzzzo@). ## License diff --git a/bigquery_schema_generator/version.py b/bigquery_schema_generator/version.py index 64477cf..6f4fa58 100644 --- a/bigquery_schema_generator/version.py +++ b/bigquery_schema_generator/version.py @@ -1 +1 @@ -__version__ = '1.2' +__version__ = '1.3'