In [None]:
import boto3, botocore
from botocore.exceptions import ClientError
import os, time, json, io, zipfile
from datetime import date
from dotenv import load_dotenv

from misc import load_from_yaml, save_to_yaml
from emr import create_emr_cluster

load_dotenv(".env")
# boto3.setup_default_session(profile_name="AMominNJ")

In [None]:
ACCOUNT_ID        = os.environ['AWS_ACCOUNT_ID_ROOT']
REGION            = os.environ['AWS_DEFAULT_REGION']
VPC_ID            = os.environ['AWS_DEFAULT_VPC']
SECURITY_GROUP_ID = os.environ['AWS_DEFAULT_SG_ID']
SUBNET_IDS        = SUBNET_IDS = os.environ["AWS_DEFAULT_SUBNET_IDS"].split(":")
SUBNET_ID         = SUBNET_IDS[0]
print(SUBNET_IDS)

In [None]:
ec2_client           = boto3.client('ec2', region_name=REGION)
ec2_resource         = boto3.resource('ec2', region_name=REGION)
sts_client           = boto3.client('sts')
rds_client           = boto3.client('rds')
iam_client           = boto3.client('iam')
s3_client            = boto3.client('s3')
glue_client          = boto3.client('glue')
lakeformation_client = boto3.client('lakeformation')
stepfunctions_client = boto3.client('stepfunctions')
apigateway_client    = boto3.client('apigateway')
lsn_client           = boto3.client('lambda')
events_client        = boto3.client('events')
sqs_client           = boto3.client('sqs')

emr_client = boto3.client('emr', region_name=REGION)

-   [Boto3 EMR Tutorial](https://hands-on.cloud/boto3/emr/)

### [EMR: Boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html)

#### Notes

-   **SecurityConfiguration**

    A **Security Configuration** in AWS EMR defines how data is encrypted and secured at rest and in transit. It allows you to enable various security settings for the cluster.

    ```python
    import boto3

    emr_client = boto3.client('emr', region_name='us-east-1')

    # Create a security configuration
    response = emr_client.create_security_configuration(
        Name='MySecurityConfig',
        SecurityConfiguration='''{
            "EncryptionConfiguration": {
                "EnableInTransitEncryption": true,
                "EnableAtRestEncryption": true,
                "AtRestEncryptionConfiguration": {
                    "S3EncryptionConfiguration": {
                        "EncryptionMode": "SSE-S3"
                    }
                },
                "InTransitEncryptionConfiguration": {
                    "TLSCertificateConfiguration": {
                        "CertificateProviderType": "PEM"
                    }
                }
            }
        }'''
    )
    print(response)
    ```

-   **StepConcurrencyLevel**

    **Step Concurrency Level** determines how many steps (tasks) can run concurrently in an EMR cluster. By default, EMR executes steps sequentially, but you can adjust this to improve parallelism.

    ```python
    response = emr_client.run_job_flow(
        Name='MyCluster',
        ReleaseLabel='emr-6.10.0',
        Instances={
            'InstanceGroups': [...],
            'KeepJobFlowAliveWhenNoSteps': True
        },
        Steps=[
            {'Name': 'Step1', 'HadoopJarStep': {...}, 'ActionOnFailure': 'CONTINUE'},
            {'Name': 'Step2', 'HadoopJarStep': {...}, 'ActionOnFailure': 'CONTINUE'}
        ],
        StepConcurrencyLevel=2  # Allows two steps to run concurrently
    )
    ```

-   **ManagedScalingPolicy**

    A **Managed Scaling Policy** in EMR enables automatic adjustment of the cluster's compute resources (instances) based on workload demands. It uses predefined scaling rules.

    ```python
    response = emr_client.put_managed_scaling_policy(
        ClusterId='j-XXXXXXXXXXXXX',
        ManagedScalingPolicy={
            'ComputeLimits': {
                'UnitType': 'InstanceFleetUnits',
                'MinimumCapacityUnits': 2,
                'MaximumCapacityUnits': 10,
                'MaximumOnDemandCapacityUnits': 4,
                'MaximumCoreCapacityUnits': 6
            }
        }
    )
    print(response)
    ```

#### Instance Profile and Role

- Instance Profile Creation using CLI [`SUCCESS`]

In [None]:
# !aws emr create-default-roles

-   This command creates:

    -   `EMR_DefaultRole` for the EMR service.
    -   `EMR_EC2_DefaultRole` for EC2 instances.
    -   `EMR_AutoScaling_DefaultRole`

In [None]:
response = iam_client.list_instance_profiles()

# Extract and return the instance profiles
instance_profiles = response["InstanceProfiles"]
print(instance_profiles)
# print(response["InstanceProfiles"][0]['InstanceProfileName'])

[{'Path': '/', 'InstanceProfileName': 'EMR_EC2_DefaultRole', 'InstanceProfileId': 'AIPAVRUVV3SNT2Y5IQA4T', 'Arn': 'arn:aws:iam::381492255899:instance-profile/EMR_EC2_DefaultRole', 'CreateDate': datetime.datetime(2024, 12, 7, 2, 47, 19, tzinfo=tzutc()), 'Roles': [{'Path': '/', 'RoleName': 'EMR_EC2_DefaultRole', 'RoleId': 'AROAVRUVV3SN7L6L3U7GE', 'Arn': 'arn:aws:iam::381492255899:role/EMR_EC2_DefaultRole', 'CreateDate': datetime.datetime(2024, 12, 7, 2, 47, 17, tzinfo=tzutc()), 'AssumeRolePolicyDocument': {'Version': '2008-10-17', 'Statement': [{'Sid': '', 'Effect': 'Allow', 'Principal': {'Service': 'ec2.amazonaws.com'}, 'Action': 'sts:AssumeRole'}]}}]}]


#### S3

In [None]:
EMR_BUCKET_NAME = 'emr-bkt-' + date.today().strftime('%Y%m%d')  # The name must be unique across all of Amazon S3
inputs, outputs, scripts, logs = ["inputs", "outputs", "scripts", "logs"]    # List of folders to create

#### Create EMR Cluster

In [None]:
CLUSTER_NAME=f"emr-cluster-{date.today().strftime('%Y%m%d')}"
LogUri, ReleaseLabel, Ec2SubnetId = "", "", ""

In [None]:
cluster_id = create_emr_cluster(CLUSTER_NAME, LogUri, ReleaseLabel, Ec2SubnetId)

In [None]:
response = emr_client.list_clusters(ClusterStates=["STARTING", "BOOTSTRAPPING", "RUNNING", "WAITING"])

# Extract ClusterIds
clusters = [f""""ClusterId": {cluster["Id"]}; "Name": {cluster["Name"]}; "Status": {cluster["Status"]["State"]}""" for cluster in response["Clusters"]]
    

[print(cluster_info) for cluster_info in clusters]

In [None]:
# emr_client.describe_cluster(ClusterId=cluster['ClusterId'])

In [None]:
# emr_client.list_clusters(ClusterStates=["STARTING", "BOOTSTRAPPING", "RUNNING", "WAITING"])

### [Intro to Amazon EMR - Big Data Tutorial using Spark](https://www.youtube.com/watch?v=8bOgOvz6Tcg&t=667s)

#### Script

```python
#
import argparse

from pyspark.sql import SparkSession
from pyspark.sql import functions as fn

def transform_data(data_source, output_uri):
    with SparkSession.builder.appName('test_emr').getOrCreate() as spark:
        df = spark.read.option('header', 'true').csv(data_source)
        df = df.select(
            fn.col("Name").alias("name"),
            fn.col("Violation Type").alias("violation_type")
        )
        df.createOrReplaceTempView("restaurant_violations")
        GROUP_BY_QUERY = """SELECT name, count(*) as total_red_violations FROM restaurant_violations WHERE violation_type == "RED" GROUP BY name"""
        transformed_df = spark.sql(GROUP_BY_QUERY)
        print(f"Number of Rows in SQL Query {transformed_df.count()}")
        transformed_df.write.mode("overwrite").parquet(output_uri)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--data_source")
    parser.add_argument("--output_uri")

    args = parser.parse_args()

    transform_data(args.data_source, args.output_uri)
```

#### Create S3 Objects

In [None]:
acl = 'private'                             # Set the ACL (e.g., 'private', 'public-read')
enable_versioning = False                   # Enable versioning
enable_encryption = False                   # Enable server-side encryption


s3.create_s3_bucket(EMR_BUCKET_NAME, [inputs, outputs, scripts, logs])


local_file1 = os.environ['DATA'] + '/restaurant_violations.csv'  # The local file you want to upload
object_name1 = f"{inputs}/restaurant_violations.csv"  # The name to save the file as in the S3 bucket

# Upload the file
s3.upload_file_to_s3(EMR_BUCKET_NAME, local_file1, object_name1)
# delete_file_from_s3(EMR_BUCKET_NAME, object_name1)

local_file2 = "../aws_emr/emr_basic/main.py"
pyscript_object = f"{scripts}/main.py"  # The name to save the file as in the S3 bucket

# Upload the file
s3.upload_file_to_s3(EMR_BUCKET_NAME, local_file2, pyscript_object)
# delete_file_from_s3(bucket_name, pyscript_object)

In [None]:
bkt_list_res = s3_client.list_objects_v2(Bucket=EMR_BUCKET_NAME)
for item in bkt_list_res['Contents']:
    print(item['Key'])

#### Instance Profile and Role

- Instance Profile Creation using CLI [`SUCCESS`]

In [None]:
# !aws emr create-default-roles

-   This command creates:

    -   `EMR_DefaultRole` for the EMR service.
    -   `EMR_EC2_DefaultRole` for EC2 instances.
    -   `EMR_AutoScaling_DefaultRole`

In [126]:
response = iam_client.list_instance_profiles()

# Extract and return the instance profiles
instance_profiles = response["InstanceProfiles"]
print(instance_profiles)
# print(response["InstanceProfiles"][0]['InstanceProfileName'])

[{'Path': '/', 'InstanceProfileName': 'EMR_EC2_DefaultRole', 'InstanceProfileId': 'AIPAVRUVV3SNT2Y5IQA4T', 'Arn': 'arn:aws:iam::381492255899:instance-profile/EMR_EC2_DefaultRole', 'CreateDate': datetime.datetime(2024, 12, 7, 2, 47, 19, tzinfo=tzutc()), 'Roles': [{'Path': '/', 'RoleName': 'EMR_EC2_DefaultRole', 'RoleId': 'AROAVRUVV3SN7L6L3U7GE', 'Arn': 'arn:aws:iam::381492255899:role/EMR_EC2_DefaultRole', 'CreateDate': datetime.datetime(2024, 12, 7, 2, 47, 17, tzinfo=tzutc()), 'AssumeRolePolicyDocument': {'Version': '2008-10-17', 'Statement': [{'Sid': '', 'Effect': 'Allow', 'Principal': {'Service': 'ec2.amazonaws.com'}, 'Action': 'sts:AssumeRole'}]}}]}]


#### Create EMR Cluster [`SUCCESS`]

In [None]:
CLUSTER_NAME=f"emr-cluster-{date.today().strftime('%Y%m%d')}"

In [None]:
cluster_id = emr_client.run_job_flow(
    Name=CLUSTER_NAME,  # Name of the EMR cluster
    LogUri=f"s3://{EMR_BUCKET_NAME}/{logs}/",
    ReleaseLabel="emr-5.32.0",
    Instances={
        "InstanceGroups": [
            {
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1,
                "Market": "ON_DEMAND"
            },
            {
                "InstanceRole": "CORE",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 2,
                "Market": "ON_DEMAND"
            },
            # {
            #     "InstanceRole": "TASK",
            #     "InstanceType": "m5.xlarge",
            #     "InstanceCount": 1,  # Optional: Add if task nodes are required
            #     "Market": "ON_DEMAND"
            # }
        ],
        "Ec2KeyName": "AMominNJ",  # EC2 key pair for SSH access
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False,
        "Ec2SubnetId": SUBNET_ID,  # Replace with your subnet ID
        # "HadoopVersion": "2.10.1",  # Optional: Hadoop version
    },
    Applications=[
        {"Name": "Hadoop"},
        {"Name": "Spark"},
        {"Name": "Hive"},
        {"Name": "Hue"},
        {"Name": "JupyterHub"},
    ],
    VisibleToAllUsers=True,
    ServiceRole="EMR_DefaultRole",  # IAM role for EMR service
    JobFlowRole="EMR_EC2_DefaultRole",  # IAM role for EMR EC2 instances
    AutoScalingRole="EMR_AutoScaling_DefaultRole",  # Optional: For auto-scaling
    Tags=[
        {"Key": "Environment", "Value": "Development"},
        {"Key": "Project", "Value": "EMR_DataEngineering"}
    ],
    ScaleDownBehavior="TERMINATE_AT_TASK_COMPLETION",  # Scale-down behavior
    StepConcurrencyLevel=1,  # Optional: Max concurrent steps
)['JobFlowId']

In [None]:
response = emr_client.list_clusters(ClusterStates=["STARTING", "BOOTSTRAPPING", "RUNNING", "WAITING"])

# Extract ClusterIds
clusters = [f""""ClusterId": {cluster["Id"]}; "Name": {cluster["Name"]}; "Status": {cluster["Status"]["State"]}""" for cluster in response["Clusters"]]
    

[print(cluster_info) for cluster_info in clusters]

In [None]:
# emr_client.describe_cluster(ClusterId=cluster['ClusterId'])

In [None]:
# emr_client.list_clusters(ClusterStates=["STARTING", "BOOTSTRAPPING", "RUNNING", "WAITING"])

#### Add Steps

- boto3

In [None]:
data_source = f"s3://{EMR_BUCKET_NAME}/{object_name1}"
output_uri = f"s3://{EMR_BUCKET_NAME}/{outputs}/"

In [None]:
# Step definition
pyspark_step = {
    'Name': 'TestRun2',
    'ActionOnFailure': 'CONTINUE',
    'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Args': [
            'spark-submit',
            f's3://{EMR_BUCKET_NAME}/{pyscript_object}',
            '--data_source', data_source,
            '--output_uri', output_uri
        ]
    }
}

# Add step to the cluster
response = emr_client.add_job_flow_steps(
    JobFlowId=cluster_id,
    Steps=[pyspark_step]
)

In [None]:
# Print the response
step_id = response['StepIds'][0]
print(f"Step added with ID: {step_id}")

# Monitor the step
step_status = emr_client.describe_step(ClusterId=cluster_id, StepId=step_id)
print("Step Status:", step_status['Step']['Status']['State'])


- Alternative Option: CLI

In [None]:
part = ['spark-submit',f's3://{EMR_BUCKET_NAME}/{pyscript_object}','--data_source', data_source,'--output_uri', output_uri]
STEPS = f"""Type=CUSTOM_JAR,Name=MyStepName,ActionOnFailure=CONTINUE,Jar=command-runner.jar,Args="{part}\""""
print(part)
print(STEPS)

In [None]:
!aws emr add-steps --cluster-id {cluster_id} --region {REGION} --steps {STEPS}

- **Another Method of using CLI** [`NOT TESTED`]

steps.json
```json
[
  {
    "Name": "Step 1",
    "ActionOnFailure": "CONTINUE",
    "HadoopJarStep": {
      "Jar": "command-runner.jar",
      "Args": ["spark-submit", "s3://emr-bkt-20241206/scripts/main.py", "--data_source", "s3://emr-bkt-20241206/inputs/restaurant_violations.csv", "--output_uri", "s3://emr-bkt-20241206/outputs"]
    }
  },
  {
    "Name": "Step 2",
    "ActionOnFailure": "CONTINUE",
    "HadoopJarStep": {
      "Jar": "command-runner.jar",
      "Args": ["spark-submit", "--deploy-mode", "cluster", "--master", "yarn", "s3://your-bucket/script2.py"]
    }
  }
]
```

```sh
aws emr add-steps \
    --cluster-id j-XXXXXXXX \
    --steps file://steps.json
```

#### Runn Script from the Master Node [`NOT TESTED YET`]

In [None]:
print(f"spark-submit main.py --data_source {data_source} --output_uri {output_uri}")

-   `$ scp ./main.py -i ~/.ssh/AMominNJ.pem hadoop@ec2-3-235-170-221.compute-1.amazonaws.com:/home/hadoop`
-   `$ ssh -i ~/.ssh/AMominNJ.pem hadoop@ec2-3-235-170-221.compute-1.amazonaws.com:/home/hadoop`
-   `$ spark-submit main.py --data_source {data_source} --output_uri {output_uri}`

#### Delete Resources

In [None]:
s3_resource = boto3.resource('s3')
bucket1 = s3_resource.Bucket(EMR_BUCKET_NAME)

# Delete all objects in the bucket
bucket1.objects.all().delete()

# Delete all object versions (if versioning is enabled)
# bucket1.object_versions.all().delete()

# Finally, delete the bucket
bucket1.delete()

In [None]:
emr_client.terminate_job_flows(JobFlowIds=[cluster_id])

In [127]:
# response = iam_client.delete_instance_profile(InstanceProfileName='emr_instance_profile')
# print("Instance profile deleted:", response)

In [None]:
## DELETE IAM ROLE AT THE END AFTER DELETING ALL OTHER RESOURCES.
iam.delete_iam_role("EMR_DefaultRole")
iam.delete_iam_role("EMR_EC2_DefaultRole")
iam.delete_iam_role("EMR_AutoScaling_DefaultRole")

###   [Automating EMR Serverless Workload |Creating|Submitting | Destroying EMR Cluster using Step Function](https://www.youtube.com/watch?v=V7bFwXBN5xc&t=199s)

-   [code](https://github.com/soumilshah1995/Automating-EMR-Serverless-Workload-Creating-Submitting-Destroying-EMR-Cluster-using-Step-Funct)

```json
{
  "Comment": "A description of my state machine",
  "StartAt": "Create New EMR Application",
  "States": {
    "Create New EMR Application": {
      "Type": "Task",
      "ResultPath": "$.CreateEMRCluster",
      "Next": "Start EMR Serverless Application",
      "Parameters": {
        "Architecture": "X86_64",
        "ClientToken.$":"States.UUID()",
        "ReleaseLabel.$": "$.emr_cluster.ReleaseLabel",
        "Type.$": "$.emr_cluster.Type",
        "Name": "datateam",
        "NetworkConfiguration": {
          "SecurityGroupIds": [ "sg-0f82bcb99a2878231" ],
          "SubnetIds": [ "subnet-05551ec8e1006b370","subnet-03576afd62b50a982" ]
        }
      },
      "Resource": "arn:aws:states:::aws-sdk:emrserverless:createApplication"
    },
    "Start EMR Serverless Application": {
      "Type": "Task",
      "ResultPath": "$.StartEMRApplication",
      "Next": "Start EMR Job wait for CallBack",
      "Parameters": {
        "ApplicationId.$": "$.CreateEMRCluster.ApplicationId"
      },
      "Resource": "arn:aws:states:::aws-sdk:emrserverless:startApplication"
    },
    "Start EMR Job wait for CallBack": {
      "Type": "Task",
      "ResultPath": "$.WaitForCallBack",
      "Catch":[
        {
          "ErrorEquals":[
            "States.TaskFailed"
          ],
          "Next":"wait_2_minutes"
        },
        {
          "ErrorEquals":[
            "States.ALL"
          ],
          "Next":"wait_2_minutes"
        }
      ],
      "Parameters": {
        "ApplicationId.$": "$.CreateEMRCluster.ApplicationId",
        "ClientToken.$": "States.UUID()",
        "ExecutionRoleArn.$": "$.ExecutionArn",
        "JobDriver": {
          "SparkSubmit": {
            "EntryPoint.$": "$.ScriptPath",
            "EntryPointArguments.$":  "States.Array($$.Task.Token)",
            "SparkSubmitParameters.$": "$.SparkSubmitParameters"
          }
        },
        "Name.$": "$.JobName"
      },
      "Resource": "arn:aws:states:::aws-sdk:emrserverless:startJobRun.waitForTaskToken",
      "Next": "wait_2_minutes"
    },
    "wait_2_minutes": {
      "Type": "Wait",
      "Seconds": 140,
      "Next": "Stop EMR Serverless Application"
    },
    "Stop EMR Serverless Application": {
      "ResultPath": "$.StopApplication",
      "Type": "Task",
      "Next": "Wait for Application to Stop",
      "Resource": "arn:aws:states:::aws-sdk:emrserverless:stopApplication",
      "Parameters": {
        "ApplicationId.$":  "$.CreateEMRCluster.ApplicationId"
      }
    },
    "Wait for Application to Stop": {
      "Type": "Wait",
      "Seconds": 140,
      "Next": "Delete EMR Serverless Application"
    },
    "Delete EMR Serverless Application": {
      "Type": "Task",
      "ResultPath": "$.DeleteEMRJob",
      "End": true,
      "Parameters": {
        "ApplicationId.$": "$.CreateEMRCluster.ApplicationId"
      },
      "Resource": "arn:aws:states:::aws-sdk:emrserverless:deleteApplication"
    }
  }
}
```

### [AWS EMR by AWS Tutorials](https://www.youtube.com/playlist?list=PLO95rE9ahzRt42F77Gikc0MNZbv8z7F6N)

1. **IAM Roles**: Ensure that the roles `EMR_EC2_DefaultRole` and `EMR_DefaultRole` exist and have necessary permissions.
2. **S3 Buckets**: Replace `s3://my-emr-logs-bucket/` and `s3://my-bootstrap-scripts/bootstrap.sh` with your S3 paths.
3. **Networking**: Update the `Ec2SubnetId` to match your network configuration.
4. **Customizations**: Modify or extend the `Configurations`, `Steps`, or `BootstrapActions` as needed.
5. **Security**: Avoid hardcoding sensitive information like passwords. Use AWS Secrets Manager or environment variables.

-   [lab: Absolute Beginners Tutorial for Amazon EMR](https://aws-dojo.com/ws34/labs/)
-   [lab: Using Amazon EMR with AWS Glue Catalog](https://aws-dojo.com/ws41/labs/#google_vignette)
-   [lab: Using Transient Amazon EMR Cluster](https://aws-dojo.com/excercises/excercise45/)

#### Create S3 Buckets

In [None]:
EMR_BKT = "httx-emr-bkt"

In [None]:
acl = 'public-read'                         # Set the ACL (e.g., 'private', 'public-read')
enable_versioning = False                   # Enable versioning
enable_encryption = False                   # Enable server-side encryption

folders = ["inputs", "outputs", "scripts", "logs"]

In [None]:
s3_client.create_bucket(Bucket=EMR_BKT)

-   **Put Object (Upload)**: Uploads an object directly to S3 (binary or text content).

In [None]:
# s3_client.put_object(Bucket='my-bucket', Key='new_file.txt', Body=b'Hello, World!')

In [None]:
[s3_client.put_object(Bucket=EMR_BKT, Key=folder) for folder in folders]

-   **List All Buckets**: Lists all the buckets in your S3 account.

In [None]:
response = s3_client.list_buckets()
print(response)
for bucket in response['Buckets']:
    print(f'Bucket: {bucket["Name"]}')

-   **Upload a File to S3**: Uploads a file to a specified S3 bucket.

In [None]:
s3_client.upload_file('./customers.csv', EMR_BKT, 'source/customers.csv')

#### Delete Resources

In [None]:
s3 = boto3.resource('s3')
bucket1 = s3.Bucket(EMR_BKT)

# Delete all objects in the bucket
bucket1.objects.all().delete()

# Delete all object versions (if versioning is enabled)
# bucket1.object_versions.all().delete()

# Finally, delete the bucket
bucket1.delete()