Skip to content

Conversation

@danielzohar
Copy link
Contributor

Dear Airflow Maintainers,

Please accept this PR that addresses the following issues:
https://issues.apache.org/jira/browse/AIRFLOW-139

Re-posting from the JIRA comment:

This indeed causes problems with (for example) Redshift.
I agree special cases should be discouraged though the current implementation definitely feels like a special case handling.
There was no issue attached to the original commit (28da05d) but I'm supposing it came due to https://www.postgresql.org/docs/7.4/static/release-7-4.html.
The documentation states:
"The server-side autocommit setting was removed and reimplemented in client applications and languages. Server-side autocommit was causing too many problems with languages and applications that wanted to control their own autocommit behavior, so autocommit was removed from the server and added to individual client APIs as appropriate."
As far as I can see, psycopg2 supports setting autocommit and I'd be very surprised if it didn't handle it well.
I tested it locally and I can confirm it works well with 9.5.4 with the following code:

>>> import psycopg2
>>> conn_string = "host='127.0.0.1' dbname='db' user='user' password='pwd'"
>>> conn = psycopg2.connect(conn_string)
>>> conn.autocommit = True
>>> cursor = conn.cursor()
>>> cursor.execute('SELECT VERSION();')
>>> cursor.fetchall()
[('PostgreSQL 9.5.4 on x86_64-pc-linux-gnu, compiled by gcc (Debian 4.9.2-10) 4.9.2, 64-bit',)]

In all honesty, I'm not sure what was the purpose of the original fix and I think those lines should be removed.

With regards to tests, I can't find where tests for hooks exist at the moment and what kind of things we might want to test. Also, are there any best practices on how we test against databases if at all? I think the added code should have had unit-tests, not really sure what we might want to test by removing the code, that Postgres.Hook.autocommit = True?

@danielzohar
Copy link
Contributor Author

I can't make anything regarding the cause of the error on Travis can someone help with that?

@danielzohar danielzohar closed this Oct 6, 2016
@danielzohar danielzohar reopened this Oct 6, 2016
@joeschmid
Copy link
Contributor

+1

@danielzohar
Copy link
Contributor Author

Can someone please advise on why the tests are failing?

@r39132
Copy link
Contributor

r39132 commented Nov 15, 2016

@danielzohar

Refer to the last lines of output before the table of coverage results.

INFO  [root] Using connection to: localhost
ERROR [root] SET AUTOCOMMIT TO OFF is no longer supported
Traceback (most recent call last):
  File "/home/travis/build/apache/incubator-airflow/airflow/models.py", line 1288, in run
    result = task_copy.execute(context=context)
  File "/home/travis/build/apache/incubator-airflow/airflow/operators/generic_transfer.py", line 78, in execute
    destination_hook.insert_rows(table=self.destination_table, rows=results)
  File "/home/travis/build/apache/incubator-airflow/airflow/hooks/dbapi_hook.py", line 190, in insert_rows
    cur.execute('SET autocommit = 0')
NotSupportedError: SET AUTOCOMMIT TO OFF is no longer supported
INFO  [root] Marking task as FAILED.
ERROR [root] SET AUTOCOMMIT TO OFF is no longer supported

@xebab
Copy link

xebab commented Nov 15, 2016

The fix for AIRFLOW-533 should have fixed the test case failure for this one, too.

@r39132
Copy link
Contributor

r39132 commented Nov 15, 2016

@danielzohar rebase and retry. I can then review for merge.

@codecov-io
Copy link

codecov-io commented Nov 16, 2016

Current coverage is 65.93% (diff: 100%)

Merging #1821 into master will decrease coverage by 0.13%

@@             master      #1821   diff @@
==========================================
  Files           129        129          
  Lines          9907       9957    +50   
  Methods           0          0          
  Messages          0          0          
  Branches          0          0          
==========================================
+ Hits           6545       6565    +20   
- Misses         3362       3392    +30   
  Partials          0          0          

Powered by Codecov. Last update 365af16...0619a73

@danielzohar
Copy link
Contributor Author

@r39132 all done

@danielzohar
Copy link
Contributor Author

@r39132 just a reminder

@r39132
Copy link
Contributor

r39132 commented Nov 22, 2016

Thx :-) Looking now!

@r39132
Copy link
Contributor

r39132 commented Nov 22, 2016

@danielzohar Pls update the PostgresTest's tests under tests/operators/operators.py. Just add a test around vacuum analyze and anything else you deem interesting. Refer to my example DAG below and my subsequent comment which shows how it runs.

# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta

now = datetime.now()
now_to_the_hour = (now - timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0, second=0, microsecond=0)
START_DATE = now_to_the_hour 
DAG_NAME = 'test_dag_v2'

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': START_DATE,
}
dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *', default_args=default_args)
sql = """
        CREATE TABLE IF NOT EXISTS test_airflow (
            dummy VARCHAR(50)
        );
        """
t1 = PostgresOperator(task_id='postgres_operator_test', sql=sql, autocommit=True, dag=dag)

sql_2 = [
            "TRUNCATE TABLE test_airflow",
            "INSERT INTO test_airflow VALUES ('X')",
        ]

t2 = PostgresOperator(task_id='postgres_operator_test_t2', sql=sql_2, autocommit=True, dag=dag)
t2.set_upstream(t1)

sql_3 = [
            "VACUUM ANALYZE",
        ]

t3 = PostgresOperator(task_id='postgres_operator_test_t3', sql=sql_3, autocommit=True, dag=dag)
t3.set_upstream(t2)

@r39132
Copy link
Contributor

r39132 commented Nov 22, 2016

Very simple test dag that:

  • creates a table
  • truncates the table
  • inserts records
  • calls vacuum analyze

screenshot 2016-11-21 19 34 45

screenshot 2016-11-21 19 33 57

screenshot 2016-11-21 19 34 59

@r39132
Copy link
Contributor

r39132 commented Nov 22, 2016

@danielzohar Any updates?

@r39132
Copy link
Contributor

r39132 commented Nov 23, 2016

@danielzohar if not further activity on this in the next few days, I will close for inactivity.

@danielzohar
Copy link
Contributor Author

@r39132 All done. Also got rid of a few unused imports along the way

@r39132
Copy link
Contributor

r39132 commented Nov 23, 2016

thx.. looking now.

@r39132
Copy link
Contributor

r39132 commented Nov 23, 2016

I'm seeing this in my example DAG.

@mistercrunch @bolkedebruin @jlowin Any ideas on what the poison pill means and why I am seeing it?

[2016-11-23 12:29:55,312] {models.py:1273} INFO - Executing <Task(PythonOperator): task5> on 2016-09-01 02:00:00
[2016-11-23 12:29:55,341] {base_hook.py:67} INFO - Using connection to: localhost
[2016-11-23 12:30:05,350] {python_operator.py:81} INFO - Done. Returned value was: None
[2016-11-23 12:30:06,832] {jobs.py:2010} WARNING - State of this instance has been externally set to None. Taking the poison pill. So long.

@r39132
Copy link
Contributor

r39132 commented Nov 23, 2016

@danielzohar interesting.. tests are failing. I wonder if this is related to the poison pill messages I am seeing as well. Please fix.

root: INFO: Executing <Task(PostgresOperator): postgres_operator_test_vacuum> on 2015-01-01 00:00:00
root: INFO: Executing: VACUUM ANALYZE;
root: INFO: Using connection to: localhost
root: INFO: VACUUM ANALYZE;
root: ERROR: VACUUM cannot run inside a transaction block
Traceback (most recent call last):
  File "/home/travis/build/apache/incubator-airflow/airflow/models.py", line 1299, in run
    result = task_copy.execute(context=context)
  File "/home/travis/build/apache/incubator-airflow/airflow/operators/postgres_operator.py", line 53, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/home/travis/build/apache/incubator-airflow/airflow/hooks/dbapi_hook.py", line 171, in run
    cur.execute(s)
InternalError: VACUUM cannot run inside a transaction block

root: INFO: Marking task as FAILED.
root: ERROR: VACUUM cannot run inside a transaction block

@danielzohar
Copy link
Contributor Author

oh, silly me. That's the reason I created the PR in the first place! I set autocommit=True and it should be passing now! @r39132

@danielzohar
Copy link
Contributor Author

@danielzohar
Copy link
Contributor Author

Green!

@r39132
Copy link
Contributor

r39132 commented Nov 24, 2016

Nice.. retesting locally!

@r39132
Copy link
Contributor

r39132 commented Nov 24, 2016

It looks like there is a side-effect of this change in that the poison pill line is emitted at the end of dag runs! I need to understand what this means a little better before merging.

[2016-11-23 16:50:20,377] {models.py:1273} INFO - Executing <Task(PythonOperator): task5> on 2016-09-01 00:00:00
[2016-11-23 16:50:20,404] {base_hook.py:67} INFO - Using connection to: localhost
[2016-11-23 16:50:30,412] {python_operator.py:81} INFO - Done. Returned value was: None
[2016-11-23 16:50:35,185] {jobs.py:2010} WARNING - State of this instance has been externally set to None. Taking the poison pill. So long.

@r39132
Copy link
Contributor

r39132 commented Nov 24, 2016

@mistercrunch @bolkedebruin is this poison pill line innocuous?

@bolkedebruin
Copy link
Contributor

I don't like to poison pill thing. It means the state of the Taks has changed while it is at the executor. Will need to dive in to understand.

@r39132
Copy link
Contributor

r39132 commented Dec 5, 2016

@bolkedebruin Any further thoughts here?

@danielzohar
Copy link
Contributor Author

Worth stating that this is a very small change, a new test was added and the tests are green.

@danielzohar
Copy link
Contributor Author

Any thoughts on this?

@bolkedebruin
Copy link
Contributor

@danielzohar Sorry for taking so long. Can you provide operational details of this? I don't like what @r39132 is seeing and I don't have time to debug it myself.

@bolkedebruin
Copy link
Contributor

@danielzohar Ok I think this is ready for merging; The issue seen by @r39132 was unrelated and was fixed in @2016

@asfgit asfgit closed this in ac9167f Jan 24, 2017
alekstorm pushed a commit to alekstorm/incubator-airflow that referenced this pull request Jun 1, 2017
The server-side autocommit setting was removed and reimplemented
in client applications and languages. Server-side autocommit was
causing too many problems with languages and applications that
wanted to control their own autocommit behavior,
so autocommit was removed from the server and added to individual client APIs as appropriate

Closes apache#1821 from danielzohar/AIRFLOW-
139_vacuum_operator
@HansBambel
Copy link
Contributor

If anybody (like me) still faced this issue: you need to add autocommit=True to the PostgresOperator:

vacuum = PostgresOperator(
        task_id="vacuum-database",
        postgres_conn_id=db_conn_id,
        sql="VACUUM FULL;",
        autocommit=True,
    )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants