diff --git a/.gitignore b/.gitignore index 503d2065..acc63b15 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,7 @@ plugins .user.yml # Desktop Services Store -.DS_Store \ No newline at end of file +.DS_Store + +#GCP +gcp.json \ No newline at end of file diff --git a/README.md b/README.md index f110426f..025651f8 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ We would love to see you contribute to SageRx. Join our [Slack](https://join.sla - `UMLS_API=` - if you want to use RxNorm, you need an API key from [UMLS](https://uts.nlm.nih.gov/uts/signup-login). 4. Make sure Docker is installed 5. Run `docker-compose up airflow-init`. -6. Run `docker-compose up`. +6. Run `docker-compose up`. > NOTE: if you have an [M1 Mac](https://stackoverflow.com/questions/62807717/how-can-i-solve-postgresql-scram-authentication-problem) `export DOCKER_DEFAULT_PLATFORM=linux/amd64`, and re-build your images @@ -71,18 +71,30 @@ The export_marts DAG is implemented to allow users to push .csv versions of the - DEST_BUCKET = 'the-name-of-your-aws-bucket' The access and secret-access keys can be found in 2 ways: + 1. If the user has the AWS CLI tools installed, simply type from Mac/Linux command prompt OR Windows Powershell: - - cat ~/.aws/credentials + +- cat ~/.aws/credentials + 2. If the user does not have AWS CLI tool installed or is unfamiliar with such, it is possible you would have saved your credentials in a location on your local machine - the file will be named after the IAM User in your AWS account, something like 'username_accessKeys.csv'. If you can't find that file, simply create a new set by following this procedure: - - Log in to the AWS console - - Search 'IAM' in the search bar and select the IAM Service --> the IAM Dashboard is displayed - - In the 'IAM resources' box, click on the number beneath 'Users' --> all IAM users are displayed - - choose the user with permissions in accordance with your needs (this will typically be the user with administrator access, but the only IAM User permission that is required is Read/Write permissions on your S3 bucket) - - Within the user page, click on 'Security Credentials', and scroll down to the box titled 'Access Keys' - - NOTE: A single user can only have 2 valid sets of login credentials at any one time. If you already have 2, you will need to either delete one or create a new user for this application - - NOTE: Once you have created a set of credentials, you only have 1 opportunity to view/save those credentials from the AWS UI. you should therefore be sure to save the .csv file in safe place. - - Click 'Create access key', select the 'Command Line Interface' option, click Next, fill in a name for the keys, click 'Create access keys', then download the .csv file and save it in safe place - - Open the .csv file, then paste the access key and secret access key into the .env file as described above + +- Log in to the AWS console +- Search 'IAM' in the search bar and select the IAM Service --> the IAM Dashboard is displayed +- In the 'IAM resources' box, click on the number beneath 'Users' --> all IAM users are displayed +- choose the user with permissions in accordance with your needs (this will typically be the user with administrator access, but the only IAM User permission that is required is Read/Write permissions on your S3 bucket) +- Within the user page, click on 'Security Credentials', and scroll down to the box titled 'Access Keys' + - NOTE: A single user can only have 2 valid sets of login credentials at any one time. If you already have 2, you will need to either delete one or create a new user for this application + - NOTE: Once you have created a set of credentials, you only have 1 opportunity to view/save those credentials from the AWS UI. you should therefore be sure to save the .csv file in safe place. +- Click 'Create access key', select the 'Command Line Interface' option, click Next, fill in a name for the keys, click 'Create access keys', then download the .csv file and save it in safe place +- Open the .csv file, then paste the access key and secret access key into the .env file as described above + +### Integration with Google Cloud Platform (GCP) + +.env variables + +- GCS_BUCKET +- GCP_PROJECT +- GCP_DATASET ### Troubleshooting diff --git a/airflow/dags/common_dag_tasks.py b/airflow/dags/common_dag_tasks.py index 781b1700..e05fdcc8 100644 --- a/airflow/dags/common_dag_tasks.py +++ b/airflow/dags/common_dag_tasks.py @@ -42,6 +42,57 @@ def get_most_recent_dag_run(dag_id): dag_runs.sort(key=lambda x: x.execution_date, reverse=True) return dag_runs[0] if dag_runs else None +def return_files_in_folder(dir_path) -> list: + files = [] + for file_path in dir_path.iterdir(): + if file_path.is_file(): + print(file_path.name) + files.append(file_path) + elif file_path.is_dir(): + files.append(return_files_in_folder(file_path)) + return files + +def get_files_in_data_folder(dag_id) -> list: + final_list = [] + ds_path = get_data_folder(dag_id) + file_paths = [file for file in ds_path.iterdir() if not file.name.startswith('.DS_Store')] + + for file_path in file_paths: + final_list.extend(return_files_in_folder(file_path)) + + return final_list + +def txt2csv(txt_path): + import pandas as pd + + output_file = txt_path.with_suffix('.csv') + csv_table = pd.read_table(txt_path, sep='\t', encoding='cp1252') + csv_table.to_csv(output_file, index=False) + + print(f"Conversion complete. The CSV file is saved as {output_file}") + return output_file + +def upload_csv_to_gcs(dag_id): + from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator + from os import environ + + gcp_tasks = [] + files = get_files_in_data_folder(dag_id) + + for file_path in files: + if file_path.suffix == '.txt': + csv_file_path = txt2csv(file_path) + + gcp_task = LocalFilesystemToGCSOperator( + task_id=f'upload_to_gcs_{csv_file_path.name}', + src=str(csv_file_path), + dst=f"{dag_id}/{csv_file_path.name}", + bucket=environ.get("GCS_BUCKET"), + gcp_conn_id='google_cloud_default' + ) + gcp_tasks.append(gcp_task) + return gcp_tasks + @task def extract(dag_id,url) -> str: # Task to download data from web location @@ -58,5 +109,5 @@ def transform(dag_id, models_subdir='staging',task_id="") -> None: from airflow.hooks.subprocess import SubprocessHook subprocess = SubprocessHook() - result = subprocess.run_command(['dbt', 'run', '--select', f'models/{models_subdir}/{dag_id}'], cwd='/dbt/sagerx') + result = subprocess.run_command(['docker', 'exec', 'dbt','dbt', 'run', '--select', f'models/{models_subdir}/{dag_id}'], cwd='/dbt/sagerx') print("Result from dbt:", result) \ No newline at end of file diff --git a/airflow/dags/dbt_gcp/dag.py b/airflow/dags/dbt_gcp/dag.py new file mode 100644 index 00000000..c97c14ae --- /dev/null +++ b/airflow/dags/dbt_gcp/dag.py @@ -0,0 +1,119 @@ +import pendulum + +from airflow_operator import create_dag + +from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator +from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator + +from airflow.operators.python_operator import PythonOperator + +dag_id = "dbt_gcp" + +dag = create_dag( + dag_id=dag_id, + schedule="0 4 * * *", + start_date=pendulum.yesterday(), + catchup=False, + concurrency=2, +) + +with dag: + def run_dbt(): + from airflow.hooks.subprocess import SubprocessHook + from airflow.exceptions import AirflowException + + subprocess = SubprocessHook() + + run_results = subprocess.run_command(['docker','exec','dbt','dbt', 'run'], cwd='/dbt/sagerx') + if run_results.exit_code != 1: + raise AirflowException(f"Command failed with return code {run_results.exit_code}: {run_results.output}") + print(f"Command succeeded with output: {run_results.output}") + + docs_results = subprocess.run_command(['docker','exec','dbt','dbt', "docs", "generate"], cwd='/dbt/sagerx') + if docs_results.exit_code != 0: + raise AirflowException(f"Command failed with return code {docs_results.exit_code}: {docs_results.output}") + print(f"Command succeeded with output: {docs_results.output}") + + check_results = subprocess.run_command(['docker','exec','dbt','dbt', 'run-operation', 'check_data_availability'], cwd='/dbt/sagerx') + if check_results.exit_code != 0: + raise AirflowException(f"Command failed with return code {check_results.exit_code}: {check_results.output}") + print(f"Command succeeded with output: {check_results.output}") + + + def get_dbt_models(): + from sagerx import run_query_to_df + + df = run_query_to_df(""" + SELECT * + FROM sagerx.data_availability + WHERE has_data = True + AND materialized != 'view' + """) + + print(f"Final list: {df}") + return df.to_dict('index') + + def load_to_gcs(run_dbt_task): + + dbt_models = get_dbt_models() + gcp_tasks = [] + for _,row_values in dbt_models.items(): + schema_name = row_values.get('schema_name') + table_name = row_values.get('table_name') + columns_info = row_values.get('columns_info') + + print(f"{schema_name}.{table_name}") + print(columns_info) + + # Transfer data from Postgres DB to Google Cloud Storage DB + p2g_task = PostgresToGCSOperator( + task_id=f'postgres_to_gcs_{schema_name}.{table_name}', + postgres_conn_id='postgres_default', + sql=f"SELECT * FROM {schema_name}.{table_name}", + bucket="sagerx_bucket", + filename=f'{table_name}', + export_format='csv', + gzip=False, + dag=dag, + ) + + # Populate BigQuery tables with data from Cloud Storage Bucket + cs2bq_task = GCSToBigQueryOperator( + task_id=f"bq_load_{schema_name}.{table_name}", + bucket="sagerx_bucket", + source_objects=[f"{table_name}"], + destination_project_dataset_table=f"sagerx-420700.sagerx_lake.{table_name}", + autodetect = True, + external_table=False, + create_disposition= "CREATE_IF_NEEDED", + write_disposition= "WRITE_TRUNCATE", + gcp_conn_id = 'google_cloud_default', + location='us-east5', + dag = dag, + ) + + + run_dbt_task.set_downstream(p2g_task) + p2g_task.set_downstream(cs2bq_task) + + gcp_tasks.append(p2g_task) + gcp_tasks.append(cs2bq_task) + + return gcp_tasks + + + run_dbt_task = PythonOperator( + task_id='run_dbt', + python_callable=run_dbt, + dag=dag, + ) + + gcp_tasks = load_to_gcs(run_dbt_task) + + +""" +Notes: +- Scaling issue if each table contains multiple tasks, can quickly grow since it grows exponentially +- Single dbt run is the preferred method since the complexity of dependencies is handled once +- If we can have dbt full run after each dag run and then only specify which tables to upload that would make it more targeted and less costly +""" diff --git a/airflow/dags/sagerx.py b/airflow/dags/sagerx.py index 3dfe5de2..ed59306f 100644 --- a/airflow/dags/sagerx.py +++ b/airflow/dags/sagerx.py @@ -4,6 +4,7 @@ from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import requests from time import sleep +import pandas as pd # Filesystem functions def create_path(*args): @@ -178,6 +179,15 @@ def load_df_to_pg(df,schema_name:str,table_name:str,if_exists:str,dtype_name:str index=index ) +def run_query_to_df(query:str) -> pd.DataFrame: + from airflow.hooks.postgres_hook import PostgresHook + + pg_hook = PostgresHook(postgres_conn_id="postgres_default") + engine = pg_hook.get_sqlalchemy_engine() + df = pd.read_sql(query, con=engine) + + return df + @retry( stop=stop_after_attempt(20), wait=wait_exponential(multiplier=1,max=10), diff --git a/airflow/requirements.txt b/airflow/requirements.txt index 7bd6bd8b..308b594e 100644 --- a/airflow/requirements.txt +++ b/airflow/requirements.txt @@ -1,4 +1,6 @@ # Any change made here should accompany an increment # to the image version on line 5 of docker-compose.yml -dbt-postgres==1.4.1 +dbt-core +dbt-postgres +apache-airflow[google] \ No newline at end of file diff --git a/dbt/Dockerfile b/dbt/Dockerfile index 301577dd..23e3acd9 100644 --- a/dbt/Dockerfile +++ b/dbt/Dockerfile @@ -5,7 +5,7 @@ ENV DBT_PROFILES_DIR=/dbt RUN apt-get -y update RUN apt-get -y install git -RUN pip install dbt-postgres==1.5.1 +RUN pip install dbt-core dbt-postgres WORKDIR /dbt diff --git a/dbt/sagerx/dbt_project.yml b/dbt/sagerx/dbt_project.yml index 45a50479..1a37013c 100644 --- a/dbt/sagerx/dbt_project.yml +++ b/dbt/sagerx/dbt_project.yml @@ -39,3 +39,6 @@ models: marts: +schema: sagerx +materialized: table + +persist_docs: + relation: true + columns: true diff --git a/dbt/sagerx/macros/check_data_availability.sql b/dbt/sagerx/macros/check_data_availability.sql new file mode 100644 index 00000000..fe036e64 --- /dev/null +++ b/dbt/sagerx/macros/check_data_availability.sql @@ -0,0 +1,77 @@ +{% macro check_data_availability() %} +{% set sources_and_models = [] -%} + +{% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") %} + {%- do sources_and_models.append([node.schema, node.name, node.config.materialized]) -%} +{%- endfor %} + +{% for node in graph.sources.values() -%} + {%- do sources_and_models.append([node.schema, node.name, node.resource_type]) -%} +{%- endfor %} + +--{{ log("sources and models: " ~ sources_and_models, info=True) }} + +{% set results_table_query %} +drop table if exists sagerx.data_availability CASCADE; +create table sagerx.data_availability( + schema_name text, + table_name text, + has_data boolean, + materialized text, + columns_info jsonb +); +{% endset %} + +{{ run_query(results_table_query) }} + + +{% for schema_name, table_name, mat_config in sources_and_models %} + {% set check_table_exists_query %} + {{ log("Check table exists query for " ~ schema_name ~ " : " ~ table_name, info=True) }} + select exists ( + select 1 + from information_schema.tables + where table_schema = '{{ schema_name }}' + and table_name = '{{ table_name }}' + ) as table_exists + {% endset %} + + {% set table_exists_result = run_query(check_table_exists_query) %} + {% if table_exists_result[0]['table_exists'] %} + {{ log("Table: " ~ table_name ~ " does exist.", info=True) }} + + {% set columns_query %} + select + column_name, + data_type, + is_nullable + from + information_schema.columns + where + table_schema = '{{ schema_name }}' + and table_name = '{{ table_name }}' + {% endset %} + + {% set columns_result = run_query(columns_query) %} + {% set columns_info = [] %} + {% for column in columns_result %} + {%- do columns_info.append({"column_name": column['column_name'], "data_type": column['data_type'], "is_nullable": column['is_nullable']}) -%} + {% endfor %} + + {% set row_count_query %} + select count(*) as row_count from {{schema_name}}.{{table_name}} + {% endset %} + {% set row_count_result = run_query(row_count_query) %} + + {% set insert_query %} + insert into sagerx.data_availability + (schema_name, table_name, has_data, materialized, columns_info) + values ('{{ schema_name }}','{{ table_name }}', {{ row_count_result[0]['row_count'] > 0 }}, '{{ mat_config }}', '{{ columns_info | tojson }}'); + {% endset %} + {{ run_query(insert_query) }} + {% else %} + {{ log("No Table: " ~ table_name ~ " does not exist.", info=True) }} + {% endif %} + +{% endfor %} +{% endmacro %} diff --git a/docker-compose.yml b/docker-compose.yml index de12a3e4..7b00e237 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,11 @@ version: "3.8" x-airflow-common: &airflow-common build: context: ./airflow - image: sagerx_airflow:v0.0.1 # versioning allows a rebuild of docker image where necessary + image: sagerx_airflow:v0.0.5 # versioning allows a rebuild of docker image where necessary + networks: + - airflow-dbt-network + env_file: + - .env environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true" @@ -15,6 +19,7 @@ x-airflow-common: &airflow-common AIRFLOW_CONN_SLACK: http://:@https%3A%2F%2Fhooks.slack.com%2Fservices%2F${SLACK_API:-} _PIP_ADDITIONAL_REQUIREMENTS: "" DBT_PROFILES_DIR: /dbt + AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: google-cloud-platform://?key_path=/opt/gcp.json user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}" volumes: - ./airflow/dags:/opt/airflow/dags @@ -23,10 +28,13 @@ x-airflow-common: &airflow-common - ./airflow/data:/opt/airflow/data - ./airflow/config/airflow.cfg:/opt/airflow/airflow.cfg - ./dbt:/dbt - + - ./gcp.json:/opt/gcp.json + - /var/run/docker.sock:/var/run/docker.sock #not the most secure way of connecting services: postgres: image: postgres:14-alpine + networks: + - airflow-dbt-network container_name: postgres environment: POSTGRES_USER: sagerx @@ -45,6 +53,8 @@ services: pgadmin: image: dpage/pgadmin4:6.15 + networks: + - airflow-dbt-network container_name: pgadmin environment: PGADMIN_DEFAULT_EMAIL: pgadmin@pgadmin.org @@ -59,13 +69,20 @@ services: dbt: build: context: ./dbt - image: dbt + env_file: + - .env + environment: + AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: google-cloud-platform://?key_path=/opt/gcp.json + image: dbt:v0.0.5 + networks: + - airflow-dbt-network container_name: dbt user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}" ports: - 8081:8081 volumes: - ./dbt:/dbt + - ./gcp.json:/opt/gcp.json command: tail -f /dev/null airflow-init: @@ -112,3 +129,7 @@ services: AWS_ACCESS_KEY_ID: ${ACCESS_KEY} AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY} AWS_DEST_BUCKET: ${DEST_BUCKET} + +networks: + airflow-dbt-network: + driver: bridge \ No newline at end of file diff --git a/postgres/2_sagerx_setup.sql b/postgres/2_sagerx_setup.sql index a5e826f3..4c3a8734 100755 --- a/postgres/2_sagerx_setup.sql +++ b/postgres/2_sagerx_setup.sql @@ -4,4 +4,11 @@ CREATE SCHEMA sagerx_lake; CREATE SCHEMA sagerx; --Add pg_stat_statements extension for query monitoring -CREATE EXTENSION IF NOT EXISTS pg_stat_statements; \ No newline at end of file +CREATE EXTENSION IF NOT EXISTS pg_stat_statements; + +CREATE TABLE sagerx.data_availability ( + schema_name text, + table_name text, + has_data boolean, + materialized text +); \ No newline at end of file