Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to use transform on SQLite table #159

Closed
tatiana opened this issue Mar 3, 2022 · 4 comments
Closed

Unable to use transform on SQLite table #159

tatiana opened this issue Mar 3, 2022 · 4 comments
Assignees
Labels
bug Something isn't working
Milestone

Comments

@tatiana
Copy link
Collaborator

tatiana commented Mar 3, 2022

Version: astro 0.6.0

How to reproduce the problem (originally found in branch readme as part of the PR #157 ):

from datetime import datetime

from airflow import DAG

from astro import sql as aql
from astro.sql.table import Table

START_DATE = datetime(2000, 1, 1)


@aql.transform()
def top_five_animations(input_table: Table):
    return """
        SELECT Title, Rating
        FROM {{input_table}}
        WHERE Genre1=='Animation'
        ORDER BY Rating desc
        LIMIT 5;
    """


with DAG(
    "example_sqlite_load_transform",
    schedule_interval=None,
    start_date=START_DATE,
    catchup=False,
) as dag:

    imdb_movies = aql.load_file(
        path="https://raw.githubusercontent.com/astro-projects/astro/readme/tests/data/imdb.csv",
        task_id="load_csv",
        output_table=Table(
            table_name="imdb_movies", database="sqlite", conn_id="sqlite_default"
        ),
    )

    top_five_animations(
        input_table=imdb_movies,
        output_table=Table(
            table_name="top_animation", database="sqlite", conn_id="sqlite_default"
        ),
    )

Error message:

cceeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2022-03-03 16:24:46,436] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: example_sqlite_load_transform.top_five_animations backfill__2022-03-03T00:00:00+00:00 [queued]>']
[2022-03-03 16:24:46,489] {taskinstance.py:1429} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=example_sqlite_load_transform
AIRFLOW_CTX_TASK_ID=top_five_animations
AIRFLOW_CTX_EXECUTION_DATE=2022-03-03T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2022-03-03T00:00:00+00:00
[2022-03-03 16:24:46,493] {base.py:70} INFO - Using connection to: id: sqlite_default. Host: /tmp/sqlite_default.db, Port: None, Schema: , Login: , Password: None, extra: {}
[2022-03-03 16:24:46,498] {taskinstance.py:1718} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1334, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1460, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1516, in _execute_task
    result = execute_callable(context=context)
  File "/home/tati/Code/astro/src/astro/sql/operators/sql_decorator.py", line 125, in execute
    hook=self.hook, schema=self.schema, conn_type=self.conn_type
AttributeError: 'SqlDecoratoratedOperator' object has no attribute 'hook'
[2022-03-03 16:24:46,508] {taskinstance.py:1272} INFO - Marking task as FAILED. dag_id=example_sqlite_load_transform, task_id=top_five_scify_movies, execution_date=20220303T000000, start_date=20220303T135514, end_date=20220303T162446
[2022-03-03 16:24:46,528] {debug_executor.py:87} ERROR - Failed to execute task: 'SqlDecoratoratedOperator' object has no attribute 'hook'.
Traceback (most recent call last):
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/executors/debug_executor.py", line 79, in _run_task
    ti._run_raw_task(job_id=ti.job_id, **params)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1334, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1460, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1516, in _execute_task
    result = execute_callable(context=context)
  File "/home/tati/Code/astro/src/astro/sql/operators/sql_decorator.py", line 125, in execute
    hook=self.hook, schema=self.schema, conn_type=self.conn_type
AttributeError: 'SqlDecoratoratedOperator' object has no attribute 'hook'
@tatiana tatiana added this to the 0.6.1 milestone Mar 3, 2022
@tatiana
Copy link
Collaborator Author

tatiana commented Mar 3, 2022

Parts of the base SQLDecorator class did not take into account SQLite support. I refactored the necessary parts to solve this error message, and also the one which came after this issue was solved:

[2022-03-03 22:23:01,827] {debug_executor.py:87} ERROR - Failed to execute task: local variable 'results' referenced before assignment.
Traceback (most recent call last):
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/executors/debug_executor.py", line 79, in _run_task
    ti._run_raw_task(job_id=ti.job_id, **params)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1334, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1460, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1516, in _execute_task
    result = execute_callable(context=context)
  File "/home/tati/Code/astro/src/astro/sql/operators/sql_decorator.py", line 131, in execute
    if not self.raw_sql:
  File "/home/tati/Code/astro/src/astro/sql/operators/sql_decorator.py", line 238, in _set_schema_if_needed
    self._run_sql_string(schema_statement, {})
  File "/home/tati/Code/astro/src/astro/sql/operators/sql_decorator.py", line 278, in _run_sql_string
    return results
UnboundLocalError: local variable 'results' referenced before assignment

@tatiana
Copy link
Collaborator Author

tatiana commented Mar 3, 2022

After solving hook-related issues, there was an issue with the SQL clause to create a table based on a select statement. The currently used one did not work with SQLite:

Traceback (most recent call last):
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1334, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1460, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1516, in _execute_task
    result = execute_callable(context=context)
  File "/home/tati/Code/astro/src/astro/sql/operators/sql_decorator.py", line 129, in execute
    if not self.raw_sql:
  File "/home/tati/Code/astro/src/astro/sql/operators/sql_decorator.py", line 269, in _run_sql_alchemy_obj
    conn = engine.connect()
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2263, in connect
    return self._connection_cls(self, **kwargs)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 104, in __init__
    else engine.raw_connection()
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2369, in raw_connection
    return self._wrap_pool_connect(
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2339, in _wrap_pool_connect
    Connection._handle_dbapi_exception_noconnection(
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1583, in _handle_dbapi_exception_noconnection
    util.raise_(
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2336, in _wrap_pool_connect
    return fn()
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 304, in unique_connection
    return _ConnectionFairy._checkout(self)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 778, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 495, in checkout
    rec = pool._do_get()
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/pool/impl.py", line 241, in _do_get
    return self._create_connection()
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 309, in _create_connection
    return _ConnectionRecord(self)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 440, in __init__
    self.__connect(first_connect_check=True)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 661, in __connect
    pool.logger.debug("Error on connect(): %s", e)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.raise_(
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 656, in __connect
    connection = pool._invoke_creator(self)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/engine/strategies.py", line 114, in connect
    return dialect.connect(*cargs, **cparams)
  File "/home/tati/.virtualenvs/astro-main/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 508, in connect
    return self.dbapi.connect(*cargs, **cparams)
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) unable to open database file
(Background on this error at: http://sqlalche.me/e/13/e3q8)

@tatiana
Copy link
Collaborator Author

tatiana commented Mar 3, 2022

This issue was solved by removing the parenthesis from the SQL statement if we run it using SQLite only:

CREATE TABLE top_scify AS (SELECT * FROM imdb_movies);

@tatiana tatiana added the bug Something isn't working label Mar 4, 2022
@tatiana tatiana self-assigned this Mar 4, 2022
tatiana added a commit that referenced this issue Mar 4, 2022
@tatiana
Copy link
Collaborator Author

tatiana commented Mar 4, 2022

Ready for review: #157

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant