Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove create table hook on upgradedb #40

Merged
merged 2 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ wheels/
# Testing
/.venv/
testdb.sqlite
env
env/
58 changes: 35 additions & 23 deletions astronomer/airflow/version_check/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from airflow.configuration import conf
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.db import create_session
from sqlalchemy import inspect

from .update_checks import UpdateAvailableBlueprint

Expand Down Expand Up @@ -39,10 +40,11 @@ def on_load(cls, *args, **kwargs):
log.debug("Skipping running update_check_plugin as [astronomer] update_check_interval = 0")
return

import airflow.utils.db
if not cls.all_table_created():
cls.create_db_tables()

import airflow.jobs.scheduler_job
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved

cls.add_before_call(airflow.utils.db, 'upgradedb', cls.create_db_tables)
try:
cls.add_before_call(
airflow.jobs.scheduler_job.SchedulerJob, '_execute_helper', cls.start_update_thread
Expand All @@ -57,14 +59,12 @@ def start_update_thread(cls):
from .models import AstronomerVersionCheck
from .update_checks import CheckThread

with create_session() as session:
engine = session.get_bind(mapper=None, clause=None)
if not engine.has_table(AstronomerVersionCheck.__tablename__):
log.warning(
"AstronomerVersionCheck tables are missing (plugin not installed at upgradedb "
"time?). No update checks will be performed"
)
return
if not cls.all_table_created():
log.warning(
"AstronomerVersionCheck tables are missing (plugin not installed at upgradedb "
"time?). No update checks will be performed"
)
return

AstronomerVersionCheck.ensure_singleton()
CheckThread().start()
Expand All @@ -76,19 +76,31 @@ def create_db_tables(cls):
with create_session() as session:
try:
engine = session.get_bind(mapper=None, clause=None)
if not engine.has_table(AstronomerVersionCheck.__tablename__) or not engine.has_table(
AstronomerAvailableVersion.__tablename__
):
log.info("Creating DB tables for %s", __name__)
metadata = AstronomerVersionCheck.metadata
metadata.create_all(
bind=engine,
tables=[
metadata.tables[c.__tablename__]
for c in [AstronomerVersionCheck, AstronomerAvailableVersion]
],
)
log.info("Created")
log.info("Creating DB tables for %s", __name__)
metadata = AstronomerVersionCheck.metadata
metadata.create_all(
bind=engine,
tables=[
metadata.tables[c.__tablename__]
for c in [AstronomerVersionCheck, AstronomerAvailableVersion]
],
)
log.info("Created")
except Exception:
log.exception("Error creating tables")
exit(1)

@classmethod
def all_table_created(cls):
"""Check if there are missing tables"""
from .models import AstronomerAvailableVersion, AstronomerVersionCheck

tables = [AstronomerAvailableVersion, AstronomerVersionCheck]
with create_session() as session:
engine = session.get_bind(mapper=None, clause=None)
inspector = inspect(engine)
for table in tables:
if not inspector.has_table(table.__tablename__):
# return early
return False
return True
4 changes: 3 additions & 1 deletion astronomer/airflow/version_check/update_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pendulum
import requests
import sqlalchemy.exc
from sqlalchemy import inspect
from flask import Blueprint, current_app
from flask_appbuilder.api import BaseApi, expose
from flask_appbuilder.security.decorators import protect
Expand Down Expand Up @@ -314,7 +315,8 @@ def register(self, app, *args, **kwargs):

with create_session() as session:
engine = session.get_bind(mapper=None, clause=None)
if not engine.has_table(AstronomerVersionCheck.__tablename__):
inspector = inspect(engine)
if not inspector.has_table(AstronomerVersionCheck.__tablename__):
self.log.warning(
"AstronomerVersionCheck tables are missing (plugin not installed at upgradedb "
"time?). No update checks will be performed"
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[build-system]
requires = ["setuptools>=60.5.0", "wheel", "pytest-runner~=5.3"]
requires = ["setuptools==63.4.3", "wheel", "pytest-runner~=5.3"]

[tool.black]
line-length = 110
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ def pytest_configure(config):
config.addinivalue_line("markers", "login_as")


@pytest.yield_fixture
@pytest.fixture
def client(app, user, request):
"""The test client, optionally logged in as a user of the given role

Expand Down
3 changes: 1 addition & 2 deletions tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ def test_plugin_registered():
assert plugins_manager.plugins[0].flask_blueprints != []


@pytest.mark.xfail(condition=True, reason="Needs deeper investigation")
@pytest.mark.login_as('Admin')
def test_logged_in(client):
response = client.get(url_for('Airflow.index'))
response = client.get(url_for('Airflow.index'), follow_redirects=True)
assert response.status_code == 200
assert b"update-notice.css" in response.data, "Ensure our template customizations are shown"

Expand Down
20 changes: 20 additions & 0 deletions tests/test_update_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,23 @@ def test_update_check_dont_show_update_if_no_new_version_available(mock_ac_versi
result = blueprint.available_update()
# Nothing would be displayed if there is no new version available
assert result is None


def test_plugin_table_created(app, session):
from airflow.cli.commands.standalone_command import standalone
from sqlalchemy import inspect
import threading

engine = session.get_bind(mapper=None, clause=None)
inspector = inspect(engine)
with app.app_context():
thread = threading.Thread(target=standalone, args=('webserver',))
thread.daemon = True
thread.start()
while thread.isAlive():
if inspector.has_table('task_instance'):
break
for _ in range(10):
x = inspector.has_table('astro_version_check')
assert x
thread.join(timeout=1)