In [None]:
bucket = 
job_data_access_role = 'arn:aws:iam::

In [None]:
job_data_access_role = 'arn:aws:iam::

##  Transcribing the videos

In [None]:
!aws s3 cp s3://aws-tc-largeobjects/CUR-TF-200-ACMNLP-1/video/ s3://{bucket}/input/ --recursive

In [None]:
from boto3 import client

conn = client('s3') 
for key in conn.list_objects(Bucket=bucket)['Contents']:
    print(key['Key'])

In [None]:
import boto3
import os, io, struct, json
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import uuid
from time import sleep
import re
import nltk
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
from nltk.tokenize import RegexpTokenizer
from nltk.stem.wordnet import WordNetLemmatizer



In [None]:
transcribe_client = boto3.client("transcribe")

In [None]:
output_files=[]
transcribe_output_prefix = 'transcribed'
for key in conn.list_objects_v2(Bucket=bucket, Prefix='input')['Contents']:
    if 'temp' in key['Key']:
        continue
    object_name=key['Key']
    media_input_uri = f's3://{bucket}/{object_name}'

    #create the transcription job
    job_uuid = uuid.uuid1()
    transcribe_job_name = f"transcribe-job-{job_uuid}"
    output_file = object_name.split('.')[0].replace(" ","_")
    transcribe_output_filename = f'{transcribe_output_prefix}-{output_file}.txt'
    output_files.append([transcribe_output_filename,object_name,""])
    print(f'{media_input_uri} transcribed to {transcribe_output_filename}')

    response = transcribe_client.start_transcription_job(
        TranscriptionJobName=transcribe_job_name,
        Media={'MediaFileUri': media_input_uri},
        MediaFormat='mp4',
        LanguageCode='en-US',
        OutputBucketName=bucket,
        OutputKey=transcribe_output_filename
    )

In [None]:
print(output_files)

## Normalizing the text

In [None]:
import pandas as pd
df = pd.DataFrame(data=output_files, columns=['OutputFile','Video','Transcription'] )

In [None]:
df.head()

In [None]:
def normalize_text(content):
text = re.sub(r"http\S+", "", content )
text = text.lower()
text = text.strip()
text = re.sub('\s+', ' ', text)
text = re.sub('\n',' ',text)
text = re.compile('<.*?>').sub('', text)
return text

In [None]:
%%time
df['Transcription_normalized'] = df['Transcription'].apply(normalize_text)

In [None]:
pd.set_option('display.max_colwidth', 150)
df.head()

## Extracting key phrases and topics

In [None]:
s3_resource = boto3.Session().resource('s3')
def upload_comprehend_s3_csv(filename, folder, dataframe):
csv_buffer = io.StringIO()
dataframe.to_csv(csv_buffer, header=False, index=False )
s3_resource.Bucket(bucket).Object(os.path.join(prefix, folder, filename)).put(Body=csv_buffer.getvalue())

In [None]:
comprehend_file = 'comprehend_input.csv'
prefix='capstone'
upload_comprehend_s3_csv(comprehend_file, 'comprehend', df['Transcription_normalized'].str.slice(0,5000))
test_url = f's3://{bucket}/{prefix}/comprehend/{comprehend_file}'
print(f'Uploaded input to {test_url}')

In [None]:
# Comprehend client information
comprehend_client = boto3.client(service_name="comprehend")

In [None]:
# Other job parameters
input_data_format = 'ONE_DOC_PER_LINE'
job_uuid = uuid.uuid1()

job_name = f"kpe-job-{job_uuid}"
input_data_s3_path = test_url
output_data_s3_path = f's3://{bucket}/'

In [None]:
# Begin the inference job
kpe_response = comprehend_client.start_key_phrases_detection_job(
InputDataConfig={'S3Uri': input_data_s3_path,
'InputFormat': input_data_format},
OutputDataConfig={'S3Uri': output_data_s3_path},
DataAccessRoleArn=job_data_access_role,
JobName=job_name,
LanguageCode='en'
)

In [None]:
# Get the job ID
kpe_job_id = kpe_response['JobId']

## Creating the dashboard

In [None]:
my_ip = "xxx.xx.x.xx/24"

In [None]:
!pip install --upgrade pip
!pip install --upgrade setuptools
!pip install --upgrade packaging
!pip install opensearch
!pip install opensearch-py
! pip install requests
! pip install requests-aws4auth

In [None]:
es_client = boto3.client('es')

In [None]:
access_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "",
                "Effect": "Allow",
                "Principal": {
                    "AWS": "*"
                },
                "Action": "es:*",
                "Resource": "*",
                "Condition": { "IpAddress": { "aws:SourceIp": my_ip } }
            }
        ]
    }

In [None]:
response = es_client.create_elasticsearch_domain(
DomainName = 'nlp-lab',
ElasticsearchVersion = '7.9',
ElasticsearchClusterConfig={
"InstanceType": 't3.small.elasticsearch',
"InstanceCount": 2,
"DedicatedMasterEnabled": False,
"ZoneAwarenessEnabled": False
},
EBSOptions={
'EBSEnabled': True,
'VolumeType': 'gp2',
'VolumeSize': 10 # Size in GB (minimum is 10)
},
AccessPolicies = json.dumps(access_policy)
)

In [None]:
# Get current job status
kpe_job = comprehend_client.describe_key_phrases_detection_job(JobId=kpe_job_id)

In [None]:
# Loop until job is completed
waited = 0
timeout_minutes = 30
while kpe_job['KeyPhrasesDetectionJobProperties']['JobStatus'] != 'COMPLETED':
    sleep(10)
    waited += 10
    assert waited//60 < timeout_minutes, "Job timed out after %d seconds." % waited
    print('.', end='')
    kpe_job = comprehend_client.describe_key_phrases_detection_job(JobId=kpe_job_id)
    
print('Ready')

In [None]:
# Get current job status
kpe_job = comprehend_client.describe_key_phrases_detection_job(JobId=kpe_job_id)
# Loop until job is completed
waited = 0
timeout_minutes = 30
while kpe_job['KeyPhrasesDetectionJobProperties']['JobStatus'] != 'COMPLETED':
    sleep(10)
    waited += 10
    assert waited//60 < timeout_minutes, "Job timed out after %d seconds." % waited
    print('.', end='')
    kpe_job = comprehend_client.describe_key_phrases_detection_job(JobId=kpe_job_id)
    
print('Ready')

In [None]:
# Get current job status
entity_job = comprehend_client.describe_entities_detection_job(JobId=entity_job_id)
# Loop until job is completed
waited = 0
timeout_minutes = 30
while entity_job['EntitiesDetectionJobProperties']['JobStatus'] != 'COMPLETED':
    sleep(10)
    waited += 10
    assert waited//60 < timeout_minutes, "Job timed out after %d seconds." % waited
    print('.', end='')
    entity_job = comprehend_client.describe_entities_detection_job(JobId=entity_job_id)
    
print('Ready')

In [None]:
kpe_comprehend_output_file = kpe_job['KeyPhrasesDetectionJobProperties']['OutputDataConfig']['S3Uri']
print(f'output filename: {kpe_comprehend_output_file}')

kpe_comprehend_bucket, kpe_comprehend_key = kpe_comprehend_output_file.replace("s3://", "").split("/", 1)

In [None]:
s3r = boto3.resource('s3')
s3r.meta.client.download_file(kpe_comprehend_bucket, kpe_comprehend_key, 'output-kpe.tar.gz')

In [None]:
# Extract the tar file
import tarfile
tf = tarfile.open('output-kpe.tar.gz')
tf.extractall()
# Rename the output
!mv 'output' 'kpe_output'

In [None]:
entity_comprehend_output_file = entity_job['EntitiesDetectionJobProperties']['OutputDataConfig']['S3Uri']
print(f'output filename: {entity_comprehend_output_file}')

entity_comprehend_bucket, entity_comprehend_key = entity_comprehend_output_file.replace("s3://", "").split("/", 1)

s3r = boto3.resource('s3')
s3r.meta.client.download_file(entity_comprehend_bucket, entity_comprehend_key, 'output-entity.tar.gz')

In [None]:
# Extract the tar file
import tarfile
tf = tarfile.open('output-entity.tar.gz')
tf.extractall()
# Rename the output
!mv 'output' 'entity_output'

In [None]:
import json
data = []
with open ('kpe_output', "r") as myfile:
    for line in myfile:
        data.append(json.loads(line))

In [None]:
kpdf = pd.DataFrame(data, columns=['KeyPhrases','Line'])
kpdf.head()

In [None]:
import json
data = []
with open ('entity_output', "r") as myfile:
    for line in myfile:
        data.append(json.loads(line))

In [None]:
entitydf = pd.DataFrame(data, columns=['Entities','Line'])
entitydf.head()

In [None]:
def extract_entities(entities, entity_type):
    filtered_entities=[]
    for entity in entities:
        if entity['Type'] == entity_type:
            filtered_entities.append(entity)
return filtered_entities

In [None]:
# df['plot_normalized'] = df['plot'].apply(normalize_text)
entitydf['location'] = entitydf['Entities'].apply(lambda x: extract_entities(x, 'LOCATION'))
entitydf['organization'] = entitydf['Entities'].apply(lambda x: extract_entities(x, 'ORGANIZATION'))

entitydf.head()

In [None]:
entitydf.set_index('Line', inplace = True)
entitydf.sort_index(inplace=True)
kpdf.set_index('Line', inplace=True)
kpdf.sort_index(inplace=True)
entitydf.head()

In [None]:
m1 = kpdf.merge(entitydf, left_index=True, right_index=True)
m1.sort_index(inplace=True)
pd.set_option('display.max_colwidth', 200)
m1.head()

In [None]:
mergedDf = df.merge(m1, left_index=True, right_index=True)

In [None]:
mergedDf.head()

In [None]:
pd.set_option('display.max_colwidth', 50)
mergedDf.head()

In [None]:
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import requests

In [None]:
from time import sleep
alive = es_client.describe_elasticsearch_domain(DomainName='nlp-lab')

In [None]:
while alive['DomainStatus']['Processing']:
print('.', end='')
sleep(10)
alive = es_client.describe_elasticsearch_domain(DomainName='nlp-lab')
print('ready!')

In [None]:
sleep(60)
es_domain = es_client.describe_elasticsearch_domain(DomainName='nlp-lab')
es_endpoint = es_domain['DomainStatus']['Endpoint']

In [None]:
region= 'us-east-1' # us-east-1
service = 'es' # IMPORTANT: this is key difference while signing the request for proxy endpoint.
credentials = boto3.Session().get_credentials()

awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
es = OpenSearch(
    hosts = [{'host': es_endpoint, 'port': 443}],
    http_auth = awsauth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

In [None]:
transcription = mergedDf.iloc[3,2]
keyphrases = mergedDf.iloc[3,4]
location = mergedDf.iloc[3,6]
organization = mergedDf.iloc[3,7]
movie_name = mergedDf.iloc[3,1]

In [None]:
from opensearchpy import helpers
def gendata(start, stop):
    if stop>mergedDf.shape[0]:
        stop = mergedDf.shape[0]

for i in range(start, stop):
    yield {
        "_index":'movies',
        "_type": "_doc",
        "_id":i,
        "_source": {"name": mergedDf.iloc[i,1], "transcription": mergedDf.iloc[i,2], "keyphrases": mergedDf.iloc[
    }

In [None]:
%%time
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
es = OpenSearch(
    hosts = [{'host': es_endpoint, 'port': 443}],
    http_auth = awsauth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)
helpers.bulk(es, gendata(0,mergedDf.shape[0]))

In [None]:
print(f'https://{es_endpoint}/_plugin/kibana')