Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions VERSION.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
11.1.0
- Add API to make snapshot public
11.2.0
- Update logic on how floats and integers are handled when determining a TDR schema from Terra input metadata
94 changes: 65 additions & 29 deletions ops_utils/tdr_utils/tdr_schema_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import time
import numpy as np
import pandas as pd
import math
from datetime import date, datetime
from typing import Any, Optional
from typing import Any, Optional, Union


class InferTDRSchema:
Expand Down Expand Up @@ -150,46 +151,81 @@ def _check_type_consistency(self, key_value_type_mappings: dict) -> list[dict]:

return disparate_header_info

def _python_type_to_tdr_type_conversion(self, value_for_header: Any) -> str:
@staticmethod
def _interpret_number(x: Union[float, int]) -> Union[int, float]:
if isinstance(x, float) and x.is_integer():
return int(x)
return x

def _determine_if_float_or_int(self, interpreted_numbers: list[Union[int, float]]) -> str:
# Remove NaNs before type checks
non_nan_numbers = [x for x in interpreted_numbers if not (isinstance(x, float) and math.isnan(x))]

# If all values are int, return int type
if all(isinstance(row_value, int) for row_value in non_nan_numbers):
return self.PYTHON_TDR_DATA_TYPE_MAPPING[int]
# If all values are float, return float type
elif all(isinstance(row_value, float) for row_value in non_nan_numbers):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this evaluation necessary if the default for non-all-integer is float either way?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good point, it's not necessary

return self.PYTHON_TDR_DATA_TYPE_MAPPING[float]
# If ANY are float, return float type
else:
return self.PYTHON_TDR_DATA_TYPE_MAPPING[float]

def _python_type_to_tdr_type_conversion(self, values_for_header: list[Any]) -> str:
"""
Convert Python data types to TDR data types.

Args:
value_for_header (Any): The value to determine the TDR type for.
values_for_header (Any): All values for a column header.

Returns:
str: The TDR data type.
"""
gcp_fileref_regex = "^gs://.*"

# Find potential file references
if isinstance(value_for_header, str):
gcp_match = re.search(
pattern=gcp_fileref_regex, string=value_for_header)
if gcp_match:
# Collect all the non-None values for the column
non_none_values = [v for v in values_for_header if v is not None]

# HANDLE SPECIAL CASES

# FILE REFS AND LISTS OF FILE REFS
# If ANY of the values for a header are of type "fileref", we assume that the column is a fileref
for row_value in non_none_values:
if isinstance(row_value, str) and re.search(pattern=gcp_fileref_regex, string=row_value):
return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"]

# Tried to use this to parse datetimes, but it was turning too many
# regular ints into datetimes. Commenting out for now
# try:
# date_or_time = parser.parse(value_for_header)
# return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(date_or_time)]
# pass
# except (TypeError, ParserError):
# pass

if isinstance(value_for_header, list):
# check for potential list of filerefs
for v in value_for_header:
if isinstance(v, str):
gcp_match = re.search(pattern=gcp_fileref_regex, string=v)
if gcp_match:
if isinstance(row_value, list):
# Check for a potential array of filerefs - if ANY of the items in a list are
# of type "fileref" we assume that the whole column is a fileref
for item in row_value:
if isinstance(item, str) and re.search(pattern=gcp_fileref_regex, string=item):
return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"]
non_none_entry_in_list = [a for a in value_for_header if a is not None][0]
return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(non_none_entry_in_list)]

# if none of the above special cases apply, just pass the type of the value to determine the TDR type
return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(value_for_header)]
# INTEGERS/FLOATS AND LISTS OF INTEGERS AND FLOATS
# Case 1: All values are plain numbers (int or float) - specifically excluding bools
if all(isinstance(x, (int, float)) and not isinstance(x, bool) for x in non_none_values):
interpreted_numbers = [self._interpret_number(row_value) for row_value in non_none_values]
return self._determine_if_float_or_int(interpreted_numbers)

# Case 2: Values are lists of numbers (e.g., [[1, 2], [3.1], [4]])
if all(isinstance(row_value, list) for row_value in non_none_values):
if all(
all(isinstance(item, (int, float)) and not isinstance(item, bool) for item in row_value)
for row_value in non_none_values
):
# Flatten the list of lists and interpret all non-None elements
interpreted_numbers = [self._interpret_number(item)
for row_value in non_none_values for item in row_value if item is not None]

return self._determine_if_float_or_int(interpreted_numbers)

# If none of the above special cases apply, use the first of the non-null values to determine the
# TDR data type

first_value = non_none_values[0]
if isinstance(first_value, list):
return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(first_value[0])]
return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(first_value)]

def _format_column_metadata(self, key_value_type_mappings: dict, disparate_header_info: list[dict]) -> list[dict]:
"""
Expand All @@ -214,8 +250,8 @@ def _format_column_metadata(self, key_value_type_mappings: dict, disparate_heade
logging.info(f"Header '{header}' was forced to string to to mismatched datatypes in column")
data_type = self.PYTHON_TDR_DATA_TYPE_MAPPING[str]
else:
# find either the first item that's non-None, or the first non-empty list
data_type = self._python_type_to_tdr_type_conversion([a for a in values_for_header if a is not None][0])
# Use all existing values for the header to determine the data type
data_type = self._python_type_to_tdr_type_conversion(values_for_header)

column_metadata = {
"name": header,
Expand Down
32 changes: 30 additions & 2 deletions ops_utils/tests/test_tdr_schema_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,37 @@ def test_check_type_consistency_disparate(self):
self.assertEqual(actual_disparate_header_info, expected_disparate_header_info)

def test_python_type_to_tdr_type_conversion_file_ref(self):
res = self.infer_tdr_schema._python_type_to_tdr_type_conversion(value_for_header="gs://bucket/some/file.txt")
res = self.infer_tdr_schema._python_type_to_tdr_type_conversion(values_for_header=["gs://bucket/some/file.txt"])
self.assertEqual(res, "fileref")

def test_python_type_to_tdr_type_conversion_boolean(self):
res = self.infer_tdr_schema._python_type_to_tdr_type_conversion(value_for_header=True)
res = self.infer_tdr_schema._python_type_to_tdr_type_conversion(values_for_header=[True])
self.assertEqual(res, "boolean")

def test_python_type_to_tdr_type_conversion_list_of_booleans(self):
res = self.infer_tdr_schema._python_type_to_tdr_type_conversion(values_for_header=[[True], [True, False], [False]])
self.assertEqual(res, "boolean")

def test_python_type_to_tdr_type_conversion_ints(self):
res = self.infer_tdr_schema._python_type_to_tdr_type_conversion(values_for_header=[1.0, 2, 3.0])
self.assertEqual(res, "int64")

def test_python_type_to_tdr_type_conversion_floats(self):
res = self.infer_tdr_schema._python_type_to_tdr_type_conversion(values_for_header=[1.1, 2.2, 3.3])
self.assertEqual(res, "float64")

def test_python_type_to_tdr_type_conversion_float_and_ints(self):
res = self.infer_tdr_schema._python_type_to_tdr_type_conversion(values_for_header=[1.1, 2, 3.0])
self.assertEqual(res, "float64")

def test_python_type_to_tdr_type_conversion_list_of_ints(self):
res = self.infer_tdr_schema._python_type_to_tdr_type_conversion(values_for_header=[[1.0, 2], [3, 4.0], [5]])
self.assertEqual(res, "int64")

def test_python_type_to_tdr_type_conversion_list_of_floats(self):
res = self.infer_tdr_schema._python_type_to_tdr_type_conversion(values_for_header=[[1.0, 2.2], [3.4, 4.1], [5.9]])
self.assertEqual(res, "float64")

def test_python_type_to_tdr_type_conversion_list_of_floats_and_ints(self):
res = self.infer_tdr_schema._python_type_to_tdr_type_conversion(values_for_header=[[1.0, 2], [3.4, 4.1], [5.0]])
self.assertEqual(res, "float64")