From c419240ee20fc413f1e56c2e361548ed33264b46 Mon Sep 17 00:00:00 2001 From: Lukasz Przychodzien Date: Tue, 25 Jun 2024 12:17:57 -0400 Subject: [PATCH 1/4] gcp workflow --- .gitignore | 5 +- airflow/dags/dbt_gcp/dag.py | 120 ++++++++++++++++++ airflow/dags/sagerx.py | 10 ++ airflow/requirements.txt | 4 +- dbt/Dockerfile | 2 +- dbt/sagerx/dbt_project.yml | 3 + dbt/sagerx/macros/check_data_availability.sql | 77 +++++++++++ docker-compose.yml | 27 +++- gcp.json | 13 ++ postgres/2_sagerx_setup.sql | 9 +- 10 files changed, 263 insertions(+), 7 deletions(-) create mode 100644 airflow/dags/dbt_gcp/dag.py create mode 100644 dbt/sagerx/macros/check_data_availability.sql create mode 100644 gcp.json diff --git a/.gitignore b/.gitignore index 503d2065..acc63b15 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,7 @@ plugins .user.yml # Desktop Services Store -.DS_Store \ No newline at end of file +.DS_Store + +#GCP +gcp.json \ No newline at end of file diff --git a/airflow/dags/dbt_gcp/dag.py b/airflow/dags/dbt_gcp/dag.py new file mode 100644 index 00000000..7f994f3c --- /dev/null +++ b/airflow/dags/dbt_gcp/dag.py @@ -0,0 +1,120 @@ +import pendulum + +from airflow_operator import create_dag + +from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator +from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator + +from airflow.operators.python_operator import PythonOperator + +dag_id = "dbt_gcp" + +dag = create_dag( + dag_id=dag_id, + schedule="0 4 * * *", + start_date=pendulum.yesterday(), + catchup=False, + concurrency=2, +) + +with dag: + def run_dbt(): + from airflow.hooks.subprocess import SubprocessHook + from airflow.exceptions import AirflowException + + subprocess = SubprocessHook() + + run_results = subprocess.run_command(['docker','exec','dbt','dbt', 'run'], cwd='/dbt/sagerx') + if run_results.exit_code != 1: + raise AirflowException(f"Command failed with return code {run_results.exit_code}: {run_results.output}") + print(f"Command succeeded with output: {run_results.output}") + + docs_results = subprocess.run_command(['docker','exec','dbt','dbt', "docs", "generate"], cwd='/dbt/sagerx') + if docs_results.exit_code != 0: + raise AirflowException(f"Command failed with return code {docs_results.exit_code}: {docs_results.output}") + print(f"Command succeeded with output: {docs_results.output}") + + check_results = subprocess.run_command(['docker','exec','dbt','dbt', 'run-operation', 'check_data_availability'], cwd='/dbt/sagerx') + if check_results.exit_code != 0: + raise AirflowException(f"Command failed with return code {check_results.exit_code}: {check_results.output}") + print(f"Command succeeded with output: {check_results.output}") + + + def get_dbt_models(): + from sagerx import run_query_to_df + + df = run_query_to_df(""" + SELECT * + FROM sagerx.data_availability + WHERE has_data = True + AND materialized != 'view' + """) + + print(f"Final list: {df}") + return df.to_dict('index') + + def load_to_gcs(run_dbt_task): + + dbt_models = get_dbt_models() + gcp_tasks = [] + for _,row_values in dbt_models.items(): + schema_name = row_values.get('schema_name') + table_name = row_values.get('table_name') + columns_info = row_values.get('columns_info') + + print(f"{schema_name}.{table_name}") + print(columns_info) + + # Transfer data from Postgres DB to Google Cloud Storage DB + p2g_task = PostgresToGCSOperator( + task_id=f'postgres_to_gcs_{schema_name}.{table_name}', + postgres_conn_id='postgres_default', + sql=f"SELECT * FROM {schema_name}.{table_name}", + bucket="sagerx_bucket", + filename=f'{table_name}', + export_format='csv', + gzip=False, + dag=dag, + ) + + # Populate BigQuery tables with data from Cloud Storage Bucket + cs2bq_task = GCSToBigQueryOperator( + task_id=f"bq_load_{schema_name}.{table_name}", + bucket="sagerx_bucket", + source_objects=[f"{table_name}"], + destination_project_dataset_table=f"sagerx-420700.sagerx_lake.{table_name}", + autodetect = True, + external_table=False, + create_disposition= "CREATE_IF_NEEDED", + write_disposition= "WRITE_TRUNCATE", + gcp_conn_id = 'google_cloud_default', + location='us-east5', + dag = dag, + ) + + + run_dbt_task.set_downstream(p2g_task) + p2g_task.set_downstream(cs2bq_task) + + gcp_tasks.append(p2g_task) + gcp_tasks.append(cs2bq_task) + + return gcp_tasks + + + run_dbt_task = PythonOperator( + task_id='run_dbt', + python_callable=run_dbt, + dag=dag, + ) + + gcp_tasks = load_to_gcs(run_dbt_task) + + +""" + +Notes: +- Scaling issue if each table contains multiple tasks, can quickly grow since it grows exponentially +- Single dbt run is the preferred method since the complexity of dependencies is handled once +- If we can have dbt full run after each dag run and then only specify which tables to upload that would make it more targeted and less costly +""" \ No newline at end of file diff --git a/airflow/dags/sagerx.py b/airflow/dags/sagerx.py index 3dfe5de2..ed59306f 100644 --- a/airflow/dags/sagerx.py +++ b/airflow/dags/sagerx.py @@ -4,6 +4,7 @@ from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import requests from time import sleep +import pandas as pd # Filesystem functions def create_path(*args): @@ -178,6 +179,15 @@ def load_df_to_pg(df,schema_name:str,table_name:str,if_exists:str,dtype_name:str index=index ) +def run_query_to_df(query:str) -> pd.DataFrame: + from airflow.hooks.postgres_hook import PostgresHook + + pg_hook = PostgresHook(postgres_conn_id="postgres_default") + engine = pg_hook.get_sqlalchemy_engine() + df = pd.read_sql(query, con=engine) + + return df + @retry( stop=stop_after_attempt(20), wait=wait_exponential(multiplier=1,max=10), diff --git a/airflow/requirements.txt b/airflow/requirements.txt index 7bd6bd8b..308b594e 100644 --- a/airflow/requirements.txt +++ b/airflow/requirements.txt @@ -1,4 +1,6 @@ # Any change made here should accompany an increment # to the image version on line 5 of docker-compose.yml -dbt-postgres==1.4.1 +dbt-core +dbt-postgres +apache-airflow[google] \ No newline at end of file diff --git a/dbt/Dockerfile b/dbt/Dockerfile index 301577dd..23e3acd9 100644 --- a/dbt/Dockerfile +++ b/dbt/Dockerfile @@ -5,7 +5,7 @@ ENV DBT_PROFILES_DIR=/dbt RUN apt-get -y update RUN apt-get -y install git -RUN pip install dbt-postgres==1.5.1 +RUN pip install dbt-core dbt-postgres WORKDIR /dbt diff --git a/dbt/sagerx/dbt_project.yml b/dbt/sagerx/dbt_project.yml index 45a50479..1a37013c 100644 --- a/dbt/sagerx/dbt_project.yml +++ b/dbt/sagerx/dbt_project.yml @@ -39,3 +39,6 @@ models: marts: +schema: sagerx +materialized: table + +persist_docs: + relation: true + columns: true diff --git a/dbt/sagerx/macros/check_data_availability.sql b/dbt/sagerx/macros/check_data_availability.sql new file mode 100644 index 00000000..fe036e64 --- /dev/null +++ b/dbt/sagerx/macros/check_data_availability.sql @@ -0,0 +1,77 @@ +{% macro check_data_availability() %} +{% set sources_and_models = [] -%} + +{% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") %} + {%- do sources_and_models.append([node.schema, node.name, node.config.materialized]) -%} +{%- endfor %} + +{% for node in graph.sources.values() -%} + {%- do sources_and_models.append([node.schema, node.name, node.resource_type]) -%} +{%- endfor %} + +--{{ log("sources and models: " ~ sources_and_models, info=True) }} + +{% set results_table_query %} +drop table if exists sagerx.data_availability CASCADE; +create table sagerx.data_availability( + schema_name text, + table_name text, + has_data boolean, + materialized text, + columns_info jsonb +); +{% endset %} + +{{ run_query(results_table_query) }} + + +{% for schema_name, table_name, mat_config in sources_and_models %} + {% set check_table_exists_query %} + {{ log("Check table exists query for " ~ schema_name ~ " : " ~ table_name, info=True) }} + select exists ( + select 1 + from information_schema.tables + where table_schema = '{{ schema_name }}' + and table_name = '{{ table_name }}' + ) as table_exists + {% endset %} + + {% set table_exists_result = run_query(check_table_exists_query) %} + {% if table_exists_result[0]['table_exists'] %} + {{ log("Table: " ~ table_name ~ " does exist.", info=True) }} + + {% set columns_query %} + select + column_name, + data_type, + is_nullable + from + information_schema.columns + where + table_schema = '{{ schema_name }}' + and table_name = '{{ table_name }}' + {% endset %} + + {% set columns_result = run_query(columns_query) %} + {% set columns_info = [] %} + {% for column in columns_result %} + {%- do columns_info.append({"column_name": column['column_name'], "data_type": column['data_type'], "is_nullable": column['is_nullable']}) -%} + {% endfor %} + + {% set row_count_query %} + select count(*) as row_count from {{schema_name}}.{{table_name}} + {% endset %} + {% set row_count_result = run_query(row_count_query) %} + + {% set insert_query %} + insert into sagerx.data_availability + (schema_name, table_name, has_data, materialized, columns_info) + values ('{{ schema_name }}','{{ table_name }}', {{ row_count_result[0]['row_count'] > 0 }}, '{{ mat_config }}', '{{ columns_info | tojson }}'); + {% endset %} + {{ run_query(insert_query) }} + {% else %} + {{ log("No Table: " ~ table_name ~ " does not exist.", info=True) }} + {% endif %} + +{% endfor %} +{% endmacro %} diff --git a/docker-compose.yml b/docker-compose.yml index de12a3e4..7b00e237 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,11 @@ version: "3.8" x-airflow-common: &airflow-common build: context: ./airflow - image: sagerx_airflow:v0.0.1 # versioning allows a rebuild of docker image where necessary + image: sagerx_airflow:v0.0.5 # versioning allows a rebuild of docker image where necessary + networks: + - airflow-dbt-network + env_file: + - .env environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true" @@ -15,6 +19,7 @@ x-airflow-common: &airflow-common AIRFLOW_CONN_SLACK: http://:@https%3A%2F%2Fhooks.slack.com%2Fservices%2F${SLACK_API:-} _PIP_ADDITIONAL_REQUIREMENTS: "" DBT_PROFILES_DIR: /dbt + AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: google-cloud-platform://?key_path=/opt/gcp.json user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}" volumes: - ./airflow/dags:/opt/airflow/dags @@ -23,10 +28,13 @@ x-airflow-common: &airflow-common - ./airflow/data:/opt/airflow/data - ./airflow/config/airflow.cfg:/opt/airflow/airflow.cfg - ./dbt:/dbt - + - ./gcp.json:/opt/gcp.json + - /var/run/docker.sock:/var/run/docker.sock #not the most secure way of connecting services: postgres: image: postgres:14-alpine + networks: + - airflow-dbt-network container_name: postgres environment: POSTGRES_USER: sagerx @@ -45,6 +53,8 @@ services: pgadmin: image: dpage/pgadmin4:6.15 + networks: + - airflow-dbt-network container_name: pgadmin environment: PGADMIN_DEFAULT_EMAIL: pgadmin@pgadmin.org @@ -59,13 +69,20 @@ services: dbt: build: context: ./dbt - image: dbt + env_file: + - .env + environment: + AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: google-cloud-platform://?key_path=/opt/gcp.json + image: dbt:v0.0.5 + networks: + - airflow-dbt-network container_name: dbt user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}" ports: - 8081:8081 volumes: - ./dbt:/dbt + - ./gcp.json:/opt/gcp.json command: tail -f /dev/null airflow-init: @@ -112,3 +129,7 @@ services: AWS_ACCESS_KEY_ID: ${ACCESS_KEY} AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY} AWS_DEST_BUCKET: ${DEST_BUCKET} + +networks: + airflow-dbt-network: + driver: bridge \ No newline at end of file diff --git a/gcp.json b/gcp.json new file mode 100644 index 00000000..7da1fc45 --- /dev/null +++ b/gcp.json @@ -0,0 +1,13 @@ +{ + "type": "service_account", + "project_id": "sagerx-420700", + "private_key_id": "6bf9848ab8176d0d1ba164047563fc833c82e13e", + "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCtqq0wO3XXsHSa\nKG0SEinf11jgL1N0QNpgXN74rg0ZdQ1+zTlMrKMpZ1TthAQpBGZgaSZWChh8B4k4\nKvbnMsuDW196bwujhCFptJiDcdhEHenbVV1RpGgxO8z7Eo0q0WikjlLtx5UM2BoK\nYOz02SjSw88rj34vOB5aQl7uFsVZ0cCg8ZwBKBC5BOSWxgJN6d3zr8AzVBy6tLFV\n1xQfiaGbuvUCuDzda79PgGBZKCkQDdiSzG955tdugEM51URFescmV9Y6afEaCB1k\nM36pPqSpiO4MWytDRwqvp599Mpv/u8ybwp4lqdw+eNVVBJtZWp9m8ye6nNUJlgF5\nXvee7ONJAgMBAAECggEADDmvHm2ZIpln4/BDmVmQ/BJ3TbTXLaBhHxZ6lcv+3RNp\n40rEJVsj00bUajH4bwDByjisu7LTPWv0Y3FW1ziyrekRRBesHJMxHPVbJSmu/UCT\n1V5hht1JfmnEyMnqCSEbujXQra92lSRjVOa015mv74JcQ6dCzUUokdcJQr50nxlw\nwU6SzK1gvpN9H/rWgNDFp9rzlumFkp1dOvrEbdGn2OsVmzHP6y+5JARlFV+gCRYc\njNzVHsSHfN30e50f+1z/8li77bTA7UumQXVdJ9xWuCkKXpuWg8KOY7i7vczsS52T\nb66qPL7K2VOz1XfbelfnRl+Sc08VV37m7EP82BPR7wKBgQDYSuPh+4QWq7r1E8Zg\n7Hn2UZdxPRq3Lt6qhCgT8sowgLfXV1A0uPSXxGyvYkt4iFsI7nPTpaE514zmqRT4\na4ILoLRRd3A0nc8eappkFs8D2GwAhwhi0V9YIeMMqDwiS3QRtQJCTsjWHPTs+5bu\nLPMpR2oWZEJd7tWdCbzHqVwVhwKBgQDNjH+Hg0lIp7PhnQZtxFv8WfPwMZBZagvq\nVX+lR0t1vBn55CYTksU1tRutF8iNT3OvNbwTA8MzlULp8st42pxac7SmR86O5eXX\np6I5t4DTT+tfyvb/MY+u4EYPWRn7tqxihTIPkwOPkkRHudu2++KkIt+Ov/KbzmmV\nc5siVT90rwKBgQCqq0Vv7uhGf1Gxyt3RYyfFrpIiX4XyH8DBqjB5tS3H4fmuqQ33\n3C3ch7j/Fz/YJzg2LvokemBi5OwgojCS6TofdLp0Qhu+2Psy1Alpivnk3eQy8loy\np2VlhK/FMAbrRMCcrEjRC5u8H/NAADITVFK3MsvKSWh4+FmEZceZgNDRhwKBgHm5\n2ZBL7GtNfVZ/4l3A6mSgdLjq8TwydAn7RhOADC0WDyAQv4fZ5FYxAcnZCti8k5rQ\nLKfqE4CJxU28jkjs1akvWm0amzW+6gVzbbvWc0Ew1Agvr2RjWl7KzFKshpmtjZru\nnD3i5znv+eWsKHTN6GMhj9j+zVL9w0NRai7D40dbAoGAdUxA6or4sFQcqajPIwuG\n2Mi/h1h7vvfpIU7+EXYKldKM251H//cREAbVCifLYgOKrFsVHmw/e09WjELo/OzF\ny1o5iv13kOntp42pE6t8zYtITyfV/yWlEqLUpima7cC+l/hA6aosfv6HjbWDEg6H\njxA6HVX3j/dLNU3ktyj/KS8=\n-----END PRIVATE KEY-----\n", + "client_email": "lukasz-mac-book@sagerx-420700.iam.gserviceaccount.com", + "client_id": "117356098211646277767", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/lukasz-mac-book%40sagerx-420700.iam.gserviceaccount.com", + "universe_domain": "googleapis.com" +} diff --git a/postgres/2_sagerx_setup.sql b/postgres/2_sagerx_setup.sql index a5e826f3..4c3a8734 100755 --- a/postgres/2_sagerx_setup.sql +++ b/postgres/2_sagerx_setup.sql @@ -4,4 +4,11 @@ CREATE SCHEMA sagerx_lake; CREATE SCHEMA sagerx; --Add pg_stat_statements extension for query monitoring -CREATE EXTENSION IF NOT EXISTS pg_stat_statements; \ No newline at end of file +CREATE EXTENSION IF NOT EXISTS pg_stat_statements; + +CREATE TABLE sagerx.data_availability ( + schema_name text, + table_name text, + has_data boolean, + materialized text +); \ No newline at end of file From 7d51707ba66375987e9cd40f8ceb2d82b6cc57ac Mon Sep 17 00:00:00 2001 From: Lukasz Przychodzien Date: Sat, 15 Jun 2024 18:02:10 -0400 Subject: [PATCH 2/4] enit init2 finalize logic to push and create datasets finalize logic to push and create datasets rm gcp spacing fix --- airflow/dags/dbt_gcp/dag.py | 3 +-- dbt/sagerx/macros/check_data_availability.sql | 21 +++++++++++++++++++ gcp.json | 13 ------------ 3 files changed, 22 insertions(+), 15 deletions(-) delete mode 100644 gcp.json diff --git a/airflow/dags/dbt_gcp/dag.py b/airflow/dags/dbt_gcp/dag.py index 7f994f3c..c97c14ae 100644 --- a/airflow/dags/dbt_gcp/dag.py +++ b/airflow/dags/dbt_gcp/dag.py @@ -112,9 +112,8 @@ def load_to_gcs(run_dbt_task): """ - Notes: - Scaling issue if each table contains multiple tasks, can quickly grow since it grows exponentially - Single dbt run is the preferred method since the complexity of dependencies is handled once - If we can have dbt full run after each dag run and then only specify which tables to upload that would make it more targeted and less costly -""" \ No newline at end of file +""" diff --git a/dbt/sagerx/macros/check_data_availability.sql b/dbt/sagerx/macros/check_data_availability.sql index fe036e64..6b2fd5f0 100644 --- a/dbt/sagerx/macros/check_data_availability.sql +++ b/dbt/sagerx/macros/check_data_availability.sql @@ -17,8 +17,17 @@ create table sagerx.data_availability( schema_name text, table_name text, has_data boolean, +<<<<<<< HEAD +<<<<<<< HEAD materialized text, columns_info jsonb +======= + materialized text +>>>>>>> fdd900a (init2) +======= + materialized text, + columns_info jsonb +>>>>>>> 7b9283b (finalize logic to push and create datasets) ); {% endset %} @@ -64,9 +73,21 @@ create table sagerx.data_availability( {% set row_count_result = run_query(row_count_query) %} {% set insert_query %} +<<<<<<< HEAD +<<<<<<< HEAD + insert into sagerx.data_availability + (schema_name, table_name, has_data, materialized, columns_info) + values ('{{ schema_name }}','{{ table_name }}', {{ row_count_result[0]['row_count'] > 0 }}, '{{ mat_config }}', '{{ columns_info | tojson }}'); +======= + insert into {{ target.schema }}.data_availability + (schema_name, table_name, has_data, materialized) + values ('{{ schema_name }}','{{ table_name }}', {{ row_count_result[0]['row_count'] > 0 }}, '{{ mat_config }}'); +>>>>>>> fdd900a (init2) +======= insert into sagerx.data_availability (schema_name, table_name, has_data, materialized, columns_info) values ('{{ schema_name }}','{{ table_name }}', {{ row_count_result[0]['row_count'] > 0 }}, '{{ mat_config }}', '{{ columns_info | tojson }}'); +>>>>>>> 7b9283b (finalize logic to push and create datasets) {% endset %} {{ run_query(insert_query) }} {% else %} diff --git a/gcp.json b/gcp.json deleted file mode 100644 index 7da1fc45..00000000 --- a/gcp.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "type": "service_account", - "project_id": "sagerx-420700", - "private_key_id": "6bf9848ab8176d0d1ba164047563fc833c82e13e", - "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCtqq0wO3XXsHSa\nKG0SEinf11jgL1N0QNpgXN74rg0ZdQ1+zTlMrKMpZ1TthAQpBGZgaSZWChh8B4k4\nKvbnMsuDW196bwujhCFptJiDcdhEHenbVV1RpGgxO8z7Eo0q0WikjlLtx5UM2BoK\nYOz02SjSw88rj34vOB5aQl7uFsVZ0cCg8ZwBKBC5BOSWxgJN6d3zr8AzVBy6tLFV\n1xQfiaGbuvUCuDzda79PgGBZKCkQDdiSzG955tdugEM51URFescmV9Y6afEaCB1k\nM36pPqSpiO4MWytDRwqvp599Mpv/u8ybwp4lqdw+eNVVBJtZWp9m8ye6nNUJlgF5\nXvee7ONJAgMBAAECggEADDmvHm2ZIpln4/BDmVmQ/BJ3TbTXLaBhHxZ6lcv+3RNp\n40rEJVsj00bUajH4bwDByjisu7LTPWv0Y3FW1ziyrekRRBesHJMxHPVbJSmu/UCT\n1V5hht1JfmnEyMnqCSEbujXQra92lSRjVOa015mv74JcQ6dCzUUokdcJQr50nxlw\nwU6SzK1gvpN9H/rWgNDFp9rzlumFkp1dOvrEbdGn2OsVmzHP6y+5JARlFV+gCRYc\njNzVHsSHfN30e50f+1z/8li77bTA7UumQXVdJ9xWuCkKXpuWg8KOY7i7vczsS52T\nb66qPL7K2VOz1XfbelfnRl+Sc08VV37m7EP82BPR7wKBgQDYSuPh+4QWq7r1E8Zg\n7Hn2UZdxPRq3Lt6qhCgT8sowgLfXV1A0uPSXxGyvYkt4iFsI7nPTpaE514zmqRT4\na4ILoLRRd3A0nc8eappkFs8D2GwAhwhi0V9YIeMMqDwiS3QRtQJCTsjWHPTs+5bu\nLPMpR2oWZEJd7tWdCbzHqVwVhwKBgQDNjH+Hg0lIp7PhnQZtxFv8WfPwMZBZagvq\nVX+lR0t1vBn55CYTksU1tRutF8iNT3OvNbwTA8MzlULp8st42pxac7SmR86O5eXX\np6I5t4DTT+tfyvb/MY+u4EYPWRn7tqxihTIPkwOPkkRHudu2++KkIt+Ov/KbzmmV\nc5siVT90rwKBgQCqq0Vv7uhGf1Gxyt3RYyfFrpIiX4XyH8DBqjB5tS3H4fmuqQ33\n3C3ch7j/Fz/YJzg2LvokemBi5OwgojCS6TofdLp0Qhu+2Psy1Alpivnk3eQy8loy\np2VlhK/FMAbrRMCcrEjRC5u8H/NAADITVFK3MsvKSWh4+FmEZceZgNDRhwKBgHm5\n2ZBL7GtNfVZ/4l3A6mSgdLjq8TwydAn7RhOADC0WDyAQv4fZ5FYxAcnZCti8k5rQ\nLKfqE4CJxU28jkjs1akvWm0amzW+6gVzbbvWc0Ew1Agvr2RjWl7KzFKshpmtjZru\nnD3i5znv+eWsKHTN6GMhj9j+zVL9w0NRai7D40dbAoGAdUxA6or4sFQcqajPIwuG\n2Mi/h1h7vvfpIU7+EXYKldKM251H//cREAbVCifLYgOKrFsVHmw/e09WjELo/OzF\ny1o5iv13kOntp42pE6t8zYtITyfV/yWlEqLUpima7cC+l/hA6aosfv6HjbWDEg6H\njxA6HVX3j/dLNU3ktyj/KS8=\n-----END PRIVATE KEY-----\n", - "client_email": "lukasz-mac-book@sagerx-420700.iam.gserviceaccount.com", - "client_id": "117356098211646277767", - "auth_uri": "https://accounts.google.com/o/oauth2/auth", - "token_uri": "https://oauth2.googleapis.com/token", - "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", - "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/lukasz-mac-book%40sagerx-420700.iam.gserviceaccount.com", - "universe_domain": "googleapis.com" -} From 19679f5219b372de440cf24727653669135fc957 Mon Sep 17 00:00:00 2001 From: Lukasz Przychodzien Date: Tue, 25 Jun 2024 12:45:05 -0400 Subject: [PATCH 3/4] fix avail --- dbt/sagerx/macros/check_data_availability.sql | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/dbt/sagerx/macros/check_data_availability.sql b/dbt/sagerx/macros/check_data_availability.sql index 6b2fd5f0..fe036e64 100644 --- a/dbt/sagerx/macros/check_data_availability.sql +++ b/dbt/sagerx/macros/check_data_availability.sql @@ -17,17 +17,8 @@ create table sagerx.data_availability( schema_name text, table_name text, has_data boolean, -<<<<<<< HEAD -<<<<<<< HEAD materialized text, columns_info jsonb -======= - materialized text ->>>>>>> fdd900a (init2) -======= - materialized text, - columns_info jsonb ->>>>>>> 7b9283b (finalize logic to push and create datasets) ); {% endset %} @@ -73,21 +64,9 @@ create table sagerx.data_availability( {% set row_count_result = run_query(row_count_query) %} {% set insert_query %} -<<<<<<< HEAD -<<<<<<< HEAD - insert into sagerx.data_availability - (schema_name, table_name, has_data, materialized, columns_info) - values ('{{ schema_name }}','{{ table_name }}', {{ row_count_result[0]['row_count'] > 0 }}, '{{ mat_config }}', '{{ columns_info | tojson }}'); -======= - insert into {{ target.schema }}.data_availability - (schema_name, table_name, has_data, materialized) - values ('{{ schema_name }}','{{ table_name }}', {{ row_count_result[0]['row_count'] > 0 }}, '{{ mat_config }}'); ->>>>>>> fdd900a (init2) -======= insert into sagerx.data_availability (schema_name, table_name, has_data, materialized, columns_info) values ('{{ schema_name }}','{{ table_name }}', {{ row_count_result[0]['row_count'] > 0 }}, '{{ mat_config }}', '{{ columns_info | tojson }}'); ->>>>>>> 7b9283b (finalize logic to push and create datasets) {% endset %} {{ run_query(insert_query) }} {% else %} From f83d8c0deb0265601acef69ffd6c47db1fc57570 Mon Sep 17 00:00:00 2001 From: Lukasz Przychodzien Date: Mon, 1 Jul 2024 10:31:08 -0400 Subject: [PATCH 4/4] gcs common functions --- README.md | 34 +++++++++++++------- airflow/dags/common_dag_tasks.py | 53 +++++++++++++++++++++++++++++++- 2 files changed, 75 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index f110426f..025651f8 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ We would love to see you contribute to SageRx. Join our [Slack](https://join.sla - `UMLS_API=` - if you want to use RxNorm, you need an API key from [UMLS](https://uts.nlm.nih.gov/uts/signup-login). 4. Make sure Docker is installed 5. Run `docker-compose up airflow-init`. -6. Run `docker-compose up`. +6. Run `docker-compose up`. > NOTE: if you have an [M1 Mac](https://stackoverflow.com/questions/62807717/how-can-i-solve-postgresql-scram-authentication-problem) `export DOCKER_DEFAULT_PLATFORM=linux/amd64`, and re-build your images @@ -71,18 +71,30 @@ The export_marts DAG is implemented to allow users to push .csv versions of the - DEST_BUCKET = 'the-name-of-your-aws-bucket' The access and secret-access keys can be found in 2 ways: + 1. If the user has the AWS CLI tools installed, simply type from Mac/Linux command prompt OR Windows Powershell: - - cat ~/.aws/credentials + +- cat ~/.aws/credentials + 2. If the user does not have AWS CLI tool installed or is unfamiliar with such, it is possible you would have saved your credentials in a location on your local machine - the file will be named after the IAM User in your AWS account, something like 'username_accessKeys.csv'. If you can't find that file, simply create a new set by following this procedure: - - Log in to the AWS console - - Search 'IAM' in the search bar and select the IAM Service --> the IAM Dashboard is displayed - - In the 'IAM resources' box, click on the number beneath 'Users' --> all IAM users are displayed - - choose the user with permissions in accordance with your needs (this will typically be the user with administrator access, but the only IAM User permission that is required is Read/Write permissions on your S3 bucket) - - Within the user page, click on 'Security Credentials', and scroll down to the box titled 'Access Keys' - - NOTE: A single user can only have 2 valid sets of login credentials at any one time. If you already have 2, you will need to either delete one or create a new user for this application - - NOTE: Once you have created a set of credentials, you only have 1 opportunity to view/save those credentials from the AWS UI. you should therefore be sure to save the .csv file in safe place. - - Click 'Create access key', select the 'Command Line Interface' option, click Next, fill in a name for the keys, click 'Create access keys', then download the .csv file and save it in safe place - - Open the .csv file, then paste the access key and secret access key into the .env file as described above + +- Log in to the AWS console +- Search 'IAM' in the search bar and select the IAM Service --> the IAM Dashboard is displayed +- In the 'IAM resources' box, click on the number beneath 'Users' --> all IAM users are displayed +- choose the user with permissions in accordance with your needs (this will typically be the user with administrator access, but the only IAM User permission that is required is Read/Write permissions on your S3 bucket) +- Within the user page, click on 'Security Credentials', and scroll down to the box titled 'Access Keys' + - NOTE: A single user can only have 2 valid sets of login credentials at any one time. If you already have 2, you will need to either delete one or create a new user for this application + - NOTE: Once you have created a set of credentials, you only have 1 opportunity to view/save those credentials from the AWS UI. you should therefore be sure to save the .csv file in safe place. +- Click 'Create access key', select the 'Command Line Interface' option, click Next, fill in a name for the keys, click 'Create access keys', then download the .csv file and save it in safe place +- Open the .csv file, then paste the access key and secret access key into the .env file as described above + +### Integration with Google Cloud Platform (GCP) + +.env variables + +- GCS_BUCKET +- GCP_PROJECT +- GCP_DATASET ### Troubleshooting diff --git a/airflow/dags/common_dag_tasks.py b/airflow/dags/common_dag_tasks.py index 781b1700..e05fdcc8 100644 --- a/airflow/dags/common_dag_tasks.py +++ b/airflow/dags/common_dag_tasks.py @@ -42,6 +42,57 @@ def get_most_recent_dag_run(dag_id): dag_runs.sort(key=lambda x: x.execution_date, reverse=True) return dag_runs[0] if dag_runs else None +def return_files_in_folder(dir_path) -> list: + files = [] + for file_path in dir_path.iterdir(): + if file_path.is_file(): + print(file_path.name) + files.append(file_path) + elif file_path.is_dir(): + files.append(return_files_in_folder(file_path)) + return files + +def get_files_in_data_folder(dag_id) -> list: + final_list = [] + ds_path = get_data_folder(dag_id) + file_paths = [file for file in ds_path.iterdir() if not file.name.startswith('.DS_Store')] + + for file_path in file_paths: + final_list.extend(return_files_in_folder(file_path)) + + return final_list + +def txt2csv(txt_path): + import pandas as pd + + output_file = txt_path.with_suffix('.csv') + csv_table = pd.read_table(txt_path, sep='\t', encoding='cp1252') + csv_table.to_csv(output_file, index=False) + + print(f"Conversion complete. The CSV file is saved as {output_file}") + return output_file + +def upload_csv_to_gcs(dag_id): + from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator + from os import environ + + gcp_tasks = [] + files = get_files_in_data_folder(dag_id) + + for file_path in files: + if file_path.suffix == '.txt': + csv_file_path = txt2csv(file_path) + + gcp_task = LocalFilesystemToGCSOperator( + task_id=f'upload_to_gcs_{csv_file_path.name}', + src=str(csv_file_path), + dst=f"{dag_id}/{csv_file_path.name}", + bucket=environ.get("GCS_BUCKET"), + gcp_conn_id='google_cloud_default' + ) + gcp_tasks.append(gcp_task) + return gcp_tasks + @task def extract(dag_id,url) -> str: # Task to download data from web location @@ -58,5 +109,5 @@ def transform(dag_id, models_subdir='staging',task_id="") -> None: from airflow.hooks.subprocess import SubprocessHook subprocess = SubprocessHook() - result = subprocess.run_command(['dbt', 'run', '--select', f'models/{models_subdir}/{dag_id}'], cwd='/dbt/sagerx') + result = subprocess.run_command(['docker', 'exec', 'dbt','dbt', 'run', '--select', f'models/{models_subdir}/{dag_id}'], cwd='/dbt/sagerx') print("Result from dbt:", result) \ No newline at end of file