Add docker entrypoint to ensure db migration on startup #270
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This all looks great but I have some test failures locally that I'm curious about.
Test Failures
============================= test session starts ==============================
platform linux -- Python 3.9.7, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
rootdir: /usr/local/airflow
plugins: mock-3.6.1, socket-0.4.0, anyio-3.2.1
collected 692 items
tests/dags/test_common_api_workflows.py ......... [ 1%]
tests/dags/test_dag_parsing.py ............................. [ 5%]
tests/dags/common/test_requester.py ...... [ 6%]
tests/dags/common/test_urls.py ................ [ 8%]
tests/dags/common/licenses/test_constants.py .... [ 9%]
tests/dags/common/licenses/test_licenses.py ................ [ 11%]
tests/dags/provider_api_scripts/test_brooklyn_museum.py ................ [ 13%]
.... [ 14%]
tests/dags/provider_api_scripts/test_cleveland_museum_of_art.py ........ [ 15%]
... [ 16%]
tests/dags/provider_api_scripts/test_europeana.py ...................... [ 19%]
[ 19%]
tests/dags/provider_api_scripts/test_finnish_museums.py ............ [ 20%]
tests/dags/provider_api_scripts/test_flickr.py ......................... [ 24%]
................. [ 27%]
tests/dags/provider_api_scripts/test_jamendo.py ................. [ 29%]
tests/dags/provider_api_scripts/test_metropolitan_museum_of_art.py ..... [ 30%]
.. [ 30%]
tests/dags/provider_api_scripts/test_museum_victoria.py ................ [ 32%]
[ 32%]
tests/dags/provider_api_scripts/test_nypl.py ................ [ 35%]
tests/dags/provider_api_scripts/test_phylopic.py .............. [ 37%]
tests/dags/provider_api_scripts/test_raw_pixel.py .......... [ 38%]
tests/dags/provider_api_scripts/test_science_museum.py ................. [ 41%]
........... [ 42%]
tests/dags/provider_api_scripts/test_smithsonian.py .................... [ 45%]
........................................................................ [ 55%]
........................ [ 59%]
tests/dags/provider_api_scripts/test_staten_museum.py .................. [ 61%]
[ 61%]
tests/dags/provider_api_scripts/test_stocksnap.py .......... [ 63%]
tests/dags/provider_api_scripts/test_walters_art_museum.py ............. [ 65%]
... [ 65%]
tests/dags/provider_api_scripts/test_wikimedia_commons.py .............. [ 67%]
.................... [ 70%]
tests/dags/provider_api_scripts/test_wordpress.py ............... [ 72%]
tests/dags/provider_api_scripts/modules/test_etlMods.py ................ [ 75%]
[ 75%]
tests/dags/storage/test_audio.py ................ [ 77%]
tests/dags/storage/test_columns.py .................................... [ 82%]
tests/dags/storage/test_image.py ........... [ 84%]
tests/dags/storage/test_media.py ....................... [ 87%]
tests/dags/storage/test_util.py .... [ 88%]
tests/dags/util/test_dag_factory.py .. [ 88%]
tests/dags/util/test_helpers.py .. [ 88%]
tests/dags/util/test_log_cleanup.py ... [ 89%]
tests/dags/util/test_operator_util.py .FF [ 89%]
tests/dags/util/test_tsv_cleaner.py . [ 89%]
tests/dags/util/etl/test_operators.py ... [ 90%]
tests/dags/util/loader/test_ingestion_column.py ... [ 90%]
tests/dags/util/loader/test_paths.py FFFFF........... [ 92%]
tests/dags/util/loader/test_s3.py ...... [ 93%]
tests/dags/util/loader/test_smithsonian_unit_codes.py .. [ 94%]
tests/dags/util/loader/test_sql.py FF...........................F.... [ 98%]
tests/dags/util/popularity/test_sql.py ...... [ 99%]
tests/templates/test_create_api_script.py . [100%]
=================================== FAILURES ===================================
________________ test_get_dated_main_runner_handles_zero_shift _________________
self = <sqlalchemy.engine.base.Connection object at 0xffff8d73aa30>
dialect = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2'>>
statement = 'SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_ins...stance.task_id = %(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s \n LIMIT %(param_1)s FOR UPDATE'
parameters = {'dag_id_1': 'test_dag', 'execution_date_1': datetime.datetime(2019, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1, 'task_id_1': 'pull_image_data'}
args = (<sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0xffff8d73a430>, [immutabledict({})])
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0xffff8d73a370>
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d4405b0>
def _execute_context(
self, dialect, constructor, statement, parameters, *args
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.ResultProxy`.
"""
try:
try:
conn = self.__connection
except AttributeError:
# escape "except AttributeError" before revalidating
# to prevent misleading stacktraces in Py3K
conn = None
if conn is None:
conn = self._revalidate_connection()
context = constructor(dialect, self, conn, *args)
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
if context.compiled:
context.pre_exec()
cursor, statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
if not context.executemany:
parameters = parameters[0]
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
statement, parameters = fn(
self,
cursor,
statement,
parameters,
context,
context.executemany,
)
if self._echo:
self.engine.logger.info(statement)
if not self.engine.hide_parameters:
self.engine.logger.info(
"%r",
sql_util._repr_params(
parameters, batches=10, ismulti=context.executemany
),
)
else:
self.engine.logger.info(
"[SQL parameters hidden due to hide_parameters=True]"
)
evt_handled = False
try:
if context.executemany:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_executemany:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_executemany(
cursor, statement, parameters, context
)
elif not parameters and context.no_parameters:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute_no_params:
if fn(cursor, statement, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute_no_params(
cursor, statement, context
)
else:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
> self.dialect.do_execute(
cursor, statement, parameters, context
)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8d5ed310; closed: -1>
statement = 'SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_ins...stance.task_id = %(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s \n LIMIT %(param_1)s FOR UPDATE'
parameters = {'dag_id_1': 'test_dag', 'execution_date_1': datetime.datetime(2019, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1, 'task_id_1': 'pull_image_data'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d4405b0>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E psycopg2.errors.UndefinedTable: relation "task_instance" does not exist
E LINE 2: FROM task_instance
E ^
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: UndefinedTable
The above exception was the direct cause of the following exception:
capsys = <_pytest.capture.CaptureFixture object at 0xffff8d73afd0>
def test_get_dated_main_runner_handles_zero_shift(capsys):
dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d"))
execution_date = datetime.strptime("2019-01-01", "%Y-%m-%d").replace(
tzinfo=timezone.utc
)
main_func = dated
with dag:
runner = op_util.get_dated_main_runner_operator(main_func, timedelta(minutes=1))
ti = TaskInstance(runner, execution_date)
> ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
tests/dags/util/test_operator_util.py:27:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.local/lib/python3.9/site-packages/airflow/utils/session.py:70: in wrapper
return func(*args, session=session, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1415: in run
res = self.check_and_change_state_before_execution(
.local/lib/python3.9/site-packages/airflow/utils/session.py:67: in wrapper
return func(*args, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1023: in check_and_change_state_before_execution
self.refresh_from_db(session=session, lock_for_update=True)
.local/lib/python3.9/site-packages/airflow/utils/session.py:67: in wrapper
return func(*args, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:623: in refresh_from_db
ti = qry.with_for_update().first()
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3429: in first
ret = list(self[0:1])
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3203: in __getitem__
return list(res)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3535: in __iter__
return self._execute_and_instances(context)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3560: in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1011: in execute
return meth(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:298: in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1124: in _execute_clauseelement
ret = self._execute_context(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1316: in _execute_context
self._handle_dbapi_exception(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1510: in _handle_dbapi_exception
util.raise_(
.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py:182: in raise_
raise exception
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276: in _execute_context
self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8d5ed310; closed: -1>
statement = 'SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_ins...stance.task_id = %(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s \n LIMIT %(param_1)s FOR UPDATE'
parameters = {'dag_id_1': 'test_dag', 'execution_date_1': datetime.datetime(2019, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1, 'task_id_1': 'pull_image_data'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d4405b0>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "task_instance" does not exist
E LINE 2: FROM task_instance
E ^
E
E [SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id
E FROM task_instance
E WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s
E LIMIT %(param_1)s FOR UPDATE]
E [parameters: {'dag_id_1': 'test_dag', 'task_id_1': 'pull_image_data', 'execution_date_1': datetime.datetime(2019, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1}]
E (Background on this error at: http://sqlalche.me/e/13/f405)
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: ProgrammingError
_________________ test_get_dated_main_runner_handles_day_shift _________________
self = <sqlalchemy.engine.base.Connection object at 0xffff8d419790>
dialect = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2'>>
statement = 'SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_ins...stance.task_id = %(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s \n LIMIT %(param_1)s FOR UPDATE'
parameters = {'dag_id_1': 'test_dag', 'execution_date_1': datetime.datetime(2019, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1, 'task_id_1': 'pull_image_data'}
args = (<sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0xffff8d419730>, [immutabledict({})])
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0xffff8d4197f0>
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d419850>
def _execute_context(
self, dialect, constructor, statement, parameters, *args
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.ResultProxy`.
"""
try:
try:
conn = self.__connection
except AttributeError:
# escape "except AttributeError" before revalidating
# to prevent misleading stacktraces in Py3K
conn = None
if conn is None:
conn = self._revalidate_connection()
context = constructor(dialect, self, conn, *args)
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
if context.compiled:
context.pre_exec()
cursor, statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
if not context.executemany:
parameters = parameters[0]
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
statement, parameters = fn(
self,
cursor,
statement,
parameters,
context,
context.executemany,
)
if self._echo:
self.engine.logger.info(statement)
if not self.engine.hide_parameters:
self.engine.logger.info(
"%r",
sql_util._repr_params(
parameters, batches=10, ismulti=context.executemany
),
)
else:
self.engine.logger.info(
"[SQL parameters hidden due to hide_parameters=True]"
)
evt_handled = False
try:
if context.executemany:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_executemany:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_executemany(
cursor, statement, parameters, context
)
elif not parameters and context.no_parameters:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute_no_params:
if fn(cursor, statement, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute_no_params(
cursor, statement, context
)
else:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
> self.dialect.do_execute(
cursor, statement, parameters, context
)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8d5ed9a0; closed: -1>
statement = 'SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_ins...stance.task_id = %(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s \n LIMIT %(param_1)s FOR UPDATE'
parameters = {'dag_id_1': 'test_dag', 'execution_date_1': datetime.datetime(2019, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1, 'task_id_1': 'pull_image_data'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d419850>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E psycopg2.errors.UndefinedTable: relation "task_instance" does not exist
E LINE 2: FROM task_instance
E ^
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: UndefinedTable
The above exception was the direct cause of the following exception:
capsys = <_pytest.capture.CaptureFixture object at 0xffff8cfc0a90>
def test_get_dated_main_runner_handles_day_shift(capsys):
dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d"))
execution_date = datetime.strptime("2019-01-01", "%Y-%m-%d").replace(
tzinfo=timezone.utc
)
main_func = dated
with dag:
runner = op_util.get_dated_main_runner_operator(
main_func, timedelta(minutes=1), day_shift=1
)
ti = TaskInstance(runner, execution_date)
> ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
tests/dags/util/test_operator_util.py:46:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.local/lib/python3.9/site-packages/airflow/utils/session.py:70: in wrapper
return func(*args, session=session, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1415: in run
res = self.check_and_change_state_before_execution(
.local/lib/python3.9/site-packages/airflow/utils/session.py:67: in wrapper
return func(*args, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1023: in check_and_change_state_before_execution
self.refresh_from_db(session=session, lock_for_update=True)
.local/lib/python3.9/site-packages/airflow/utils/session.py:67: in wrapper
return func(*args, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:623: in refresh_from_db
ti = qry.with_for_update().first()
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3429: in first
ret = list(self[0:1])
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3203: in __getitem__
return list(res)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3535: in __iter__
return self._execute_and_instances(context)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3560: in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1011: in execute
return meth(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:298: in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1124: in _execute_clauseelement
ret = self._execute_context(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1316: in _execute_context
self._handle_dbapi_exception(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1510: in _handle_dbapi_exception
util.raise_(
.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py:182: in raise_
raise exception
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276: in _execute_context
self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8d5ed9a0; closed: -1>
statement = 'SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_ins...stance.task_id = %(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s \n LIMIT %(param_1)s FOR UPDATE'
parameters = {'dag_id_1': 'test_dag', 'execution_date_1': datetime.datetime(2019, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1, 'task_id_1': 'pull_image_data'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d419850>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "task_instance" does not exist
E LINE 2: FROM task_instance
E ^
E
E [SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id
E FROM task_instance
E WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s
E LIMIT %(param_1)s FOR UPDATE]
E [parameters: {'dag_id_1': 'test_dag', 'task_id_1': 'pull_image_data', 'execution_date_1': datetime.datetime(2019, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1}]
E (Background on this error at: http://sqlalche.me/e/13/f405)
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: ProgrammingError
__________________ test_stage_oldest_tsv_file_finds_tsv_file ___________________
self = <sqlalchemy.engine.base.Connection object at 0xffff8d442b50>
dialect = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2'>>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
args = (<sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0xffff8d442b80>, [immutabledict({})])
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0xffff8d442b20>
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d442be0>
def _execute_context(
self, dialect, constructor, statement, parameters, *args
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.ResultProxy`.
"""
try:
try:
conn = self.__connection
except AttributeError:
# escape "except AttributeError" before revalidating
# to prevent misleading stacktraces in Py3K
conn = None
if conn is None:
conn = self._revalidate_connection()
context = constructor(dialect, self, conn, *args)
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
if context.compiled:
context.pre_exec()
cursor, statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
if not context.executemany:
parameters = parameters[0]
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
statement, parameters = fn(
self,
cursor,
statement,
parameters,
context,
context.executemany,
)
if self._echo:
self.engine.logger.info(statement)
if not self.engine.hide_parameters:
self.engine.logger.info(
"%r",
sql_util._repr_params(
parameters, batches=10, ismulti=context.executemany
),
)
else:
self.engine.logger.info(
"[SQL parameters hidden due to hide_parameters=True]"
)
evt_handled = False
try:
if context.executemany:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_executemany:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_executemany(
cursor, statement, parameters, context
)
elif not parameters and context.no_parameters:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute_no_params:
if fn(cursor, statement, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute_no_params(
cursor, statement, context
)
else:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
> self.dialect.do_execute(
cursor, statement, parameters, context
)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8db62b80; closed: -1>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d442be0>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E psycopg2.errors.UndefinedTable: relation "xcom" does not exist
E LINE 1: DELETE FROM xcom WHERE xcom.key = 'media_type' AND xcom.exec...
E ^
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: UndefinedTable
The above exception was the direct cause of the following exception:
tmpdir = local('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_fin0')
def test_stage_oldest_tsv_file_finds_tsv_file(tmpdir):
tmp_directory = str(tmpdir)
identifier = TEST_ID
test_tsv = "test_v002_.tsv"
path = tmpdir.join(test_tsv)
path.write("")
> tsv_found = paths.stage_oldest_tsv_file(
tmp_directory,
identifier,
0,
ti,
)
tests/dags/util/loader/test_paths.py:24:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
openverse_catalog/dags/util/loader/paths.py:52: in stage_oldest_tsv_file
ti.xcom_push(key="media_type", value=media_type)
.local/lib/python3.9/site-packages/airflow/utils/session.py:70: in wrapper
return func(*args, session=session, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1937: in xcom_push
XCom.set(
.local/lib/python3.9/site-packages/airflow/utils/session.py:67: in wrapper
return func(*args, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/xcom.py:82: in set
session.query(cls).filter(
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3926: in delete
delete_op.exec_()
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1697: in exec_
self._do_exec()
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1930: in _do_exec
self._execute_stmt(delete_stmt)
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1702: in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3568: in _execute_crud
return conn.execute(stmt, self._params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1011: in execute
return meth(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:298: in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1124: in _execute_clauseelement
ret = self._execute_context(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1316: in _execute_context
self._handle_dbapi_exception(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1510: in _handle_dbapi_exception
util.raise_(
.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py:182: in raise_
raise exception
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276: in _execute_context
self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8db62b80; closed: -1>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d442be0>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "xcom" does not exist
E LINE 1: DELETE FROM xcom WHERE xcom.key = 'media_type' AND xcom.exec...
E ^
E
E [SQL: DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s]
E [parameters: {'key_1': 'media_type', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'task_id_1': 'op_no_dag', 'dag_id_1': 'adhoc_airflow'}]
E (Background on this error at: http://sqlalche.me/e/13/f405)
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: ProgrammingError
----------------------------- Captured stdout call -----------------------------
[�[34m2021-10-25 21:38:39,562�[0m] {�[34mpaths.py:�[0m109} INFO�[0m - getting files from /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_fin0�[0m
[�[34m2021-10-25 21:38:39,562�[0m] {�[34mpaths.py:�[0m111} INFO�[0m - found files:
['/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_fin0/test_v002_.tsv']�[0m
[�[34m2021-10-25 21:38:39,562�[0m] {�[34mpaths.py:�[0m113} INFO�[0m - last_modified_list:
[('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_fin0/test_v002_.tsv', 1635197919.561753)]�[0m
[�[34m2021-10-25 21:38:39,563�[0m] {�[34mpaths.py:�[0m132} INFO�[0m - Moving /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_fin0/test_v002_.tsv to /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_fin0/db_loader_staging/testing/test_v002_.tsv�[0m
------------------------------ Captured log call -------------------------------
INFO util.loader.paths:paths.py:109 getting files from /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_fin0
INFO util.loader.paths:paths.py:111 found files:
['/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_fin0/test_v002_.tsv']
INFO util.loader.paths:paths.py:113 last_modified_list:
[('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_fin0/test_v002_.tsv', 1635197919.561753)]
INFO util.loader.paths:paths.py:132 Moving /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_fin0/test_v002_.tsv to /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_fin0/db_loader_staging/testing/test_v002_.tsv
__________________ test_stage_oldest_tsv_file_stages_tsv_file __________________
self = <sqlalchemy.engine.base.Connection object at 0xffff8ce9ac10>
dialect = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2'>>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
args = (<sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0xffff8ce9aca0>, [immutabledict({})])
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0xffff8ce9ad60>
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8ce9ab20>
def _execute_context(
self, dialect, constructor, statement, parameters, *args
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.ResultProxy`.
"""
try:
try:
conn = self.__connection
except AttributeError:
# escape "except AttributeError" before revalidating
# to prevent misleading stacktraces in Py3K
conn = None
if conn is None:
conn = self._revalidate_connection()
context = constructor(dialect, self, conn, *args)
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
if context.compiled:
context.pre_exec()
cursor, statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
if not context.executemany:
parameters = parameters[0]
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
statement, parameters = fn(
self,
cursor,
statement,
parameters,
context,
context.executemany,
)
if self._echo:
self.engine.logger.info(statement)
if not self.engine.hide_parameters:
self.engine.logger.info(
"%r",
sql_util._repr_params(
parameters, batches=10, ismulti=context.executemany
),
)
else:
self.engine.logger.info(
"[SQL parameters hidden due to hide_parameters=True]"
)
evt_handled = False
try:
if context.executemany:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_executemany:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_executemany(
cursor, statement, parameters, context
)
elif not parameters and context.no_parameters:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute_no_params:
if fn(cursor, statement, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute_no_params(
cursor, statement, context
)
else:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
> self.dialect.do_execute(
cursor, statement, parameters, context
)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8db62c70; closed: -1>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8ce9ab20>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E psycopg2.errors.UndefinedTable: relation "xcom" does not exist
E LINE 1: DELETE FROM xcom WHERE xcom.key = 'media_type' AND xcom.exec...
E ^
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: UndefinedTable
The above exception was the direct cause of the following exception:
tmpdir = local('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta0')
def test_stage_oldest_tsv_file_stages_tsv_file(tmpdir):
tmp_directory = str(tmpdir)
staging_subdirectory = paths.STAGING_SUBDIRECTORY
identifier = TEST_ID
test_tsv = "test_v002_.tsv"
path = tmpdir.join(test_tsv)
path.write("")
> paths.stage_oldest_tsv_file(tmp_directory, identifier, 0, ti)
tests/dags/util/loader/test_paths.py:41:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
openverse_catalog/dags/util/loader/paths.py:52: in stage_oldest_tsv_file
ti.xcom_push(key="media_type", value=media_type)
.local/lib/python3.9/site-packages/airflow/utils/session.py:70: in wrapper
return func(*args, session=session, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1937: in xcom_push
XCom.set(
.local/lib/python3.9/site-packages/airflow/utils/session.py:67: in wrapper
return func(*args, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/xcom.py:82: in set
session.query(cls).filter(
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3926: in delete
delete_op.exec_()
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1697: in exec_
self._do_exec()
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1930: in _do_exec
self._execute_stmt(delete_stmt)
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1702: in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3568: in _execute_crud
return conn.execute(stmt, self._params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1011: in execute
return meth(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:298: in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1124: in _execute_clauseelement
ret = self._execute_context(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1316: in _execute_context
self._handle_dbapi_exception(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1510: in _handle_dbapi_exception
util.raise_(
.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py:182: in raise_
raise exception
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276: in _execute_context
self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8db62c70; closed: -1>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8ce9ab20>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "xcom" does not exist
E LINE 1: DELETE FROM xcom WHERE xcom.key = 'media_type' AND xcom.exec...
E ^
E
E [SQL: DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s]
E [parameters: {'key_1': 'media_type', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'task_id_1': 'op_no_dag', 'dag_id_1': 'adhoc_airflow'}]
E (Background on this error at: http://sqlalche.me/e/13/f405)
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: ProgrammingError
----------------------------- Captured stdout call -----------------------------
[�[34m2021-10-25 21:38:39,734�[0m] {�[34mpaths.py:�[0m109} INFO�[0m - getting files from /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta0�[0m
[�[34m2021-10-25 21:38:39,734�[0m] {�[34mpaths.py:�[0m111} INFO�[0m - found files:
['/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta0/test_v002_.tsv']�[0m
[�[34m2021-10-25 21:38:39,734�[0m] {�[34mpaths.py:�[0m113} INFO�[0m - last_modified_list:
[('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta0/test_v002_.tsv', 1635197919.732753)]�[0m
[�[34m2021-10-25 21:38:39,734�[0m] {�[34mpaths.py:�[0m132} INFO�[0m - Moving /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta0/test_v002_.tsv to /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta0/db_loader_staging/testing/test_v002_.tsv�[0m
------------------------------ Captured log call -------------------------------
INFO util.loader.paths:paths.py:109 getting files from /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta0
INFO util.loader.paths:paths.py:111 found files:
['/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta0/test_v002_.tsv']
INFO util.loader.paths:paths.py:113 last_modified_list:
[('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta0/test_v002_.tsv', 1635197919.732753)]
INFO util.loader.paths:paths.py:132 Moving /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta0/test_v002_.tsv to /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta0/db_loader_staging/testing/test_v002_.tsv
________ test_stage_oldest_tsv_file_removes_staged_file_from_output_dir ________
self = <sqlalchemy.engine.base.Connection object at 0xffff8d61ea90>
dialect = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2'>>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
args = (<sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0xffff8d61e9d0>, [immutabledict({})])
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0xffff8d61e670>
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d61e3d0>
def _execute_context(
self, dialect, constructor, statement, parameters, *args
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.ResultProxy`.
"""
try:
try:
conn = self.__connection
except AttributeError:
# escape "except AttributeError" before revalidating
# to prevent misleading stacktraces in Py3K
conn = None
if conn is None:
conn = self._revalidate_connection()
context = constructor(dialect, self, conn, *args)
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
if context.compiled:
context.pre_exec()
cursor, statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
if not context.executemany:
parameters = parameters[0]
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
statement, parameters = fn(
self,
cursor,
statement,
parameters,
context,
context.executemany,
)
if self._echo:
self.engine.logger.info(statement)
if not self.engine.hide_parameters:
self.engine.logger.info(
"%r",
sql_util._repr_params(
parameters, batches=10, ismulti=context.executemany
),
)
else:
self.engine.logger.info(
"[SQL parameters hidden due to hide_parameters=True]"
)
evt_handled = False
try:
if context.executemany:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_executemany:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_executemany(
cursor, statement, parameters, context
)
elif not parameters and context.no_parameters:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute_no_params:
if fn(cursor, statement, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute_no_params(
cursor, statement, context
)
else:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
> self.dialect.do_execute(
cursor, statement, parameters, context
)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8db62b80; closed: -1>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d61e3d0>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E psycopg2.errors.UndefinedTable: relation "xcom" does not exist
E LINE 1: DELETE FROM xcom WHERE xcom.key = 'media_type' AND xcom.exec...
E ^
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: UndefinedTable
The above exception was the direct cause of the following exception:
tmpdir = local('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_rem0')
def test_stage_oldest_tsv_file_removes_staged_file_from_output_dir(tmpdir):
tmp_directory = str(tmpdir)
identifier = TEST_ID
test_tsv = "test_v002_.tsv"
path = tmpdir.join(test_tsv)
path.write("")
> paths.stage_oldest_tsv_file(tmp_directory, identifier, 0, ti)
tests/dags/util/loader/test_paths.py:53:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
openverse_catalog/dags/util/loader/paths.py:52: in stage_oldest_tsv_file
ti.xcom_push(key="media_type", value=media_type)
.local/lib/python3.9/site-packages/airflow/utils/session.py:70: in wrapper
return func(*args, session=session, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1937: in xcom_push
XCom.set(
.local/lib/python3.9/site-packages/airflow/utils/session.py:67: in wrapper
return func(*args, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/xcom.py:82: in set
session.query(cls).filter(
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3926: in delete
delete_op.exec_()
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1697: in exec_
self._do_exec()
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1930: in _do_exec
self._execute_stmt(delete_stmt)
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1702: in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3568: in _execute_crud
return conn.execute(stmt, self._params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1011: in execute
return meth(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:298: in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1124: in _execute_clauseelement
ret = self._execute_context(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1316: in _execute_context
self._handle_dbapi_exception(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1510: in _handle_dbapi_exception
util.raise_(
.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py:182: in raise_
raise exception
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276: in _execute_context
self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8db62b80; closed: -1>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d61e3d0>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "xcom" does not exist
E LINE 1: DELETE FROM xcom WHERE xcom.key = 'media_type' AND xcom.exec...
E ^
E
E [SQL: DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s]
E [parameters: {'key_1': 'media_type', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'task_id_1': 'op_no_dag', 'dag_id_1': 'adhoc_airflow'}]
E (Background on this error at: http://sqlalche.me/e/13/f405)
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: ProgrammingError
----------------------------- Captured stdout call -----------------------------
[�[34m2021-10-25 21:38:39,888�[0m] {�[34mpaths.py:�[0m109} INFO�[0m - getting files from /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_rem0�[0m
[�[34m2021-10-25 21:38:39,888�[0m] {�[34mpaths.py:�[0m111} INFO�[0m - found files:
['/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_rem0/test_v002_.tsv']�[0m
[�[34m2021-10-25 21:38:39,888�[0m] {�[34mpaths.py:�[0m113} INFO�[0m - last_modified_list:
[('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_rem0/test_v002_.tsv', 1635197919.887753)]�[0m
[�[34m2021-10-25 21:38:39,888�[0m] {�[34mpaths.py:�[0m132} INFO�[0m - Moving /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_rem0/test_v002_.tsv to /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_rem0/db_loader_staging/testing/test_v002_.tsv�[0m
------------------------------ Captured log call -------------------------------
INFO util.loader.paths:paths.py:109 getting files from /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_rem0
INFO util.loader.paths:paths.py:111 found files:
['/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_rem0/test_v002_.tsv']
INFO util.loader.paths:paths.py:113 last_modified_list:
[('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_rem0/test_v002_.tsv', 1635197919.887753)]
INFO util.loader.paths:paths.py:132 Moving /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_rem0/test_v002_.tsv to /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_rem0/db_loader_staging/testing/test_v002_.tsv
_________________ test_stage_oldest_tsv_file_stages_older_file _________________
self = <sqlalchemy.engine.base.Connection object at 0xffff8cacc040>
dialect = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2'>>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
args = (<sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0xffff8cacc0d0>, [immutabledict({})])
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0xffff8cacc190>
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8cacc940>
def _execute_context(
self, dialect, constructor, statement, parameters, *args
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.ResultProxy`.
"""
try:
try:
conn = self.__connection
except AttributeError:
# escape "except AttributeError" before revalidating
# to prevent misleading stacktraces in Py3K
conn = None
if conn is None:
conn = self._revalidate_connection()
context = constructor(dialect, self, conn, *args)
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
if context.compiled:
context.pre_exec()
cursor, statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
if not context.executemany:
parameters = parameters[0]
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
statement, parameters = fn(
self,
cursor,
statement,
parameters,
context,
context.executemany,
)
if self._echo:
self.engine.logger.info(statement)
if not self.engine.hide_parameters:
self.engine.logger.info(
"%r",
sql_util._repr_params(
parameters, batches=10, ismulti=context.executemany
),
)
else:
self.engine.logger.info(
"[SQL parameters hidden due to hide_parameters=True]"
)
evt_handled = False
try:
if context.executemany:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_executemany:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_executemany(
cursor, statement, parameters, context
)
elif not parameters and context.no_parameters:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute_no_params:
if fn(cursor, statement, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute_no_params(
cursor, statement, context
)
else:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
> self.dialect.do_execute(
cursor, statement, parameters, context
)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8db62e50; closed: -1>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8cacc940>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E psycopg2.errors.UndefinedTable: relation "xcom" does not exist
E LINE 1: DELETE FROM xcom WHERE xcom.key = 'media_type' AND xcom.exec...
E ^
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: UndefinedTable
The above exception was the direct cause of the following exception:
tmpdir = local('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1')
def test_stage_oldest_tsv_file_stages_older_file(tmpdir):
tmp_directory = str(tmpdir)
staging_subdirectory = paths.STAGING_SUBDIRECTORY
identifier = TEST_ID
test_one_tsv = "test1_v001_.tsv"
test_two_tsv = "test2_v001_.tsv"
path_one = tmpdir.join(test_one_tsv)
path_one.write("")
time.sleep(0.01)
path_two = tmpdir.join(test_two_tsv)
path_two.write("")
> paths.stage_oldest_tsv_file(tmp_directory, identifier, 0, ti)
tests/dags/util/loader/test_paths.py:69:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
openverse_catalog/dags/util/loader/paths.py:52: in stage_oldest_tsv_file
ti.xcom_push(key="media_type", value=media_type)
.local/lib/python3.9/site-packages/airflow/utils/session.py:70: in wrapper
return func(*args, session=session, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1937: in xcom_push
XCom.set(
.local/lib/python3.9/site-packages/airflow/utils/session.py:67: in wrapper
return func(*args, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/xcom.py:82: in set
session.query(cls).filter(
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3926: in delete
delete_op.exec_()
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1697: in exec_
self._do_exec()
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1930: in _do_exec
self._execute_stmt(delete_stmt)
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1702: in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3568: in _execute_crud
return conn.execute(stmt, self._params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1011: in execute
return meth(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:298: in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1124: in _execute_clauseelement
ret = self._execute_context(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1316: in _execute_context
self._handle_dbapi_exception(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1510: in _handle_dbapi_exception
util.raise_(
.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py:182: in raise_
raise exception
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276: in _execute_context
self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8db62e50; closed: -1>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8cacc940>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "xcom" does not exist
E LINE 1: DELETE FROM xcom WHERE xcom.key = 'media_type' AND xcom.exec...
E ^
E
E [SQL: DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s]
E [parameters: {'key_1': 'media_type', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'task_id_1': 'op_no_dag', 'dag_id_1': 'adhoc_airflow'}]
E (Background on this error at: http://sqlalche.me/e/13/f405)
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: ProgrammingError
----------------------------- Captured stdout call -----------------------------
[�[34m2021-10-25 21:38:40,054�[0m] {�[34mpaths.py:�[0m109} INFO�[0m - getting files from /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1�[0m
[�[34m2021-10-25 21:38:40,054�[0m] {�[34mpaths.py:�[0m111} INFO�[0m - found files:
['/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1/test2_v001_.tsv', '/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1/test1_v001_.tsv']�[0m
[�[34m2021-10-25 21:38:40,054�[0m] {�[34mpaths.py:�[0m113} INFO�[0m - last_modified_list:
[('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1/test2_v001_.tsv', 1635197920.053753), ('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1/test1_v001_.tsv', 1635197920.041753)]�[0m
[�[34m2021-10-25 21:38:40,054�[0m] {�[34mpaths.py:�[0m132} INFO�[0m - Moving /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1/test1_v001_.tsv to /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1/db_loader_staging/testing/test1_v001_.tsv�[0m
------------------------------ Captured log call -------------------------------
INFO util.loader.paths:paths.py:109 getting files from /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1
INFO util.loader.paths:paths.py:111 found files:
['/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1/test2_v001_.tsv', '/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1/test1_v001_.tsv']
INFO util.loader.paths:paths.py:113 last_modified_list:
[('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1/test2_v001_.tsv', 1635197920.053753), ('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1/test1_v001_.tsv', 1635197920.041753)]
INFO util.loader.paths:paths.py:132 Moving /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1/test1_v001_.tsv to /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_sta1/db_loader_staging/testing/test1_v001_.tsv
________________ test_stage_oldest_tsv_file_ignores_newer_file _________________
self = <sqlalchemy.engine.base.Connection object at 0xffff8cd445e0>
dialect = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2'>>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
args = (<sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0xffff8cd446a0>, [immutabledict({})])
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0xffff8cd44670>
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8cd445b0>
def _execute_context(
self, dialect, constructor, statement, parameters, *args
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.ResultProxy`.
"""
try:
try:
conn = self.__connection
except AttributeError:
# escape "except AttributeError" before revalidating
# to prevent misleading stacktraces in Py3K
conn = None
if conn is None:
conn = self._revalidate_connection()
context = constructor(dialect, self, conn, *args)
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
if context.compiled:
context.pre_exec()
cursor, statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
if not context.executemany:
parameters = parameters[0]
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
statement, parameters = fn(
self,
cursor,
statement,
parameters,
context,
context.executemany,
)
if self._echo:
self.engine.logger.info(statement)
if not self.engine.hide_parameters:
self.engine.logger.info(
"%r",
sql_util._repr_params(
parameters, batches=10, ismulti=context.executemany
),
)
else:
self.engine.logger.info(
"[SQL parameters hidden due to hide_parameters=True]"
)
evt_handled = False
try:
if context.executemany:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_executemany:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_executemany(
cursor, statement, parameters, context
)
elif not parameters and context.no_parameters:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute_no_params:
if fn(cursor, statement, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute_no_params(
cursor, statement, context
)
else:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
> self.dialect.do_execute(
cursor, statement, parameters, context
)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8d55aa90; closed: -1>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8cd445b0>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E psycopg2.errors.UndefinedTable: relation "xcom" does not exist
E LINE 1: DELETE FROM xcom WHERE xcom.key = 'media_type' AND xcom.exec...
E ^
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: UndefinedTable
The above exception was the direct cause of the following exception:
tmpdir = local('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0')
def test_stage_oldest_tsv_file_ignores_newer_file(tmpdir):
tmp_directory = str(tmpdir)
staging_subdirectory = paths.STAGING_SUBDIRECTORY
identifier = TEST_ID
test_one_tsv = "test1_v002_.tsv"
test_two_tsv = "test2_v002_.tsv"
path_one = tmpdir.join(test_one_tsv)
path_one.write("")
time.sleep(0.01)
path_two = tmpdir.join(test_two_tsv)
path_two.write("")
> paths.stage_oldest_tsv_file(tmp_directory, identifier, 0, ti)
tests/dags/util/loader/test_paths.py:85:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
openverse_catalog/dags/util/loader/paths.py:52: in stage_oldest_tsv_file
ti.xcom_push(key="media_type", value=media_type)
.local/lib/python3.9/site-packages/airflow/utils/session.py:70: in wrapper
return func(*args, session=session, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1937: in xcom_push
XCom.set(
.local/lib/python3.9/site-packages/airflow/utils/session.py:67: in wrapper
return func(*args, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/xcom.py:82: in set
session.query(cls).filter(
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3926: in delete
delete_op.exec_()
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1697: in exec_
self._do_exec()
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1930: in _do_exec
self._execute_stmt(delete_stmt)
.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py:1702: in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3568: in _execute_crud
return conn.execute(stmt, self._params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1011: in execute
return meth(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:298: in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1124: in _execute_clauseelement
ret = self._execute_context(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1316: in _execute_context
self._handle_dbapi_exception(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1510: in _handle_dbapi_exception
util.raise_(
.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py:182: in raise_
raise exception
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276: in _execute_context
self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8d55aa90; closed: -1>
statement = 'DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'task_id_1': 'op_no_dag'}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8cd445b0>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "xcom" does not exist
E LINE 1: DELETE FROM xcom WHERE xcom.key = 'media_type' AND xcom.exec...
E ^
E
E [SQL: DELETE FROM xcom WHERE xcom.key = %(key_1)s AND xcom.execution_date = %(execution_date_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s]
E [parameters: {'key_1': 'media_type', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'task_id_1': 'op_no_dag', 'dag_id_1': 'adhoc_airflow'}]
E (Background on this error at: http://sqlalche.me/e/13/f405)
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: ProgrammingError
----------------------------- Captured stdout call -----------------------------
[�[34m2021-10-25 21:38:40,217�[0m] {�[34mpaths.py:�[0m109} INFO�[0m - getting files from /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0�[0m
[�[34m2021-10-25 21:38:40,217�[0m] {�[34mpaths.py:�[0m111} INFO�[0m - found files:
['/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0/test1_v002_.tsv', '/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0/test2_v002_.tsv']�[0m
[�[34m2021-10-25 21:38:40,217�[0m] {�[34mpaths.py:�[0m113} INFO�[0m - last_modified_list:
[('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0/test1_v002_.tsv', 1635197920.206753), ('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0/test2_v002_.tsv', 1635197920.216753)]�[0m
[�[34m2021-10-25 21:38:40,218�[0m] {�[34mpaths.py:�[0m132} INFO�[0m - Moving /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0/test1_v002_.tsv to /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0/db_loader_staging/testing/test1_v002_.tsv�[0m
------------------------------ Captured log call -------------------------------
INFO util.loader.paths:paths.py:109 getting files from /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0
INFO util.loader.paths:paths.py:111 found files:
['/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0/test1_v002_.tsv', '/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0/test2_v002_.tsv']
INFO util.loader.paths:paths.py:113 last_modified_list:
[('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0/test1_v002_.tsv', 1635197920.206753), ('/tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0/test2_v002_.tsv', 1635197920.216753)]
INFO util.loader.paths:paths.py:132 Moving /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0/test1_v002_.tsv to /tmp/pytest-of-airflow/pytest-0/test_stage_oldest_tsv_file_ign0/db_loader_staging/testing/test1_v002_.tsv
___________________ test_create_loading_table_creates_table ____________________
self = <sqlalchemy.engine.base.Connection object at 0xffff8ce820d0>
dialect = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2'>>
statement = 'SELECT xcom.value AS xcom_value \nFROM xcom \nWHERE xcom.key = %(key_1)s AND xcom.task_id = %(task_id_1)s AND xcom.da...xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC \n LIMIT %(param_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'param_1': 1, ...}
args = (<sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0xffff8ce82700>, [immutabledict({})])
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0xffff8ce82dc0>
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8ce82a90>
def _execute_context(
self, dialect, constructor, statement, parameters, *args
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.ResultProxy`.
"""
try:
try:
conn = self.__connection
except AttributeError:
# escape "except AttributeError" before revalidating
# to prevent misleading stacktraces in Py3K
conn = None
if conn is None:
conn = self._revalidate_connection()
context = constructor(dialect, self, conn, *args)
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
if context.compiled:
context.pre_exec()
cursor, statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
if not context.executemany:
parameters = parameters[0]
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
statement, parameters = fn(
self,
cursor,
statement,
parameters,
context,
context.executemany,
)
if self._echo:
self.engine.logger.info(statement)
if not self.engine.hide_parameters:
self.engine.logger.info(
"%r",
sql_util._repr_params(
parameters, batches=10, ismulti=context.executemany
),
)
else:
self.engine.logger.info(
"[SQL parameters hidden due to hide_parameters=True]"
)
evt_handled = False
try:
if context.executemany:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_executemany:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_executemany(
cursor, statement, parameters, context
)
elif not parameters and context.no_parameters:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute_no_params:
if fn(cursor, statement, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute_no_params(
cursor, statement, context
)
else:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
> self.dialect.do_execute(
cursor, statement, parameters, context
)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8c63f040; closed: -1>
statement = 'SELECT xcom.value AS xcom_value \nFROM xcom \nWHERE xcom.key = %(key_1)s AND xcom.task_id = %(task_id_1)s AND xcom.da...xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC \n LIMIT %(param_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'param_1': 1, ...}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8ce82a90>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E psycopg2.errors.UndefinedTable: relation "xcom" does not exist
E LINE 2: FROM xcom
E ^
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: UndefinedTable
The above exception was the direct cause of the following exception:
postgres = Postgres(cursor=<cursor object at 0xffff8c6f4e50; closed: 0>, connection=<connection object at 0xffff8c6dc180; dsn: 'user=deploy password=xxx dbname=openledger host=postgres port=5432', closed: 0>)
def test_create_loading_table_creates_table(postgres):
postgres_conn_id = POSTGRES_CONN_ID
identifier = TEST_ID
load_table = TEST_LOAD_TABLE
> sql.create_loading_table(postgres_conn_id, identifier, ti)
tests/dags/util/loader/test_sql.py:226:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
openverse_catalog/dags/util/loader/sql.py:69: in create_loading_table
media_type = ti.xcom_pull(task_ids="stage_oldest_tsv_file", key="media_type")
.local/lib/python3.9/site-packages/airflow/utils/session.py:70: in wrapper
return func(*args, session=session, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:2010: in xcom_pull
xcom = query.with_entities(XCom.value).first()
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3429: in first
ret = list(self[0:1])
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3203: in __getitem__
return list(res)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3535: in __iter__
return self._execute_and_instances(context)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3560: in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1011: in execute
return meth(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:298: in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1124: in _execute_clauseelement
ret = self._execute_context(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1316: in _execute_context
self._handle_dbapi_exception(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1510: in _handle_dbapi_exception
util.raise_(
.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py:182: in raise_
raise exception
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276: in _execute_context
self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8c63f040; closed: -1>
statement = 'SELECT xcom.value AS xcom_value \nFROM xcom \nWHERE xcom.key = %(key_1)s AND xcom.task_id = %(task_id_1)s AND xcom.da...xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC \n LIMIT %(param_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'param_1': 1, ...}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8ce82a90>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "xcom" does not exist
E LINE 2: FROM xcom
E ^
E
E [SQL: SELECT xcom.value AS xcom_value
E FROM xcom
E WHERE xcom.key = %(key_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s AND xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC
E LIMIT %(param_1)s]
E [parameters: {'key_1': 'media_type', 'task_id_1': 'stage_oldest_tsv_file', 'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1}]
E (Background on this error at: http://sqlalche.me/e/13/f405)
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: ProgrammingError
__________ test_create_loading_table_errors_if_run_twice_with_same_id __________
self = <sqlalchemy.engine.base.Connection object at 0xffff8cbc2b80>
dialect = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2'>>
statement = 'SELECT xcom.value AS xcom_value \nFROM xcom \nWHERE xcom.key = %(key_1)s AND xcom.task_id = %(task_id_1)s AND xcom.da...xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC \n LIMIT %(param_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'param_1': 1, ...}
args = (<sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0xffff8cbc2a30>, [immutabledict({})])
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0xffff8cbc2d30>
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8cbc29d0>
def _execute_context(
self, dialect, constructor, statement, parameters, *args
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.ResultProxy`.
"""
try:
try:
conn = self.__connection
except AttributeError:
# escape "except AttributeError" before revalidating
# to prevent misleading stacktraces in Py3K
conn = None
if conn is None:
conn = self._revalidate_connection()
context = constructor(dialect, self, conn, *args)
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
if context.compiled:
context.pre_exec()
cursor, statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
if not context.executemany:
parameters = parameters[0]
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
statement, parameters = fn(
self,
cursor,
statement,
parameters,
context,
context.executemany,
)
if self._echo:
self.engine.logger.info(statement)
if not self.engine.hide_parameters:
self.engine.logger.info(
"%r",
sql_util._repr_params(
parameters, batches=10, ismulti=context.executemany
),
)
else:
self.engine.logger.info(
"[SQL parameters hidden due to hide_parameters=True]"
)
evt_handled = False
try:
if context.executemany:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_executemany:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_executemany(
cursor, statement, parameters, context
)
elif not parameters and context.no_parameters:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute_no_params:
if fn(cursor, statement, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute_no_params(
cursor, statement, context
)
else:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
> self.dialect.do_execute(
cursor, statement, parameters, context
)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8c63fc70; closed: -1>
statement = 'SELECT xcom.value AS xcom_value \nFROM xcom \nWHERE xcom.key = %(key_1)s AND xcom.task_id = %(task_id_1)s AND xcom.da...xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC \n LIMIT %(param_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'param_1': 1, ...}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8cbc29d0>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E psycopg2.errors.UndefinedTable: relation "xcom" does not exist
E LINE 2: FROM xcom
E ^
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: UndefinedTable
The above exception was the direct cause of the following exception:
postgres = Postgres(cursor=<cursor object at 0xffff8c63fa90; closed: 0>, connection=<connection object at 0xffff8c6dca40; dsn: 'user=deploy password=xxx dbname=openledger host=postgres port=5432', closed: 0>)
def test_create_loading_table_errors_if_run_twice_with_same_id(postgres):
postgres_conn_id = POSTGRES_CONN_ID
identifier = TEST_ID
> sql.create_loading_table(postgres_conn_id, identifier, ti)
tests/dags/util/loader/test_sql.py:239:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
openverse_catalog/dags/util/loader/sql.py:69: in create_loading_table
media_type = ti.xcom_pull(task_ids="stage_oldest_tsv_file", key="media_type")
.local/lib/python3.9/site-packages/airflow/utils/session.py:70: in wrapper
return func(*args, session=session, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:2010: in xcom_pull
xcom = query.with_entities(XCom.value).first()
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3429: in first
ret = list(self[0:1])
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3203: in __getitem__
return list(res)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3535: in __iter__
return self._execute_and_instances(context)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3560: in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1011: in execute
return meth(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:298: in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1124: in _execute_clauseelement
ret = self._execute_context(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1316: in _execute_context
self._handle_dbapi_exception(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1510: in _handle_dbapi_exception
util.raise_(
.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py:182: in raise_
raise exception
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276: in _execute_context
self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8c63fc70; closed: -1>
statement = 'SELECT xcom.value AS xcom_value \nFROM xcom \nWHERE xcom.key = %(key_1)s AND xcom.task_id = %(task_id_1)s AND xcom.da...xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC \n LIMIT %(param_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'param_1': 1, ...}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8cbc29d0>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "xcom" does not exist
E LINE 2: FROM xcom
E ^
E
E [SQL: SELECT xcom.value AS xcom_value
E FROM xcom
E WHERE xcom.key = %(key_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s AND xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC
E LIMIT %(param_1)s]
E [parameters: {'key_1': 'media_type', 'task_id_1': 'stage_oldest_tsv_file', 'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1}]
E (Background on this error at: http://sqlalche.me/e/13/f405)
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: ProgrammingError
_______________________ test_drop_load_table_drops_table _______________________
self = <sqlalchemy.engine.base.Connection object at 0xffff8d457160>
dialect = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2'>>
statement = 'SELECT xcom.value AS xcom_value \nFROM xcom \nWHERE xcom.key = %(key_1)s AND xcom.task_id = %(task_id_1)s AND xcom.da...xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC \n LIMIT %(param_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'param_1': 1, ...}
args = (<sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0xffff8d4571f0>, [immutabledict({})])
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0xffff8d457310>
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d457640>
def _execute_context(
self, dialect, constructor, statement, parameters, *args
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.ResultProxy`.
"""
try:
try:
conn = self.__connection
except AttributeError:
# escape "except AttributeError" before revalidating
# to prevent misleading stacktraces in Py3K
conn = None
if conn is None:
conn = self._revalidate_connection()
context = constructor(dialect, self, conn, *args)
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
if context.compiled:
context.pre_exec()
cursor, statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
if not context.executemany:
parameters = parameters[0]
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
statement, parameters = fn(
self,
cursor,
statement,
parameters,
context,
context.executemany,
)
if self._echo:
self.engine.logger.info(statement)
if not self.engine.hide_parameters:
self.engine.logger.info(
"%r",
sql_util._repr_params(
parameters, batches=10, ismulti=context.executemany
),
)
else:
self.engine.logger.info(
"[SQL parameters hidden due to hide_parameters=True]"
)
evt_handled = False
try:
if context.executemany:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_executemany:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_executemany(
cursor, statement, parameters, context
)
elif not parameters and context.no_parameters:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute_no_params:
if fn(cursor, statement, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute_no_params(
cursor, statement, context
)
else:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
> self.dialect.do_execute(
cursor, statement, parameters, context
)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8d49ee50; closed: -1>
statement = 'SELECT xcom.value AS xcom_value \nFROM xcom \nWHERE xcom.key = %(key_1)s AND xcom.task_id = %(task_id_1)s AND xcom.da...xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC \n LIMIT %(param_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'param_1': 1, ...}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d457640>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E psycopg2.errors.UndefinedTable: relation "xcom" does not exist
E LINE 2: FROM xcom
E ^
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: UndefinedTable
The above exception was the direct cause of the following exception:
postgres_with_load_table = Postgres(cursor=<cursor object at 0xffff8d55aa90; closed: 0>, connection=<connection object at 0xffff8c6dc180; dsn: 'user=deploy password=xxx dbname=openledger host=postgres port=5432', closed: 0>)
def test_drop_load_table_drops_table(postgres_with_load_table):
postgres_conn_id = POSTGRES_CONN_ID
identifier = TEST_ID
load_table = TEST_LOAD_TABLE
> sql.drop_load_table(postgres_conn_id, identifier, ti)
tests/dags/util/loader/test_sql.py:1252:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
openverse_catalog/dags/util/loader/sql.py:302: in drop_load_table
media_type = ti.xcom_pull(task_ids="stage_oldest_tsv_file", key="media_type")
.local/lib/python3.9/site-packages/airflow/utils/session.py:70: in wrapper
return func(*args, session=session, **kwargs)
.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:2010: in xcom_pull
xcom = query.with_entities(XCom.value).first()
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3429: in first
ret = list(self[0:1])
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3203: in __getitem__
return list(res)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3535: in __iter__
return self._execute_and_instances(context)
.local/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3560: in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1011: in execute
return meth(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:298: in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1124: in _execute_clauseelement
ret = self._execute_context(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1316: in _execute_context
self._handle_dbapi_exception(
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1510: in _handle_dbapi_exception
util.raise_(
.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py:182: in raise_
raise exception
.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1276: in _execute_context
self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0xffff903a49a0>
cursor = <cursor object at 0xffff8d49ee50; closed: -1>
statement = 'SELECT xcom.value AS xcom_value \nFROM xcom \nWHERE xcom.key = %(key_1)s AND xcom.task_id = %(task_id_1)s AND xcom.da...xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC \n LIMIT %(param_1)s'
parameters = {'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'key_1': 'media_type', 'param_1': 1, ...}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0xffff8d457640>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "xcom" does not exist
E LINE 2: FROM xcom
E ^
E
E [SQL: SELECT xcom.value AS xcom_value
E FROM xcom
E WHERE xcom.key = %(key_1)s AND xcom.task_id = %(task_id_1)s AND xcom.dag_id = %(dag_id_1)s AND xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC
E LIMIT %(param_1)s]
E [parameters: {'key_1': 'media_type', 'task_id_1': 'stage_oldest_tsv_file', 'dag_id_1': 'adhoc_airflow', 'execution_date_1': datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1}]
E (Background on this error at: http://sqlalche.me/e/13/f405)
.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py:608: ProgrammingError
=========================== short test summary info ============================
FAILED tests/dags/util/test_operator_util.py::test_get_dated_main_runner_handles_zero_shift
FAILED tests/dags/util/test_operator_util.py::test_get_dated_main_runner_handles_day_shift
FAILED tests/dags/util/loader/test_paths.py::test_stage_oldest_tsv_file_finds_tsv_file
FAILED tests/dags/util/loader/test_paths.py::test_stage_oldest_tsv_file_stages_tsv_file
FAILED tests/dags/util/loader/test_paths.py::test_stage_oldest_tsv_file_removes_staged_file_from_output_dir
FAILED tests/dags/util/loader/test_paths.py::test_stage_oldest_tsv_file_stages_older_file
FAILED tests/dags/util/loader/test_paths.py::test_stage_oldest_tsv_file_ignores_newer_file
FAILED tests/dags/util/loader/test_sql.py::test_create_loading_table_creates_table
FAILED tests/dags/util/loader/test_sql.py::test_create_loading_table_errors_if_run_twice_with_same_id
FAILED tests/dags/util/loader/test_sql.py::test_drop_load_table_drops_table
================== 10 failed, 682 passed in 78.26s (0:01:18) ===================
CMD python wait_for_db.py && \ | ||
airflow db init && \ | ||
airflow users create -r Admin -u airflow -f Air -l Flow -p airflow --email airflow@example.org && \ | ||
(airflow scheduler & airflow webserver) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
Ah shoot, I didn't specify that the containers need to be rebuilt. Let me add that to the instructions (you'll need to run |
60ef707
to
96d344b
Compare
96d344b
to
e58363e
Compare
@AetherUnbound yep, thanks for the reminder! Tests pass now. |
@@ -53,7 +53,7 @@ lint: | |||
pre-commit run --all-files | |||
|
|||
# Mount the tests directory and run a particular command | |||
@_mount-tests command: up | |||
@_mount-tests command: (up "postgres s3") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know it was possible to pass args here!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just
can do so much! 🤯 I just wish it was easier to find stuff in their documentation...lol
Fixes
Fixes #227 by @AetherUnbound
Description
This PR adds a docker entrypoint for the Airflow container. This ensures that
wait_for_db.py
andairflow db upgrade
are always run, regardless of the command supplied. This means that tests run (or any command run) on a fresh install will have a functional Airflow metastore prior to running.I added an
init
entrypoint command, which will create an admin user in addition to the above steps. This is now the default command for development.just test
will now only start up the necessary services, rather than the entire stack, since the webserver doesn't need to be running in order to run the tests.I also added a
help
command, which will print the help text in the entrypoint and exit. That functionally serves as internal documentation as well, since most users aren't likely to run the container withdc run webserver help
.Example output
just test
from fresh instanceSample output on initialized instance
Testing Instructions
just down -v
just build
just test
postgres
ands3
services are started and that the DB is migrated successfully beforehand.dc run --rm webserver help
Checklist
Update index.md
).main
) or a parent feature branch.Developer Certificate of Origin
Developer Certificate of Origin