# Build an event-driven knowledge graph with machine-learning

This notebook demonstrates how to build an event-driven knowledge graph with machine-learning using Amazon SageMaker 
and Amazon Neptune.

## Contents
1. [Setup](#Setup)
1. [Deploying models from the Registry](#Deploying-models-from-the-Registry)
1. [Using the event-driven knowledge graph](#Using-the-event-driven-knowledge-graph)
1. [Cleanup](#Cleanup)


## Setup

### Setup Dependencies

In [None]:
%pip install --upgrade sagemaker
%pip install python-magic
%pip install Pillow

In [None]:
import os
import json
import boto3
import sagemaker
from utils import register_jumpstart_model
import magic
from PIL import Image
from io import BytesIO
import base64
import hashlib
import uuid
import math

### Configuration Setup


In [None]:
# Create the SageMaker Session
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
region = sagemaker_session.boto_region_name
boto_session = boto3.Session(region_name=region)
sagemaker_client = sagemaker_session.sagemaker_client
default_bucket = sagemaker_session.default_bucket()

In [None]:
STREAM_NAME = "bus-ekg"

## Registering Models

We'll now register models from SageMaker Jumpstart with the SageMaker Model Registry

Because the SageMaker Model registry does not currently support the "model data source" convention used by SageMaker Jumpstart we'll need to package our models before we can register them. 

Let's look at the method `register_jumpstart_model` from `utils.py` that we'll use to do it.

In [None]:
register_jumpstart_model??

In [None]:
# NER Model
model_id = "huggingface-ner-distilbert-base-cased-finetuned-conll03-english"
model_version = "2.0.0"
model_package_group_name = "text-ner"

# create the model package group
response = sagemaker_session.sagemaker_client.create_model_package_group(
    ModelPackageGroupName = model_package_group_name
)

#register the model
model_package_arn = register_jumpstart_model(model_package_group_name,model_id,model_version)

ner = model_package_arn
print(model_package_arn)

In [None]:
# Object Detection

model_id = "pytorch-od1-fasterrcnn-resnet50-fpn"
model_version = "2.0.0"
model_package_group_name = "image-od"

# create the model package group
response = sagemaker_session.sagemaker_client.create_model_package_group(
    ModelPackageGroupName = model_package_group_name
)

#register the model
model_package_arn = register_jumpstart_model(model_package_group_name,model_id,model_version)

od = model_package_arn
print(model_package_arn)

In [None]:
# Image Vector Embeddings
model_id = "tensorflow-icembedding-tf2-preview-inception-v3-featurevector-4"
model_version = "3.0.0"
model_package_group_name = "image-vector"


# create the model package group
response = sagemaker_session.sagemaker_client.create_model_package_group(
    ModelPackageGroupName = model_package_group_name
)

#register the model
model_package_arn = register_jumpstart_model(model_package_group_name,model_id,model_version)

icembedding = model_package_arn
print(model_package_arn)

## Deploying models from the Registry

Now we need to deploy the models we've registered

In [None]:
# Deploy NER model
model_package_arn = ner

model = sagemaker.ModelPackage(
    role=role, 
    model_package_arn=model_package_arn, 
    sagemaker_session=sagemaker_session
)

# get default instance type
response = sagemaker_client.describe_model_package(
    ModelPackageName = model_package_arn
)
instances = response["InferenceSpecification"]["SupportedRealtimeInferenceInstanceTypes"]
instance = instances[0]

# deploy
ner_endpoint = model.deploy(endpoint_name="text-ner",initial_instance_count=1, instance_type='ml.g4dn.xlarge',wait=False)

In [None]:
# Deploy Object Detection model
model_package_arn = od

model = sagemaker.ModelPackage(
    role=role, 
    model_package_arn=model_package_arn, 
    sagemaker_session=sagemaker_session
)

# get default instance type
response = sagemaker_client.describe_model_package(
    ModelPackageName = model_package_arn
)
instances = response["InferenceSpecification"]["SupportedRealtimeInferenceInstanceTypes"]
instance = instances[0]

# deploy
od_endpoint = model.deploy(endpoint_name="image-od",initial_instance_count=1, instance_type=instance,wait=False)

In [None]:
# Deploy Image embedding model
model_package_arn = icembedding

model = sagemaker.ModelPackage(
    role=role, 
    model_package_arn=model_package_arn, 
    sagemaker_session=sagemaker_session
)

# get default instance type
response = sagemaker_client.describe_model_package(
    ModelPackageName = model_package_arn
)
instances = response["InferenceSpecification"]["SupportedRealtimeInferenceInstanceTypes"]
instance = instances[0]

# deploy
icembedding_endpoint = model.deploy(endpoint_name="image-vector",initial_instance_count=1, instance_type=instance,wait=False)

## Using the event-driven knowledge graph


Our basic schema for an event on the bus  is:

```json
{
  "detail": {
      "data": {
          "..."
      }
      "metadata": {
  }
  },
  "detail-type": "eventCreated",
  "source": "some.event.source"
}
```

Where:
* `source`: source of the event
* `detail-type`: event detail in the past tense, e.g. "ChannelCreated".
* `detail`: contains further information about the event.
  * `detail.metadata`: Contains even metadata fields for subscribers to use. Best practice is to include a field here to support idempotency operations, e.g. `detail.metadata.id` .
  * `detail.data`: fields containing the data of an event (e.g. a payload)
  
See [Building next-generation applications with event-driven architecture](https://www.youtube.com/watch?v=KXR17uwLEC8) for best-practices for events.

In [None]:
kinesis = boto_session.client("kinesis")

def encode_record(record):
    data = json.dumps(record)
    data = data.encode("utf-8")
    return dict(
        Data=data,
        PartitionKey=str(uuid.uuid4())
    )


def put_batch(events):
    if len(events) > 0:
        response = kinesis.put_records(
            StreamName=STREAM_NAME,
            Records=[
                encode_record(record)
                for record in events
            ]
        )

### Using the media stack

The media stack has consumers listening for text and images. We set the event `source` field to the mime content type of the source event:
* Object Detection Model: with `source` -> `content.image.jpeg`
* Image Vector Model: with `source` -> `content.image.*`
* Named Entity Recognition Model: with `source` -> `content.text.*`


In [None]:
def get_md5(data):
    return hashlib.md5(data).hexdigest()


def get_content_label(data):
    mime = magic.from_buffer(data[:100], mime=True)
    mime = mime.replace("/", ".")
    return f"content.{mime}"


#### Ingesting Text

In [None]:
%%gremlin

g.V().count()

In [None]:
data = b"My name is Wolfgang and I live in Berlin"
event = {
  "detail": {
    "data": {
      "payload": data.decode("utf-8")
    },
    "metadata": {
      "~id": get_md5(data)
    }
  },
  "detail-type": "textExtracted",
  "source": get_content_label(data)
}

event

In [None]:
put_batch([event])

We can now query the graph to view the results:

In [None]:
%%gremlin -p v,outE,inV

g.V().outE().inV().path().by(valueMap(true))

#### Ingesting Images

Next we'll ingest an image

In [None]:
bucket_for_jumpstart_assets = f"jumpstart-cache-prod-{boto_session.region_name}"

data = sagemaker_session.s3_client.get_object(Bucket=bucket_for_jumpstart_assets,Key="inference-notebook-assets/Naxos_Taverna.jpg")["Body"].read()

Let's have a look at the image:

In [None]:
Image.open(BytesIO(data))

In [None]:
def resize(data, size):
    im = Image.open(BytesIO(data))
    if type(size) == float:
        w1,h1 = math.floor(im.width * size),math.floor(im.height * size)
        im = im.resize((w1,h1))
    elif type(size) == tuple:
        im = im.resize(size)
    fp = BytesIO()
    im.save(fp, format='jpeg')
    data = fp.getvalue()
    return data

def encode_data(data):
    data = base64.b64encode(data).decode("utf-8")
    return data

In [None]:
# We'll need to resize this image to keep it under the limit for event sizes for StepFunctions (256Kb)
data_resized = resize(data,0.7)
# We base64 encode the image so we can 
payload = encode_data(data_resized)


event = {
  "detail": {
    "data": {
      "payload": payload
    },
    "metadata": {
      "~id": get_md5(data),
      "uri": f"s3://{bucket_for_jumpstart_assets}/inference-notebook-assets/Naxos_Taverna.jpg",
    }
  },
  "detail-type": "imageCreated",
  "source": get_content_label(data)
}

put_batch([event])

Now we can query the graph:

In [None]:
%%gremlin -p v,outE,inV

g.V().outE().inV().path().by(valueMap(true))

## Cleanup

In [None]:
# drop the data from the graph

In [None]:
%%gremlin

g.V().drop()

In [None]:
# delete endpoints and endpoint configuration

for name in [
    "text-ner",
    "image-od",
    "image-vector"
    ]:
    # delete endpoints
    sagemaker_client.delete_endpoint(
      EndpointName=name
    )
    # delete endpoint config
    sagemaker_client.delete_endpoint_config(EndpointConfigName = name)