# Entity Matcher in production - contextualize time series to assets using Cognite Functions

## Table Of Contents

1. [Introduction](#introduction)
2. [Authenticating with CDF](#authentication)
3. [Getting a copy of test-data](#test-data)
4. [Defining and deploying our Cognite Function](#deploy-cog-fun)
5. [Calling our Cognite Function](#calling-func)
6. [Scheduling our Cognite Function](#schedule-cog-fun)

## Introduction <a name="introduction"></a>

After ingesting some assets and time series to CDF, a common use case is to perform some sort of contextualization by mapping time series to assets. Quite often, there is not an identical match between a field on the time series to e.g. the asset name, but a somewhat similar name. The most typical example we use in Cognite is the asset `21PT1019` and the time series `IAA_21PT1019.PV` or similar.

In our contextualization toolbox (an [API](https://docs.cognite.com/api/playground/#operation/entityMatchingFit) with a corresponding  [SDK](https://cognite-sdk-experimental.readthedocs-hosted.com/en/latest/cognite.html#contextualization)), we have a tool to solve this problem called entity matching. Entity matching means joining two datasets on a common key with fuzzyness, and our implementation is a machine learning model that can be trained supervised or unsupervised. 

In this tutorial, we'll take the time series and assets from the `publicdata` tenant, and deploy a Cognite Function that performs entity matching to map the time series to assets, and schedule it to run periodically. This can be used out of the box for many customers as an initial contextualization step.

We will provide you with the code needed to get a copy of the test data in your own tenant.

### Requirements

We will be using the [experimental Cognite SDK](https://cognite-sdk-experimental.readthedocs-hosted.com/en/latest/cognite.html#functions). 
To run this example, you first need to install this package:

```
pip install cognite-sdk-experimental
```

## Authenticating with CDF <a name="authentication"></a>

In order to connect to CDF we will authenticate using Azure AD. There are several ways to do this, all of them outlined in the [Authenticate with Azure AD](https://docs.cognite.com/dev/guides/sdk/python/python_auth_oidc/) documentation. We will in this example use [**client credentials**](https://docs.cognite.com/dev/guides/sdk/python/python_auth_oidc/#authenticate-with-client-secret). There are pros and cons to each authentication method. Here we choose client-credentials due to little additional work needed to get up and running.

This means that we need the following:

1. A `token_client_id` and a `token_client_secret`;
2. a `token_url` (can be inferred from the Azure tenant-ID); and finally,
3. a list of `token_scopes` (can usually be inferred from the CDF cluster). 

In [None]:
from getpass import getpass
from cognite.experimental import CogniteClient

project      = "<CDF project>" # Fill in your project here
cdf_cluster  = "<CDF cluster>" # Fill in the cluster your project is running in (for instance 'api'/'westeurope-1')

tenant_id           = "<Tenant ID>"       # Fill in your Azure AD tenant ID here
token_client_id     = "<Token Client ID>" # Fill in your Client ID here
token_client_secret = getpass("Paste your `token_client_secret` in here: ")

token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
token_scopes = [f"https://{cdf_cluster}.cognitedata.com/.default"]

client = CogniteClient(
    project = project,
    server  = cdf_cluster,
    client_name = "My entity matcher",
    token_url = token_url,
    token_scopes = token_scopes,
    token_client_id = token_client_id,
    token_client_secret = token_client_secret,
)

## Getting a copy of test-data <a name="test-data"></a>

In this contextualization example we need some test data to test the entity matching functionality on. We have therefore provided a `publicdata.json`-file containing an asset hierarchy that will function as our test dataset. In this section, we ingest this data into your tenant, to get ready for the next few sections. Feel free to inspect the `publicdata.json`-file to get a feel for what the assets look like.

In [None]:
import json
from cognite.client.data_classes import Asset, TimeSeries

# We load the json-file into the `assets` and `time_series` lists.
with open("publicdata.json", "r") as f:
    data = json.load(f)
    assets = data["assets"]
    time_series = data["time_series"]
    
    print(f"Found {len(assets)} assets and {len(time_series)} time series")
    
# We then create Asset and TimeSeries objects, which is what the SDK expects.
# We make sure to use id's from existing source as external_id + parent_external_id to preserve asset hierarchy.
assets = [
    Asset(name = asset["name"], description = asset.get("description"), external_id = asset["id"], parent_external_id = asset.get("parent_id"), source="publicdata") 
    for asset in assets
]
time_series = [
    TimeSeries(name = ts["name"], description = ts.get("description"), external_id = ts.get("external_id"), metadata={"source": "publicdata"}) 
    for ts in time_series
]

# Finally, we use the `create_hierarchy` method from the SDK to ingest the whole asset hierarchy at once.
# Similarly, we ingest the time series.
try:
    client.assets.create_hierarchy(assets)
    print(f"Created {len(assets)} assets")
except:
    print("Failed to create assets. Probably already ingested these in a previous run")
    
try: 
    client.time_series.create(time_series)
    print(f"Created {len(time_series)} time series")
except:
    print("Failed to create time series. Probably already ingested these in a previous run")


## Defining, testing, and deploying our Cognite Function <a name="deploy-cog-fun"></a>

We are now ready to create the function that will run entity-matching on our newly ingested data. We define this in the file `handler.py`. Our function relies on the `cognite-sdk-experimental` package, so we need to specify this in a `requirements.txt`-file. We will deploy the function as showcased in `examples/02-creating-a-function-from-a-folder`, by deploying the entire folder.

In brief, our function will perform the following:

1. Since we need an experimental client to get the functionality we want, we must instantiate a new client on the inside of our function (the one we get through the argument `client` in `handle()` is a non-experimental `CogniteClient`). To do this, we use the `token` that is passed through the `secrets` argument.

2. We fetch all the assets and time series we ingested earlier.

3. Our goal is to map time-series to assets. We therefore create an entity matching model with `time_series` as sources, and `assets` as targets. 

4. We train our entity matching model (normally, this is a one-time operation, and the model can be reused. However, for simplicity sake, we train every time we call the function). 

5. We use the trained model to predict the time-series to asset mapping. We filter out items based on a threshold specified in the function `data` argument.

6. Finally, we update the time series according to the predictions, to store the time-series to asset relationship.


It is good practice to test that the function works locally before deploying it to CDF. We do that here, by passing in our client, a data-dictionary with a threshold, and a token in the secrets-field (this token will be populated automatically when the function is deployed). 
Our function accepts a "dry_run" argument which we set to "True" when testing locally, to not do any inadvertent updates on the time series. 

In [None]:
from handler import handle
handle(client, {"good_match_threshold" : 0.75, "dry_run": True}, {"token" : client.config.token})

### Deploying our function

Having verified that the function does what we expect it do - we are ready to deploy it. We do that simply by calling `client.functions.create`, and pass in the `handle`-object.

In [None]:
function = client.functions.create(
    name = "CogFun: Entity Matcher",
    external_id = "entity_matcher_function",
    folder = ".",
    description = "Performs entity matching, mapping time_series to assets"
)

In [None]:
while function.status != "Ready":
    function.update()
    
    if function.status == "Failed":        
        print("Failed to deploy function")
        break
else:
    print("Function is successfully deployed")


## Calling our function <a name="calling-func"></a>

We can now call the function and get it's response and logs. Note how we do not need to supply a client or a token when calling it in CDF. This is supplied automatically. 

In [None]:
data = {"good_match_threshold": 0.75, "dry_run": False}
call = function.call(data)

In [None]:
print(
f"""
Function returned with:

Logs: {call.get_logs()}
Response: {call.get_response()}
"""
)

## Scheduling our function <a name="schedule-cog-fun"></a>

Consider the case we might want to run this entity matching job on a regular schedule to automatically match newly ingested time series to assets. We can do this by creating a Cognite Function Schedule.
In order to do this, we need to pass in a set of client credentials (`client_id` and `client_secret`). In this example, for simplicity we will use the same client credentials we used to instantiate our client with (`token_client_id` and `token_client_secret` from above). However, for production use cases, you want to have a dedicated set of client credentials for your schedule.

We create the function schedule by using the `client.functions.schedule.create`-method in the SDK. We use the crontab-format for specifying when the schedule should trigger a function call. If you have no prior experience with the crontab-format, see https://crontab.guru/ for an introduction. 

In this example, we want our schedule to run every minute. This corresponds to the cron expression `"* * * * *"`.

In [None]:
client_credentials = {'client_id': token_client_id, 'client_secret' : token_client_secret}
schedule = client.functions.schedules.create(
    name = "run entity matching every minute",
    cron_expression = "* * * * *",             # the cron expression runs every minutes
    function_id = function.id,                 # we specify the ID of the function we want to schedule
    client_credentials = client_credentials,   # this is a dictionary with 'client_secret' and 'client_id'
    data = {                                   # this is the data we wish to call the function with
        "good_match_threshold":0.75,
        "dry_run": False
    },
    description = "Perform entity matching mapping time_series to assets every minute"
)

In the Fusion UI you can now see a schedule has been created for the function in question. After a few minutes you should also see that the function is being automatically called. 

## Cleanup

In order to clean up after this example notebook, we can delete the function and the associated schedule that we created.

In [None]:
client.functions.delete(id=function.id) # This will also delete the function schedule tied to this function.