In [1]:
import os
from datetime import datetime
import time
import tarfile
import boto3
import botocore
import logging

In [2]:
def setup_logging(default_level=logging.WARNING):
    """
    Setup logging configuration
    """
    logging.basicConfig(level=default_level)
    return logging.getLogger('TriggerSparkSubmit')

In [3]:
def terminate(error_message=None):
    """
    Method to exit the Python script. It will log the given message and then exit().
    :param error_message:
    """
    if error_message:
        logger.error(error_message)
    logger.critical('The script is now terminating')
    exit()


In [4]:
def intialize():
        app_name = "unique_visitors_spark"                  # Application name
        ec2_key_name = "dynobjx"                       # Key name to use for cluster
        job_flow_id = "j-2A3GSR7FH1UE0"                # Returned by AWS in start_spark_cluster()
        job_name = None                                # Filled by generate_job_name()
        path_script = "/home/ab/pyspark/spark_demo/"                 # Path of Spark script to be deployed on AWS Cluster
        s3_bucket_logs = "s3://aws-logs-507340184449-ap-southeast-1/elasticmapreduce/"   # S3 Bucket to store AWS EMR logs
        s3_bucket_temp_files = "daimlerdemotemp"     # S3 Bucket to store temporary files
        s3_region = 's3-ap-southeast-1-amazonaws.com'       # S3 region to specifiy s3Endpoint in s3-dist-cp step
        user = 'Raghu'                                  # Define user name

In [5]:
def generate_job_name(self):
        self.job_name = "{}.{}.{}".format(self.app_name,
                                          self.user,
                                          datetime.now().strftime("%Y%m%d.%H%M%S.%f"))
    

In [6]:
def temp_bucket_exists(self, s3):
        """
        Check if the bucket we are going to use for temporary files exists.
        :param s3:
        :return:
        """
        try:
            s3.meta.client.head_bucket(Bucket=self.s3_bucket_temp_files)
        except botocore.exceptions.ClientError as e:
            # If a client error is thrown, then check that it was a 404 error.
            # If it was a 404 error, then the bucket does not exist.
            error_code = int(e.response['Error']['Code'])
            if error_code == 404:
                terminate("Bucket for temporary files does not exist")
            terminate("Error while connecting to Bucket")
        logger.info("S3 bucket for temporary files exists")

In [7]:
  def tar_python_script(self):
        """
        :return:
        """
        # Create tar.gz file
        t_file = tarfile.open("spark_demo/script.tar.gz", 'w:gz')
        # Add Spark script path to tar.gz file
        files = os.listdir(self.path_script)
        for f in files:
            t_file.add(self.path_script + f, arcname=f)
        # List all files in tar.gz
        for f in t_file.getnames():
            logger.info("Added %s to tar-file" % f)
        t_file.close()
    

In [8]:
      
    #$ command-runner.jar spark-submit --deploy-mode cluster --conf PYSPARK_PYTHON=/usr/bin/python3 
    #s3://mybucket/mypath/myscript.py
    
def intialize_s3_toSpark(self,c):
            Applications=[{'Name': 'Hadoop'}, {'Name': 'Spark'}],
            JobFlowRole='EMR_EC2_DefaultRole',
            ServiceRole='EMR_DefaultRole',
            VisibleToAllUsers=True,
            BootstrapActions=[{
            'Name': 'Setup',
            'ScriptBootstrapAction': {
                'path':'s3n://daimlerdemotemp/setup.sh'.format(self.s3_bucket_temp_files, self.job_name),
                'Args': ['s3://daimlerdemotemp/'.format(self.s3_bucket_temp_files, self.job_name)
                       ]
            }
        }]
    

In [9]:
 def step_spark_submit(self, c, arguments):
        
        
        response = c.add_job_flow_steps(
        JobFlowId="j-2A3GSR7FH1UE0",
        Steps=[{
            'Name': 'Spark Application',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
               'Jar': 'command-runner.jar',
               'Args': ["spark-submit", "/home/hadoop/unique_counts.py", arguments]
            }
        }]
    )
        logger.info("Added step 'spark-submit' with argument '{}'".format(arguments))
        time.sleep(1) 
        
        
        
        
        

In [10]:
def upload_temp_files(self, s3):
        """
        Move the PySpark script files to the S3 bucket we use to store temporary files
        :param s3:
        :return:
        """
        s3.Object(self.s3_bucket_temp_files, self.job_name + '/setup.sh')\
          .put(Body=open('spark_demo/setup.sh', 'rb'), ContentType='text/x-sh')
            
        s3.Object(self.s3_bucket_temp_files, self.job_name + '/script.tar.gz')\
          .put(Body=open('spark_demo/script.tar.gz', 'rb'), ContentType='application/x-tar')
        logger.info("Uploaded files to key '{}' in bucket '{}'".format(self.job_name, self.s3_bucket_temp_files))
        return True


In [11]:
def run(self):
        logger=setup_logging()
        session = boto3.Session()                     # Select AWS IAM profile
        s3 = session.resource('s3')                   # Open S3 connection
        generate_job_name()                            # Generate job name
        temp_bucket_exists(s3)  
        tar_python_script()
        upload_temp_files(s3)                          #uploads local file to temp s3
        c = session.client('emr')                           # Open EMR connection 
        intialize_s3_toSpark(c)
        step_spark_submit(c,"")                        # Add step 'spark-submit'  