# Push data from **Cognite Data Fusion** to a **Power BI** `push` or `streaming` dataset
On this notebook, we will be pushing data from **Cognite Data Fusion** to a **Power BI** `push` or `streaming` dataset. On this example a new `push` dataset was created in **Power BI** with the following schema:
```json
[
    {
        "timestamp" :"2023-11-07T18:46:22.462Z",
        "value" :98.6,
        "unit" :"AAAAA555555",
        "sensor" :"AAAAA555555",
        "tag" :"AAAAA555555"
    }
]
```

To create a new `push` or `streaming` dataset in **Power BI** follow the steps below:
1. Go to [Power BI](https://app.powerbi.com/) and login with your credentials
2. Go to your workspace and click on `+ New` and then `Streaming dataset`
3. Select `API` and click on `Next`
4. Give your dataset a name and configure the desired schema

Copy the URL of the dataset as we will be using it later on this notebook.

## Dependencies

To run this notebook you will need to install the following dependencies:
- [Cognite Python SDK](https://pypi.org/project/cognite-sdk/)
- [dotenv](https://pypi.org/project/python-dotenv/)

In [6]:
import os
from cognite.client import CogniteClient, ClientConfig
from cognite.client.credentials import OAuthClientCredentials
from dotenv import load_dotenv
from cognite.client.data_classes import DataPointSubscriptionCreate
from cognite.client.data_classes import filters as f
from cognite.client.data_classes import ClientCredentials
from cognite.client.data_classes.datapoints_subscriptions import DatapointSubscriptionFilterProperties as p

## Credentials
To authenticate to Cognite Data Fusion you will need credentials. This notebook expects the credentials to be stored in an `.env` file in the same directory as the notebook. The `.env` file should look like this:
```bash
CLIENT_ID=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
TENANT_ID=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
CDF_CLUSTER=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
CDF_PROJECT=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
CLIENT_SECRET=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
```
Replace the `x` with your own credentials. In this case, I'm using client credentials flow, but you can use other flows as well.

In [None]:
load_dotenv()

client_id = os.environ["CLIENT_ID"]
client_secret = os.environ["CLIENT_SECRET"]
tenant_id = os.environ["TENANT_ID"]
cluster = os.environ["CDF_CLUSTER"]
project = os.environ["CDF_PROJECT"]

scopes = [f"https://{cluster}.cognitedata.com/.default"]
base_url = f"https://{cluster}.cognitedata.com"
authority_uri = f"https://login.microsoftonline.com/{tenant_id}"

creds = OAuthClientCredentials(
    token_url=f"{authority_uri}/oauth2/v2.0/token",
    scopes=scopes,
    client_id=client_id,
    client_secret=client_secret,
)

cnf = ClientConfig(
    client_name="powerbi-tutorial",
    base_url=base_url,
    project=project,
    credentials=creds,
)

client = CogniteClient(config=cnf)

## Create Subscription
We'll start by creating a subscription to time series. This is a new feature of Cognite Data Fusion that allows you to subscribe to changes in the time series. You can read more about it [here](https://api-docs.cognite.com/20230101-beta/tag/Data-point-subscriptions).

In this example, I'm subscribing to all time series that have a prefix of "EVE".

In [None]:
my_filter = f.Prefix(
    property=p.external_id,
    value="EVE"
)
sub = DataPointSubscriptionCreate(
    external_id="powerbi-subscription-test",
    name="PowerBI Subscription Test",
    description="Subscription for PowerBI",
    partition_count=1,
    filter=my_filter
)
client.time_series.subscriptions.create(sub)

## Create Function `handle`
Here we'll create a function called `handle` where the logic that fetches updates from CDF and pushes it to Power BI will be implemented. To manage the state of the subscription cursor we'll be using a string time series.

The logic of the function is as follows:
1. Fetch the latest cursor from the time series (if the time series doesn't exist, create it)
2. Fetch the time series included in the subscription
3. Fetch the metadata of the time series
4. Fetch the updates from the subscription
5. Check if there are any updates (if there are no updates, we can skip the rest of the logic)
6. Create the payload to be pushed to Power BI (conforming to the schema of the dataset)
7. Push the payload to Power BI (if the push succeeds, insert the new cursor in the time series)

In [None]:
def handle(client, data, secrets):
    import requests
    import json
    from datetime import datetime, timezone
    from cognite.client.data_classes import TimeSeries
    from cognite.client.utils import ms_to_datetime, datetime_to_ms

    SUBSCRIPTION_EXTERNAL_ID = "powerbi-subscription-test"
    STATE_EXTERNAL_ID = "powerbi-subscription-test-state"

    # 1) Get latest cursor from state store
    state_ts = client.time_series.retrieve(external_id=STATE_EXTERNAL_ID)
    if state_ts is None:
        # TimeSeries does not exist, so we create it
        client.time_series.create(
            TimeSeries(
                name="PowerBI Subscription Test State",
                description="State store for PowerBI subscription",
                external_id=STATE_EXTERNAL_ID,
                is_string=True,
            )
        )
        print("Time Series created to store state")
        # cursor is None, so we fetch all data points
        cursor = None
    else:
        # Fetch the latest cursor from the state store
        cursor_dp = client.time_series.data.retrieve_latest(
            external_id=STATE_EXTERNAL_ID
        )
        if len(cursor_dp.value) == 0:
            # cursor is None, so we fetch all data points
            print("No cursor found in state store")
            cursor = None
        else:
            cursor = cursor_dp.value[0]

    # 2) Fetch the time series included in the subscription
    ts_items = client.time_series.subscriptions.list_member_time_series(
        external_id=SUBSCRIPTION_EXTERNAL_ID, limit=None
    )

    # 3) Fetch the time series metadata
    ts_list = client.time_series.retrieve_multiple(ids=[ts.id for ts in ts_items])

    # 4) Fetch the updates from the subscription
    def fetch_data_points_updates(external_id, cursor=None):
        update_data = []
        for batch in client.time_series.subscriptions.iterate_data(
            external_id=external_id, cursor=cursor
        ):
            update_data = update_data + batch.updates
            cursor = batch.cursor
            if not batch.has_next:
                break
        return update_data, cursor

    update_data, cursor = fetch_data_points_updates(
        external_id=SUBSCRIPTION_EXTERNAL_ID, cursor=cursor
    )

    # 5) We can stop here if there are no updates
    # We don't need to update the cursor in the state store
    if len(update_data) == 0:
        print("No updates found")
        return None

    # 6) Create the payload for PowerBI
    def create_powerbi_payload(update_data, ts_list):
        # Convert ms since epoch to ISO 8601 format (used by Power BI)
        def convert_ms_to_iso(ms_since_epoch):
            dt_object = ms_to_datetime(ms_since_epoch)
            return dt_object.isoformat(timespec="milliseconds").replace("+00:00", "Z")

        data = []
        for item in update_data:
            # we are only interested in upserts
            upsert = item.upserts
            # find the time series metadata
            ts = next((ts for ts in ts_list if ts.id == upsert.id), None)
            data.append(
                {
                    "timestamp": convert_ms_to_iso(upsert.timestamp[0]),
                    "value": upsert.value[0],
                    "unit": ts.unit,
                    "sensor": ts.name,
                    "tag": ts.external_id,
                }
            )
        return data

    data = create_powerbi_payload(update_data, ts_list)

    # 7) Push the data to PowerBI URL from secrets
    url = secrets.get("url")

    headers = {"Content-Type": "application/json"}
    response = requests.request(
        method="POST", url=url, headers=headers, data=json.dumps(data)
    )
    if response.status_code == 200:
        print("Data pushed successfully to Power BI")
        # Update the cursor in the state store
        now = datetime_to_ms(datetime.now(timezone.utc))
        client.time_series.data.insert(
            external_id=STATE_EXTERNAL_ID,
            datapoints=[{"timestamp": now, "value": cursor}],
        )
    else:
        print("Error pushing data to Power BI")

## Test Function `handle`
Before deploying the function, we can test it locally. Replace the `url` variable with the URL of your dataset.

In [None]:
handle(client, {}, {"url": "https://api.powerbi.com/beta/..."})

## Create Cognite Function
Now that we have tested the function locally, we can deploy it to Cognite Functions. Replace the `url` variable with the URL of your dataset.

In [None]:
function = client.functions.create(
    external_id="push-to-powerbi",
    name="Push to Power BI",
    function_handle=handle,
    secrets={"url": "https://api.powerbi.com/beta/..."},
)
function

## Create Function Schedule
We need to wait a little bit until the function is successfully deployed. Once it is deployed, we can create a schedule for it. This schedule will run the function every 5 minutes.

In [None]:
schedule = client.functions.schedules.create(
    name="Push data to Power BI",
    function_external_id="push-to-powerbi",
    cron_expression="*/5 * * * *",
    client_credentials=ClientCredentials(
        client_id=client_id,
        client_secret=client_secret
    ),
    description="Push data to Power BI every 5 minutes",
    data={},
)
schedule