Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Way to pass in schema name to overwrite the schema pulled from the conn_id #74

Closed
TJaniF opened this issue Dec 1, 2022 · 2 comments · Fixed by #75
Closed
Labels
enhancement New feature or request

Comments

@TJaniF
Copy link
Contributor

TJaniF commented Dec 1, 2022

Hi :)

I've been exploring how to use the building the datasource from a provided conn_id-feature and ran into a small issue:
Our dev (and prod) environment is set up with Snowflake tables located in different schemas, something like:

dev_db
- schema_1
-- table_1
-- table_2
- schema_2
-- table_3
-- table_4
etc

There is currently only one snowflake connection in the dev environment which has its schema field left empty. Of course I (or rather the deployment admin) could create one additional snowflake connection per schema but this does not seem ideal or scaleable.

Because of this I've been trying to find a way to change the schema of the datasource created when passing in a conn_id but I could not find a way to do so e.g. via a data_contex_config with the intended schema.

When trying to leave the schema in the Airflow connection blank and passing in the schema name (TAMARAFINGERLIN) with the table name (CLUSTERS) to data_asset_name like this:

t1 = GreatExpectationsOperator(
        task_id="t1",
        data_asset_name="TAMARAFINGERLIN.CLUSTERS",
        conn_id="{{conn.galaxy_snowflake_etl}}",
        data_context_root_dir=ge_root_dir,
        expectation_suite_name="CLUSTERS",
    )

the following error happens:

[2022-12-01T19:11:34.849+0000] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 803, in execute
    Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 275, in errorhandler_wrapper
    handed_over = Error.hand_to_other_handler(
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 330, in hand_to_other_handler
    cursor.errorhandler(connection, cursor, error_class, error_value)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 209, in default_errorhandler
    raise error_class(
snowflake.connector.errors.ProgrammingError: 090106 (22000): 01a8aedf-0506-3740-0000-6821196f772e: Cannot perform CREATE TEMPTABLE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 521, in execute
    result = self.checkpoint.run(batch_request=self.batch_request)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 304, in usage_statistics_wrapped_method
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/checkpoint/checkpoint.py", line 194, in run
    self._run_validation(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/checkpoint/checkpoint.py", line 339, in _run_validation
    validator: Validator = self.data_context.get_validator(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/data_context/abstract_data_context.py", line 1481, in get_validator
    self.get_batch_list(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 304, in usage_statistics_wrapped_method
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/data_context/abstract_data_context.py", line 1666, in get_batch_list
    return datasource.get_batch_list_from_batch_request(batch_request=batch_request)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/datasource/new_datasource.py", line 205, in get_batch_list_from_batch_request
    ) = data_connector.get_batch_data_and_metadata(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/datasource/data_connector/data_connector.py", line 116, in get_batch_data_and_metadata
    batch_data, batch_markers = self._execution_engine.get_batch_data_and_markers(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py", line 1243, in get_batch_data_and_markers
    batch_data = SqlAlchemyBatchData(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_batch_data.py", line 161, in __init__
    self._create_temporary_table(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_batch_data.py", line 295, in _create_temporary_table
    self._engine.execute(stmt)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1274, in execute
    return self._exec_driver_sql(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1578, in _exec_driver_sql
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 803, in execute
    Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 275, in errorhandler_wrapper
    handed_over = Error.hand_to_other_handler(
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 330, in hand_to_other_handler
    cursor.errorhandler(connection, cursor, error_class, error_value)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 209, in default_errorhandler
    raise error_class(
sqlalchemy.exc.ProgrammingError: (snowflake.connector.errors.ProgrammingError) 090106 (22000): 01a8aedf-0506-3740-0000-6821196f772e: Cannot perform CREATE TEMPTABLE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
[SQL: CREATE OR REPLACE TEMPORARY TABLE ge_temp_7a4f3e73 AS SELECT * 
FROM "TAMARAFINGERLIN.CLUSTERS" 
WHERE true]
(Background on this error at: https://sqlalche.me/e/14/f405)
[2022-12-01T19:11:34.888+0000] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=GE_TEST, task_id=t1, execution_date=20221201T191130, start_date=20221201T191131, end_date=20221201T191134

I was wondering if there could be a way to override some parameters of the connection provided via the conn_id. This would allow me to map the operator over sets of schema names, tables names and expectation suite names without having to have admin access to the deployment or manually create a lot of connections. :)

cc @denimalpaca

@TJaniF TJaniF added the enhancement New feature or request label Dec 1, 2022
@denimalpaca
Copy link
Contributor

I think the easiest solution here would be to include parsing of data_asset_name when a conn_id is passed in to see if a database or schema is included with the table. Then connections without specified schemas (or ones in which the user wants to overwrite them) can be used.

@talagluck @kaxil any thoughts on this?

@talagluck
Copy link
Contributor

I think that feels like a great approach. I could also imagine including a separate schema_name parameter to allow specification at the operator level.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants