In [1]:
# ! pip install boto3
import boto3

In [None]:
class EMR_ADMIN:
    def __init__(self,region_name="ap-south-1"):
        self.client = boto3.client('emr', aws_access_key_id="...", 
                      aws_secret_access_key="...", 
                      region_name=region_name)
        
    def EMR_Launch(self):
        MASTER_NODE = {
                            'Name': 'master_node',
                            'Market': 'ON_DEMAND',
                            'InstanceRole': 'MASTER',
                            'InstanceType': 'm5.xlarge',
                            'InstanceCount': 1,
                            'EbsConfiguration': {
                                'EbsBlockDeviceConfigs': [
                                    {
                                        'VolumeSpecification': {
                                            'VolumeType': 'gp2',
                                            'SizeInGB': 28
                                        },
                                        'VolumesPerInstance': 1
                                    },
                                ],
                                'EbsOptimized': True
                            }

                        }
        CORE_NODES = {
                        'Name': 'core_nodes',
                        'Market': 'SPOT',
                        'InstanceRole': 'CORE',
        #                 'BidPrice': 'string',
                        'InstanceType': 'm5.2xlarge',
                        'InstanceCount': 1,
                        'EbsConfiguration': {
                            'EbsBlockDeviceConfigs': [
                                {
                                    'VolumeSpecification': {
                                        'VolumeType': 'gp2',
                                        'SizeInGB': 28
                                    },
                                    'VolumesPerInstance': 1
                                },
                            ],
                            'EbsOptimized': True
                        }
                    }
        TASK_NODES = {
                        'Name': 'task_nodes',
                        'Market': 'SPOT',
                        'InstanceRole': 'TASK',
        #                 'BidPrice': 'string',
                        'InstanceType': 'm4.large',
                        'InstanceCount': 1,
                        'EbsConfiguration': {
                            'EbsBlockDeviceConfigs': [
                                {
                                    'VolumeSpecification': {
                                        'VolumeType': 'gp2',
                                        'SizeInGB': 28
                                    },
                                    'VolumesPerInstance': 1
                                },
                            ],
                            'EbsOptimized': True
                        }
            
                    }
        configurations = [
                                
                                {
                                    'Classification': 'livy-conf',
                                    'Properties': {
                                        'livy.server.session.timeout':'720h',
                                        'livy.impersonation.enabled': 'true',
                                        'livy.spark.master': 'yarn-cluster'
                                    }
                                },
                                  {
                                    'Classification': 'core-site',
                                    'Properties': {
                                      'hadoop.proxyuser.livy.groups': '*',
                                      'hadoop.proxyuser.livy.hosts': '*'
                                    }
                                  },
                                {
                                    'Classification': 'jupyter-sparkmagic-conf',                                                                                
                                    'Properties':{
                                        'kernel_python_credentials': '{"username": "", "password": "", "url": "http://localhost:8998", "auth": "None" }',                                                                                                                            
                                        'kernel_scala_credentials':  '{"username": "", "password": "", "url": "http://localhost:8998", "auth": "None" }'
                                    }
                                }
                        
                            ]
        # 
        applicationList = [
                                {
                                    'Name': 'JupyterHub',
                                },
                            ]
        bootstrapActions=[
                            {
                                'Name': 'userdata_emr',
                                'ScriptBootstrapAction': {
                                    'Path': 's3://.../userdata_emr.sh'
#                                     'Args': [
#                                         'string',
#                                     ]
                                }
                            },
                        ]
        
        
        cluster = self.client.list_clusters(ClusterStates=['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING'])  
        Ids=[clus['Id'] for clus in cluster['Clusters'] if clus['Name']==cluster_name]
        if len(Ids)>0:
            return Ids
        response = self.client.run_job_flow(
                    Name='EMR',
                    LogUri='s3://..../EMR_logs/',
                    ReleaseLabel='emr-5.24.1',
                    Instances={
                        'InstanceGroups': [MASTER_NODE, CORE_NODES],

                        'Ec2KeyName': 'emr',
                        'Placement': {
                            'AvailabilityZone': 'ap-south-1a',
                        },
                        'KeepJobFlowAliveWhenNoSteps': True,
                        'TerminationProtected': False,
                        'HadoopVersion': '2.8.5',     
                    },
                    BootstrapActions=bootstrapActions,
                    Applications= applicationList,
                    Configurations= configurations,
                    VisibleToAllUsers=True,
                    JobFlowRole='EMR_EC2_DefaultRole',
                    ServiceRole='EMR_DefaultRole',
                    Tags=[
                        {
                            'Key': 'EMR',
                            'Value': 'EMR'
                        }
                    ],
                    AutoScalingRole='EMR_AutoScaling_DefaultRole',
                    ScaleDownBehavior='TERMINATE_AT_TASK_COMPLETION',
                    EbsRootVolumeSize=32
                )
        
        return response
    
    def EMR_isRunning(self, cluster_name="EMR"):
        running_clust = self.client.list_clusters(ClusterStates=['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING'])
        running_clust=[clus for clus in running_clust['Clusters'] if clus['Name']==cluster_name]
        print(running_clust)
        if(len(running_clust))==0:
            return 0
        elif len(running_clust)>1:
            return 2
        else:
            return 1
    
    def EMR_isReady(self, cluster_name="EMR"):
        running_clust = self.client.list_clusters(ClusterStates=['WAITING','RUNNING'])
        running_clust=[clus for clus in running_clust['Clusters'] if clus['Name']==cluster_name]
        print(running_clust)
        
        if(len(running_clust))==0:
            return 0
        elif len(running_clust)>1:
            return 2
        else:
            return 1
        return 0
    
    def EMR_terminate(self, cluster_name="EMR"):
        cluster = self.client.list_clusters(ClusterStates=['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING'])  
        Ids=[clus['Id'] for clus in cluster['Clusters'] if clus['Name']==cluster_name]
        if len(Ids)==0:
            return 1
        self.client.terminate_job_flows(JobFlowIds=Ids)
        return 1
    
    def EMR_DNS(self, cluster_name="EMR"):
        '''
        The DNS name of the master node. 
        If the cluster is on a private subnet, this is the private DNS name. 
        On a public subnet, this is the public DNS name.
        '''
        EMR_DNS=""
        cluster = self.client.list_clusters(ClusterStates=['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING'])  
        Ids=[clus['Id'] for clus in cluster['Clusters'] if clus['Name']==cluster_name]
        if len(Ids)==0:
            # start func ?
            EMR_DNS=""
            return 1
        resp=self.client.describe_cluster(ClusterId=Ids[0])
        EMR_DNS=resp['Cluster']['MasterPublicDnsName']
        return EMR_DNS
    

In [None]:
EMR.EMR_terminate()