Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push Data to GCS and BigQuery #299

Merged
merged 4 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ plugins
.user.yml

# Desktop Services Store
.DS_Store
.DS_Store

#GCP
gcp.json
34 changes: 23 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ We would love to see you contribute to SageRx. Join our [Slack](https://join.sla
- `UMLS_API=<umls_api_key>` - if you want to use RxNorm, you need an API key from [UMLS](https://uts.nlm.nih.gov/uts/signup-login).
4. Make sure Docker is installed
5. Run `docker-compose up airflow-init`.
6. Run `docker-compose up`.
6. Run `docker-compose up`.

> NOTE: if you have an [M1 Mac](https://stackoverflow.com/questions/62807717/how-can-i-solve-postgresql-scram-authentication-problem) `export DOCKER_DEFAULT_PLATFORM=linux/amd64`, and re-build your images

Expand Down Expand Up @@ -71,18 +71,30 @@ The export_marts DAG is implemented to allow users to push .csv versions of the
- DEST_BUCKET = 'the-name-of-your-aws-bucket'

The access and secret-access keys can be found in 2 ways:

1. If the user has the AWS CLI tools installed, simply type from Mac/Linux command prompt OR Windows Powershell:
- cat ~/.aws/credentials

- cat ~/.aws/credentials

2. If the user does not have AWS CLI tool installed or is unfamiliar with such, it is possible you would have saved your credentials in a location on your local machine - the file will be named after the IAM User in your AWS account, something like 'username_accessKeys.csv'. If you can't find that file, simply create a new set by following this procedure:
- Log in to the AWS console
- Search 'IAM' in the search bar and select the IAM Service --> the IAM Dashboard is displayed
- In the 'IAM resources' box, click on the number beneath 'Users' --> all IAM users are displayed
- choose the user with permissions in accordance with your needs (this will typically be the user with administrator access, but the only IAM User permission that is required is Read/Write permissions on your S3 bucket)
- Within the user page, click on 'Security Credentials', and scroll down to the box titled 'Access Keys'
- NOTE: A single user can only have 2 valid sets of login credentials at any one time. If you already have 2, you will need to either delete one or create a new user for this application
- NOTE: Once you have created a set of credentials, you only have 1 opportunity to view/save those credentials from the AWS UI. you should therefore be sure to save the .csv file in safe place.
- Click 'Create access key', select the 'Command Line Interface' option, click Next, fill in a name for the keys, click 'Create access keys', then download the .csv file and save it in safe place
- Open the .csv file, then paste the access key and secret access key into the .env file as described above

- Log in to the AWS console
- Search 'IAM' in the search bar and select the IAM Service --> the IAM Dashboard is displayed
- In the 'IAM resources' box, click on the number beneath 'Users' --> all IAM users are displayed
- choose the user with permissions in accordance with your needs (this will typically be the user with administrator access, but the only IAM User permission that is required is Read/Write permissions on your S3 bucket)
- Within the user page, click on 'Security Credentials', and scroll down to the box titled 'Access Keys'
- NOTE: A single user can only have 2 valid sets of login credentials at any one time. If you already have 2, you will need to either delete one or create a new user for this application
- NOTE: Once you have created a set of credentials, you only have 1 opportunity to view/save those credentials from the AWS UI. you should therefore be sure to save the .csv file in safe place.
- Click 'Create access key', select the 'Command Line Interface' option, click Next, fill in a name for the keys, click 'Create access keys', then download the .csv file and save it in safe place
- Open the .csv file, then paste the access key and secret access key into the .env file as described above

### Integration with Google Cloud Platform (GCP)

.env variables

- GCS_BUCKET
- GCP_PROJECT
- GCP_DATASET

### Troubleshooting

Expand Down
53 changes: 52 additions & 1 deletion airflow/dags/common_dag_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,57 @@ def get_most_recent_dag_run(dag_id):
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
return dag_runs[0] if dag_runs else None

def return_files_in_folder(dir_path) -> list:
files = []
for file_path in dir_path.iterdir():
if file_path.is_file():
print(file_path.name)
files.append(file_path)
elif file_path.is_dir():
files.append(return_files_in_folder(file_path))
return files

def get_files_in_data_folder(dag_id) -> list:
final_list = []
ds_path = get_data_folder(dag_id)
file_paths = [file for file in ds_path.iterdir() if not file.name.startswith('.DS_Store')]

for file_path in file_paths:
final_list.extend(return_files_in_folder(file_path))

return final_list

def txt2csv(txt_path):
import pandas as pd

output_file = txt_path.with_suffix('.csv')
csv_table = pd.read_table(txt_path, sep='\t', encoding='cp1252')
csv_table.to_csv(output_file, index=False)

print(f"Conversion complete. The CSV file is saved as {output_file}")
return output_file

def upload_csv_to_gcs(dag_id):
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from os import environ

gcp_tasks = []
files = get_files_in_data_folder(dag_id)

for file_path in files:
if file_path.suffix == '.txt':
csv_file_path = txt2csv(file_path)

gcp_task = LocalFilesystemToGCSOperator(
task_id=f'upload_to_gcs_{csv_file_path.name}',
src=str(csv_file_path),
dst=f"{dag_id}/{csv_file_path.name}",
bucket=environ.get("GCS_BUCKET"),
gcp_conn_id='google_cloud_default'
)
gcp_tasks.append(gcp_task)
return gcp_tasks

@task
def extract(dag_id,url) -> str:
# Task to download data from web location
Expand All @@ -58,5 +109,5 @@ def transform(dag_id, models_subdir='staging',task_id="") -> None:
from airflow.hooks.subprocess import SubprocessHook

subprocess = SubprocessHook()
result = subprocess.run_command(['dbt', 'run', '--select', f'models/{models_subdir}/{dag_id}'], cwd='/dbt/sagerx')
result = subprocess.run_command(['docker', 'exec', 'dbt','dbt', 'run', '--select', f'models/{models_subdir}/{dag_id}'], cwd='/dbt/sagerx')
print("Result from dbt:", result)
119 changes: 119 additions & 0 deletions airflow/dags/dbt_gcp/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import pendulum

from airflow_operator import create_dag

from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

from airflow.operators.python_operator import PythonOperator

dag_id = "dbt_gcp"

dag = create_dag(
dag_id=dag_id,
schedule="0 4 * * *",
start_date=pendulum.yesterday(),
catchup=False,
concurrency=2,
)

with dag:
def run_dbt():
from airflow.hooks.subprocess import SubprocessHook
from airflow.exceptions import AirflowException

subprocess = SubprocessHook()

run_results = subprocess.run_command(['docker','exec','dbt','dbt', 'run'], cwd='/dbt/sagerx')
if run_results.exit_code != 1:
raise AirflowException(f"Command failed with return code {run_results.exit_code}: {run_results.output}")
print(f"Command succeeded with output: {run_results.output}")

docs_results = subprocess.run_command(['docker','exec','dbt','dbt', "docs", "generate"], cwd='/dbt/sagerx')
if docs_results.exit_code != 0:
raise AirflowException(f"Command failed with return code {docs_results.exit_code}: {docs_results.output}")
print(f"Command succeeded with output: {docs_results.output}")

check_results = subprocess.run_command(['docker','exec','dbt','dbt', 'run-operation', 'check_data_availability'], cwd='/dbt/sagerx')
if check_results.exit_code != 0:
raise AirflowException(f"Command failed with return code {check_results.exit_code}: {check_results.output}")
print(f"Command succeeded with output: {check_results.output}")


def get_dbt_models():
from sagerx import run_query_to_df

df = run_query_to_df("""
SELECT *
FROM sagerx.data_availability
WHERE has_data = True
AND materialized != 'view'
""")

print(f"Final list: {df}")
return df.to_dict('index')

def load_to_gcs(run_dbt_task):

dbt_models = get_dbt_models()
gcp_tasks = []
for _,row_values in dbt_models.items():
schema_name = row_values.get('schema_name')
table_name = row_values.get('table_name')
columns_info = row_values.get('columns_info')

print(f"{schema_name}.{table_name}")
print(columns_info)

# Transfer data from Postgres DB to Google Cloud Storage DB
p2g_task = PostgresToGCSOperator(
task_id=f'postgres_to_gcs_{schema_name}.{table_name}',
postgres_conn_id='postgres_default',
sql=f"SELECT * FROM {schema_name}.{table_name}",
bucket="sagerx_bucket",
filename=f'{table_name}',
export_format='csv',
gzip=False,
dag=dag,
)

# Populate BigQuery tables with data from Cloud Storage Bucket
cs2bq_task = GCSToBigQueryOperator(
task_id=f"bq_load_{schema_name}.{table_name}",
bucket="sagerx_bucket",
source_objects=[f"{table_name}"],
destination_project_dataset_table=f"sagerx-420700.sagerx_lake.{table_name}",
autodetect = True,
external_table=False,
create_disposition= "CREATE_IF_NEEDED",
write_disposition= "WRITE_TRUNCATE",
gcp_conn_id = 'google_cloud_default',
location='us-east5',
dag = dag,
)


run_dbt_task.set_downstream(p2g_task)
p2g_task.set_downstream(cs2bq_task)

gcp_tasks.append(p2g_task)
gcp_tasks.append(cs2bq_task)

return gcp_tasks


run_dbt_task = PythonOperator(
task_id='run_dbt',
python_callable=run_dbt,
dag=dag,
)

gcp_tasks = load_to_gcs(run_dbt_task)


"""
Notes:
- Scaling issue if each table contains multiple tasks, can quickly grow since it grows exponentially
- Single dbt run is the preferred method since the complexity of dependencies is handled once
- If we can have dbt full run after each dag run and then only specify which tables to upload that would make it more targeted and less costly
"""
10 changes: 10 additions & 0 deletions airflow/dags/sagerx.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests
from time import sleep
import pandas as pd

# Filesystem functions
def create_path(*args):
Expand Down Expand Up @@ -178,6 +179,15 @@ def load_df_to_pg(df,schema_name:str,table_name:str,if_exists:str,dtype_name:str
index=index
)

def run_query_to_df(query:str) -> pd.DataFrame:
from airflow.hooks.postgres_hook import PostgresHook

pg_hook = PostgresHook(postgres_conn_id="postgres_default")
engine = pg_hook.get_sqlalchemy_engine()
df = pd.read_sql(query, con=engine)

return df

@retry(
stop=stop_after_attempt(20),
wait=wait_exponential(multiplier=1,max=10),
Expand Down
4 changes: 3 additions & 1 deletion airflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Any change made here should accompany an increment
# to the image version on line 5 of docker-compose.yml

dbt-postgres==1.4.1
dbt-core
dbt-postgres
apache-airflow[google]
2 changes: 1 addition & 1 deletion dbt/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ENV DBT_PROFILES_DIR=/dbt
RUN apt-get -y update
RUN apt-get -y install git

RUN pip install dbt-postgres==1.5.1
RUN pip install dbt-core dbt-postgres

WORKDIR /dbt

Expand Down
3 changes: 3 additions & 0 deletions dbt/sagerx/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ models:
marts:
+schema: sagerx
+materialized: table
+persist_docs:
relation: true
columns: true
77 changes: 77 additions & 0 deletions dbt/sagerx/macros/check_data_availability.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{% macro check_data_availability() %}
{% set sources_and_models = [] -%}

{% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") %}
{%- do sources_and_models.append([node.schema, node.name, node.config.materialized]) -%}
{%- endfor %}

{% for node in graph.sources.values() -%}
{%- do sources_and_models.append([node.schema, node.name, node.resource_type]) -%}
{%- endfor %}

--{{ log("sources and models: " ~ sources_and_models, info=True) }}

{% set results_table_query %}
drop table if exists sagerx.data_availability CASCADE;
create table sagerx.data_availability(
schema_name text,
table_name text,
has_data boolean,
materialized text,
columns_info jsonb
);
{% endset %}

{{ run_query(results_table_query) }}


{% for schema_name, table_name, mat_config in sources_and_models %}
{% set check_table_exists_query %}
{{ log("Check table exists query for " ~ schema_name ~ " : " ~ table_name, info=True) }}
select exists (
select 1
from information_schema.tables
where table_schema = '{{ schema_name }}'
and table_name = '{{ table_name }}'
) as table_exists
{% endset %}

{% set table_exists_result = run_query(check_table_exists_query) %}
{% if table_exists_result[0]['table_exists'] %}
{{ log("Table: " ~ table_name ~ " does exist.", info=True) }}

{% set columns_query %}
select
column_name,
data_type,
is_nullable
from
information_schema.columns
where
table_schema = '{{ schema_name }}'
and table_name = '{{ table_name }}'
{% endset %}

{% set columns_result = run_query(columns_query) %}
{% set columns_info = [] %}
{% for column in columns_result %}
{%- do columns_info.append({"column_name": column['column_name'], "data_type": column['data_type'], "is_nullable": column['is_nullable']}) -%}
{% endfor %}

{% set row_count_query %}
select count(*) as row_count from {{schema_name}}.{{table_name}}
{% endset %}
{% set row_count_result = run_query(row_count_query) %}

{% set insert_query %}
insert into sagerx.data_availability
(schema_name, table_name, has_data, materialized, columns_info)
values ('{{ schema_name }}','{{ table_name }}', {{ row_count_result[0]['row_count'] > 0 }}, '{{ mat_config }}', '{{ columns_info | tojson }}');
{% endset %}
{{ run_query(insert_query) }}
{% else %}
{{ log("No Table: " ~ table_name ~ " does not exist.", info=True) }}
{% endif %}

{% endfor %}
{% endmacro %}
Loading