# Project: Using Airflow to Submit Spark Jobs to AWS EMR

Credit to @josephmachado for tutorial

This project will:
1. Create an AWS EMR cluster
2. Submit Apache Spark jobs to the cluster using EMR’s Step function from Airflow
3. Wait for completion of the jobs
4. Terminate the AWS EMR cluster

We will build a simple `DAG` which uploads a local `pyspark` script and some data into a S3 bucket, starts an EMR cluster, submits a Spark job that uses the uploaded script in the S3 bucket, and when the job is complete terminates the EMR cluster.

![DAG Diagram](https://www.startdataengineering.com/images/how-to-submit-spark-jobs-to-emr-cluster-from-airflow/spark_submit_design.png)

## Prerequisites:
1. [docker](https://docs.docker.com/get-docker/) (make sure to have docker-compose as well)
2. [AWS account](https://aws.amazon.com/) to set up required cloud services
3. [Install](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) and [configure](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html#cli-configure-quickstart-config) AWS CLI

## Create AWS bucket:
```bash
aws s3api create-bucket --acl public-read-write --bucket emr-spark-airflow
```

## Get data and place in data directory:
```bash
mkdir ./dags/data
mv movie_review* ./dags/data/movie_review.csv
```

The project folder structure looks like this:
```bash
.
├── Dockerfile
├── LICENSE
├── config
│   └── airflow.cfg
├── dags
│   ├── __pycache__
│   │   └── spark_submit_airflow.cpython-37.pyc
│   ├── data
│   │   └── movie_review.csv
│   ├── scripts
│   │   └── spark
│   │       └── random_text_classification.py
│   └── spark_submit_airflow.py
├── docker-compose-LocalExecutor.yml
├── emr-spark-airflow.ipynb
├── requirements.txt
├── script
│   └── entrypoint.sh
├── setup
│   └── raw_input_data
├── temp
└── workflows
    └── ci.yml

11 directories, 12 files
```

## Code

### 1. Move data and script to AWS S3

The `random_text_classification.py` is a naive `pyspark` script that reads in our data and if the review contains the word good it classifies it as positive else negative review.
The dag defined at `spark_submit_airflow.py` is the outline we will build on. This is a simple dag scheduled to run at `10:00 AM UTC` everyday.
We use Apache Airflow’s `S3Hook` to connect to our S3 bucket and move the data and script to the required location.

In [None]:
from airflow.hooks.S3_hook import S3Hook
from airflow.operators import PythonOperator

# Configurations
BUCKET_NAME = "<your-bucket-name>"
local_data = "./dags/data/movie_review.csv"
s3_data = "data/movie_review.csv"
local_script = "./dags/scripts/spark/random_text_classification.py"
s3_script = "scripts/random_text_classification.py"

# helper function
def _local_to_s3(filename, key, bucket_name=BUCKET_NAME):
    s3 = S3Hook()
    s3.load_file(filename=filename, bucket_name=bucket_name, replace=True, key=key)

data_to_s3 = PythonOperator(
    dag=dag,
    task_id="data_to_s3",
    python_callable=_local_to_s3,
    op_kwargs={"filename": local_data, "key": s3_data,},
)
script_to_s3 = PythonOperator(
    dag=dag,
    task_id="script_to_s3",
    python_callable=_local_to_s3,
    op_kwargs={"filename": local_script, "key": s3_script,},
)

![Screen Shot 2022-09-26 at 10.17.34 AM.png](attachment:33df5ea1-9cae-4b80-8a0c-1baac829da4a.png)

### 2. Create an EMR cluster

Apache Airflow has an `EmrCreateJobFlowOperator` operator to create an EMR cluster.
We have to define the cluster configurations and the operator can use that to create the EMR cluster.

In [None]:
from airflow.contrib.operators.emr_create_job_flow_operator import (
    EmrCreateJobFlowOperator,
)

JOB_FLOW_OVERRIDES = {
    "Name": "Movie review classifier",
    "ReleaseLabel": "emr-5.29.0",
    "Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}], # We want our EMR cluster to have HDFS and Spark
    "Configurations": [
        {
            "Classification": "spark-env",
            "Configurations": [
                {
                    "Classification": "export",
                    "Properties": {"PYSPARK_PYTHON": "/usr/bin/python3"}, # by default EMR uses py2, change it to py3
                }
            ],
        }
    ],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Master node",
                "Market": "SPOT",
                "InstanceRole": "MASTER",
                "InstanceType": "m4.xlarge",
                "InstanceCount": 1,
            },
            {
                "Name": "Core - 2",
                "Market": "SPOT", # Spot instances are a "use as available" instances
                "InstanceRole": "CORE",
                "InstanceType": "m4.xlarge",
                "InstanceCount": 2,
            },
        ],
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False, # this lets us programmatically terminate the cluster
    },
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
}

# Create an EMR cluster
create_emr_cluster = EmrCreateJobFlowOperator(
    task_id="create_emr_cluster",
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id="aws_default",
    emr_conn_id="emr_default",
    dag=dag,
)

# The EmrCreateJobFlowOperator creates a cluster and stores the EMR cluster id(unique identifier) in xcom, which is a key value store used to access variables across Airflow tasks.

### 3. Run jobs in the EMR cluster and wait for it to complete

We will add the individual steps that we need to run on the cluster.

In [None]:
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator

s3_clean = "clean_data/"
SPARK_STEPS = [ # Note the params values are supplied to the operator
    {
        "Name": "Move raw data from S3 to HDFS",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "s3-dist-cp",
                "--src=s3://{{ params.BUCKET_NAME }}/data",
                "--dest=/movie",
            ],
        },
    },
    {
        "Name": "Classify movie reviews",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--deploy-mode",
                "client",
                "s3://{{ params.BUCKET_NAME }}/{{ params.s3_script }}",
            ],
        },
    },
    {
        "Name": "Move clean data from HDFS to S3",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "s3-dist-cp",
                "--src=/output",
                "--dest=s3://{{ params.BUCKET_NAME }}/{{ params.s3_clean }}",
            ],
        },
    },
]

# Add your steps to the EMR cluster
step_adder = EmrAddStepsOperator(
    task_id="add_steps",
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
    aws_conn_id="aws_default",
    steps=SPARK_STEPS,
    params={ # these params are used to fill the paramterized values in SPARK_STEPS json
        "BUCKET_NAME": BUCKET_NAME,
        "s3_data": s3_data,
        "s3_script": s3_script,
        "s3_clean": s3_clean,
    },
    dag=dag,
)

last_step = len(SPARK_STEPS) - 1 # this value will let the sensor know the last step to watch
# wait for the steps to complete
step_checker = EmrStepSensor(
    task_id="watch_step",
    job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')["
    + str(last_step)
    + "] }}",
    aws_conn_id="aws_default",
    dag=dag,
)

We specify 3 steps in the SPARK_STEPS json, they are:

1. Copy data from AWS S3 into the clusters HDFS location /movie
2. Run a naive text classification spark script random_text_classification.py which reads input from /movie and write output to /output.
3. Copy the data from cluster HDFS location /output to AWS S3 clean_data location, denoted by the s3_clean configuration variable.

We get the EMR cluster id from xcom as shown in job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}".
We also get the last step id from xcom in our EmrStepSensor.
The step sensor will periodically check if that last step is completed or skipped or terminated.

### 4. Terminate the EMR cluster
After the step sensor senses the completion of the last step, we can terminate our EMR cluster.

In [None]:
from airflow.contrib.operators.emr_terminate_job_flow_operator import (
    EmrTerminateJobFlowOperator,
)

# Terminate the EMR cluster
terminate_emr_cluster = EmrTerminateJobFlowOperator(
    task_id="terminate_emr_cluster",
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
    aws_conn_id="aws_default",
    dag=dag,
)
# In the above snippet, we get the cluster id from xcom and then terminate the cluster.

## Running the DAG:

```bash
docker-compose -f docker-compose-LocalExecutor.yml up -d
```

Go to [http://localhost:8080/admin/](http://localhost:8080/admin/) and turn on the `spark_submit_airflow` DAG.

You can check the status at [http://localhost:8080/admin/airflow/graph?dag_id=spark_submit_airflow](http://localhost:8080/admin/airflow/graph?dag_id=spark_submit_airflow).

![Screen Shot 2022-09-26 at 10.16.42 AM.png](attachment:e48df551-4abd-4789-a48a-b3ae440cbe7a.png)

![Screen Shot 2022-09-26 at 11.21.06 AM.png](attachment:b22491d0-268e-4436-9bc7-142896a2aaf1.png)

NOTE: If your job fails or you stop your Airflow instance make sure to check your AWS EMR UI console to terminate any running EMR cluster. You can also remove the S3 you created as shown below

## Terminate local instance

```bash
docker-compose -f docker-compose-LocalExecutor.yml down
```

```bash
aws s3api delete-bucket --bucket emr-spark-airflow
```

## Conclusion

In thie project we created a temporary AWS EMR cluster to run Spark jobs. One of the biggest issues with this approach is the time it takes to create the EMR cluster. If you are using an always-on EMR cluster you can skip the tasks to create and terminate the EMR cluster.