Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Commit

Permalink
Remove get_*_operator functions, simplify commoncrawl logic (#301)
Browse files Browse the repository at this point in the history
* Remove `get_*_operator` functions, simplify commoncrawl logic

* Only ignore the common folder, not all folders starting w/ common

* Eliminate other uses of get operator functions
  • Loading branch information
AetherUnbound committed Dec 10, 2021
1 parent 6f5f598 commit 12f8000
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 258 deletions.
2 changes: 1 addition & 1 deletion openverse_catalog/dags/.airflowignore
@@ -1,5 +1,5 @@
# Ignore all non-DAG files
common
common/
commoncrawl/commoncrawl_scripts
providers/provider_api_scripts
retired
34 changes: 12 additions & 22 deletions openverse_catalog/dags/common/dag_factory.py
Expand Up @@ -21,22 +21,7 @@
"retry_delay": timedelta(minutes=15),
"on_failure_callback": slack.on_failure_callback,
}


def get_dated_main_runner_operator(
main_function,
execution_timeout,
day_shift=0,
task_id="pull_image_data",
):
args_str = f"{{{{ macros.ds_add(ds, -{day_shift}) }}}}"
return PythonOperator(
task_id=task_id,
python_callable=main_function,
op_args=[args_str],
execution_timeout=execution_timeout,
depends_on_past=False,
)
DATE_RANGE_ARG_TEMPLATE = "{{ macros.ds_add(ds, -%s) }}"


def create_provider_api_workflow(
Expand Down Expand Up @@ -103,8 +88,12 @@ def create_provider_api_workflow(

with dag:
if dated:
get_dated_main_runner_operator(
main_function, dagrun_timeout, day_shift=day_shift
PythonOperator(
task_id="pull_image_data",
python_callable=main_function,
op_args=[DATE_RANGE_ARG_TEMPLATE % day_shift],
execution_timeout=dagrun_timeout,
depends_on_past=False,
)
else:
PythonOperator(
Expand Down Expand Up @@ -233,11 +222,12 @@ def _build_ingest_operator_list_list(
reingestion_day_list_list = [[0]] + reingestion_day_list_list
return [
[
get_dated_main_runner_operator(
main_function,
ingestion_task_timeout,
day_shift=d,
PythonOperator(
task_id=f"ingest_{d}",
python_callable=main_function,
op_args=[DATE_RANGE_ARG_TEMPLATE % d],
execution_timeout=ingestion_task_timeout,
depends_on_past=False,
)
for d in L
]
Expand Down
105 changes: 0 additions & 105 deletions openverse_catalog/dags/common/etl/operators.py

This file was deleted.

25 changes: 0 additions & 25 deletions openverse_catalog/dags/common/loader/operators.py

This file was deleted.

91 changes: 66 additions & 25 deletions openverse_catalog/dags/commoncrawl/commoncrawl_etl.py
Expand Up @@ -2,7 +2,18 @@
from datetime import datetime, timedelta

from airflow import DAG
from common.etl import operators
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import (
EmrCreateJobFlowOperator,
)
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import (
EmrTerminateJobFlowOperator,
)
from airflow.providers.amazon.aws.sensors.emr_job_flow import EmrJobFlowSensor
from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor
from airflow.providers.amazon.aws.sensors.s3_prefix import S3PrefixSensor
from airflow.utils.trigger_rule import TriggerRule
from commoncrawl.commoncrawl_utils import get_load_s3_task_id, load_file_to_s3


FILE_DIR = os.path.abspath(os.path.dirname(__file__))
Expand Down Expand Up @@ -30,6 +41,7 @@
"retry_delay": timedelta(minutes=60),
}

CC_INDEX_TEMPLATE = "CC-MAIN-{{ execution_date.strftime('%Y-%V') }}"
JOB_FLOW_OVERRIDES = {
"Applications": [{"Name": "hive"}, {"Name": "spark"}, {"Name": "pig"}],
"BootstrapActions": [
Expand Down Expand Up @@ -129,7 +141,9 @@
"--master",
"yarn",
EXTRACT_SCRIPT_S3,
"--default",
# This was "--default" previously but a task within the DAG
# modified it on DAG parse time to be this value.
CC_INDEX_TEMPLATE,
],
"Jar": "command-runner.jar",
},
Expand Down Expand Up @@ -159,41 +173,68 @@
catchup=False,
tags=["commoncrawl"],
) as dag:
check_for_cc_index = operators.get_check_cc_index_in_s3_sensor(
AWS_CONN_ID,
check_for_cc_index = S3PrefixSensor(
task_id="check_for_cc_index",
retries=0,
aws_conn_id=AWS_CONN_ID,
bucket_name="commoncrawl",
prefix=f"crawl-data/{CC_INDEX_TEMPLATE}",
poke_interval=60,
timeout=60 * 60 * 24 * 3,
soft_fail=True,
mode="reschedule",
)

check_for_wat_file = operators.get_check_wat_file_in_s3_sensor(
AWS_CONN_ID,
check_for_wat_file = S3KeySensor(
task_id="check_for_wat_file",
retries=0,
aws_conn_id=AWS_CONN_ID,
bucket_name="commoncrawl",
bucket_key=f"crawl-data/{CC_INDEX_TEMPLATE}/wat.paths.gz",
poke_interval=60,
timeout=60 * 60 * 24 * 3,
soft_fail=True,
mode="reschedule",
)

cluster_bootstrap_loader = operators.get_load_to_s3_operator(
CONFIG_SH_LOCAL,
CONFIG_SH_KEY,
BUCKET_V2,
AWS_CONN_ID,
cluster_bootstrap_loader = PythonOperator(
task_id=get_load_s3_task_id(CONFIG_SH_KEY),
python_callable=load_file_to_s3,
op_args=[CONFIG_SH_LOCAL, CONFIG_SH_KEY, BUCKET_V2, AWS_CONN_ID],
)

extract_script_loader = operators.get_load_to_s3_operator(
EXTRACT_SCRIPT_LOCAL,
EXTRACT_SCRIPT_S3_KEY,
BUCKET_V2,
AWS_CONN_ID,
extract_script_loader = PythonOperator(
task_id=get_load_s3_task_id(EXTRACT_SCRIPT_S3_KEY),
python_callable=load_file_to_s3,
op_args=[
EXTRACT_SCRIPT_LOCAL,
EXTRACT_SCRIPT_S3_KEY,
BUCKET_V2,
AWS_CONN_ID,
],
)

job_flow_creator = operators.get_create_job_flow_operator(
RAW_PROCESS_JOB_FLOW_NAME, JOB_FLOW_OVERRIDES, AWS_CONN_ID, EMR_CONN_ID
job_flow_creator = EmrCreateJobFlowOperator(
task_id=f"create_{RAW_PROCESS_JOB_FLOW_NAME}",
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id=AWS_CONN_ID,
emr_conn_id=EMR_CONN_ID,
)

job_sensor = operators.get_job_sensor(
60 * 60 * 7,
RAW_PROCESS_JOB_FLOW_NAME,
AWS_CONN_ID,
job_sensor = EmrJobFlowSensor(
task_id=f"check_{RAW_PROCESS_JOB_FLOW_NAME}",
timeout=60 * 60 * 7,
mode="reschedule",
retries=0,
job_flow_id=job_flow_creator.task_id,
aws_conn_id=AWS_CONN_ID,
)

job_flow_terminator = operators.get_job_terminator(
RAW_PROCESS_JOB_FLOW_NAME,
AWS_CONN_ID,
job_flow_terminator = EmrTerminateJobFlowOperator(
task_id=f"terminate_{RAW_PROCESS_JOB_FLOW_NAME}",
job_flow_id=job_flow_creator.task_id,
aws_conn_id=AWS_CONN_ID,
trigger_rule=TriggerRule.ALL_DONE,
)

(
Expand Down
10 changes: 10 additions & 0 deletions openverse_catalog/dags/commoncrawl/commoncrawl_utils.py
@@ -0,0 +1,10 @@
from airflow.providers.amazon.aws.hooks.s3 import S3Hook


def load_file_to_s3(local_file, remote_key, bucket, aws_conn_id):
s3 = S3Hook(aws_conn_id=aws_conn_id)
s3.load_file(local_file, remote_key, replace=True, bucket_name=bucket)


def get_load_s3_task_id(s3_key: str) -> str:
return f"load_{s3_key.replace('/', '_').replace('.', '_')}_to_s3"

0 comments on commit 12f8000

Please sign in to comment.