In [None]:
import os
import boto3
import shutil
import awswrangler as wrangler

# Constants
from utils.consts import *

### Enviornment Setup

**NOTE**: `AWS_ACCESS_KEY` and `AWS_SECRET_KEY` are set as environment variables so make sure they exist!!

In [None]:
session = boto3.Session(region_name=AWS_REGION,aws_access_key_id=AWS_ACCESS_KEY,aws_secret_access_key=AWS_SECRET_KEY)
client = session.client("emr-serverless", region_name=AWS_REGION)

If updating the environment for EMR you can repackage the Dockerfile:
```
docker build --output . .
```

In [None]:
# Updates EMR Base environment (creates a new environment.tar.gz based on Dockerfile if True)
UPDATE_ENVIRONMENT = False
# Updates Codebase for EMR to use (Uploades /src/* as zip to S3 if True)
UPDATE_MODULES = True

In [None]:
if UPDATE_MODULES:
    if os.path.isfile(MODULE_FILE):
        os.remove(MODULE_FILE)

    if SCRIPT_FILE is not None and SCRIPT_PATH is not None:
        module_file_name = MODULE_FILE.split('.')[0]
        module_file_extension = MODULE_FILE.split('.')[1]
        filename = f'{module_file_name}.{module_file_extension}'
        shutil.make_archive('src', 'zip', '../', 'src')
        wrangler.s3.upload(SCRIPT_FILE, FULL_SCRIPT_PATH, boto3_session=session)
        wrangler.s3.upload(MODULE_FILE, FULL_MODULE_PATH, boto3_session=session)

if UPDATE_ENVIRONMENT:
    if not os.path.isfile(f'../{ENVIRONMENT_FILE}'):
        raise Exception('Build your environment first using Docker: '
                        'DOCKER_BUILDKIT=1 docker build --output . .')

    wrangler.s3.upload(f'../{ENVIRONMENT_FILE}', FULL_ENVIRONMENT_PATH, boto3_session=session)

### Manually Run EMR

In [None]:
from processing.emr import EMRServerless

In [None]:
# Can designate an Application ID (default None)
APPLICATION_ID = "00fe30cc9gb81u2p"
# Designate an Application Name
APPLICATION_NAME = 'BatchOnDemand'

In [None]:
# Create and start a new EMRServerless Spark Application
emr_serverless = EMRServerless(emr_client=client, application_id=APPLICATION_ID)
if APPLICATION_ID is None: 
    print(f"Creating and starting EMR Serverless Spark App")
    emr_serverless.create_application(APPLICATION_NAME, "emr-6.14.0")
emr_serverless.start_application()

In [None]:
job_dict = {}
job_run_id = None

In [None]:
# Submit a Spark job
try:
    job_run_id = emr_serverless.run_spark_job(
        name=APPLICATION_NAME,
        script_location=f"{FULL_SCRIPT_PATH}",
        venv_name="environment",
        venv_location=f'{FULL_ENVIRONMENT_PATH}',
        modules_location=f'{FULL_MODULE_PATH}',
        job_role_arn=EMR_JOB_ROLE_ARN,
        arguments=["PROD"],
        s3_bucket_name=S3_LOGS_BUCKET,
        wait=False
    )
    print(f"Submitting new Spark job with id {job_run_id}")

except Exception as e:
    print(f'Error while submitting job: \n{e}')

    for job_run_id in job_dict.keys():
        job_status = emr_serverless.cancel_spark_job(job_id=job_run_id)
        print(f'Job {job_run_id} cancelled')

    raise e

job_dict[job_run_id] = False