## Read the Cognite Learn content before running code examples.

## 1. Environment set up

### Install the Cognite SDK package

In [None]:
%pip install "cognite-sdk[pandas]"

#### Install the MSAL library




In [None]:
%pip install msal

### Connect to Cognite Data Fusion
Import all the needed libraries and instantiate the Cognite Client.

This client object is how all queries will be sent to the Cognite API to retrieve data.

For successfully retreiving the  Cognite Client instance, you need to authenicate msal on your browser. Also, make sure to enter the client secret you generated from the course lesson. Run the cell bellow, copy the code from output and enter it on the [site](https://microsoft.com/devicelogin) and authorize the Learn-participants application. If everything completes successfully you will get the output with the token description.

In [None]:
from cognite.client import CogniteClient, ClientConfig
from cognite.client.credentials import Token
from cognite.client.data_classes import OidcCredentials

from msal import PublicClientApplication

CLIENT_SECRET = "..." # Enter secret here

# Contact Project Administrator to get these
TENANT_ID = "48d5043c-cf70-4c49-881c-c638f5796997"
CLIENT_ID = "c9874b3f-a32b-4f04-8193-52eacd5fc37d"

CDF_CLUSTER = "api" # api, westeurope-1 etc
COGNITE_PROJECT = "de-transformations"
SCOPES = [f'https://{CDF_CLUSTER}.cognitedata.com/.default']
AUTHORITY_HOST_URI = 'https://login.microsoftonline.com'
AUTHORITY_URI = AUTHORITY_HOST_URI + '/' + TENANT_ID
PORT = 53000
app = PublicClientApplication(client_id=CLIENT_ID, authority=AUTHORITY_URI)

def authenticate_device_code(app):
  # Firstly, check the cache to see if this end user has signed in before
  accounts = app.get_accounts()
  if accounts:
    creds = app.acquire_token_silent(SCOPES, account=accounts[0])
  else:
    device_flow = app.initiate_device_flow(scopes=SCOPES)
    print(device_flow['message']) # print device code to screen
    creds = app.acquire_token_by_device_flow(flow=device_flow)
  return creds

def get_token():
  return Token(authenticate_device_code(app)['access_token'])

cnf = ClientConfig(
  project=COGNITE_PROJECT,
  base_url=f'https://{CDF_CLUSTER}.cognitedata.com',
  client_name='cognite-python-dev',
  credentials=get_token()
) 
client = CogniteClient(cnf)

print(client.iam.token.inspect().projects)

Set the prefix for recources names.

In [None]:
PREFIX = "..." # enter your name and birth year (or something random)
DATASET_NAME = f"{PREFIX}-ds"
DATABASE_NAME = f"{PREFIX}-db"
ASSET_NAME = f"{PREFIX}-root"

### Create credentials for use in Transformations

To run a transformation, we should set up OIDC credentials for the source and destination projects. It could be different CDF projects, but we only have one in our case.

In [None]:
from cognite.client.data_classes import OidcCredentials

creds = OidcCredentials(
        client_id = CLIENT_ID,
        client_secret = CLIENT_SECRET,
        scopes = " ".join(SCOPES),
        token_uri = f"{AUTHORITY_URI}/oauth2/v2.0/token",
        cdf_project_name = COGNITE_PROJECT
)

## 2. Prepare RAW resources

### Data description

We have three tables with the weather data from [The Norwegian Meteorological Institute](https://www.met.no/). You can get the API key on the site and play more with different kinds of meteorologic data from the site. 

sources.csv - the list of [sources](https://frost.met.no/api.html#/sources) (Sensor Systems) with geospatial metadata,

elements.csv - the metadata about the weather and climate [elements](https://frost.met.no/api.html#!/elements/getElements) that are defined for use in the API,

observations.csv - the list of [observations](https://frost.met.no/api.html#!/observations/observations) for the particular sources and limited date range.

All the data from [FrostAPI](https://frost.met.no/index.html) were saved in CSV tables and provided in the the repository, in data folder.

### Prepare database

Create database for RAW tables

In [None]:
client.raw.databases.create(DATABASE_NAME)

List databases and check out our database

In [None]:
databases = client.raw.databases.list(limit=-1)
for db in databases:
  if db.name == DATABASE_NAME:
    print(db)

### Prepare tables

Create tables

In [None]:
client.raw.tables.create(DATABASE_NAME, "sources")
client.raw.tables.create(DATABASE_NAME, "elements")
client.raw.tables.create(DATABASE_NAME, "observations");

List tables, we should see 3 our tables in the database

In [None]:
client.raw.tables.list(db_name=DATABASE_NAME)

### Import rows

Upload CSV files to pandas DataFrames. Before running the cell upload files to the Colab environment or access the files from data folder in the repository. 

In [None]:
import pandas as pd

sources_df = pd.read_csv('data/sources.csv', index_col=0).fillna('')
elements_df = pd.read_csv('data/elements.csv', index_col=0).fillna('')
observations_df = pd.read_csv('data/observations.csv', index_col=0).fillna('')

Check out DataFrames. Run the cells below to display the first five rows in each data frame and check which data you have in each table.

In [None]:
elements_df.head()

In [None]:
observations_df.head()

Insert dataframes into RAW tables

In [None]:
client.raw.rows.insert_dataframe(DATABASE_NAME, "sources", sources_df)
client.raw.rows.insert_dataframe(DATABASE_NAME, "elements", elements_df)
client.raw.rows.insert_dataframe(DATABASE_NAME, "observations", observations_df)

### Create dataset

In [None]:
from cognite.client.data_classes import DataSet
client.data_sets.create(DataSet(external_id=DATASET_NAME, name=DATASET_NAME))

In [None]:
DATASET_ID = client.data_sets.retrieve_multiple(external_ids=[DATASET_NAME])[0].id

### Make a root asset

Organising our assets makes a root asset, which will be a parent for other created assets.

In [None]:
from cognite.client.data_classes import Asset
client.assets.create(Asset(external_id=ASSET_NAME, name=ASSET_NAME, data_set_id=DATASET_ID))

## 3. Transformations

### Create and debug queries

#### Create an asset query


Let's take a look at the sources table and make a query to create assets. To create a transformation, you should set the destination type. Each destination type has requested and optional columns. You can check the schema for a particular destination type by running the following code.

In [None]:
from cognite.client.data_classes import TransformationDestination
client.transformations.schema.retrieve(destination=TransformationDestination.asset_hierarchy())

Considering the required and optional columns, we compose our query.

In [None]:
asset_query = f'''with 
countries as
(select 
  distinct country as name,
  concat('{PREFIX}-', country) as externalId,
  '{ASSET_NAME}' as parentExternalId,
  {DATASET_ID} as dataSetId
from `{DATABASE_NAME}`.sources
where country <> ''),
  
counties as
(select
  county as name,
  concat('{PREFIX}-', 'county-', county) as externalId,
  concat('{PREFIX}-', first(country)) as parentExternalId,
  {DATASET_ID} as dataSetId
from `{DATABASE_NAME}`.sources
where county <> ''
group by county),

municipalities as
(select
  municipality as name,
  concat('{PREFIX}-', 'municipality-', municipality) as externalId,
  (
    case
    	when first(county) = '' then concat('{PREFIX}-', first(country))
    	else concat('{PREFIX}-', 'county-', first(county))
	end
  ) as parentExternalId,
  {DATASET_ID} as dataSetId
from `{DATABASE_NAME}`.sources
where municipality <> ''
group by municipality),

sensors as
(select
  name as name,
  concat('{PREFIX}-', id) as externalId,
  (
    case
    	when municipality <> '' then concat('{PREFIX}-', 'municipality-', municipality)
  		when county <> '' then concat('{PREFIX}-', 'county-', county)
    	else concat('{PREFIX}-', country)
	end
  ) as parentExternalId,
  {DATASET_ID} as dataSetId,
  to_metadata(*) as metadata
from `{DATABASE_NAME}`.sources
)
  
select * from sensors
union all select *, null as metadata from countries
union all select *, null as metadata from counties
union all select *, null as metadata from municipalities'''

Query explanation: 

As you can see in the source table, we have different source attributes, such as country, county, and municipality. Of course, we can model this as a flat structure of Sensor Systems, but it is way more convenient to use hierarchical structures. 

We use the _with_ clause to define different levels of our assets. For example, we select distinct values from the _country_ column to get all the countries. Name and external id are the same in this case and we use the root asset as a parent. 

```
countries as
(select 
  distinct country as name,
  country as externalId,
  '{ASSET_NAME}' as parentExternalId
from `{DATABASE_NAME}`.sources
where country <> '')
```
To get the list of counties, we use the _group by_ statement. We don't use the rows with empty county fields and use the country as a parent asset.

```
counties as
(select
  the county as name,
  concat('county-', county) as externalId,
  first(country) as parentExternalId
from `{DATABASE_NAME}`.sources
where county <> ''
group by county)
```

To get the municipalities we use the _group by_ statement and skip the lines without municipalities. To resolve the ParentExternalId field we use the _case_ statement. If there is a county for that municipality then we use that as a parent otherwise we use the country.

```
municipalities as
(select
  municipality as name,
  concat('municipality-', municipality) as externalId,
  (
    case
    	when first(county) = '' then first(country)
    	else concat('county-', first(county))
	end
  ) as parentExternalId
from `{DATABASE_NAME}`.sources
where municipality <> ''
group by municipality),
```
Each row in the table represents the sensor. So we use the _case_ statement to get the parentExternalId. If the row has a municipality, it is a parent; it could also be a county or country. Also, we use the _to\_metadata_ function to create a column with mappings from all the original columns.
```
sensors as
(select
  name as name,
  id as externalId,
  (
    case
    	when municipality <> '' then concat('municipality-', municipality)
  		when county <> '' then concat('county-', county)
    	else country
	end
  ) as parentExternalId,
  to_metadata(*) as metadata
from `{DATABASE_NAME}`.sources
)
```
In the end, we combine all the assets using the _union all_ statement. And since we only have metadata for sensors, we add the null columns for other levels. 

```
select * from sensors
union all select *, null as metadata from countries
union all select *, null as metadata from counties
union all select *, null as metadata from municipalities
```

To debug the transformation via SDK, you can use the _preview_ method as shown below.

In [None]:
transformation_preview = client.transformations.preview(asset_query)
preview_df = pd.DataFrame(transformation_preview.results)
preview_df.head()

### Create and run asset transformation job

Create the transformation using OIDC credentials defined above and _asset\_query_. The fields _name_ and _external\_id_ are mandatory when you create a transformation.

In [None]:
from cognite.client.data_classes import Transformation, TransformationDestination, OidcCredentials

transformation = Transformation(
    name=f"{PREFIX}-assets",
    external_id=f"{PREFIX}-assets",
    source_oidc_credentials=creds,
    destination_oidc_credentials=creds, 
    destination=TransformationDestination.asset_hierarchy(), 
    conflict_mode="upsert", 
    query=asset_query,
)
client.transformations.create(transformation)

### Retrieve the asset transformation

To retrieve the transformation you can use a several methods, the easiest one, knowing id or external_id is the retrieve method.

It has the following signature:
```
TransformationsAPI.retrieve(id: Optional[int] = None, external_id: Optional[str] = None) → Optional[cognite.client.data_classes.transformations.Transformation]
Retrieve a single transformation by id.

Parameters:	id (int, optional) – ID
Returns:	Requested transformation or None if it does not exist.
Return type:	Optional[Transformation]
```


In [None]:
asset_transformation = client.transformations.retrieve(external_id=f'{PREFIX}-assets')
asset_transformation

If we need to get several transformations then it's better to use retrieve_multiple. 

```
TransformationsAPI.retrieve_multiple(ids: Sequence[int] = None, external_ids: Sequence[str] = None, ignore_unknown_ids: bool = False) → cognite.client.data_classes.transformations.TransformationList
Retrieve multiple transformations.

Parameters:	
ids (List[int]) – List of ids to retrieve.
external_ids (List[str]) – List of external ids to retrieve.
ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.
Returns:	
Requested transformation or None if it does not exist.

Return type:	
TransformationList
```

In [None]:
asset_transformation = client.transformations.retrieve_multiple(external_ids=[f'{PREFIX}-assets'])[0]
asset_transformation

Before the creation of the timeseries and datapoints transformations we should run the asset transformation. You can do it by id or external_id. Also, it's possible to set timeout and the flag if should wait until the transformation ends. The run method signature:
```
TransformationsAPI.run(transformation_id: int = None, transformation_external_id: str = None, wait: bool = True, timeout: Optional[float] = None) → cognite.client.data_classes.transformations.jobs.TransformationJob
Run a transformation.

Parameters:	
transformation_id (int) – Transformation internal id
transformation_external_id (str) – Transformation external id
wait (bool) – Wait until the transformation run is finished. Defaults to True.
timeout (Optional[float]) – maximum time (s) to wait, default is None (infinite time). Once the timeout is reached, it returns with the current status. Won’t have any effect if wait is False.
Returns:	
Created transformation job
```

In [None]:
client.transformations.run(asset_transformation.id, wait=False)

After the running we can list the jobs to check the status, or use the object returning from the run method.

In [None]:
client.transformations.jobs.list(transformation_id=asset_transformation.id)

A transformation object has a last_finished_job attribute which is convenient to check from API to be sure that your transformation was run and completed on time.

In [None]:
# To get info on the latest runs, we fetch it again:
asset_transformation = client.transformations.retrieve(external_id=f'{PREFIX}-assets')
asset_transformation.last_finished_job

### Create a time series query

For the beginning, let's check which fields we need to create a time series. Retrive the schema of TransformationDestination.timeseries().

In [None]:
client.transformations.schema.retrieve(destination=TransformationDestination.timeseries())

Let's create a query taking into account the table data and schema fields.

In [None]:
ts_query = f'''with measurements as
(select
  split(sourceId, ":")[0] as sensor,  
  get_json_object(observations, "$.value") as temp,
  get_json_object(observations, "$.elementId") as measure_type,
  get_json_object(observations, "$.unit") as unit
from `{DATABASE_NAME}`.observations),

ts_data as
(select 
  unit, 
  sensor, 
  measure_type
from measurements
group by sensor, measure_type, unit),

assets as
(
select 
  * 
from 
  _cdf.assets
where
  dataSetId = {DATASET_ID}
)

select
  concat(sensor, '-', measure_type, '-', unit) as name,
  concat('{PREFIX}-', sensor, '-', measure_type, '-', unit) as externalId,
  unit as unit,
  assets.id as assetId,
  {DATASET_ID} as dataSetId
from ts_data
join assets on concat('{PREFIX}-', ts_data.sensor) = assets.externalId'''

Query explanation:
In the _observation_ table we only have 3 columns: sourceID, referenceTime and observations. The _observations_ column contains the JSON with data. The _sourceId_ values have such a template _\<sensor\_id\>:\<number of the sensor\>_.
We use the _with_ clause to create needed subqueries. 
The first one is _measurements_, we split the _sourceId_ values in two parts and only use _sensor\_id_ because we have the same values as external ids in assets. To extract the values from the _observations_ column we use the _get\_json\_object_ function. 
```
measurements as
(select
  split(sourceId, ":")[0] as sensor,  
  get_json_object(observations, "$.value") as temp,
  get_json_object(observations, "$.elementId") as measure_type,
  get_json_object(observations, "$.unit") as unit
from `{DATABASE_NAME}`.observations)
```
To create time series we should aggregate measurements data, we group it by sensor, measure\_type and unit. In our case, it's redundant since we only observe the air temperature in Celsius degrees in the subset of data from MET Norway, but it's good to have this feature for future measurements.
```
ts_data as
(select 
  unit, 
  sensor, 
  measure_type
from measurements
group by sensor, measure_type, unit)
```
We need to have a subquery of assets so we don't create a time series not connected to any asset. We query only assets from our data set.
```
assets as
(
select 
  * 
from 
  _cdf.assets
where
  dataSetId = {DATASET_ID}
)
```
Finally, we format the external id and set the name, unit, assetId and dataSetId to create a dataset.
```
select
  concat(sensor, '-', measure_type, '-', unit) as name,
  concat(sensor, '-', measure_type, '-', unit) as externalId,
  unit as unit,
  assets.id as assetId,
  {DATASET_ID} as dataSetId
from ts_data
join assets on ts_data.sensor = assets.externalId
```

In [None]:
tr = client.transformations.preview(ts_query, source_limit=500)
df = pd.DataFrame(tr.results)
df.head()

### Create and run timeseries transformation job

We create and run the timeseries transformation in the same way we did before with the asset transformation.

In [None]:
from cognite.client.data_classes import Transformation, TransformationDestination, OidcCredentials

transformation = Transformation(
    name=f"{PREFIX}-ts",
    external_id=f"{PREFIX}-ts",
    source_oidc_credentials=creds,
    destination_oidc_credentials=creds, 
    destination=TransformationDestination.timeseries(), 
    conflict_mode="upsert", 
    query=ts_query,
)

client.transformations.create(transformation)

In [None]:
timeseries_transformation = client.transformations.retrieve(external_id=f'{PREFIX}-ts')

In [None]:
client.transformations.run(timeseries_transformation.id)

Then we can check the transformation jobs list to the status of the running the timeseries transformation job.

In [None]:
client.transformations.jobs.list(transformation_id=timeseries_transformation.id)

### Create datapoints transformation query

Let's check out the schema of the datapoints destination. In CDF, you can use numeric and string data points. In our case, we want to have a numeric temperature value.

In [None]:
client.transformations.schema.retrieve(destination=TransformationDestination.datapoints())

The query will be quite basic this time.

In [None]:
query_dp = f'''with measurements as
(select
  split(sourceId, ":")[0] as sensor,  
  cast(get_json_object(observations, "$.value") as double) as temperature,
  get_json_object(observations, "$.elementId") as measure_type,
  get_json_object(observations, "$.unit") as unit,
  to_timestamp(referenceTime) as reference_time
from `{DATABASE_NAME}`.observations)

select
  concat('{PREFIX}-', sensor, '-', measure_type, '-', unit) as externalId,
  reference_time as timestamp,
  temperature as value
from measurements'''

Query explanation: as we already have a measurement table we can use it directly to get the datapoints. But let's use the temporary table for clarity. We extract all the needed data from the JSON column 'observations'.

```
with measurements as
(select
  split(sourceId, ":")[0] as sensor,  
  cast(get_json_object(observations, "$.value") as double) as temperature,
  get_json_object(observations, "$.elementId") as measure_type,
  get_json_object(observations, "$.unit") as unit,
  to_timestamp(referenceTime) as reference_time
from `{DATABASE_NAME}`.observations)
```
And then we format our data to fit the schema requirements.
```
select
  concat('{PREFIX}-', sensor, '-', measure_type, '-', unit) as externalId,
  reference_time as timestamp,
  temperature as value
from measurements
```

You can run the query before the creation of the transformation in the same way we did for assets and time series to check if the result looks good.

In [None]:
tr = client.transformations.preview(query_dp)
df = pd.DataFrame(tr.results)
df.head()

### Create and run datapoints transformation

To create and run the datapoints transformation we use all the same methods as before.

In [None]:
from cognite.client.data_classes import Transformation, TransformationDestination, OidcCredentials

transformation = Transformation(
    name=f"{PREFIX}-dp",
    external_id=f"{PREFIX}-dp",
    source_oidc_credentials=creds,
    destination_oidc_credentials=creds, 
    destination=TransformationDestination.datapoints(), 
    conflict_mode="upsert", 
    query=query_dp,
)
client.transformations.create(transformation)

In [None]:
datapoints_transformation = client.transformations.retrieve(external_id=f'{PREFIX}-dp')
client.transformations.run(datapoints_transformation.id)

Also we can use external id to get the list of the transformation jobs.

In [None]:
client.transformations.jobs.list(transformation_external_id=f'{PREFIX}-dp')

### Cancel a transformation job

If your job is executed for too long you could cancel it having the job id. Let's create a transformation job and cancel it, then check the jobs list.

In [None]:
datapoints_job = client.transformations.run(datapoints_transformation.id, wait=False)
datapoints_job.cancel()
dp_jobs = client.transformations.jobs.list(transformation_external_id=f'{PREFIX}-dp')
dp_jobs

You can also check the error message because the canceled job has the same status as the failed one.

In [None]:
dp_jobs[0].error

There is also a possibility to cancel job by transformation id or external id because only one job could be running in a moment for every transformation.

In [None]:
datapoints_job = client.transformations.run(datapoints_transformation.id, wait=False)
client.transformations.cancel(transformation_external_id=f'{PREFIX}-dp')
dp_jobs = client.transformations.jobs.list(transformation_external_id=f'{PREFIX}-dp')
dp_jobs

## 4. Clean up resources

In [None]:
# Delete RAW tables
client.raw.tables.delete(db_name=DATABASE_NAME, name=['sources', 'elements', 'observations'])
# Delete RAW database
client.raw.databases.delete(name=DATABASE_NAME)
# Delete time series
tss = client.time_series.list(data_set_ids=[DATASET_ID], limit=-1)
client.time_series.delete([ts.id for ts in tss])
# Delete assets recursively
client.assets.delete(external_id=f"{PREFIX}-root", recursive=True)
# Delete transformations
client.transformations.delete(external_id=[f"{PREFIX}-assets", f"{PREFIX}-ts", f"{PREFIX}-dp"])