In [1]:
import os
from data_ml_utils.boto3_botocore_client.client_emr import AwsEMRServices

os.environ["AWS_ACCESS_KEY_ID"] = "xxx"
os.environ["AWS_SECRET_ACCESS_KEY"] = "xxx"

### initialise emr client

In [2]:
aws_emr = AwsEMRServices()

### create EMR cluster with specifications
- almost one-liner to create EMR cluster

In [3]:
configurations = [
    {
        "Classification": "spark-hive-site",
        "Properties": {
            "hive.metastore.glue.catalogid": "251259879778",
            "hive.metastore.client.factory.class": (
                """com.amazonaws.glue.catalog.metastore."""
                """AWSGlueDataCatalogHiveClientFactory"""
            ),
        },
        "Configurations": [],
    },
    {
        "Classification": "zeppelin-env",
        "Properties": {},
        "Configurations": [
            {
                "Classification": "export",
                "Properties": {
                    "ZEPPELIN_NOTEBOOK_S3_BUCKET": "hipages-long-lake",
                    "ZEPPELIN_NOTEBOOK_S3_USER": "zeppelin",
                    "ZEPPELIN_NOTEBOOK_STORAGE": (
                        "org.apache.zeppelin.notebook.repo.S3NotebookRepo"
                    ),
                    "ZEPPELIN_NOTEBOOK_CRON_ENABLE": "true",
                },
                "Configurations": [],
            }
        ],
    },
    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.executor.cores": "4",
            "spark.executor.memory": "9486M",
            "spark.driver.memory": "10G",
            "spark.sql.legacy.timeParserPolicy": "LEGACY",
            "spark.sql.autoBroadcastJoinThreshold": "-1",
            "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": "2",
            "spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored": "true",  # noqa E501
            "spark.hadoop.parquet.enable.summary-metadata": "false",
            "spark.sql.parquet.mergeSchema": "false",
            "spark.sql.parquet.filterPushdown": "true",
            "spark.sql.hive.metastorePartitionPruning": "true",
            "spark.hadoop.fs.s3a.committer.name": "directory",
        },
        "Configurations": [],
    },
]

# emr applications required
applications_required = [
    {"Name": "Spark"},
    {"Name": "Zeppelin"},
    {"Name": "Hadoop"},
    {"Name": "Hue"},
    {"Name": "Hive"},
    {"Name": "Pig"},
    {"Name": "Ganglia"},
]

In [4]:
aws_emr.create_emr_cluster(
    master_instance_type="m5.xlarge",
    core_instance_type="m5.xlarge",
    core_instance_count=1,
    configurations=configurations,
    applications=applications_required,
    log_uri="s3n://au-com-hipages-data-scratchpad/shuming-development/elasticmapreduce/",
    task_id="test",
    identifier="1989-01-01",
    bidprice="0.6",
)

{'JobFlowId': 'j-MRVJ9KXIZV79',
 'ClusterArn': 'arn:aws:elasticmapreduce:ap-southeast-2:251259879778:cluster/j-MRVJ9KXIZV79',
 'ResponseMetadata': {'RequestId': '912e3c2c-1c6b-4694-915b-f893670644a1',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '912e3c2c-1c6b-4694-915b-f893670644a1',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '121',
   'date': 'Wed, 27 Apr 2022 04:48:23 GMT'},
  'RetryAttempts': 0}}

### check created EMR cluster
- we will check if the cluster is in `WAITING` state
- one-liner to check for EMR cluster status

In [6]:
aws_emr.check_emr_cluster_status(
  cluster_id="j-MRVJ9KXIZV79"
)

Cluster is up and running, please proceed


0

### get cluster id from task and date identifier
- one-liner to retrieve cluster id

In [7]:
aws_emr.get_cluster_id(
  task_id="test",
  identifier="1989-01-01"
)

'j-MRVJ9KXIZV79'

### get EMR cluster master dns name
- one-liner to retrieve EMR cluster master dns name

In [8]:
aws_emr.get_emr_master_dns_name(
  cluster_id="j-MRVJ9KXIZV79"
)

'ec2-3-26-50-250.ap-southeast-2.compute.amazonaws.com'

### terminate created EMR cluster
- one-liner to terminate EMR cluster

In [9]:
aws_emr.terminate_emr_cluster(
  cluster_id="j-MRVJ9KXIZV79"
)

0

### wrapper function to spin up EMR cluster
- this wrapper function creates an EMR cluster and check that the status is in `WAITTING`
- it has up to 3 tries to create a successful EMR cluster
- it will fail after 3 tries
- one line to spin up EMR cluster

In [11]:
aws_emr.spin_up_emr_cluster(
    master_instance_type="m5.xlarge",
    core_instance_type="m5.xlarge",
    core_instance_count=1,
    configurations=configurations,
    applications=applications_required,
    log_uri="s3n://au-com-hipages-data-scratchpad/shuming-development/elasticmapreduce/",
    task_id="test",
    identifier="1989-01-02",
    bidprice="0.6",
)

Cluster is up and running, please proceed


0