# Lab 3 - Integrate Neptune with OpenSearch
In this lab we will configure the existing OpenSearch index to integrate with the Neptune cluster created in Lab 2. This lab expects that you have already created Workshop 0, or have enabled OpenSearch index by default on deployment.

To integrate Neptune with OpenSearch, you can use an existing OpenSearch Service cluster that has been populated according the Neptune data model for OpenSearch data, or you can create an OpenSearch service domain linked with Neptune using an AWS CLoudFormation stack. In this lab, we will be using an existing cluster.

## Neptune data model for OpenSearch data
Documents in OpenSearch correspond to an entity and store the relevant information for the entity. We compare this to Gremlin, where vertices and edges are considered entities. This means that the OpenSearch documents need to have the information about our vertices and edges in the form of labels and properties.

When we set up Neptune in Lab 2, we included labels and properties so our data would fit with this format.

### Import dependencies
The following libraries are needed for this lab.

In [None]:
import boto3
from gremlin_python.structure.graph import Graph
from gremlin_python.process.graph_traversal import __
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.driver.aiohttp.transport import AiohttpTransport
from gremlin_python.process.traversal import *

### Create Clients

In [None]:
neptune = boto3.client('neptune')
ec2 = boto3.client('ec2')
batch = boto3.client('batch')
ssm = boto3.client('ssm')

### Load variables saved in Lab 2
At the end of Lab 2 we saved some variables that we'll need in this lab. The following cell with load those variables into this lab environment.

In [None]:
%store -r

### Enable Neptune streams
In order to add the data in Neptune to our OpenSearch index, Neptune streams need to be enabled.

CLuster parameters such as `neptune_streams` are part of a parameter group. We cannot change the default parameter group, so we first need to create a new one and then update our cluster to use the new parameter group with streams enabled over the default one.

In [None]:
# Create the new parameter group. All parameters will be created by default
parameter_group_name = 'retail-demo-store-neptune-opensearch-parameter-group'
parameter_group_response = neptune.create_db_cluster_parameter_group(
    DBClusterParameterGroupName=parameter_group_name,
    DBParameterGroupFamily='neptune1.2',
    Description='Parameter group for Neptune OpenSearch integration'
)

# Enable streams
neptune.modify_db_cluster_parameter_group(
    DBClusterParameterGroupName=parameter_group_name,
    Parameters=[
        {
            'ParameterName': 'neptune_streams',
            'ParameterValue': '1',
            'ApplyMethod': 'pending-reboot'
        },
    ]
)

# Use the newly created parameter group with our existing cluster
neptune.modify_db_cluster(
    DBClusterIdentifier=db_cluster_identifier,
    ApplyImmediately=True,
    DBClusterParameterGroupName=parameter_group_response['DBClusterParameterGroup']['DBClusterParameterGroupName'],
)

## Amazon Neptune-to-OpenSearch replication setup
Amazon Neptune supports full-text search in Gremlin and SPARQL queries using Amazon OpenSearch Service (OpenSearch Service). You can use an AWS CloudFormation stack to link an OpenSearch Service domain to Neptune.

We will be following the instructions in [this repository](https://github.com/awslabs/amazon-neptune-tools/tree/master/export-neptune-to-elasticsearch) to index existing data in an Amazon Neptune database in ElasticSearch to enable Neptune's full-text search integration.

### Get Parameters
The existing OpenSearch endpoint is required as a stack parameter. We can use the `ssm` boto3 client to retrieve it.

In [None]:
opensearch_domain_endpoint = None

domains_response = opensearch_service.list_domain_names()

for domain_name in domains_response['DomainNames']:
    describe_response = opensearch_service.describe_domain(
        DomainName=domain_name['DomainName']
    )

    tags_response = opensearch_service.list_tags(ARN=describe_response['DomainStatus']['ARN'])

    domain_match = False
    for tag in tags_response['TagList']:
        if tag['Key'] == 'Name' and tag['Value'] == 'retaildemostore':
            domain_match = True
            break

    if domain_match:
        opensearch_domain_endpoint = describe_response['DomainStatus']['Endpoints']['vpc']
        break

print('OpenSearch domain endpoint: ' + str(opensearch_domain_endpoint))

assert opensearch_domain_endpoint, 'OpenSearch domain endpoint could not be determined. Ensure Amazon OpenSearch domain has been successfully created and has "retaildemostore" tag before continuing.'

opensearch_sg = ssm.get_parameter(
    Name='retaildemostore-stack-opensearch-domain-sg'
)
opensearch_sg = opensearch_sg['Parameter']['Value']
print(opensearch_sg)

### Create keypair for stack creation
A keypair is required as a parameter for the stack.

In [None]:
keypair = ec2.create_key_pair(KeyName='retail-demo-store-neptune-opensearch')
keypair_name = keypair['KeyName']

### Launch Stack
#### Stack parameters
Replace the corresponding stack parameters with the values given when you run the below code cell.

In [None]:
print(f"""
KeyPairName: {keypair_name}
VPC: {vpc_id}
Subnet1: {subnet}
NeptuneEndpoint: {neptune_endpoint}
NeptuneEngine: gremlin
NeptuneClientSecurityGroup: {security_group_id}
ElasticSearchEndpoint: {opensearch_domain_endpoint}
NumberOfShards: 1
NumberOfReplica: 0
ElasticSearchClientSecurityGroup: {opensearch_sg}
""")


#### The stack
Launch the stack in the same region you deployed the Retail Demo Store in. Creation will take approximately 3 minutes.

| Region | Stack |
| ---- | ---- |
|US East (N. Virginia) |  [<img src="https://s3.amazonaws.com/cloudformation-examples/cloudformation-launch-stack.png">](https://us-east-1.console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks/create/review?templateURL=https://s3.amazonaws.com/aws-neptune-customer-samples/neptune-sagemaker/cloudformation-templates/export-neptune-to-elasticsearch/export-neptune-to-elasticsearch.json&stackName=neptune-index) |
|US West (Oregon) |  [<img src="https://s3.amazonaws.com/cloudformation-examples/cloudformation-launch-stack.png">](https://us-west-2.console.aws.amazon.com/cloudformation/home?region=us-west-2#/stacks/create/review?templateURL=https://s3.amazonaws.com/aws-neptune-customer-samples/neptune-sagemaker/cloudformation-templates/export-neptune-to-elasticsearch/export-neptune-to-elasticsearch.json&stackName=neptune-index) |
|Europe (Ireland) |  [<img src="https://s3.amazonaws.com/cloudformation-examples/cloudformation-launch-stack.png">](https://eu-west-1.console.aws.amazon.com/cloudformation/home?region=eu-west-1#/stacks/create/review?templateURL=https://s3.amazonaws.com/aws-neptune-customer-samples/neptune-sagemaker/cloudformation-templates/export-neptune-to-elasticsearch/export-neptune-to-elasticsearch.json&stackName=neptune-index) |
|Asia Pacific (Tokyo) | [<img src="https://s3.amazonaws.com/cloudformation-examples/cloudformation-launch-stack.png">](https://ap-northeast-1.console.aws.amazon.com/cloudformation/home?region=ap-northeast-1#/stacks/create/review?templateURL=https://s3.amazonaws.com/aws-neptune-customer-samples/neptune-sagemaker/cloudformation-templates/export-neptune-to-elasticsearch/export-neptune-to-elasticsearch.json&stackName=neptune-index) |
|Asia Pacific (Sydney) | [<img src="https://s3.amazonaws.com/cloudformation-examples/cloudformation-launch-stack.png">](https://ap-southeast-2.console.aws.amazon.com/cloudformation/home?region=ap-southeast-2#/stacks/create/review?templateURL=https://s3.amazonaws.com/aws-neptune-customer-samples/neptune-sagemaker/cloudformation-templates/export-neptune-to-elasticsearch/export-neptune-to-elasticsearch.json&stackName=neptune-index) |


### Create batch job to update index
Once the stack has successfully created we can invoke the Lambda function that starts the batch job. You can find and copy the command as the `StartExportCommand` property in the stack output.

The command will look like `aws lambda invoke --function-name arn:aws:lambda:eu-west-1:{ACCOUNT_ID}}:function:export-neptune-to-kinesis-12345678 --region {REGION}} /dev/stdout`


In [None]:
# Insert command below
!

#### Wait for batch job to complete
Insert the given job id in the output to observe job status. When status is `SUCEEDED`, all the data stored in Neptune has been indexed in our ElasticSearch cluster.

In [None]:
import time

status = None
max_time = time.time() + 15*60
while time.time() < max_time:
    response = batch.describe_jobs(
        jobs=[
            # Inset job id here as a string,
        ]
    )
    status = response['jobs'][0]['status']

    print("Status: {}".format(status))

    if status == 'SUCCEEDED':
        break

    time.sleep(30)

## Evaluate Search
Now that the Neptune data has been indexed, you can test the search experience with some queries. To see how the results differ from the existing index, compare the results here with those in the Retail Demo Store Web UI. If you haven't already opened a browser window/tab to the Retail Demo Store Web UI, navigate to the CloudFormation console in this AWS account and check the Outputs section of the stack used to launch the Retail Demo Store. Make sure you're checking the base/root stack and not the nested stacks that were created. In the Outputs section look for the output named `WebURL`.

### Test endpoint
Insert the string generated by the code cell below into the curl command to validate that the search endpoint is live.

In [None]:
'https://'+opensearch_domain_endpoint+'/amazon_neptune'

In [None]:
! curl

### Gremlin Queries
We will run some Gremlin queries with the new search endpoint.

#### Setup connection

In [None]:
graph = Graph()

port = 8182
endpoint = neptune_endpoint
endpoint = f'wss://{endpoint}:{port}/gremlin'

graph=Graph()

connection = DriverRemoteConnection(endpoint,'g',
                 transport_factory=lambda:AiohttpTransport(call_from_event_loop=True))
g = graph.traversal().withRemote(connection)
endpoint

#### Queries
In the code cell below, we count the number of inserted products as we did when they were inserted. The difference here being that we are using the full-text search endpoint.

In [None]:
g.withSideEffect("Neptune#fts.endpoint", "https://vpc-opensearchdomai-vl14xxcejere-pilc2aofyeznmglu5yqt2xsk4a.eu-west-1.es.amazonaws.com")\
    .V().hasLabel('product').count().next()

The below query returns products with the style of backpack.

In [None]:
g.withSideEffect("Neptune#fts.endpoint", "https://vpc-opensearchdomai-vl14xxcejere-pilc2aofyeznmglu5yqt2xsk4a.eu-west-1.es.amazonaws.com")\
    .withSideEffect("Neptune#fts.queryType", "query_string")\
    .V().has("*", "Neptune#fts predicates.style.value:backpack").toList()

The below query gets products name Screwdriver.

In [None]:
g.withSideEffect("Neptune#fts.endpoint", "https://vpc-opensearchdomai-vl14xxcejere-pilc2aofyeznmglu5yqt2xsk4a.eu-west-1.es.amazonaws.com")\
    .withSideEffect("Neptune#fts.queryType", "query_string")\
    .V().has("*", "Neptune#fts predicates.product_name.value:Screwdriver").toList()

Compare the output of the queries above with the search results generated by the products index on the Web UI. Also try changing some of the queries to see how results change. Refer to [this documentation](https://docs.aws.amazon.com/neptune/latest/userguide/full-text-search-gremlin.html) for more examples.


## Workshop Complete
Congratulations! You have now completed the Retail Demo Store Neptune Workshop.

### Cleanup
If you launched the Retail Demo Store in your personal AWS account **AND** you're done with all workshops, you can follow the [Neptune workshop cleanup](./Lab-4-Cleanup-Neptune-Resources.ipynb) to delete all of the resources created by this workshop.

**IMPORTANT: since the Neptune resources were created by this notebook and not CloudFormation, deleting the CloudFormation stack for the Retail Demo Store will not remove the Neptune resources. You MUST run the [Neptune workshop cleanup](./Lab-4-Cleanup-Neptune-Resources.ipynb) notebook or manually clean up these resources.**

If you are participating in an AWS managed event such as a workshop and using an AWS provided temporary account, you can skip the cleanup workshop unless otherwise instructed.

#### Store variables for cleanup

In [None]:
%store parameter_group_name
%store keypair_name
%store opensearch_domain_endpoint