# Movie Search Blog
## Part 2: Create Embeddings and Populate OpenSearch
In this section, we will populate an Amazon OpenSearch Service domain with movie indexes. Prior to this step be sure that you have completed the necessary infrastructure prerequistes for Amazon OpenSearch.

We use Neptune ML (https://docs.aws.amazon.com/neptune/latest/userguide/machine-learning.html) Graph Neural Networks (GNN) to encode embeddings for movies in the IMDB dataset. Our goal is to produce a graph-aware vector embedding that represents a movie as well as its related entities, such as genres and artists. 

The movie centric data model we will be working with is seen below:

![image-2.png](attachment:image-2.png)

We will only create embeddings for movie, artist, and genre entities. *User and place entities are not included in the embeddings*.

To access the OpenSearch domain, we use methods similar to those seen in the [IMDB Knowledge Graph Blog](https://github.com/aws-samples/imdb-knowledge-graph-blog/blob/main/part3-out-of-catalog/cdk/ooc/lambdas/LoadDataIntoOpenSearchLambda/lambda_handler.py), follow up further there if you are just getting started with Amazon OpenSearch and want to learn more.

The diagram seen below depicts the approach. For a more in depth discussion, refer to our accompanying blog post for more information.

![image-5.png](attachment:image-5.png)


## Settings
First, in the cell below you will need to add in some parameters for your personal enviornment. Be sure to have completed the infrastructure prerequisites before moving on.

In [None]:
import boto3

REGION = boto3.Session().region_name

# Replace with your S3 bucket where you prepared the IMDB data for Neptune use
SOURCE_S3_PATH_NOSLASH=f"s3://<your bucket>"

# Replace with your Amazon OpenSearch Service domain endpoint
AOS_ENDPOINT="<your AOS endpoint>"

# Replace with your IAM role for the GNN embedding pipeline.
SAGEMAKER_ROLE="<your IAM role for GNN pipeline>"


## Connect to OpenSearch domain and create indexes


The next step will be connecting to the Amazon OpenSearch domain and creating the indexes over which we will search for the lexical, semantic, and graphical elements of our search process. 

![WBDArch.drawio%20%284%29.png](attachment:WBDArch.drawio%20%284%29.png)

In [None]:
!pip install -q opensearch-py

In [None]:
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth

aos_client = OpenSearch(
    hosts = [{'host': AOS_ENDPOINT.split("//")[1], 'port': 443}],
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

In [None]:
aos_client.indices.get_alias(index="*")

The cell below will create the first of 3 OpenSearch indexes, each with different ways for us to conduct similarity search. The first index below will create the lexical based search index.

In [None]:
movie_index = {
    "settings": {
        "number_of_replicas": 1,
        "number_of_shards": 5,
        "analysis": {
          "analyzer": {
            "default": {
              "type": "standard",
              "stopwords": "_english_"
            }
          }
        }
    },  
    "mappings": {
        "properties": {
            "sentence_embedding": {
                "type": "knn_vector",
                "dimension": 384,  
            },
            "gnn_embedding": {
                "type": "knn_vector",
                "dimension": 64,  
            }
        }
    }
}

aos_client.indices.create(index="movie",body=movie_index)

In [None]:
aos_client.indices.get(index="movie")

In [None]:
aos_client.indices.get(index="movie")

This next cell will create the semantic search index for our search process.

In [None]:
movie_sentence_index = {
    "settings": {
        "number_of_replicas": 1,
        "number_of_shards": 5,
        "index.knn": True,
        "index.knn.space_type": "cosinesimil",
        "analysis": {
          "analyzer": {
            "default": {
              "type": "standard",
              "stopwords": "_english_"
            }
          }
        }
    },   
    "mappings": {
        "properties": {
            "sentence_embedding": {
                "type": "knn_vector",
                "dimension": 384,  
            },
            "gnn_embedding": {
                "type": "knn_vector",
                "dimension": 64,  
            }
        }
    }
}

aos_client.indices.create(index="movie_sentence",body=movie_sentence_index)

In [None]:
aos_client.indices.get(index="movie_sentence")

Finally, the following cell will create another search index for our embeddings generated by the GNN. This can also be thought of as our graph intelligent search component.

In [None]:
movie_gnn_index = {
    "settings": {
        "number_of_replicas": 1,
        "number_of_shards": 5,
        "index.knn": True,
        "index.knn.space_type": "cosinesimil",
        "analysis": {
          "analyzer": {
            "default": {
              "type": "standard",
              "stopwords": "_english_"
            }
          }
        }
    },   
    "mappings": {
        "properties": {
            "sentence_embedding": {
                "type": "knn_vector",
                "dimension": 384,  
            },
            "gnn_embedding": {
                "type": "knn_vector",
                "dimension": 64,  
            }
        }
    }
}

aos_client.indices.create(index="movie_gnn",body=movie_gnn_index)

In [None]:
aos_client.indices.get(index="movie_gnn")

### Delete index (cleanup only)
Caution, do not run this cell unless you wish to delete your OpenSearch index.

In [None]:
index_name="<index>"
aos_client.indices.delete(index=index_name)

## Create Sentence Embeddings
In this section we will undergo the process of actually creating the vector embeddings for the sentances in our data.

![WBDArch.png](attachment:WBDArch.png)
### Get local copy of movie CSV
The first step is pulling a copy of the movie data into the local enivornment as a CSV.

In [None]:
%%bash -s "$SOURCE_S3_PATH_NOSLASH"

SOURCE_OBJ=$1/source/movies.csv
aws s3 cp $SOURCE_OBJ movies.csv
wc -l movies.csv

### Setup sentence transformer model

Next we will setup a sentence-transformer model in order to create the embeddings from our natural language text data. For more details on how sentance transformers operate, see this [blog](https://aws.amazon.com/blogs/machine-learning/text-embedding-and-sentence-similarity-retrieval-at-scale-with-amazon-sagemaker-jumpstart/)

In [None]:
pip install sentence-transformers

In [None]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

def get_embeddings(sentences):
    embeddings = model.encode(sentences)
    return embeddings
    
def get_str_embedding(embedding):
    return ';'.join([str(x) for x in embedding])

def parse_str_embedding(s):
    toks= s.split(";")
    return list(map(lambda x: float(x), toks))
    


### Generate sentence embeddings for movie, save to local file

The cell below will generate the sentence embeddings from the utilities we imported previously. After generation it will then save these embeddings to a local file within this notebook space.

In [None]:
import csv

def csv_open(fname, headers):
    ff = []
    ff.append(open(fname, "w"))
    ff.append(csv.writer(ff[0]))
    csv_write(ff, headers)
    return ff

def csv_close(ff):
    ff[0].close()

def csv_write(ff, arr):
    ff[1].writerow("" if pd.isna(a) or a=='-' else a for a in arr)


This takes several hours. 

In [None]:
import pandas as pd
import string

printable = set(string.printable)
def fix_title(title):
    return ''.join(filter(lambda x: x in printable, title))
    
sentence_embedding_file = csv_open("sentence_embeddings.csv", ["~id", "embedding:vector"])

df= pd.read_csv('movies.csv')
for index, row in df.iterrows():
    if index % 500 == 0:
        print(str(index))
    title=row["title:string"]
    embedding=None
    try:
        if pd.isna(title):
            embedding=""
        else:
            embedding=get_str_embedding(get_embeddings(title))
    except Exception as e:
        print(f"Error on {index} {row}")
        print("Exception: {}".format(type(e).__name__))
        print("Exception message: {}".format(e))
        title=fix_title(title)
        embedding=get_str_embedding(get_embeddings(title))
    myrow = [row["~id"], embedding]
    csv_write(sentence_embedding_file, myrow)

csv_close(sentence_embedding_file)

In [None]:
!wc -l sentence_embeddings.csv

In [None]:
!head -3 sentence_embeddings.csv

## Create GNN embeddings

![WBDArch%20%281%29.png](attachment:WBDArch%20%281%29.png)

### Create pipeline for GNN embeddings

The cell below will now create a sagemaker pipeline to generate our embeddings.

Since the pipeline creates several SageMaker resources when it executed, we recommend you pass to start_pipeline_execution() a set of tags. Created resources will be given that tag for you to track. See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/create_pipeline.html for more. 

In [None]:
sagemaker_client = boto3.client("sagemaker", region_name=REGION)
pipeline_name="neptune-ml-sagemaker-pipeline-movie-test2"

sagemaker_client.create_pipeline(
    PipelineName=pipeline_name,
    PipelineDefinitionS3Location={
        'Bucket': f"graphlytics-{REGION}",
        'ObjectKey': "v1/sagemaker-pipelines/neptune-ml-training-pipeline.json",
        },
   RoleArn=SAGEMAKER_ROLE
)

### Run and monitor the pipeline to create the GNN embeddings

The cell below will kick off the process to generate the GNN embeddings, this may take some time so do not worry if the response takes awhile.

Before running, check the quotas console https://console.aws.amazon.com/servicequotas/home/services/sagemaker/quotas to check if you can create 2 instances of the each of the processing and training instance types. You can request a limit increase in the console. 

In [None]:
input_data_s3_location=f"{SOURCE_S3_PATH_NOSLASH}/source"
processed_data_s3_location = f"{SOURCE_S3_PATH_NOSLASH}/processing"
train_model_s3_location = f"{SOURCE_S3_PATH_NOSLASH}/training"
embedding_s3_location = f"{SOURCE_S3_PATH_NOSLASH}/embeddings"
processing_instance_type = "ml.r5.4xlarge"
training_instance_type="ml.p3.8xlarge"
processing_instance_size_gb="200"
training_instance_size_gb="200"

start_result = sagemaker_client.start_pipeline_execution(
    PipelineName=pipeline_name,
    PipelineParameters=[

    { "Name": "inputDataS3Location", "Value": input_data_s3_location },
    { "Name": "processedDataS3Location", "Value": processed_data_s3_location},
    { "Name": "trainModelS3Location", "Value": train_model_s3_location},
    { "Name": "embeddingS3Location", "Value": embedding_s3_location},
    { "Name": "embeddingDimension", "Value": "64"},
    { "Name": "model", "Value": "rgcn"},
    { "Name": "sagemakerIamRoleArn", "Value": SAGEMAKER_ROLE},
    { "Name": "processingInstanceType", "Value": processing_instance_type},
    { "Name": "trainingInstanceType", "Value": training_instance_type},
    { "Name": "processingInstanceVolumeSizeInGB", "Value": processing_instance_size_gb},
    { "Name": "trainingInstanceVolumeSizeInGB", "Value": training_instance_size_gb}
])


The following cell can be used to monitor the execution of your sagemaker pipeline we just set up. Run the cell to check its current status.

### Monitor the pipeline execution
It takes some time. Get status of the pipeline. PipelineExecutionStatus is most critical attribute. Also view training and processing jobs directly in SageMaker console. 

In [None]:
arn=start_result['PipelineExecutionArn']

sagemaker_client.describe_pipeline_execution(
    PipelineExecutionArn=arn
)

### Download embeddings

The following cells will download the embeddings we just created into our local enviornment.

In [None]:
%%bash -s "$SOURCE_S3_PATH_NOSLASH" 

echo Download GNN
aws s3 cp $1/embeddings/movie.csv gnn_embeddings.csv



In [None]:
%%bash

echo need to sort the files by ID allowing us to tie together by row number
head -1 movies.csv > xmovie.csv
head -1 sentence_embeddings.csv > xsentence_embeddings.csv
head -1 gnn_embeddings.csv > xgnn_embeddings.csv

echo movie
tail -n+2 movies.csv | sort -k1,1 -t"," -d >> xmovie.csv
echo sentence
tail -n+2 sentence_embeddings.csv | sort -k1,1 -t"," -d  >> xsentence_embeddings.csv
echo gnn
tail -n+2 gnn_embeddings.csv | sort -k1,1 -t"," -d  >> xgnn_embeddings.csv

echo confirm num lines the same
wc -l xmovie.csv xgnn_embeddings.csv xsentence_embeddings.csv


In [None]:
%%bash

echo Check sort order of TT IDs is the same across the three files

cat xmovie.csv | awk -F, '{print $1}' > xmovie_tt.txt
cat xgnn_embeddings.csv | awk -F, '{print $1}' > xgnn_tt.txt
cat xsentence_embeddings.csv | awk -F, '{print $1}' > xsentence_tt.txt

echo diff movie gnn
diff xmovie_tt.txt xgnn_tt.txt | head -10

echo diff movie sentence
diff xmovie_tt.txt xsentence_tt.txt | head -10

echo diff gnn sentence
diff xgnn_tt.txt xsentence_tt.txt | head -10


## Write to indexes

Next, we need to write the embeddings into the OpenSearch indexes which we created at the start of this notebook

In [None]:
import json

def add_bulk_to_index(index_name, chunk):

    data = []
    for rec in chunk:
        j = {
            'index': {
                '_index': index_name, 
                '_id': rec['id']
            }
        }
        data.append(json.dumps(j))
        data.append(json.dumps(rec['fields']))

    data_to_load = "\n".join(data)
    bulk_res = aos_client.bulk(data_to_load)


In [None]:
import pandas as pd

CHUNK_SIZE=10000
movie_reader = pd.read_csv('xmovie.csv', iterator=True)
sentence_reader = pd.read_csv('xsentence_embeddings.csv', iterator=True)
gnn_reader = pd.read_csv('xgnn_embeddings.csv', iterator=True)

MOVIE_MAP = {
    'title:string': 'title',
    'year:int': 'year',
    'averageRating:float': 'averageRating',
    'numVotes:int': 'numVotes',
    'runtime:int': 'runtime'
}

def process_next_chunk():
    aos_chunks = []
    movie_df = None
    sentence_df = None
    gnn_df = None
    try:
        movie_df = movie_reader.get_chunk(CHUNK_SIZE)
        sentence_df = sentence_reader.get_chunk(CHUNK_SIZE)
        gnn_df = gnn_reader.get_chunk(CHUNK_SIZE)
        if len(movie_df) == 0:
            return False
    except StopIteration as e: 
        print("Stop")
        print(e)    
        return False
    
    start_index = -1
    for index, row in movie_df.iterrows():
        if start_index == -1:
            start_index = index
            print(start_index)
        aos_rec = {"id": row["~id"], 'fields': {}}
        aos_chunks.append(aos_rec)
        try:
            for m in MOVIE_MAP:
                if pd.isna(row[m]):
                    #skip
                    pass
                else:
                    aos_rec['fields'][MOVIE_MAP[m]]=row[m]
        except Exception as e:
            print(f"Error on movie on {index} {row}")
            print("Exception: {}".format(type(e).__name__))
            print("Exception message: {}".format(e))
    for index, row in sentence_df.iterrows():
        try:
            aos_rec = aos_chunks[index-start_index]
            if row['~id']==aos_rec['id'] and 'embedding:vector' in row and len(row['embedding:vector']) > 0:
                aos_rec['fields']['sentence_embedding']=parse_str_embedding(row['embedding:vector'])
            else:
                raise Exception(f"Sentence Embedding/movie mismatch {str(index)} {str(row)} {str(aos_rec)}")
        except Exception as e:
            print(f"Error on {index} {row}")
            print("Exception: {}".format(type(e).__name__))
            print("Exception message: {}".format(e))
    for index, row in gnn_df.iterrows():
        try:
            aos_rec = aos_chunks[index-start_index]
            if row['~id']==aos_rec['id'] and 'embedding:vector' in row and len(row['embedding:vector']) > 0:
                aos_rec['fields']['gnn_embedding']=parse_str_embedding(row['embedding:vector'])
            else:
                raise Exception(f"GNN Embedding/movie mismatch {str(index)} {str(row)} {str(aos_rec)}")
        except Exception as e:
            print(f"Error on {index} {row}")
            print("Exception: {}".format(type(e).__name__))
            print("Exception message: {}".format(e))
    
    for idx in ['movie', 'movie_sentence', 'movie_gnn']:
        add_bulk_to_index(idx, aos_chunks)
        
    return True

while True:
    ret = process_next_chunk()
    if not ret:
        break



### Optional reset if you need to start again


In [None]:
#aos_client.indices.delete(index="movie")
#aos_client.indices.create(index="movie",body=movie_index)

#aos_client.indices.delete(index="movie_gnn")
#aos_client.indices.create(index="movie_gnn",body=movie_gnn_index)

#aos_client.indices.delete(index="movie_sentence")
aos_client.indices.create(index="movie_sentence",body=movie_sentence_index)
