# Sequencia para processamento em Cluster AWS

Todo o código foi baseado na biblioteca boto3. Para executar é necessário ter na máquina configurada as credencias da AWS conforme descrito no link https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html. 

In [1]:
try:
    !pip install boto3=="1.13.1" --quiet
except:
    print("Running throw py file.")

In [2]:
import boto3
import os
import json

In [3]:
dirpath = os.getcwd()

## Configurando serviços AWS
Sequencia de atividads para configuração de ambiente AWS para armazenamento e processamento do modelo PySpark.

#### Definindo Variáveis usados na configuração de ambiente AWS.

In [4]:
my_bucket = "data-sprints-fk"
app_key = "data-sprints-test-fk"
my_tag = [{'Key': app_key, 'Value': ''}]
my_resource_group = "rg-data-sprints-test-fk"
my_emr_cluster = "spark-data-sprints-test-fk"
files_to_upload = ['desafio.py','lib/ny_map.png', 'Analise.html']

### Criação de um Bucket S3 "data-sprints-fk" para armazenamento do modelo PySpark.

In [5]:
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
s3_client.create_bucket(Bucket=my_bucket)

{'ResponseMetadata': {'RequestId': '64F4FDBA9F2A36AE',
  'HostId': '44H/2ZZ7VQpg8eqtUBXmvDz/Os0TanbE8o86a7cP0rE6dB1Fl4gbnihGZ0qJcfk9Nb1UR1LSh4o=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '44H/2ZZ7VQpg8eqtUBXmvDz/Os0TanbE8o86a7cP0rE6dB1Fl4gbnihGZ0qJcfk9Nb1UR1LSh4o=',
   'x-amz-request-id': '64F4FDBA9F2A36AE',
   'date': 'Mon, 22 Jun 2020 19:32:11 GMT',
   'location': '/data-sprints-fk',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Location': '/data-sprints-fk'}

Definição de uma TAG para o Bucket criado

In [6]:
s3_client.put_bucket_tagging(Bucket=my_bucket, Tagging= {'TagSet': my_tag} )

{'ResponseMetadata': {'RequestId': '9878C08B2EE986EF',
  'HostId': 'a14Q0w1nBGqVYvrbgjPMaJJgPq0Y8wyFDCZhSv90VVAkU/SApetlcUep5BgIFr/P5O8CnFJW2+k=',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-id-2': 'a14Q0w1nBGqVYvrbgjPMaJJgPq0Y8wyFDCZhSv90VVAkU/SApetlcUep5BgIFr/P5O8CnFJW2+k=',
   'x-amz-request-id': '9878C08B2EE986EF',
   'date': 'Mon, 22 Jun 2020 19:32:12 GMT',
   'server': 'AmazonS3'},
  'RetryAttempts': 0}}

Upload do modelo para o bucket na pasta model.

In [7]:
for file in files_to_upload: 
    file_name = dirpath + "/" + file
    try:
        if '.html' in file_name:
            response = s3_client.upload_file(file_name, my_bucket, file)
            
            
            #Modificando o ContentType
            object = s3.Object(my_bucket, file)
            object.copy_from(CopySource={'Bucket': my_bucket, 'Key': file},
                             MetadataDirective="REPLACE",
                             ContentType="text/html",
                             ACL = 'public-read')

        else:
            response = s3_client.upload_file(file_name, my_bucket, "model/" + file, ExtraArgs={'ACL':'public-read', })
            
        print("It was uploaded the file", "'" + file + "'", ".")
    except ClientError as e:
        logging.error(e)

It was uploaded the file 'desafio.py' .
It was uploaded the file 'lib/ny_map.png' .
It was uploaded the file 'Analise.html' .


Abrir em um Browser o site: https://data-sprints-fk.s3.amazonaws.com/Analise.html

### Configuração de um Resource Group

In [8]:
RG_client = boto3.client('resource-groups')

#AWS::AllSupported
#AWS::S3::Bucket
query = {
    "ResourceTypeFilters": ["AWS::AllSupported"],
    "TagFilters":  [{
        "Key": my_tag[0].get("Key"),
        "Values": [""]
    }] 
}
resource_query = {
    'Type': 'TAG_FILTERS_1_0',
    'Query': json.dumps(query)
}

try:
    resp = RG_client.create_group(Name=my_resource_group,ResourceQuery=resource_query)
    print("Resource Group was created.")
except Exception as e:
    print(e)

#print(query)
#print(my_tag)

Resource Group was created.


### Criação de um  EMR Cluster

In [10]:
emr_client = boto3.client('emr') #region_name='us-east-1'

cluster_id = emr_client.run_job_flow(Name=my_emr_cluster, 
    ReleaseLabel='emr-5.30.1',
    LogUri='s3://' + my_bucket + '/log/',
    Applications=[
        {
            'Name': 'Spark'
        },
    ],
    Instances={
        'InstanceGroups': [
            {
                'Name': "Master",
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm4.large',
                'InstanceCount': 1,
            },
            {
                'Name': "Slave",
                'Market': 'ON_DEMAND',
                'InstanceRole': 'CORE',
                'InstanceType': 'm4.large',
                'InstanceCount': 2,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': True,
    },
    Steps=[
        {
            'Name': 'Spark application',   
                    'ActionOnFailure': 'CONTINUE',
                    'HadoopJarStep': {
                        'Jar': 'command-runner.jar',
                        'Args': ["spark-submit","--deploy-mode","cluster","s3://" + my_bucket + "/model/" + files_to_upload[0]]
                    }
        }        
    ],                                    
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole',
    Tags=my_tag
)

In [11]:
from datetime import datetime, date

clusters = emr_client.list_clusters(
        CreatedAfter = datetime.today()
)
my_cluster = [i for i in clusters['Clusters'] if i['Name'] == my_emr_cluster][0]
my_cluster

{'Id': 'j-VDFGKFNMETKB',
 'Name': 'spark-data-sprints-test-fk',
 'Status': {'State': 'STARTING',
  'StateChangeReason': {},
  'Timeline': {'CreationDateTime': datetime.datetime(2020, 6, 22, 16, 33, 36, 329000, tzinfo=tzlocal())}},
 'NormalizedInstanceHours': 0,
 'ClusterArn': 'arn:aws:elasticmapreduce:us-east-1:032594213725:cluster/j-VDFGKFNMETKB'}

In [12]:
import time

response = emr_client.describe_cluster(ClusterId = my_cluster['Id'])
print('The current state is', response['Cluster']['Status']['State'], '-', datetime.today())
i = 0

while response['Cluster']['Status']['State'] != 'TERMINATED' and i < 30:
    response = emr_client.describe_cluster(ClusterId = my_cluster['Id'])
    print('The current state is', response['Cluster']['Status']['State'], '-', datetime.today(), i)
    i += 1
    time.sleep(60)

The current state is STARTING - 2020-06-22 16:34:35.776465
The current state is STARTING - 2020-06-22 16:34:35.976943 0
The current state is STARTING - 2020-06-22 16:35:36.737582 1
The current state is STARTING - 2020-06-22 16:36:37.858783 2
The current state is STARTING - 2020-06-22 16:37:38.590846 3
The current state is STARTING - 2020-06-22 16:38:39.337433 4
The current state is STARTING - 2020-06-22 16:39:40.126925 5
The current state is RUNNING - 2020-06-22 16:40:40.941824 6
The current state is WAITING - 2020-06-22 16:41:41.738353 7
The current state is WAITING - 2020-06-22 16:42:42.402716 8
The current state is WAITING - 2020-06-22 16:43:43.188375 9
The current state is WAITING - 2020-06-22 16:44:43.894691 10
The current state is WAITING - 2020-06-22 16:45:44.664334 11
The current state is WAITING - 2020-06-22 16:46:45.389088 12
The current state is WAITING - 2020-06-22 16:47:46.230103 13
The current state is WAITING - 2020-06-22 16:48:46.992569 14
The current state is WAITING -

## Desativação/Remoção das configurações da AWS

Remoção do Resource Group

In [13]:
RG_client.delete_group(GroupName=my_resource_group)

{'ResponseMetadata': {'RequestId': '6d1ed8ba-9cfc-4690-9548-088a4676e8c4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Mon, 22 Jun 2020 20:04:58 GMT',
   'content-type': 'application/json',
   'content-length': '159',
   'connection': 'keep-alive',
   'x-amzn-requestid': '6d1ed8ba-9cfc-4690-9548-088a4676e8c4',
   'x-amz-apigw-id': 'Oi9ErGyEoAMFqEw=',
   'x-amzn-trace-id': 'Root=1-5ef10eea-4a78b0b2168977cfed43f86c;Sampled=0'},
  'RetryAttempts': 0},
 'Group': {'GroupArn': 'arn:aws:resource-groups:us-east-1:032594213725:group/rg-data-sprints-test-fk',
  'Name': 'rg-data-sprints-test-fk'}}

Remoção de todos os arquivos do Bucket

In [14]:
bucket = s3.Bucket(my_bucket)
bucket.objects.all().delete()

[{'ResponseMetadata': {'RequestId': '01B882D100C6825D',
   'HostId': 'g2HyG9tsMzaWrv8NhRzR7hgQ6VVNQq2N7kT5VYO3N0yjyG5LJbhNiMU/rnCnM3+gB4A6MmVYHkI=',
   'HTTPStatusCode': 200,
   'HTTPHeaders': {'x-amz-id-2': 'g2HyG9tsMzaWrv8NhRzR7hgQ6VVNQq2N7kT5VYO3N0yjyG5LJbhNiMU/rnCnM3+gB4A6MmVYHkI=',
    'x-amz-request-id': '01B882D100C6825D',
    'date': 'Mon, 22 Jun 2020 20:05:01 GMT',
    'connection': 'close',
    'content-type': 'application/xml',
    'transfer-encoding': 'chunked',
    'server': 'AmazonS3'},
   'RetryAttempts': 0},
  'Deleted': [{'Key': 'log/j-VDFGKFNMETKB/node/i-03f0dd0fb6fc5777a/provision-node/reports/0/7fe92b5c-7ee0-44e9-bda4-ed05a1c3970c/ip-172-31-63-214.ec2.internal/202006221938.yaml.gz'},
   {'Key': 'log/j-VDFGKFNMETKB/containers/application_1592854723680_0003/container_1592854723680_0003_02_000001/stderr.gz'},
   {'Key': 'log/j-VDFGKFNMETKB/node/i-095dff7056a8b0bd0/applications/hadoop-yarn/yarn-yarn-proxyserver-ip-172-31-62-165.out.gz'},
   {'Key': 'log/j-VDFGKFNMETKB/n

Remoção do Bucket

In [15]:
s3_client.delete_bucket(Bucket=my_bucket)

{'ResponseMetadata': {'RequestId': 'F10AB1EF1D8224C1',
  'HostId': 'k8USpgHxPoNsrAShJGnJyx2LsvOYXioO1ygBiAEpngwviPJ/EUbL+xAS4Q2ghqamqgLivBr71QA=',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-id-2': 'k8USpgHxPoNsrAShJGnJyx2LsvOYXioO1ygBiAEpngwviPJ/EUbL+xAS4Q2ghqamqgLivBr71QA=',
   'x-amz-request-id': 'F10AB1EF1D8224C1',
   'date': 'Mon, 22 Jun 2020 20:05:03 GMT',
   'server': 'AmazonS3'},
  'RetryAttempts': 0}}