## DEPENDENCIES MANAGEMENT

In [None]:
!pip install cognite-sdk-core install pandas
from cognite.client import CogniteClient
import requests
import json
import pandas as pd

## PRESET PARAMETERS

### Default parameters

In [None]:
HASURA_BASE_API='https://datapop.greenfield.cognite.ai'
HASURA_GRAPHQL_ENDPOINT = f'{HASURA_BASE_API}/v1/graphql'
CDF_URL = "https://api.cognitedata.com"
TOKEN_SCOPES = ["https://api.cognitedata.com/.default"]
TOKEN_URL = "https://login.microsoftonline.com/806128be-974c-452a-a25c-f98d78eb24ea/oauth2/v2.0/token"

### User injected parameters

#### Generic parameters

In [None]:
CLIENT_ID = input("Type in your client ID...")
CLIENT_SECRET = input("Type in your client secret...")
CDF_CLUSTER = input('Type in your CDF Cluster (e.g. api, westeurope-1, etc)...') # "api"  # api, westeurope-1 etc
COGNITE_PROJECT = input('Type in your Cognite project name...') # "itg-testing"

#### Flow run specific parameters

In [None]:
# Prefect flow parameters
# Tenants name
tenant = input('Type in your tenant name (usually same as Cognite project name)')

# ITG project ID
projectId = input('Type in your project ID...')

# ITG schema type
schemaName = input('Type in your target/destination schema name...')

# JSONata transformation code
jsonata = input('Type in your (JSONata) transformation code...')

# CDF raw database name
rawDatabase = input('Type in your CDF raw database name...')

# CDF raw table name
rawTable = input('Type in your CDF raw database table name...')

# Raw batch size (default in Prefect is 10000)
_rawBatchSize = input('Type in your raw batch size (must be an integer and defaults to a 100 if invalid value provided)...')
rawBatchSize = 100
try:
    rawBatchSize = int(_rawBatchSize.strip())
except:
    print(f'Invalid raw batch size value provided, defaulting to {rawBatchSize}...')

# Raw batch size (default in Prefect is 2500)
_itgBatchSize = input('Type in your ITG batch size (must be an integer and defaults to a 10 if invalid value provided)...')
itgBatchSize = 10
try:
    itgBatchSize = int(_itgBatchSize.strip())
except:
    print(f'Invalid ITG batch size value provided, defaulting to {itgBatchSize}...')

## AUTHENTICATION

### Get CDF authentication token

In [None]:
SCOPES = [f"https://{CDF_CLUSTER}.cognitedata.com/.default"]
cdf_client = CogniteClient(
    token_url=TOKEN_URL,
    token_client_id=CLIENT_ID,
    token_client_secret=CLIENT_SECRET,
    token_scopes=SCOPES,
    project=COGNITE_PROJECT,
    base_url=f"https://{CDF_CLUSTER}.cognitedata.com",
    client_name="client_secret_test_script",
    debug=True,
)
cognite_token = cdf_client.config.token()

### Get Hasura token

In [None]:
hasura_token_response = requests.get(
    f'https://datapop-auth.greenfield.cognite.ai/token', headers={
        "Authorization": f"Bearer {cognite_token}",
        "x-project": 'itg-testing',
        "x-cluster": 'api'
    }
).json()
hasura_token = hasura_token_response['token']

### Configure hasura authentication header

In [None]:
headers = {"Authorization": f"Bearer {hasura_token}", "Content-Type": "application/json", 'X-Hasura-Admin-Secret': 'randompassword'}
print("Authorization:", headers["Authorization"][0:30], "...")

## Get Flow ID

### Flow ID retrieval query

In [None]:
queryGetFlowId = """query GetRawToSchemaFlowId{
 flow(
   where: { name: { _eq: "raw-to-schema" }, archived: { _eq: false } }
   limit: 1
 ) {
   id
 }
}"""

### Fetch flow ID

In [None]:
r = requests.post(HASURA_GRAPHQL_ENDPOINT, json={"query": queryGetFlowId}, headers=headers)
if r.status_code == 200:
    flowId = r.json()['data']['flow'][0]['id']
else:
    raise Exception(f"Query failed to run with a {r.status_code}.")

## TRANSFORMATIONS

### Parameters

In [None]:
'''
Run multiple transformations on one flow.
'''
transformations = [
    {
        "clientId": CLIENT_ID,
        "clientSecret": CLIENT_SECRET,
        "cdfBaseUrl": CDF_URL,
        "tokenScopes": TOKEN_SCOPES,
        "tokenUrl": TOKEN_URL,
        "cdfCluster": "api",
        "cdfProject": tenant,
        "targetProjectId": projectId,
        "targetSchemaType": schemaName,
        "jsonataTransformation": jsonata,
        "rawDb": rawDatabase,
        "rawTable": rawTable,
        "rawBatchSize": rawBatchSize,
        "itgBatchSize": itgBatchSize,
        "forceReload": True
    },
    {
        "clientId": CLIENT_ID,
        "clientSecret": CLIENT_SECRET,
        "cdfBaseUrl": CDF_URL,
        "tokenScopes": TOKEN_SCOPES,
        "tokenUrl": TOKEN_URL,
        "cdfCluster": "api",
        "cdfProject": tenant,
        "targetProjectId": projectId,
        "targetSchemaType": schemaName,
        "jsonataTransformation": jsonata,
        "rawDb": rawDatabase,
        "rawTable": rawTable,
        "rawBatchSize": rawBatchSize,
        "itgBatchSize": itgBatchSize,
        "forceReload": True
    }
]

parameters = {
    'transformationConfigs': transformations
}

### Parameters variables

In [None]:
parametersRunFlow = {
    "flow_id": flowId,
    "parameters": parameters
}

### Run flow query

In [None]:
mutationRunFlow = """mutation RunFlow($flow_id: UUID!, $parameters: JSON) {
    create_flow_run(input: {
        flow_id: $flow_id,
        parameters: $parameters
    }) {
        id
    }
}"""


In [None]:
r = requests.post(HASURA_GRAPHQL_ENDPOINT, json={"query": mutationRunFlow, "variables": parametersRunFlow}, headers=headers)
if r.status_code == 200:
    print(f"Running flow = {json.dumps(r.json(), indent=2)}")
else:
    raise Exception(f"Query failed to run with a {r.status_code}.")

## WATCH TRANSFORMATION

In [None]:
queryGetLastTasks = """query GetLastTasks($flow_id: uuid) {
  flow_run(
    where: { flow_id: { _eq: $flow_id }}
    limit: 5,
    order_by: {start_time: desc}
  ) {
    id
    name
    created
    agent_id
    flow_id
  }
}"""

parametersGetLastTasks = {
    "flow_id": flowId,
}


In [None]:
r = requests.post(HASURA_GRAPHQL_ENDPOINT, json={"query": queryGetLastTasks, "variables": parametersGetLastTasks}, headers=headers)
if r.status_code == 200:
    lastFlowRunId = r.json()['data']['flow_run'][0]['id']
    result = r.json()
else:
    raise Exception(f"Query failed to run with a {r.status_code}.")

In [None]:
pd.set_option('display.max_rows', None)

In [None]:
pd.json_normalize(result['data']['flow_run'])

In [None]:
import time
# Need to sleep a little to take status of the last task run
time.sleep(len(transformations) * 30)

In [None]:
getStatusQuery = """query GetStatusQuery($flowRunId: uuid){
  flow_run(
    where: {
      id: {
        _eq: $flowRunId
      }
    })
    {
      end_time
      heartbeat
      logs {
        id
        created
        message
        level
      }
  }
}"""

parametersGetStatusQuery = {
    "flowRunId": lastFlowRunId,
}

In [None]:
r = requests.post(HASURA_GRAPHQL_ENDPOINT, json={"query": getStatusQuery, "variables": parametersGetStatusQuery}, headers=headers)
if r.status_code == 200:
    r.json()
else:
    raise Exception(f"Query failed to run with a {r.status_code}.")

In [None]:
pd.json_normalize(r.json()['data']['flow_run'][0]['logs'])
