# Prepare

- IAM
    * Attach policy `AmazonElasticMapReduceFullAccess` to IAM role which excute sagemaker

## Create S3 bucket and upload sample data

In [None]:
import sagemaker
import os
import boto3
import json
import time
import shutil

sagemaker_session_bucket = None
if sagemaker_session_bucket is None:
    # set to default bucket if a bucket name is not given
    # sagemaker will automatically create this bucket if it not exists
    sess = sagemaker.Session()
    sagemaker_session_bucket = sess.default_bucket()

print(f"sagemaker bucket: {sagemaker_session_bucket}")

s3_prefix = 'emr-demo/'

s3 = boto3.resource('s3')
sage_bucket = s3.Bucket(sagemaker_session_bucket)

# upload demo_data file to s3
s3_client = boto3.client('s3')
demo_data_file = 'demo_data.csv'
demo_data_key = s3_prefix+demo_data_file
s3_client.upload_file(demo_data_file, sagemaker_session_bucket, demo_data_key)

s3_output = s3_prefix+'output/'

# write s3 path setting
with open("emr_get_s3_path.py", "w+") as f:
    f.write(f"sagemaker_session_bucket = '{sagemaker_session_bucket}'\n")
    f.write(f"demo_data_key = '{demo_data_key}'\n")
    f.write(f"s3_output = '{s3_output}'")


## Create security group for emr livy server

In [None]:
instance_ip = !curl -s 'http://ifconfig.co'
instance_ip = instance_ip[0]
print(f"instance ip: {instance_ip}")


In [None]:
ec2 = boto3.client('ec2')
response = ec2.describe_vpcs()
vpc_id = response.get('Vpcs', [{}])[0].get('VpcId', '')

response = ec2.create_security_group(GroupName='EMR_LIVY',
                                     Description='EMR_LIVY',
                                     VpcId=vpc_id)
security_group_id = response['GroupId']
print('Security Group Created %s in vpc %s.' % (security_group_id, vpc_id))

data = ec2.authorize_security_group_ingress(
    GroupId=security_group_id,
    IpPermissions=[
        {'IpProtocol': 'tcp',
         'FromPort': 8998,
         'ToPort': 8998,
         'IpRanges': [{
             'CidrIp': f'{instance_ip}/32',
             'Description': 'Livy Server'
         }]}
    ])
print('Ingress Successfully Set %s' % data)


## Create EMR cluster, and get cluster_id
1. if you need more than one instance, make sure the ec2 limit is enough.  
https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html
2. You could create cluster on EMR Console either, just remember copy and paste `cluster_id = ...`

In [None]:
# emr 6.7.0
# master: r5.xlarge x1
# if you need more than one instance, make sure the ec2 limit is enough.
# https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html
cluster_name = 'emr-demo'
s3_emr_log = f"s3n://{s3_prefix}log/"
region = boto3.Session().region_name

# You could replace this with EMR cluster / AWS Cli export, but remember remove field `--os-release-label XX.XXX.XXX`
response = !./create_cluster.sh {cluster_name} {s3_emr_log} {security_group_id} {region}

cluster_id = json.loads("".join(response))["ClusterId"]
print(f"cluster_id = {cluster_id}")

## Wait cluster ready, then get master_ip
Wait 7 mins first, then check per 10 sec.

In [None]:
master_ip = ""
time.sleep(420)
while True:
    response = !aws emr describe-cluster --cluster-id {cluster_id}
    json_str = "".join(response)
    try:
        cluster_data = json.loads(json_str)
    except:
        print(json_str)
        print("[ERROR] Create cluster fail.")
        break
    cluster_state = cluster_data["Cluster"]["Status"]["State"]
    print(f"EMR cluster state: {cluster_state}")
    if cluster_state == "STARTING":
        time.sleep(10)
    else:
        master_ip = ".".join(cluster_data["Cluster"]["MasterPublicDnsName"].split(".")[0].split("-")[1:])
        print(f"master ip = {master_ip}")
        break


## Run livy code, start session and get session_id

In [None]:
def create_session():
    r = !./livy_create_session.sh {master_ip}
    # print(r)
    try:
        session_id = json.loads(r[-1])['id']
        return session_id
    except:
        print("[ERROR] create livy session fail")
        if len(r) > 0:
            print(r[-1])
        return None

session_id = create_session()
print(f"livy session is starting, id= {session_id}")

## Confirm session state, it should be idle

In [None]:
def get_session_state():
    r = !./livy_get_session_state.sh {master_ip} {session_id}
    try:
        state = json.loads(r[-1])['state']
        return state
    except:
        print("[ERROR] get livy session state fail")
        if len(r) > 0:
            print(r[-1])
        return None
while True:
    session_state = get_session_state()
    print (f"session state = {session_state}")
    if session_state == "starting":
        time.sleep(5)
    elif session_state == "idle":
        print(f"session start complete, state: {session_state}")
        break
    else:
        print(f"[ERROR] livy session start fail, state: {session_state}")
        break
        




## Make sure emr s3 output folder is empty

In [None]:

sage_bucket.objects.filter(Prefix=s3_output).delete()


## (function) Send pyfile to a livy statement

In [None]:
def add_file_to_statement(pyfile):
    try:
        with open(pyfile, 'r') as emr_code:
            # \\\n
            json_str = json.dumps({"code":emr_code.read().replace("\\\n", "").replace("\\", "\\\\")})
            json_str_len = len(json_str)
            print(f"Send char num: {json_str_len}")
    except:
        print(f"[ERROR] File {pyfile} not exists nor accessible")
        return None
            
    with open('emr_json_code', 'w') as emr_json_code:
        emr_json_code.write(json_str)

    if session_id != None:
        r = !./livy_add_statement.sh {master_ip} {session_id}
        try:
            statement_id = json.loads(r[-1])["id"]
            print(f"Send {pyfile} to EMR success, statement id = {statement_id}")
            return statement_id
        except:
            print("[ERROR] {} send fail, chat number: {}".format(pyfile, json_str_len))
            print("".join(r))
            return None




## (function) Show livy statement output

In [None]:

def get_result(statement_id, delay=5, max_wait=18000):
    if statement_id == None:
        print("[ERROR] statement_id not availible.")
        return False
    wait = 0
    while (wait < max_wait):
        print("wait....")
        r = !./livy_show_result.sh {master_ip} {session_id} {statement_id}
        try:
            output = json.loads(r[-1])
            statement_state = output["state"]
        except:
            print(r)
            return False
        if statement_state == "available":
            try:
                if output["output"]['status'] == 'error':
                    print("[ERROR] {}".format(output["output"]["ename"]))
                    print("[ERROR] {}".format(output["output"]["evalue"]))
                    return False
                print(f"statement id {statement_id} excute success and take {wait} seconds")
                if output["output"]["data"]['text/plain'] != "":
                    print(output["output"]["data"]['text/plain'])
                return True
            except:
                print("[ERROR] "+"".join(r))
                return False
        elif statement_state == "running":
            time.sleep(delay)
            wait += delay
        elif statement_state == "waiting":
            time.sleep(delay)
        else:
            print (f"[ERROR] statement state = {statement_state}")
            return False
    print(f"[ERROR] Already wait for {max_wait} sec, still not get result, and this statement may still running.")
    return False





## Send code to livy statement, and wait for result

In [None]:
statement_id = add_file_to_statement('emr_get_s3_path.py')
get_result(statement_id)

In [None]:
statement_id = add_file_to_statement('emr_code.py')
get_result(statement_id)

## Terminate EMR Cluster

In [None]:
response = !aws emr terminate-clusters --cluster-ids {cluster_id}
if response == []:
    print(f"Terminate emr cluster success: {cluster_id}")
else:
    print(f"Terminate emr cluster fail: {cluster_id}")
    print(response)

## Download s3 output data

In [None]:


if os.path.exists(s3_output) is True:
    shutil.rmtree(s3_output, ignore_errors=True)

# generate a local folder
os.makedirs(s3_output)

# download s3 obj
for obj in sage_bucket.objects.filter(Prefix=s3_output):
    sage_bucket.download_file(obj.key, obj.key)




In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .getOrCreate()

spark.read \
    .option("header", "true") \
    .load(s3_output).show()

## Delete security group for livy server, must wait all dependency obj(EMR) terminated.

In [None]:
response = ec2.delete_security_group(GroupId=security_group_id)
print('Security Group Deleted')
