                                            Scheduling an Analysis Job 

I highly recommend that you run at least one Comprehend job from the point and click interface, especially, if you are new to AWS. This way you can create a data access role (aka data_access_role_arn), and then you can simply copy the role name from the job description.

In [69]:
import boto3
from botocore.exceptions import ClientError 
from dotenv import load_dotenv
import json 
import pandas as pd 
import tarfile
load_dotenv()

s3 = boto3.client('s3',)

bucket_name = "comprihend-tweet-bucket"
local_file_name = "D:\\Scripts\\Comprihend\\Comprihend-2-With-Pandas\\Comprehend\\amazon_tweets.csv"
aws_file_name = "input-data/amazon_tweets_1.csv"
output = s3.upload_file(local_file_name, bucket_name, aws_file_name)
print(f"file uploaded to {bucket_name} bucket with name {aws_file_name}")


file uploaded to comprihend-tweet-bucket bucket with name input-data/amazon_tweets_1.csv


In [70]:
input_s3_url = "s3://comprihend-tweet-bucket/input-data"
input_doc_format = "ONE_DOC_PER_LINE"
output_s3_url = "s3://comprihend-tweet-bucket/output-data"
data_access_role_arn = "arn:aws:iam::XXXXXXXXXXXXXXX:role/service-role/AmazonComprehendServiceRole-Com-S3" # replace with your IAM role arn 
number_of_topics = 10

input_data_config = {
    'S3Uri': input_s3_url,
    'InputFormat': input_doc_format,
}
output_data_config = {
    'S3Uri': output_s3_url,
}
comprehend  = boto3.client('comprehend')
start_job_entity = comprehend.start_entities_detection_job(
    InputDataConfig=input_data_config,
    OutputDataConfig=output_data_config,
    DataAccessRoleArn=data_access_role_arn,
    LanguageCode='en'
)

job_id = start_job_entity['JobId']
print(f'Started Entity Detection Job: {job_id}')

Started Entity Detection Job: 2acf35038f261168fc332295f18bd0ec


In [119]:
describe_result = comprehend.describe_entities_detection_job(JobId=job_id)
job_status = describe_result['EntitiesDetectionJobProperties']['JobStatus']
print(f'Job Status: {job_status}')
if job_status == 'FAILED':
    print(f'Reason: {describe_result["EntitiesDetectionJobProperties"]["Message"]}')

Job Status: COMPLETED


In [127]:
# job_id = "db6a3a8cf2541530bb96827b8d1a7edc" 
enitity_result = comprehend.describe_entities_detection_job(JobId=job_id)['EntitiesDetectionJobProperties']['OutputDataConfig']['S3Uri']
print(enitity_result)

s3://comprihend-tweet-bucket/output-data/893415859041-NER-2acf35038f261168fc332295f18bd0ec/output/output.tar.gz


In [128]:
# list all files and folders in the bucket 
def list_all_files():
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    for obj in bucket.objects.all():
        print(obj.key)
list_all_files()

input-data/
input-data/amazon_tweets_1.csv
output-data/.write_access_check_file.temp
output-data/893415859041-NER-2acf35038f261168fc332295f18bd0ec/output/output.tar.gz
output-data/893415859041-NER-654f93bda25adaa33dfe3bae93cf980c/output/output.tar.gz
results/
results/.write_access_check_file.temp


                                                        Downloading the Results From S3 bucket

In [129]:
import boto3
s3 = boto3.client('s3') 
bucket_name = "comprihend-tweet-bucket"
s3_name = 's3://' + bucket_name + '/' 
result_aws_file_name = enitity_result.replace(s3_name, '')
local_file_name = 'Comprehend\\outputs\\entities.tar.gz'

s3.download_file(bucket_name, result_aws_file_name, local_file_name)

print('Downloaded file: ' + result_aws_file_name)

Downloaded file: output-data/893415859041-NER-2acf35038f261168fc332295f18bd0ec/output/output.tar.gz


                                                ## Extracting the Entities from the Results ## 

In [130]:
import tarfile
def extract_targz(targz_file, output_path = ''):
    if targz_file.endswith("tar.gz"):
        tar = tarfile.open(targz_file, "r:gz")
        tar.extractall(path = output_path)
        tar.close()
    elif targz_file.endswith("tar"):
        tar = tarfile.open(targz_file, "r:")
        tar.extractall(path = output_path)
        tar.close()
output_path = 'Comprehend\\outputs\\extracted'
extract_targz(local_file_name, output_path)
print(f"Extracted {local_file_name} to {output_path}")

Extracted Comprehend\outputs\entities.tar.gz to Comprehend\outputs\extracted


In [131]:
import json
# input_file_name = 'Comprehend\\outputs\\extracted\\output'
input_file_name = 'Comprehend\outputs\extracted\output'
entites = [
    json.loads(line) for line in open(input_file_name, 'r')
]
len(entites)

10000

In [134]:
# This function parses entities data into a dataframe
def parse_entities(data):
    df = pd.DataFrame() # declare an empty dataframe
    nested_json = 'Entities' # nested sub-dictiptionary to extract data from
    # populate the dataframe
    for line in data:
        dt_temp = pd.DataFrame(line[nested_json])  # extract data from sub-dictionary
        other_fields = list(line.keys())
        other_fields.remove(nested_json) # remove nested fields        
        for field in other_fields:  # add common fields
            dt_temp[field] = line[field]
        
        df = pd.DataFrame(pd.concat([df, dt_temp], sort=False))
    return(df)
entities_df = parse_entities(entites)

In [141]:
df = pd.read_csv('D:\\Scripts\\Comprihend\\Comprihend-2-With-Pandas\\Comprehend\\amazon_tweets.csv', names=['amazon_tweets'], header=None, dtype= 'str')
record_no = 999
# Tweet text
print(df.loc[record_no].item())
# Resutls
entities_df.query('Line == @record_no')


Win 5 High Value Amazon Vouchers in @gadgetstouse giveaway, you need to watch this video at https://t.co/7nE81BVVjq - make sure to leave a comment and like the video.


Unnamed: 0,BeginOffset,EndOffset,Score,Text,Type,File,Line
0,5.0,17.0,0.951132,5 High Value,QUANTITY,amazon_tweets_1.csv,999
1,18.0,24.0,0.957802,Amazon,ORGANIZATION,amazon_tweets_1.csv,999
2,38.0,50.0,0.470461,gadgetstouse,TITLE,amazon_tweets_1.csv,999
3,93.0,116.0,0.993396,https://t.co/7nE81BVVjq,OTHER,amazon_tweets_1.csv,999
