Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Facade collection update #2184

Merged
merged 15 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 20 additions & 17 deletions augur/application/cli/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,7 @@ def start(disable_collection, development, port):
celery_beat_process.terminate()

try:
clear_redis_caches()
connection_string = ""
with DatabaseSession(logger) as session:
config = AugurConfig(logger, session)
connection_string = config.get_section("RabbitMQ")['connection_string']

clear_rabbitmq_messages(connection_string)

cleanup_after_collection_halt(logger)
except RedisConnectionError:
pass

Expand All @@ -191,13 +184,7 @@ def stop():
logger = logging.getLogger("augur.cli")
_broadcast_signal_to_processes(given_logger=logger)

clear_redis_caches()
connection_string = ""
with DatabaseSession(logger) as session:
config = AugurConfig(logger, session)
connection_string = config.get_section("RabbitMQ")['connection_string']

clear_rabbitmq_messages(connection_string)
cleanup_after_collection_halt(logger)

@cli.command('kill')
def kill():
Expand All @@ -207,15 +194,18 @@ def kill():
logger = logging.getLogger("augur.cli")
_broadcast_signal_to_processes(broadcast_signal=signal.SIGKILL, given_logger=logger)

clear_redis_caches()
cleanup_after_collection_halt(logger)

def cleanup_after_collection_halt(logger):
clear_redis_caches()
connection_string = ""
with DatabaseSession(logger) as session:
config = AugurConfig(logger, session)
connection_string = config.get_section("RabbitMQ")['connection_string']

clear_rabbitmq_messages(connection_string)
clean_collection_status(session)

clear_rabbitmq_messages(connection_string)

def clear_redis_caches():
"""Clears the redis databases that celery and redis use."""
Expand All @@ -232,6 +222,19 @@ def clear_rabbitmq_messages(connection_string):
rabbitmq_purge_command = f"sudo rabbitmqctl purge_queue celery -p {virtual_host_string}"
subprocess.call(rabbitmq_purge_command.split(" "))

#Make sure that database reflects collection status when processes are killed/stopped.
def clean_collection_status(session):
session.execute_sql(s.sql.text("""
UPDATE augur_operations.collection_status
SET core_status='Pending'
WHERE core_status='Collecting';
UPDATE augur_operations.collection_status
SET secondary_status='Pending'
WHERE secondary_status='Collecting';
UPDATE augur_operations.collection_status
SET facade_status='Pending'
WHERE facade_status='Collecting';
"""))

@cli.command('export-env')
def export_env(config):
Expand Down
7 changes: 3 additions & 4 deletions augur/application/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,17 @@ def get_development_flag():
},
"Facade": {
"check_updates": 1,
"clone_repos": 1,
"create_xlsx_summary_files": 1,
"delete_marked_repos": 0,
"fix_affiliations": 1,
"force_analysis": 1,
"force_invalidate_caches": 1,
"force_updates": 1,
"limited_run": 0,
"multithreaded": 1,
"nuke_stored_affiliations": 0,
"pull_repos": 1,
"rebuild_caches": 1,
"run_analysis": 1
"run_analysis": 1,
"run_facade_contributors": 1
},
"Server": {
"cache_expire": "3600",
Expand Down Expand Up @@ -102,6 +100,7 @@ def get_development_flag():
"prelim_phase": 1,
"primary_repo_collect_phase": 1,
"secondary_repo_collect_phase": 1,
"facade_phase": 1,
"machine_learning_phase": 0
}
}
Expand Down
9 changes: 6 additions & 3 deletions augur/application/db/models/augur_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,14 +814,17 @@ class Repo(Base):
ForeignKey("augur_data.repo_groups.repo_group_id"), nullable=False
)
repo_git = Column(String, nullable=False)

#TODO: repo_path and repo_name should be generated columns in postgresql
repo_path = Column(String)
repo_name = Column(String)
repo_added = Column(
TIMESTAMP(precision=0), nullable=False, server_default=text("CURRENT_TIMESTAMP")
)
repo_status = Column(
String, nullable=False, server_default=text("'New'::character varying")
)

#repo_status = Column(
# String, nullable=False, server_default=text("'New'::character varying")
#)
repo_type = Column(
String,
server_default=text("''::character varying"),
Expand Down
9 changes: 9 additions & 0 deletions augur/application/db/models/augur_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,15 @@ class CollectionStatus(Base):
secondary_task_id = Column(String)
event_last_collected = Column(TIMESTAMP)

facade_status = Column(String,nullable=False, server_default=text("'Pending'"))
facade_data_last_collected = Column(TIMESTAMP)
facade_task_id = Column(String)


repo_status = Column(
String, nullable=False, server_default=text("'New'::character varying")
)

repo = relationship("Repo", back_populates="collection_status")

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""change CollectionStatus table to keep track of facade independently

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@IsaacMilarky This should only include, the changes that are needed for this pr. I can change this if you would like

Revision ID: 6
Revises: 5
Create Date: 2023-02-16 12:45:57.486871

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from sqlalchemy.sql import text

# revision identifiers, used by Alembic.
revision = '6'
down_revision = '5'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('releases', 'release_id',
existing_type=sa.CHAR(length=256),
type_=sa.CHAR(length=128),
existing_nullable=False,
existing_server_default=sa.text('nextval(\'"augur_data".releases_release_id_seq\'::regclass)'),
schema='augur_data')


op.drop_column('repo', 'repo_status', schema='augur_data')
op.add_column('collection_status', sa.Column('facade_status', sa.String(), server_default=sa.text("'Pending'"), nullable=False), schema='augur_operations')
op.add_column('collection_status', sa.Column('facade_data_last_collected', postgresql.TIMESTAMP(), nullable=True), schema='augur_operations')
op.add_column('collection_status', sa.Column('facade_task_id', sa.String(), nullable=True), schema='augur_operations')
op.add_column('collection_status', sa.Column('repo_status', sa.String(), server_default=sa.text("'New'::character varying"), nullable=False), schema='augur_operations')

#Recreate the foreign key
op.drop_constraint('collection_status_repo_id_fk', 'collection_status', schema='augur_operations', type_='foreignkey')
op.create_foreign_key('collection_status_repo_id_fk', 'collection_status', 'repo', ['repo_id'], ['repo_id'], source_schema='augur_operations', referent_schema='augur_data')

op.alter_column('user_groups', 'user_id',
existing_type=sa.INTEGER(),
nullable=True,
schema='augur_operations')
op.drop_constraint('user_groups_user_id_name_key', 'user_groups', schema='augur_operations', type_='unique')
op.create_unique_constraint('user_group_unique', 'user_groups', ['user_id', 'name'], schema='augur_operations')
op.drop_constraint('user_repos_repo_id_fkey', 'user_repos', schema='augur_operations', type_='foreignkey')
op.create_foreign_key('user_repo_user_id_fkey', 'user_repos', 'repo', ['repo_id'], ['repo_id'], source_schema='augur_operations', referent_schema='augur_data')
op.alter_column('user_session_tokens', 'user_id',
existing_type=sa.INTEGER(),
nullable=True,
schema='augur_operations')
op.alter_column('user_session_tokens', 'application_id',
existing_type=sa.VARCHAR(),
nullable=False,
schema='augur_operations')


#Add toggle for facade collection.
conn = op.get_bind()
result = conn.execute(text("""SELECT * FROM augur_operations.config WHERE section_name='Task_Routine';""")).fetchall()
if result:

conn.execute(text(f"""
INSERT INTO "augur_operations"."config" ("section_name", "setting_name", "value", "type") VALUES ('Task_Routine', 'facade_phase', '{1}', 'int');
INSERT INTO "augur_operations"."config" ("section_name", "setting_name", "value", "type") VALUES ('Facade', 'run_facade_contributors', '{1}', 'int');
"""))
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('user_session_tokens', 'application_id',
existing_type=sa.VARCHAR(),
nullable=True,
schema='augur_operations')
op.alter_column('user_session_tokens', 'user_id',
existing_type=sa.INTEGER(),
nullable=False,
schema='augur_operations')
op.drop_constraint('user_repo_user_id_fkey', 'user_repos', schema='augur_operations', type_='foreignkey')
op.create_foreign_key('user_repos_repo_id_fkey', 'user_repos', 'repo', ['repo_id'], ['repo_id'], source_schema='augur_operations')
op.drop_constraint('user_group_unique', 'user_groups', schema='augur_operations', type_='unique')
op.create_unique_constraint('user_groups_user_id_name_key', 'user_groups', ['user_id', 'name'], schema='augur_operations')
op.alter_column('user_groups', 'user_id',
existing_type=sa.INTEGER(),
nullable=False,
schema='augur_operations')
op.drop_constraint('collection_status_repo_id_fk', 'collection_status', schema='augur_operations', type_='foreignkey')
op.create_foreign_key('collection_status_repo_id_fk', 'collection_status', 'repo', ['repo_id'], ['repo_id'], source_schema='augur_operations')
op.drop_column('collection_status', 'repo_status', schema='augur_operations')
op.drop_column('collection_status', 'facade_task_id', schema='augur_operations')
op.drop_column('collection_status', 'facade_data_last_collected', schema='augur_operations')
op.drop_column('collection_status', 'facade_status', schema='augur_operations')
op.add_column('repo', sa.Column('repo_status', sa.VARCHAR(), server_default=sa.text("'New'::character varying"), autoincrement=False, nullable=False), schema='augur_data')
op.alter_column('releases', 'release_id',
existing_type=sa.CHAR(length=128),
type_=sa.CHAR(length=256),
existing_nullable=False,
existing_server_default=sa.text('nextval(\'"augur_data".releases_release_id_seq\'::regclass)'),
schema='augur_data')
# ### end Alembic commands ###
4 changes: 3 additions & 1 deletion augur/tasks/git/dependency_tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ def process_dependency_metrics(repo_git):
logger = logging.getLogger(process_dependency_metrics.__name__)

with DatabaseSession(logger, engine) as session:
logger.info(f"repo_git: {repo_git}")
query = session.query(Repo).filter(Repo.repo_git == repo_git)
repo = execute_session_query(query,'one')


try:
repo = execute_session_query(query,'one')
deps_model(session, repo.repo_id)
except Exception as e:
session.logger.error(f"Could not complete deps_model!\n Reason: {e} \n Traceback: {''.join(traceback.format_exception(None, e, e.__traceback__))}")