# Embedding pipeline

Related JIRA tickets: 
* https://bbpteam.epfl.ch/project/issues/browse/DKE-718
* https://bbpteam.epfl.ch/project/issues/browse/DKE-715

Prerequisites:

- Models have been built
- Embedding service can read models from a dedicated Nexus project where all models are storerd
- Model id equals the Nexus resource id of the EmbeddingModel resource

Questions:

* do we really need to url-encode tags ?
* add missing types and properties to the context

# Setup

## Imports

In [None]:
import requests
import getpass
import uuid
import os
import math

import numpy as np
import nexussdk as nxs

from collections import namedtuple
from urllib.parse import quote_plus
from kgforge.core import KnowledgeGraphForge
from kgforge.specializations.mappings import DictionaryMapping
from bluegraph.downstream import EmbeddingPipeline
from bluegraph.core import GraphElementEmbedder

In [None]:
from kgforge.version import __version__
print(__version__)

## Helpers

In [None]:
def get_vector_dimensions(embeddings):
    dimensions = list()
    for embedding in embeddings.values():
        dimensions.append(len(embedding))
    if len(set(dimensions)) == 1:
        dimension = dimensions[0]
    else:
        print("Not all embedding vectors have the same dimension")
    return dimension

def get_es_view_mappings(dimension):
    mappings = {
            "properties": {
              "@id": {
                "type": "keyword"
              },
              "@type": {
                "type": "keyword"
              },
              "derivation": {
                "properties": {
                  "entity": {
                    "properties": {
                      "@id": {
                        "type": "keyword"
                      }
                    },
                    "type": "nested"
                  }
                },
                "type": "nested"
              },
              "embedding": {
                "dims": dimension,
                "type": "dense_vector"
              },
              "generation": {
                "properties": {
                  "activity": {
                    "properties": {
                      "used": {
                        "properties": {
                          "@id": {
                            "type": "keyword"
                          }
                        },
                        "type": "nested"
                      }
                    },
                    "type": "nested"
                  }
                },
                "type": "nested"
              }
            }
          }
    return mappings


# TODO move the resource part out to avoid duplication
def update_stats(forge, stats_resource, sample_size, stats, formula, param_name, tag):
    stats_resource.scriptScore = formula
    stats_resource.vectorParameter = param_name
    stats_resource.series = forge.from_json([
            {
              "statistic": "min",
              "unitCode": "dimensionless",
              "value": stats.min
            },
            {
              "statistic": "max",
              "unitCode": "dimensionless",
              "value": stats.max
            },
            {
              "statistic": "mean",
              "unitCode": "dimensionless",
              "value": stats.mean
            },
            {
              "statistic": "standard deviation",
              "unitCode": "dimensionless",
              "value": stats.std
            },
            {
              "statistic": "N",
              "unitCode": "dimensionless",
              "value": sample_size
            }
        ])
    forge.update(stats_resource)
    forge.tag(stats_resource, tag) # TODO : Included tagging the statistics resources with the same tag as the embeddings ?
    return stats_resource


def update_boosting_data(forge, boosting_resource, view_id, deviation, formula, param_name, tag):
    factors = [
        {"entity": {"@id": k}, "value": 1 + v, "unitCode": "dimensionless"}
        for k, v in deviation.items()
    ]
    boosting_resource.scriptScore = formula
    boosting_resource.vectorParameter = param_name
    boosting_resource.series = factors
    forge.update(boosting_resource)
    forge.tag(boosting_resource, tag) # TODO : Included tagging the boosting resource with the same tag as the embeddings ?
    return boosting_resource

In [None]:
# This part was taken from Eugenia Oshurko's "Compute and register view statistics and boosting.ipynb" notebook

In [None]:
def set_elastic_view(forge, view):
    forge._store.service.elastic_endpoint["endpoint"] = "/".join(
        (views_endpoint, quote_plus(view), "_search"))

    
def get_all_vectors(forge, resource_limit):
    all_embeddings = forge.elastic(f"""{{
        "from" : 0,
        "size" : {resource_limit},
        "query": {{
            "term": {{"_deprecated": false}}
        }}
    }}
    """)
    vectors = {
        result._source["@id"]: result._source["embedding"]
        for result in all_embeddings
    }
    return vectors


def get_all_scores(forge, vectors, formula, param_name, resource_limit=200, boosting=None):
    score_values = set()
    for k, vector in vectors.items():
        query = f"""{{
          "size": {len(vectors)},
          "query": {{
            "script_score": {{
                "query": {{
                    "bool" : {{
                      "must_not" : {{
                        "term" : {{ "@id": "{k}" }}
                      }},
                      "must": {{ "exists": {{ "field": "embedding" }} }}
                    }}
                }},
                "script": {{
                    "source": "{formula}",
                    "params": {{
                      "{param_name}": {vector}
                    }}
                }}
            }}
          }}
        }}"""

        res = forge.elastic(query)
        for el in res:
            boost_factor = 1
            if boosting:
                boost_factor = 1 + boosting[el._source["@id"]]
            score_values.add(el._score * boost_factor)
    score_values = np.array(list(score_values))
    return score_values


def get_view_stats(forge, vectors, formula, param_name, resource_limit=200, boosting=None):
    Statistics = namedtuple('Statistics', 'min max mean std')
    score_values = get_all_scores(
        forge, vectors, formula, param_name, resource_limit, boosting)
    return score_values, Statistics(
        score_values.min(),
        score_values.max(),
        score_values.mean(),
        score_values.std())


def register_stats(forge, view_id, sample_size, stats, formula, param_name, tag, boosting_resource=None):
    json_data = {
        "type": "ElasticSearchViewStatistics",
        "scriptScore": formula,
        "vectorParameter": param_name,
        "series": [
            {
              "statistic": "min",
              "unitCode": "dimensionless",
              "value": stats.min
            },
            {
              "statistic": "max",
              "unitCode": "dimensionless",
              "value": stats.max
            },
            {
              "statistic": "mean",
              "unitCode": "dimensionless",
              "value": stats.mean
            },
            {
              "statistic": "standard deviation",
              "unitCode": "dimensionless",
              "value": stats.std
            },
            {
              "statistic": "N",
              "unitCode": "dimensionless",
              "value": sample_size
            }
        ],
        "derivation": {
            "type": "Derivation",
            "entity": {
                "id": view_id
            }
        }
    }
    if boosting_resource:
        json_data["boosted"] = True
        json_data["generation"] = {
            "type": "Generation",
            "activity": {
                "used": {
                    "id": boosting_resource.id # TODO : Include the boosting_resource version here?
                }
            }
        }
    else:
        json_data["boosted"] = False
    stats_resource = forge.from_json(json_data)
    forge.register(stats_resource)
    forge.tag(stats_resource, tag) # TODO : Included tagging the statistics resources with the same tag as the embeddings ?
    return stats_resource

def get_score_deviation(forge, point_id, vector, k, formula, param_name):
    query = f"""{{
      "size": {k},
      "query": {{
        "script_score": {{
          "query": {{
                "exists": {{
                    "field": "embedding"
                }}
          }},
          "script": {{
            "source": "{formula}",
            "params": {{
              "{param_name}": {vector}
            }}
          }}
        }}
      }}
    }}"""

    result = forge.elastic(query)
    scores = set()
    for el in result:
        if point_id != el._source["@id"]:
            scores.add(el._score)
    scores = np.array(list(scores))
    return math.sqrt(((1 - scores)**2).mean())


def register_boosting_data(forge, view_id, deviation, formula, param_name, tag):
    factors = [
        {"entity": {"@id": k}, "value": 1 + v, "unitCode": "dimensionless"}
        for k, v in deviation.items()
    ]
    resource = forge.from_json({
        "type": "SimilarityBoostingSeries",
        "scriptScore": formula,
        "vectorParameter": param_name,
        "series": factors,
        "derivation": {
            "type": "Derivation",
            "entity": {
                "id": view_id
            }
        }
    })
    forge.register(resource)
    forge.tag(resource, tag) # TODO : Included tagging the boosting resource with the same tag as the embeddings ?
    return resource

## User input

In [None]:
TOKEN = getpass.getpass()

In [None]:
ENDPOINT = "https://staging.nexus.ocp.bbp.epfl.ch/v1"

In [None]:
DOWNLOAD_DIR = "./data"

In [None]:
MODEL_ID = "https://staging.nexus.ocp.bbp.epfl.ch/v1/resources/dke/embedder_catalog/_/e2b953b9-6724-4278-a1e5-3472bd63e374" # TODO adjust

In [None]:
MODEL_REVISION = 4  # Specify a revision, if necessary

In [None]:
NEW_MODEL_REVISION = 5  # Specify a new model revision for "Update Embedding resources" use case

In [None]:
VIEW_NAME = "test1" # Fragment to be used in the view id for new ElasticSearchView resource

In [None]:
CONFIG_ID = "https://bbp.epfl.ch/neurosciencegraph/data/d4c33156-18a6-4660-b35c-9c3a86a550ba" # Id of the RecommenderConfiguration resource that is used by the UI

In [None]:
LIMIT = 200 # TODO could be a problem if not all of them taken

---

In [None]:
formula = "(cosineSimilarity(params.query_vector, doc['embedding']) + 1.0) / 2"

In [None]:
vector_parameter = "query_vector"

In [None]:
tag = f"{MODEL_ID.split('/')[-1]}?rev={MODEL_REVISION}"

In [None]:
new_tag = f"{MODEL_ID.split('/')[-1]}?rev={NEW_MODEL_REVISION}"

In [None]:
view_id = f"https://bbp.epfl.ch/neurosciencegraph/data/views/es/{VIEW_NAME}"

## Forge sessions

### Session for embedding models

In [None]:
forge_models = KnowledgeGraphForge(
    "https://raw.githubusercontent.com/BlueBrain/nexus-forge/master/examples/notebooks/use-cases/prod-forge-nexus.yml",
    endpoint=ENDPOINT,
    token=TOKEN, 
    bucket="dke/embedder_catalog")

### Session for embedding resources

In [None]:
forge = KnowledgeGraphForge(
    "https://raw.githubusercontent.com/BlueBrain/nexus-forge/master/examples/notebooks/use-cases/prod-forge-nexus.yml",
    token=TOKEN, 
    endpoint=ENDPOINT,        
    bucket="bbp_test/akk_snap_test2")

## Nexussdk session

In [None]:
nxs.config.set_environment(forge._store.endpoint)
nxs.config.set_token(TOKEN)
ORGANIZATION = forge._store.bucket.split("/")[0]
PROJECT = forge._store.bucket.split("/")[-1]

## Views endpoint

In [None]:
views_endpoint = "/".join((ENDPOINT, "views", quote_plus(ORGANIZATION), quote_plus(PROJECT)))

# Create EmbeddingModel

1. Create a new embedding model using BlueGraph
2. Create EmbeddingModel resource according to DKE-744
3. Push it to Nexus (using a dedicated project, where all models live)
4. Restart the service

In [None]:
# TODO Inlude in "Build Model for SEU Neuron Morphologies.ipynb"

# Create Embedding resources

1. Given a model id, its revision, and a given a set of resources, ask the service[or some python code] for embedding vectors
2. Create embedding resources according to [this mapping](https://bbpgitlab.epfl.ch/dke/users/eugeniashurko/dataset-embeddings/-/blob/master/mappings/seu-embedding.hjson)
3. Push them to Nexus
4. Tag them with the model `UUID` and its revision (e.g. `e2b953b9-6724-4278-a1e5-3472bd63e374?rev=1`)
5. TODO: Check that the resources are indexed properly

### Fetch embedding vectors

In [None]:
# This part was taken from Eugenia Oshurko's "Fetch embeddings without the service.ipynb" notebook

In [None]:
model_resource = forge_models.retrieve(f"{MODEL_ID}{'?rev=' + str(MODEL_REVISION) if MODEL_REVISION is not None else ''}")

In [None]:
forge_models.download(model_resource, "distribution.contentUrl", DOWNLOAD_DIR, overwrite=True)
pipeline_path = os.path.join(DOWNLOAD_DIR, model_resource.distribution.name)

In [None]:
pipeline = EmbeddingPipeline.load(
    pipeline_path,
    embedder_interface=GraphElementEmbedder,
    embedder_ext="zip")

In [None]:
embedding_table = pipeline.generate_embedding_table()

In [None]:
selected_points = embedding_table.index # selected_points = embedding_table.sample(5).index

In [None]:
embeddings = dict(zip(selected_points, pipeline.retrieve_embeddings(selected_points)))

### Map embedding vectors to resources

- TODO: add the NeuronMorphology revision once available

In [None]:
embeddings_list = list()

In [None]:
for at_id, embedding in embeddings.items():
    embeddings_list.append({
        "morphology_id": at_id,
        "morphology_rev": "TODO",
        "model_id": MODEL_ID,
        "model_rev": MODEL_REVISION,
        "embedding_name": f"Embedding of morphology {at_id.split('/')[-1]} at revision TODO" ,
        "embedding": embedding,
        "uuid": at_id.split("/")[-1]
        
    })

In [None]:
mapping = DictionaryMapping.load("./mappings/seu-embedding.hjson")

In [None]:
embedding_resources = forge.map(embeddings_list, mapping)

In [None]:
for r in embedding_resources:
    r.id = forge.format("identifier", "embeddings", str(uuid.uuid4()))

In [None]:
# print(embedding_resources[0])

In [None]:
forge.register(embedding_resources)
forge.tag(embedding_resources, tag)

# Create ElasticSearchView

1. Get dimensions of the embedding vectors
2. Create a Nexus View resource with:
- `resourceTypes` being `Embedding`
- `dense_vector` with the right dimensions
- `resourceTag` field corresponds to the model UUID and its revision (e.g. `e2b953b9-6724-4278-a1e5-3472bd63e374?rev=1`)

TODO: Adapt the resource_types property to the proper `Embedding` type once it is added to the context


In [None]:
dimension = get_vector_dimensions(embeddings)

In [None]:
try:
    es_view = nxs.views.create_es(ORGANIZATION, 
                                  PROJECT, 
                                  mapping = get_es_view_mappings(dimension),
                                  tag = tag,
                                  view_id = f"https://bbp.epfl.ch/neurosciencegraph/data/views/es/{VIEW_NAME}",
                                  resource_types = [f"https://staging.nexus.ocp.bbp.epfl.ch/v1/resources/{ORGANIZATION}/{PROJECT}/_/Embedding"],
                                  source_as_text = False, 
                                  include_metadata = True, 
                                  include_deprecated = False)
except nxs.HTTPError as e:
    print(e)

# Create ElasticSearchViewStatistics resources (non-boosted)

TODO: There can be a lag while resources are being indexed

In [None]:
# This part was taken from Eugenia Oshurko's "Compute and register view statistics and boosting.ipynb" notebook

In [None]:
set_elastic_view(forge, es_view["@id"])
vectors = get_all_vectors(forge, LIMIT)
values, stats = get_view_stats(forge, vectors, formula, vector_parameter, LIMIT)
stats_resource = register_stats(forge, es_view["@id"], values.shape[0], stats, formula, vector_parameter, tag)

# Create SimilarityBoostingData and ElasticSearchViewStatistics (boosted)

In [None]:
# This part was taken from Eugenia Oshurko's "Compute and register view statistics and boosting.ipynb" notebook

In [None]:
set_elastic_view(forge, es_view["@id"])
deviations = dict()
vectors = get_all_vectors(forge, LIMIT)
for point_id, vector in vectors.items():
    deviations[point_id] = get_score_deviation(
        forge, point_id, vector, 10, formula, vector_parameter)

In [None]:
boosting_resource = register_boosting_data(
        forge, es_view["@id"], deviations, formula, vector_parameter, tag)

In [None]:
vectors = get_all_vectors(forge, LIMIT)
values, stats = get_view_stats(forge, vectors, formula, vector_parameter, LIMIT, deviations)
stats_resource = register_stats(forge, es_view["@id"], values.shape[0], stats, formula, vector_parameter, tag, boosting_resource)

# Create or update RecommenderConfiguration resource

- TODO: Check whether this needs a separate forge session (bbp/atlas)
- TODO: Check whether we need versions of e.g. model, statistics resources etc

## Create RecommenderConfiguration resource

In [None]:
config_resource = forge.from_json({
    "type": "RecommenderConfiguration",
    "configuration": [
        {
            "view": {"id": es_view["@id"]},
            "model": {"id": model_resource.id},
            "statistics": {"id": stats_resource.id}
        }
    ]
})

In [None]:
forge.register(config_resource)

## Update RecommenderConfiguration resource

TODO: Check that the config is not already in the list

In [None]:
config_resource = forge.retrieve(CONFIG_ID)

In [None]:
if isinstance(config_resource.configuration, list):
    config_resource.configuration.append(
        forge.from_json({
                "view": {"id": es_view["@id"]},
                "model": {"id": model_resource.id},
                "statistics": {"id": stats_resource.id}
    }))
else:
    config_resource.configuration = [
        config_resource.configuration,
        forge.from_json({
                "view": {"id": es_view["@id"]},
                "model": {"id": model_resource.id},
                "statistics": {"id": stats_resource.id}
    })
]

In [None]:
forge.update(config_resource)

# Update EmbeddingModel

A new revision of the model is created when we want to:

change graph representation/hyperparameters/training set/dimesionionality
use another algorithm
i.e. anything that may change the resulting embedding vectors, but does not change the purpose of the model.

1. Fetch the existing model resource
2. Update the distribution (or meta-data, if necessary)
3. Update in Nexus

In [None]:
# TODO Inlude in "Build Model for SEU Neuron Morphologies.ipynb"

# Update Embedding resources

We want to update Embedding resources when there is a new version of a model

1. Fetch embedding resources given the model id and the set of resources
2. Ask the service[or some python code] for the new embedding vectors
3. Update embedding resources with the new vectors and the new model revision in `generation.activity.used.id`
4. Update in Nexus
5. Tag them with the model UUID and the its revision (e.g. `e2b953b9-6724-4278-a1e5-3472bd63e374?rev=2`)
6. TODO: Check that the resources are indexed properly

### Fetch new embedding vectors

In [None]:
# This part was taken from Eugenia Oshurko's "Fetch embeddings without the service.ipynb" notebook

In [None]:
model_resource = forge_models.retrieve(f"{MODEL_ID}{'?rev=' + str(NEW_MODEL_REVISION) if NEW_MODEL_REVISION is not None else ''}")

In [None]:
forge_models.download(model_resource, "distribution.contentUrl", DOWNLOAD_DIR, overwrite=True)
pipeline_path = os.path.join(DOWNLOAD_DIR, model_resource.distribution.name)

In [None]:
pipeline = EmbeddingPipeline.load(
    pipeline_path,
    embedder_interface=GraphElementEmbedder,
    embedder_ext="zip")

In [None]:
embedding_table = pipeline.generate_embedding_table()

In [None]:
selected_points = embedding_table.index

In [None]:
updated_embeddings = dict(zip(selected_points, pipeline.retrieve_embeddings(selected_points)))

### Fetch old embedding vectors from Nexus

- TODO: include update of the wasAssociatedWith property of the Embedding resources
- TODO: add the NeuronMorphology revision once available


In [None]:
# This part was taken from Eugenia Oshurko's "Fetch embeddings without the service.ipynb" notebook

In [None]:
set_elastic_view(forge, view_id)

In [None]:
embedding_resources = forge.elastic(f"""{{
        "from" : 0,
        "size" : {LIMIT},
        "query": {{
            "term": {{"_deprecated": false}}
        }}
    }}
    """)

In [None]:
embedding_resources = [forge.retrieve(e._id) for e in embedding_resources]

In [None]:
for e in embedding_resources:
    e.embedding = updated_embeddings[e.derivation.entity.id]
    e.derivation.entity.hasSelector.value = f"?rev=TODO"
    e.generation.activity.used.hasSelector.value = f"?rev={NEW_MODEL_REVISION}"    

In [None]:
forge.update(embedding_resources)
forge.tag(embedding_resources, new_tag)

# Update ElasticSearchView for the updated model

1. Fetch the view
2. Get dimensions from the updated model
3. Update the view resource with

in the mapping we specify 
- `dense_vector` with the right dimensions
- `resourceTag` with the model UUID and the new revision (e.g. `e2b953b9-6724-4278-a1e5-3472bd63e374?rev=2`)

In [None]:
dimension = get_vector_dimensions(updated_embeddings)

In [None]:
nxs.config.set_environment(forge._store.endpoint)
nxs.config.set_token(TOKEN)
ORGANIZATION = forge._store.bucket.split("/")[0]
PROJECT = forge._store.bucket.split("/")[-1]
try:
    es_view = nxs.views.fetch(ORGANIZATION, 
                                  PROJECT, 
                                  view_id = view_id)
    es_view["mapping"] = get_es_view_mappings(dimension)
    es_view["resourceTag"] = new_tag
    nxs.views.update_es(es_view)
except nxs.HTTPError as e:
    print(e)

# Update ElasticSearchViewStatistics resources (non-boosted)

- TODO: check that there is only one resource returned

In [None]:
stats_resource = forge.search({
    "type": "ElasticSearchViewStatistics",
    "derivation": {
        "entity": {
            "id": es_view["@id"]
        }
    },
    "boosted": False
})[0]

In [None]:
set_elastic_view(forge, es_view["@id"])
vectors = get_all_vectors(forge, LIMIT)
values, stats = get_view_stats(forge, vectors, formula, vector_parameter, LIMIT)
updated_stats_resource = update_stats(forge, stats_resource, values.shape[0], stats, formula, vector_parameter, new_tag)

# Update SimilarityBoostingData and ElasticSearchViewStatistics (boosted)

In [None]:
boosted_stats_resource = forge.search({
    "type": "ElasticSearchViewStatistics",
    "derivation": {
        "entity": {
            "id": es_view["@id"]
        }
    },
    "boosted": True
})[0]

In [None]:
boosting_resource = forge.search({
    "type": "SimilarityBoostingSeries",
    "derivation": {
        "entity": {
            "id": es_view["@id"]
        }
    }
})[0]

In [None]:
set_elastic_view(forge, es_view["@id"])
updated_deviations = dict()
vectors = get_all_vectors(forge, LIMIT)
for point_id, vector in vectors.items():
    updated_deviations[point_id] = get_score_deviation(
        forge, point_id, vector, 10, formula, vector_parameter)

In [None]:
updated_boosting_data_resource = update_boosting_data(
        forge, boosting_resource, es_view["@id"], updated_deviations, formula, vector_parameter, new_tag)

In [None]:
boosted_global_stats, stats = get_view_stats(
        forge, vectors, formula, vector_parameter, LIMIT, updated_deviations)
updated_boosted_stats_resource = update_stats(forge, boosted_stats_resource, LIMIT, stats, formula, vector_parameter, new_tag)

# TO BE REMOVED

url encoding a string

In [None]:
import urllib.parse

In [None]:
es_id = "https://bluebrain.github.io/nexus/vocabulary/defaultElasticSearchIndex"

In [None]:
es_encoded = urllib.parse.quote(es_id, safe='')