-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Description
Apache Airflow version
main (development)
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Getting error when trying to access airflow.settings.engine
ERROR - Task failed with exception source="task" error_detail=[{"exc_type":"AttributeError","exc_value":"module 'airflow.settings' has no attribute 'engine'","exc_notes":[],"syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":582,"name":"run"},{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":718,"name":"_execute_task"},{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":373,"name":"wrapper"},{"filename":"/opt/airflow/airflow/decorators/base.py","lineno":252,"name":"execute"},{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":373,"name":"wrapper"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":202,"name":"execute"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":226,"name":"execute_callable"},{"filename":"/opt/airflow/airflow/utils/operator_helpers.py","lineno":262,"name":"run"},{"filename":"/files/dags/setup_teardown/setup_teardown_sample_db.py","lineno":53,"name":"create_table"},{"filename":"/files/dags/setup_teardown/setup_teardown_sample_db.py","lineno":34,"name":"get_cursor"}]}]
What you think should happen instead?
No response
How to reproduce
Run the below Dag:
from datetime import datetime
from airflow.decorators import dag, task , setup , teardown
from airflow import settings
import psycopg2
import textwrap
update_template = textwrap.dedent(
"""
BEGIN;
-- make sure the table exists
CREATE TABLE IF NOT EXISTS test_table (
value INT NOT NULL
);
-- seed it if it's newly created
INSERT INTO test_table (value)
SELECT 0
WHERE NOT EXISTS (SELECT * FROM test_table);
-- increment based on coaller
UPDATE test_table
SET {column} = {column} + 1;
COMMIT;
"""
)
def get_cursor():
"get a cursor on the airflow database"
url = settings.engine.url
host = url.host or "localhost"
port = str(url.port or "5432")
user = url.username or "postgres"
password = url.password or "postgres"
schema = url.database
conn = psycopg2.connect(
host=host, user=user, port=port, password=password, dbname=schema
)
conn.autocommit = True
cursor = conn.cursor()
return cursor
@setup
@task
def create_table():
cursor = get_cursor()
cursor.execute(update_template.format(column="value"))
cursor.execute("SELECT * FROM test_table;")
value = cursor.fetchall()[0][0]
print(f"value is {value}")
return value
@task()
def transform(value: int):
value = value + 1
return value
@task
def load(value: int):
print(f"Total value is: {value}")
@teardown
@task
def delete_table():
cursor = get_cursor()
cursor.execute("DROP TABLE test_table" )
@dag(start_date=datetime(1970, 1, 1), schedule=None, tags=["setup_teardown"])
def setup_teardown_sample_db():
s = create_table()
t = delete_table()
with s >> t:
load(transform(s))
setup_teardown_sample = setup_teardown_sample_db()Operating System
Linux
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct