In [73]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3
import pandas as pd

In [74]:
host = 'search-project1-fcmtfzdorczceupsri4f5e376y.us-east-1.es.amazonaws.com' # cluster endpoint, for example: my-test-domain.us-east-1.es.amazonaws.com
region = 'us-east-1' # e.g. us-west-1

credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region)
index_name = 'restaurant'

In [75]:
client = OpenSearch(
    hosts = [{'host': host, 'port': 443}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

In [5]:
response = client.indices.create(index_name)
print('\nCreating index:')
print(response)


Creating index:
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'restaurant'}


In [9]:
df = pd.read_csv('yelp.csv')

In [14]:
i = 0
for idx, row in df.iterrows():
    document = {
        'restaurant_id': row['id'],
        'cuisine': row['query']
    }

    response = client.index(
        index = index_name,
        body = document,
        id = i,
        refresh = True
    )
    
    i = i + 1

In [70]:
q = 'chinese'
query = {
  'size': 3,
  'query': {
    'multi_match': {
      'query': q,
      'fields': ['cuisine']
    }
  }
}


In [76]:
response = client.search(
    body = query,
    index = index_name
)

In [80]:
[hits['_source']['restaurant_id'] for hits in response['hits']['hits']]

['s25mujxKWphsyFm0Ji1xYA', 'LT0yyLNM84EiJj9DL0rfHA', 'WRRs3smfm6rHP5k4Slzr3w']

In [32]:
if response['hits']:
    print(response['hits']['hits'][0]['_source']['restaurant_id'])

s25mujxKWphsyFm0Ji1xYA


In [2]:
import boto3

dynamodb = boto3.client("dynamodb")



In [3]:
response = dynamodb.get_item(
    TableName='cloud1-db',
    Key={
        'id': {'S': 's25mujxKWphsyFm0Ji1xYA'},
    }
)
print(response['Item'])

{'location': {'S': 'manhattan'}, 'query': {'S': 'chinese food'}, 'rating': {'S': '3.0'}, 'zip_code': {'S': '10022'}, 'address': {'S': '324 E 57th St, New York, NY 10022'}, 'id': {'S': 's25mujxKWphsyFm0Ji1xYA'}, 'name': {'S': 'Mr Chow'}, 'review_count': {'S': '418'}, 'coordinates': {'S': '(40.7588502019644, -73.9643376320601)'}}


In [4]:
response['Item']['location']['S']

'manhattan'

In [5]:
location = response['Item']['location']['S']
query = response['Item']['query']['S']
rating = response['Item']['rating']['S']
zip_code = response['Item']['zip_code']['S']
name = response['Item']['name']['S']
review_count = response['Item']['review_count']['S']
coordinates = response['Item']['coordinates']['S']

In [7]:
review_count

'418'

In [50]:
sns = boto3.client("sns", 
                   region_name="us-east-1")

In [17]:
response = sns.list_topics()
topic_arn = response["Topics"][0]['TopicArn']


In [40]:
response = sns.subscribe(TopicArn=topic_arn, Protocol="email", Endpoint="aditya.sdrt@gmail.com")
subscription_arn = response["SubscriptionArn"]

In [41]:
subscription_arn

'pending confirmation'

In [47]:
response = sns.list_subscriptions()
subscriptions = response["Subscriptions"]

In [49]:
sns.publish(TopicArn='arn:aws:sns:us-east-1:139100004146:cloud1-sns', 
            Message="message text", 
            Subject="subject used in emails only")['MessageId']

'd7b8b9cb-1f80-506d-8389-e272932bf6c7'

In [43]:
subscriptions

[{'SubscriptionArn': 'arn:aws:sns:us-east-1:139100004146:cloud1-sns:4fdf5b42-5e4b-4f4b-bf07-5802ed3dbfbd',
  'Owner': '139100004146',
  'Protocol': 'email',
  'Endpoint': 'aditya.sdrt@gmail.com',
  'TopicArn': 'arn:aws:sns:us-east-1:139100004146:cloud1-sns'},
 {'SubscriptionArn': 'PendingConfirmation',
  'Owner': '139100004146',
  'Protocol': 'email',
  'Endpoint': 'user@server.com',
  'TopicArn': 'arn:aws:sns:us-east-1:139100004146:cloud1-sns'}]

In [44]:
subscriptions

[{'SubscriptionArn': 'arn:aws:sns:us-east-1:139100004146:cloud1-sns:4fdf5b42-5e4b-4f4b-bf07-5802ed3dbfbd',
  'Owner': '139100004146',
  'Protocol': 'email',
  'Endpoint': 'aditya.sdrt@gmail.com',
  'TopicArn': 'arn:aws:sns:us-east-1:139100004146:cloud1-sns'},
 {'SubscriptionArn': 'PendingConfirmation',
  'Owner': '139100004146',
  'Protocol': 'email',
  'Endpoint': 'user@server.com',
  'TopicArn': 'arn:aws:sns:us-east-1:139100004146:cloud1-sns'}]

In [103]:
import boto3

sqs = boto3.resource('sqs')

In [104]:
queue = sqs.get_queue_by_name(QueueName='Proj1SQS')

In [105]:
for message in queue.receive_messages():
    print(message.message_attributes)

None


In [4]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3
import pandas as pd
import time

opensearch_host = 'search-project1-fcmtfzdorczceupsri4f5e376y.us-east-1.es.amazonaws.com'
region = 'us-east-1'
port = 443
index_name = 'restaurant'
queue_name = 'Proj1SQS'
size = 3
max_timeout = 60
sleep_time = 10

In [59]:
def lambda_handler(event, context):
    sqs = boto3.resource('sqs')
    queue = sqs.get_queue_by_name(QueueName=queue_name)
    messages = queue.receive_messages(MessageAttributeNames=['All'])
    if len(messages):
        for message in messages:
            print("Processing message {}".format(message.message_id))

            cuisine = message.message_attributes.get('Cuisine').get('StringValue')
            date = message.message_attributes.get('Date').get('StringValue')
            email = message.message_attributes.get('Email').get('StringValue')
            people_number = message.message_attributes.get('PeopleNumber').get('StringValue')
            booking_time = message.message_attributes.get('Time').get('StringValue')

            print("Receiving message from SQS - Cuisine : {}, Date : {}, Email : {}, People Number : {}, Booking Time : {}".format(
                cuisine,
                date,
                email,
                people_number,
                booking_time
            ))

            credentials = boto3.Session().get_credentials()
            auth = AWSV4SignerAuth(credentials, region)
            opensearch = OpenSearch(
                hosts = [{'host': opensearch_host, 'port': port}],
                http_auth = auth,
                use_ssl = True,
                verify_certs = True,
                connection_class = RequestsHttpConnection
            )

            query = {
            'size': size,
            'query': {
                'multi_match': {
                'query': cuisine,
                'fields': ['cuisine']
                }
            }
            }

            response = opensearch.search(
                body = query,
                index = index_name
            )

            if response['hits']:
                restaurant_ids = [hits['_source']['restaurant_id'] for hits in response['hits']['hits']]
                print("Receiving restaurant_ids from OpenSearch : {}".format(restaurant_ids))
            else:
                raise ValueError("Open Search Failed")

            dynamodb = boto3.client("dynamodb")

            locations = []
            names = []

            assert len(restaurant_ids)

            for restaurant_id in restaurant_ids:

                response = dynamodb.get_item(
                    TableName='cloud1-db',
                    Key={
                        'id': {'S': restaurant_id},
                    }
                )

                try:
                    location = response['Item']['location']['S']
                    name = response['Item']['name']['S']
                    locations.append(location)
                    names.append(name)
                    print("restaurant_id : {}".format(restaurant_id))
                    print("name : {}".format(name))
                    print("location : {}".format(location))
                except:
                    raise ValueError("DynamoDB Failed")

            sns = boto3.client("sns")
            response = sns.list_topics()
            topic_arn = response["Topics"][0]['TopicArn']
            print("Subscribing email : {}".format(email))
            response = sns.subscribe(TopicArn=topic_arn, Protocol="email", Endpoint=email)
            subscription_arn = response["SubscriptionArn"]

            timeout = 0
            while subscription_arn == 'pending confirmation':
                if timeout == max_timeout:
                    raise ValueError("Email is not confirmed : {}. subscription_arn : {}".format(email, subscription_arn))
                else:
                    print("Email {} has not been confirmed. subscription_arn : {}. Sleeping...".format(email, subscription_arn))
                    time.sleep(sleep_time)
                    timeout = timeout + 1
                    subscription_arn = [x['SubscriptionArn'] for x in sns.list_subscriptions()['Subscriptions'] if x['Endpoint'] == email][0]

            print("email {} subscription_arn has been confirmed : {}".format(email, subscription_arn))

            subject = "Message from DiningConcierge"
            message_body = """Hello! Here are my {cuisine} restaurant suggestions for {people_number} people, for {date} at {booking_time}: 1. {name0}, located at {location0}, 2. {name1}, located at {location1}, 3. {name2}, located at {location2}. Enjoy your meal!""".format(
                cuisine= cuisine,
                people_number= people_number,
                date= date,
                booking_time= booking_time,
                name0= names[0],
                location0= locations[0],
                name1= names[1],
                location1= locations[1],
                name2= names[2],
                location2= locations[2]
            )

            message_id = sns.publish(TopicArn=topic_arn, 
                    Message=message_body, 
                    Subject=subject)['MessageId']
            print("Message has been sent to {}".format(email))
            message.delete()
            print("Message has been deleted from SQS")

            return {
                'cuisine' : cuisine,
                'date' : date,
                'email' : email,
                'people_number' : people_number,
                'booking_time' : booking_time,
                'restaurant_ids': restaurant_ids,
                'names': names,
                'locations': locations,
                'subject': subject,
                'message': message,
                'message_id': message_id
            }
    else:
        print("No messages found!")

In [60]:
lambda_handler("", "")

In [90]:
%debug

> [0;32m<ipython-input-88-aba58247ff65>[0m(5)[0;36mlambda_handler[0;34m()[0m
[0;32m      3 [0;31m    [0mqueue[0m [0;34m=[0m [0msqs[0m[0;34m.[0m[0mget_queue_by_name[0m[0;34m([0m[0mQueueName[0m[0;34m=[0m[0mqueue_name[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m      4 [0;31m    [0;32mfor[0m [0mmessage[0m [0;32min[0m [0mqueue[0m[0;34m.[0m[0mreceive_messages[0m[0;34m([0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m----> 5 [0;31m        [0mcuisine[0m [0;34m=[0m [0mmessage[0m[0;34m.[0m[0mmessage_attributes[0m[0;34m.[0m[0mget[0m[0;34m([0m[0;34m'Cuisine'[0m[0;34m)[0m[0;34m.[0m[0mget[0m[0;34m([0m[0;34m'StringValue'[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m      6 [0;31m        [0mdate[0m [0;34m=[0m [0mmessage[0m[0;34m.[0m[0mmessage_attributes[0m[0;34m.[0m[0mget[0m[0;34m([0m[0;34m'Date'[0m[0;34m)[0m[0;34m.[0m[0mget[0m[0;34m([0m[0;34m'StringValue'[0m[0;34m)[0m[0

In [92]:
message.message_attributes