Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINTENANCE] SQL backend integration tests #9822

Merged
merged 23 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1b85da3
[MAINTENANCE] Bring back sql_datasource integration tests for backend…
tyler-hoffman Apr 25, 2024
3b57fda
Merge branch 'develop' into m/v1-291/sql-backend-integration-tests
tyler-hoffman Apr 25, 2024
1b8c53b
Fix typo
tyler-hoffman Apr 25, 2024
8123279
Remove unused return
tyler-hoffman Apr 25, 2024
bbd47ca
Update _get_exception_details
tyler-hoffman Apr 25, 2024
3bb485b
Simplify the use of test fixture names
tyler-hoffman Apr 25, 2024
7968208
Revert "Update _get_exception_details"
tyler-hoffman Apr 25, 2024
fae6ac0
Revert "Revert "Update _get_exception_details""
tyler-hoffman Apr 25, 2024
10aafcb
Revert "Simplify the use of test fixture names"
tyler-hoffman Apr 25, 2024
d295efa
Update name to be more specific
tyler-hoffman Apr 25, 2024
5f97f7a
Merge remote-tracking branch 'origin' into m/v1-291/sql-backend-integ…
tyler-hoffman Apr 25, 2024
1826844
PoC: refactor sql integration tests around batch definitions
tyler-hoffman Apr 26, 2024
097778d
Update method for adding datasource and remove now unneeded test cases
tyler-hoffman Apr 29, 2024
66432da
Fix outdated imports
tyler-hoffman Apr 29, 2024
03b69dd
Merge branch 'develop' into m/v1-291/sql-backend-integration-tests
tyler-hoffman Apr 29, 2024
c29a550
xfail sqlite partitioners
tyler-hoffman Apr 29, 2024
51935ab
sources -> data_sources
tyler-hoffman Apr 29, 2024
2860983
Add test util to create datasources
tyler-hoffman Apr 29, 2024
d39f8d8
Fix test setup
tyler-hoffman Apr 29, 2024
cbdf2dd
Break up error message onto multiple lines
tyler-hoffman Apr 29, 2024
ca7ddbd
Spell better
tyler-hoffman Apr 29, 2024
324e2fe
Collapse two conditions
tyler-hoffman Apr 29, 2024
2868c9b
Merge branch 'develop' into m/v1-291/sql-backend-integration-tests
tyler-hoffman Apr 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@
from tests.integration.fixtures.partition_and_sample_data.partitioner_test_cases_and_fixtures import ( # noqa: E501
TaxiPartitioningTestCase,
TaxiPartitioningTestCasesBase,
TaxiPartitioningTestCasesColumnValue,
TaxiPartitioningTestCasesConvertedDateTime,
TaxiPartitioningTestCasesDateTime,
TaxiPartitioningTestCasesDividedInteger,
TaxiPartitioningTestCasesHashedColumn,
TaxiPartitioningTestCasesModInteger,
TaxiPartitioningTestCasesMultiColumnValues,
TaxiPartitioningTestCasesWholeTable,
TaxiTestData,
)
Expand Down Expand Up @@ -435,49 +429,6 @@ def in_memory_sqlite_taxi_ten_trips_per_month_execution_engine(sa):
column_names_to_convert=["pickup_datetime", "dropoff_datetime"],
)
),
TaxiPartitioningTestCasesColumnValue(
taxi_test_data=TaxiTestData(
test_df=ten_trips_per_month_df(),
test_column_name="passenger_count",
test_column_names=None,
column_names_to_convert=["pickup_datetime", "dropoff_datetime"],
)
),
TaxiPartitioningTestCasesDividedInteger(
taxi_test_data=TaxiTestData(
test_df=ten_trips_per_month_df(),
test_column_name="pickup_location_id",
test_column_names=None,
column_names_to_convert=["pickup_datetime", "dropoff_datetime"],
)
),
TaxiPartitioningTestCasesModInteger(
taxi_test_data=TaxiTestData(
test_df=ten_trips_per_month_df(),
test_column_name="pickup_location_id",
test_column_names=None,
column_names_to_convert=["pickup_datetime", "dropoff_datetime"],
)
),
TaxiPartitioningTestCasesHashedColumn(
taxi_test_data=TaxiTestData(
test_df=ten_trips_per_month_df(),
test_column_name="pickup_location_id",
test_column_names=None,
column_names_to_convert=["pickup_datetime", "dropoff_datetime"],
)
),
TaxiPartitioningTestCasesMultiColumnValues(
taxi_test_data=TaxiTestData(
test_df=ten_trips_per_month_df(),
test_column_name=None,
test_column_names=[
"rate_code_id",
"payment_type",
],
column_names_to_convert=["pickup_datetime", "dropoff_datetime"],
)
),
TaxiPartitioningTestCasesDateTime(
taxi_test_data=TaxiTestData(
test_df=ten_trips_per_month_df(),
Expand All @@ -486,16 +437,9 @@ def in_memory_sqlite_taxi_ten_trips_per_month_execution_engine(sa):
column_names_to_convert=["pickup_datetime", "dropoff_datetime"],
)
),
TaxiPartitioningTestCasesConvertedDateTime(
taxi_test_data=TaxiTestData(
test_df=ten_trips_per_month_df(),
test_column_name="pickup_datetime",
test_column_names=None,
column_names_to_convert=None,
)
),
],
)
@pytest.mark.xfail(reason="To be implemented in V1-305", strict=True)
@pytest.mark.sqlite
def test_sqlite_partition(
taxi_test_cases: TaxiPartitioningTestCasesBase,
Expand All @@ -514,8 +458,8 @@ def test_sqlite_partition(
batch_spec = SqlAlchemyDatasourceBatchSpec(
table_name="test",
schema_name="main",
partitioner_method=test_case.partitioner_method_name,
partitioner_kwargs=test_case.partitioner_kwargs,
partitioner_method=test_case.add_batch_definition_method_name,
partitioner_kwargs=test_case.add_batch_definition_kwargs,
batch_identifiers={},
)
else: # noqa: PLR5501
Expand All @@ -524,8 +468,8 @@ def test_sqlite_partition(
batch_spec = SqlAlchemyDatasourceBatchSpec(
table_name="test",
schema_name="main",
partitioner_method=test_case.partitioner_method_name,
partitioner_kwargs=test_case.partitioner_kwargs,
partitioner_method=test_case.add_batch_definition_method_name,
partitioner_kwargs=test_case.add_batch_definition_kwargs,
batch_identifiers={
taxi_test_cases.test_column_name: test_case.expected_column_values[0]
},
Expand All @@ -536,8 +480,8 @@ def test_sqlite_partition(
batch_spec = SqlAlchemyDatasourceBatchSpec(
table_name="test",
schema_name="main",
partitioner_method=test_case.partitioner_method_name,
partitioner_kwargs=test_case.partitioner_kwargs,
partitioner_method=test_case.add_batch_definition_method_name,
partitioner_kwargs=test_case.add_batch_definition_kwargs,
batch_identifiers={
column_name: test_case.expected_column_values[0][column_name]
for column_name in taxi_test_cases.test_column_names
Expand Down
135 changes: 42 additions & 93 deletions tests/integration/db/taxi_data_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, List
from typing import TYPE_CHECKING, List

import sqlalchemy as sa

import great_expectations as gx
from great_expectations.core import IDDict
from great_expectations.core.batch import BatchRequest, LegacyBatchDefinition
from great_expectations.core.batch_spec import SqlAlchemyDatasourceBatchSpec
from great_expectations.datasource import BaseDatasource
from great_expectations.datasource.data_connector import ConfiguredAssetSqlDataConnector
from great_expectations.core.batch_definition import BatchDefinition
from great_expectations.execution_engine.sqlalchemy_batch_data import (
SqlAlchemyBatchData,
)
Expand All @@ -18,6 +14,7 @@
)
from tests.test_utils import (
LoadedTable,
add_datasource,
clean_up_tables_with_prefix,
load_and_concatenate_csvs,
load_data_into_test_database,
Expand Down Expand Up @@ -100,114 +97,66 @@ def _execute_taxi_partitioning_test_cases(

test_case: TaxiPartitioningTestCase
for test_case in test_cases:
print("Testing partitioner method:", test_case.partitioner_method_name)
print("Testing add_batch_definition_* method", test_case.add_batch_definition_method_name)

# 1. Setup

context = gx.get_context()
context = gx.get_context(mode="ephemeral")

datasource_name: str = "test_datasource"
data_connector_name: str = "test_data_connector"
batch_definition_name: str = "test_batch_definition"
data_asset_name: str = table_name # Read from generated table name

column_name: str = taxi_partitioning_test_cases.test_column_name
tyler-hoffman marked this conversation as resolved.
Show resolved Hide resolved
column_names: List[str] = taxi_partitioning_test_cases.test_column_names

# 2. Set partitioner in DataConnector config
data_connector_config: dict = {
"class_name": "ConfiguredAssetSqlDataConnector",
"assets": {
data_asset_name: {
"partitioner_method": test_case.partitioner_method_name,
"partitioner_kwargs": test_case.partitioner_kwargs,
}
},
}

# noinspection PyTypeChecker
context.add_datasource(
name=datasource_name,
class_name="Datasource",
execution_engine={
"class_name": "SqlAlchemyExecutionEngine",
"connection_string": connection_string,
},
data_connectors={data_connector_name: data_connector_config},
datasource = add_datasource(
context, name=datasource_name, connection_string=connection_string
)
asset = datasource.add_table_asset(data_asset_name, table_name=table_name)
add_batch_definition_method = getattr(
asset, test_case.add_batch_definition_method_name or "MAKE THIS REQUIRED"
)
batch_definition: BatchDefinition = add_batch_definition_method(
name=batch_definition_name, **test_case.add_batch_definition_kwargs
)

datasource: BaseDatasource = context.get_datasource(datasource_name=datasource_name)

data_connector: ConfiguredAssetSqlDataConnector = datasource.data_connectors[
data_connector_name
]

# 3. Check if resulting batches are as expected
# using data_connector.get_batch_definition_list_from_batch_request()
batch_request: BatchRequest = BatchRequest(
datasource_name=datasource_name,
data_connector_name=data_connector_name,
data_asset_name=data_asset_name,
)
batch_definition_list: List[LegacyBatchDefinition] = (
data_connector.get_batch_definition_list_from_batch_request(batch_request=batch_request)
batch_request = batch_definition.build_batch_request()
batch_list = asset.get_batch_list_from_batch_request(batch_request)
assert len(batch_list) == test_case.num_expected_batch_definitions, (
f"Found {len(batch_list)} batch definitions "
f"but expected {test_case.num_expected_batch_definitions}"
)
print(len(batch_definition_list), "batch definitions found")
print(test_case.num_expected_batch_definitions, "expected batch definitions")
assert len(batch_definition_list) == test_case.num_expected_batch_definitions

expected_batch_definition_list: List[LegacyBatchDefinition]
expected_batch_metadata: List[dict]

if test_case.table_domain_test_case:
expected_batch_definition_list = [
LegacyBatchDefinition(
datasource_name=datasource_name,
data_connector_name=data_connector_name,
data_asset_name=data_asset_name,
batch_identifiers=IDDict({}),
)
]
expected_batch_metadata = [{}]
elif column_name or column_names:
# This condition is a smell. Consider refactoring.
expected_batch_metadata = [data for data in test_case.expected_column_values]
else:
column_value: Any
if column_name:
expected_batch_definition_list = [
LegacyBatchDefinition(
datasource_name=datasource_name,
data_connector_name=data_connector_name,
data_asset_name=data_asset_name,
batch_identifiers=IDDict({column_name: column_value}),
)
for column_value in test_case.expected_column_values
]
elif column_names:
dictionary_element: dict
expected_batch_definition_list = [
LegacyBatchDefinition(
datasource_name=datasource_name,
data_connector_name=data_connector_name,
data_asset_name=data_asset_name,
batch_identifiers=IDDict(dictionary_element),
)
for dictionary_element in test_case.expected_column_values
]
else:
raise ValueError("Missing test_column_names or test_column_names attribute.")

assert (
set(batch_definition_list) == set(expected_batch_definition_list)
), f"BatchDefinition lists don't match\n\nbatch_definition_list:\n{batch_definition_list}\n\nexpected_batch_definition_list:\n{expected_batch_definition_list}" # noqa: E501

# 4. Check that loaded data is as expected

# Use expected_batch_definition_list since it is sorted, and we already
# asserted that it contains the same items as batch_definition_list
batch_spec: SqlAlchemyDatasourceBatchSpec = data_connector.build_batch_spec(
expected_batch_definition_list[0]
raise ValueError("Missing test_column_names or test_column_names attribute.")

actual_batch_metadata = [batch.metadata for batch in batch_list]
assert actual_batch_metadata == expected_batch_metadata, (
f"Batch metadata lists don't match.\n\n"
f"batch_list:\n{batch_list}\n\n"
f"expected_batch metadata:\n{expected_batch_metadata}"
)

batch_data: SqlAlchemyBatchData = context.datasources[
datasource_name
].execution_engine.get_batch_data(batch_spec=batch_spec)
# 4. Check that loaded data is as expected, using correctness
# of the first batch as a proxy for correctness of the whole list

first_batch = batch_list[0]
execution_engine = datasource.get_execution_engine()
batch_data: SqlAlchemyBatchData = execution_engine.get_batch_data(
batch_spec=first_batch.batch_spec
)

num_rows: int = batch_data.execution_engine.execute_query(
num_rows: int = execution_engine.execute_query(
sa.select(sa.func.count()).select_from(batch_data.selectable)
).scalar()
assert num_rows == test_case.num_expected_rows_in_first_batch_definition
51 changes: 0 additions & 51 deletions tests/integration/db/test_sql_data_partitioned_on_column_value.py

This file was deleted.

This file was deleted.