# Step 3: build OCS data views, map metadata to streams 

#### This notebook synchronize OCS with the graph, meaning tags and metadata are added to target OCS streams and associated Data Views are created 

#### All Data Views share the same structure. The information needed to create one are:

* Database name (asset_db)
* List of Asset ID 
* OCS tag 

Data View sample below with: 

* `asset_db:brewey`
* `asset_id:FV31`
* `hub__all_columns` as tag 

In [None]:
import asyncio
from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport
import httpx
import json
import time
import urllib3
from ocs_academic_hub import HubClient
import yaml

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

In [None]:
# config_file = "config-windfarm.yaml"  # "config-acad-prod-desc-v2.yaml"
config_file = "config-prod-ucd-v2.yaml"

In [None]:
with open(config_file) as f:
    config = yaml.safe_load(f)
# config

In [None]:
%env OCS_HUB_CONFIG=config-dv.ini
hub = HubClient()

In [None]:
namespace_id = config["ocs"]["configuration"]["namespace"]  # "academic_hub_01"
asset_db = config["ocs"]["configuration"]["asset_db"]  # "deschutes"

tag_prefix = "hub__"
timeout = 45.0

streams = hub.Streams.getStreams(namespace_id, query="name:*", count=20000)
len(streams), streams[0].Id, streams[0].Name

In [None]:
name2id = {s.Name: s.Id for s in streams}
len(name2id)

In [None]:
async def request(method, url, params=None, data=None, headers=None, **kwargs):
    if not headers:
        headers = hub._OCSClient__baseClient.sdsHeaders()

    async with httpx.AsyncClient() as client:
        r = await client.request(
            method,
            url,
            params=params,
            data=data,
            headers=headers,
            timeout=timeout,
            **kwargs
        )
    return r


async def update_tags(namespace_id, stream_name, new_tags, hub_clean=False):
    if namespace_id is None:
        raise TypeError

    try:
        streamId = name2id[stream_name]
    except KeyError:
        return
    response = await request(
        "get",
        hub._OCSClient__Streams._Streams__streamsPath.format(
            tenant_id=hub.tenant, namespace_id=namespace_id, stream_id=streamId
        )
        + "/Tags",
    )

    current_tags = json.loads(response.text)
    if hub_clean:
        tags = [tag for tag in current_tags if "hub__" not in tag]
    else:
        tags = list(set(current_tags + [tag_prefix + i for i in new_tags]))

    response = await request(
        "put",
        hub._OCSClient__Streams._Streams__streamsPath.format(
            tenant_id=hub.tenant, namespace_id=namespace_id, stream_id=streamId
        )
        + "/Tags",
        data=json.dumps(tags),
    )
    # print(f"[{stream_name}]-tags={tags}")
    return tags

In [None]:
async def update_metadata(namespace_id, stream_name, new_meta):
    if namespace_id is None:
        raise TypeError

    try:
        streamId = name2id[stream_name]
    except KeyError:
        return None
    response = await request(
        "get",
        hub._OCSClient__Streams._Streams__streamsPath.format(
            tenant_id=hub.tenant, namespace_id=namespace_id, stream_id=streamId
        )
        + "/Metadata",
    )

    metadata = json.loads(response.text)
    metadata.update(new_meta)
    response = await request(
        "put",
        hub._OCSClient__Streams._Streams__streamsPath.format(
            tenant_id=hub.tenant, namespace_id=namespace_id, stream_id=streamId
        )
        + "/Metadata",
        data=json.dumps(metadata),
    )
    return metadata

In [None]:
async def update_stream(stream_info):
    m = await update_metadata(
        namespace_id,
        stream_info["stream_name"],
        {
            "asset_db": stream_info["asset_db"],
            "asset_id": stream_info["asset_id"],
            "column_name": stream_info["name"],
        },
    )
    dv = stream_info["dataviews"]
    new_tags = [i["ocs_tag"] for i in dv]
    t = await update_tags(namespace_id, stream_info["stream_name"], new_tags)
    if m and t:
        return True

In [None]:
async def update_streams(streams_info):
    for stream_info in streams_info:
        # print(f"-[{stream_info['stream_name']}]- ", end="")
        print(f"+", end="")
        r = await update_stream(stream_info)
        if not r:
            print(f"@error({stream_info['stream_name']})")

## Input Parameters

In [None]:
sample_transport = RequestsHTTPTransport(
    url=config["graphql"]["endpoint"], verify=False, retries=3
)
client = Client(transport=sample_transport, fetch_schema_from_transport=True)

In [None]:
streams_query = gql(
    """
query PIPoint_tags($asset_db: String) {
  PIPoint(asset_db: $asset_db) {
    asset_db
    asset_id
    name
    stream_name
    dataviews {
      name
      ocs_tag
      asset_id
    }
  }
}
"""
)
streams = client.execute(streams_query, variable_values={"asset_db": config["ocs"]["configuration"]["asset_db"]})
print(streams["PIPoint"][0])
# print(json.dumps(dataviews, indent=4))

In [None]:
r = await update_stream(streams["PIPoint"][0])

In [None]:
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i : i + n]


async def gather_func(f, items, ndiv):
    div = (len(items) // ndiv) + 1
    chunk_list = list(chunks(items, div))
    coroutines = [f(chunk_list[i]) for i in range(0, ndiv)]
    start_time = time.perf_counter()
    print("-OK-") if await asyncio.gather(*coroutines) else print("@oops")
    print(f"> runtime {time.perf_counter() - start_time:.2f} secs")

In [None]:
if config["ocs"]["configuration"].get("update_streams", True):
    await gather_func(update_streams, streams["PIPoint"], 4)

In [None]:
dataview_query = gql(
    """
query DataViews($asset_db: String) {
  DataView(ocs_sync: false, asset_db: $asset_db, orderBy: id_asc) {
    name
    id
    asset_db
    asset_id
    ocs_tag
    description
    has_stream {
      name
      stream_name
    }
  }
}
"""
)
dataviews = client.execute(
    dataview_query,
    variable_values={"asset_db": config["ocs"]["configuration"]["asset_db"]},
)
# print(dataviews["DataView"][0])
print(json.dumps(dataviews, indent=4))

In [None]:
def dv_header(asset_id, dv_name, dv_id, description):
    return {
        "Id": dv_id,
        "Name": dv_id,
        "Description": description,
        "IndexField": {"Source": "NotApplicable", "Keys": [], "Label": "Timestamp"},
    }


def dv_query(asset_db, asset_id, tag, asset_type, value_addition=""):
    if len(asset_id) == 1:
        asset_clause = f'asset_id:"{asset_id[0]}"'
    else:
        asset_clause = (
            "(" + " OR ".join([f'asset_id:"{asset}"' for asset in asset_id]) + ")"
        )
    return {
        "Id": f"Asset_{asset_type}",
        "Kind": "Stream",
        "Value": f'asset_db:"{asset_db}" AND {asset_clause} AND {tag}{value_addition}',
    }


def dv_datafield(asset_type, key, label_suffix=""):
    return {
        "QueryId": f"Asset_{asset_type}",
        "DataFields": [
            {
                "Source": "PropertyId",
                "Keys": [key],
                "Label": f"{{IdentifyingValue}}{label_suffix}",
            }
        ],
        "IdentifyingField": {
            "Source": "Metadata",
            "Keys": ["column_name"],
            "Label": "{IdentifyingValue} {FirstKey}",
        },
    }


def dv_footer():
    return {
        "GroupingFields": [
            {
                "Source": "Metadata",
                "Keys": ["Asset_Id"],
                "Label": "{IdentifyingValue} {FirstKey}",
            }
        ],
        "DefaultStartIndex": "2017-02-07T00:00",
        "DefaultEndIndex": "2017-02-27T00:00",
        "DefaultInterval": "00:05:00",
        "IndexTypeCode": "DateTime",
        "Shape": "Standard",
    }

In [None]:
def build_dv(asset_id, tag, dv_id, dv_name, description):
    tag = "hub__" + tag
    dvh = dv_header(asset_id, dv_name, dv_id, description)
    dvq = {
        "Queries": [
            dv_query(asset_db, asset_id, tag, "value"),
            dv_query(asset_db, asset_id, tag, "digital", " AND TypeId:PI-Digital"),
        ]
    }
    dvdf = {
        "DataFieldSets": [
            dv_datafield("value", "Value"),
            dv_datafield("digital", "DigitalStateName", "__ds"),
        ]
    }
    dvf = dv_footer()

    dv = {**dvh, **dvq, **dvdf, **dvf}

    return dv

In [None]:
dv = build_dv(["FV31"], "all_columns", "test-dv", "Default", "Test Description")
print(json.dumps(dv, indent=2))

In [None]:
from ocs_sample_library_preview import DataView

In [None]:
dataview_mutation = gql(
    """
mutation SyncDV($id: ID!) {
  MergeDataView(id: $id, ocs_sync: true) {
    id
    ocs_sync
  }
}
"""
)


def sync_dataview(dv_def):
    dv = build_dv(
        dv_def["asset_id"],
        dv_def["ocs_tag"],
        dv_def["id"],
        dv_def["name"],
        dv_def["description"],
    )
    # print(json.dumps(dv, indent=2))
    dataview = DataView.fromDictionary(dv)
    hub.DataViews.putDataView(namespace_id, dataview)
    result = client.execute(dataview_mutation, variable_values={"id": dv_def["id"]})
    print(f"[{result}]")

In [None]:
for dv_def in dataviews["DataView"]:
    sync_dataview(dv_def)