# Entity Matcher in production - contextualize time series to assets using Cognite Functions

For every new customer, we ingest their data and perform some sort of contextualization by mapping time series to assets. Quite often, there is not an identical match between a field on the time series to e.g. the asset name, but a somewhat similar name. The most typical example we use in Cognite is the asset `21PT1019` and the time series `IAA_21PT1019.PV` or similar. In our contextualization toolbox (an [API](https://docs.cognite.com/api/playground/#operation/entityMatchingFit) with a corresponding [https://cognite-sdk-experimental.readthedocs-hosted.com/en/latest/cognite.html#contextualization](SDK)), we have a tool to solve this problem called entity matching. Entity matching means joining two datasets one a common key with fuzzyness, and our implementation is a machine learning model that can be trained supervised or unsupervised. 

In this tutorial, we'll take the time series and assets from the `publicdata` tenant, and deploy a Cognite Function that performs entity matching to map the time series to assets, and schedule it so it runs periodically. This can be used out of the box for many customers as an initial contextualization step. 

In [3]:
from getpass import getpass
from cognite.experimental import CogniteClient

In [10]:
api_key = getpass()
client = CogniteClient(
    api_key=api_key,
    project="functions-tutorial",
    client_name="DSHub",
    base_url="https://greenfield.cognitedata.com"
)

 ················································


In [46]:
def handle(client, daanta):
    # When deploying a function from a notebook like this, all imports must be performed inside the `handle` function.
    from cognite.experimental import CogniteClient
    from cognite.client.data_classes import TimeSeriesUpdate
    import time
    
    # The entity matcher suggests matches with a certain score. To achieve a reasonable result, this score must be adjusted. 
    # The default value of 0.75 has been chosen by inspecting the outcome of this function, and may be different on data from other customers.
    good_match_threshold = data.get("good_match_threshold", 0.75)
    
    # Create experimental SDK client as the contextualization API's are in playground and are thus not available in the regular SDK.
    client = CogniteClient(api_key = client.config.api_key,base_url = client.config.base_url, project = client.config.project)

    # Download all assets and time series, using 5 requests in parallel
    assets = client.assets.list(limit=-1, partitions=5)
    time_series = client.time_series.list(limit=-1, partitions=5)
    
    # Create simplified objects with only name and id
    assets_simplified = [{"id": asset.id, "name": asset.name} for asset in assets]
    time_series_simplified = [{"id": ts.id, "name": ts.name} for ts in time_series]

    # Train the ML Entity Matcher on the data. The SDK expects as input the array of objects you match FROM (time series) and a list of what you match TO (assets)
    t0 = time.time()
    model = client.entity_matching.fit_ml(match_from = time_series_simplified, match_to = assets_simplified)
    print(f"Training entity matcher model with id {model.model_id} ...")
    model.wait_for_completion()
    t1 = time.time()
    print(f"Model {model.model_id} trained on {len(assets_simplified)} assets and {len(time_series_simplified)} events using {t1-t0} seconds")

    # Use the ML Entity Matcher model to match the data. This model can be reused, so training is not necessary each time, but we do it for simplicity in this example.
    t0 = time.time()
    job = model.predict_ml(time_series_simplified)
    result = job.result # This will wait for completion
    t1 = time.time()
    print(f"Predict finished after {t1-t0} seconds on {len(time_series_simplified)} time series.")
    
    # Filter out the best matches with the threshold specified in the input
    good_match_count = 0
    time_series_updates = []
    for item in result["items"]:
        match_from = item["matchFrom"] # Time series
        matches = item["matches"] # Suggested asset matches for the time series
        good_matches = [match for match in matches if match["score"] >= good_match_threshold]
        if len(good_matches) > 0:
            good_match_count += 1
            best_match = good_matches[0]
            time_series_updates.append(TimeSeriesUpdate(id=match_from["id"]).asset_id.set(best_match["matchTo"]["id"]))
    
    client.time_series.update(time_series_updates) # uncomment to actually update the asset_id field
    print(f"Matched {good_match_count} time series to assets")
    return {
        "matches": good_match_count
    }


Test the function by running it locally. We send in the `CogniteClient` and an empty dictionary.

In [36]:
result = handle(client, {})

Training entity matcher model with id 3437015128748459 ...
Model 3437015128748459 trained on 1106 assets and 363 events using 2.2561750411987305 seconds
Predict finished after 2.529940366744995 seconds on 363 time series.
Matched 270 time series to assets


Now that we have verified that it works, we can create a Cognite Function that performs this on demand or by a schedule.

In [38]:
name = "Anders"
external_id = f"{name} entity matcher"
function = client.functions.create(name = external_id, description = "Entity matcher example", external_id=external_id, api_key = api_key, function_handle = handle)

Unnamed: 0,value
id,1466241980543512
name,Anders entity matcher
external_id,Anders entity matcher
description,Entity matcher example
owner,
status,Queued
file_id,114596068597375
created_time,1591630621919
api_key,***
secrets,{}


In [39]:
# Run this until it shows status ready
function = client.functions.retrieve(external_id = external_id)
function

Unnamed: 0,value
id,1466241980543512
name,Anders entity matcher
external_id,Anders entity matcher
description,Entity matcher example
owner,
status,Ready
file_id,114596068597375
created_time,1591630621919
api_key,***
secrets,{}


In [43]:
function_call = function.call() # This will wait for the function call to complete.

In [44]:
function_call.get_logs()

Unnamed: 0,timestamp,message
0,1591631396957,Training entity matcher model with id 18085672...
1,1591631397998,Model 1808567244913639 trained on 1106 assets ...
2,1591631400477,Predict finished after 2.479564666748047 secon...
3,1591631400930,Matched 270 time series to assets


In [45]:
function_call.get_response()

Unnamed: 0,value
call_id,3647736147294311
function_id,1466241980543512
response,{'matches': 270}


In [47]:
client.functions.schedules.create(name = f"{name} entity matcher", function_external_id=external_id, cron_expression = "* * * * *")

Unnamed: 0,value
id,8454631847181861
name,Anders entity matcher
function_external_id,Anders entity matcher
description,
cron_expression,* * * * *
created_time,1591631571174
