# Cyoda Client Demo

This notebook demonstrates a basic data preprocessing example with a workflow for data cleaning.


## Prerequisites

Ensure you have the following before running the cells:

- Cyoda API credentials (API key, secret, etc.)

- Python packages installed via `requirements.txt`


### Steps

1. **Setup**: Import necessary libraries and set up environment variables.

2. **Authentication**: Authenticate with the Cyoda API.

3. **Operations**: Perform basic and advanced operations.


### Step 1: Setup Environment and Import Libraries
Ensure you have set up your environment properly.

In [None]:

import logging

def setup_logging(level=logging.INFO):
    logging.basicConfig(level=level)
    logger = logging.getLogger(__name__)
    return logger

logger = setup_logging()
logger.info("Logging initialized.")


In [None]:
import os

API_KEY = os.environ["CYODA_API_KEY"]
API_SECRET = os.environ["CYODA_API_SECRET"]
API_URL = os.environ["CYODA_API_URL"]+"/api"
GRPC_ADDRESS = os.environ["GRPC_ADDRESS"]
WORK_DIR = os.environ["WORK_DIR"]
TOKEN = ""
logger.info(f"API URL: {API_URL}")
logger.info(f"GRPC Address: {GRPC_ADDRESS}")


In [None]:

# Define entity and model version constants
ENTITY_CLASS_NAME = "com.cyoda.tdb.model.treenode.TreeNodeEntity"
ENTITY_NAME = "insurance_v1"
ENTITY_PROCESSED_NAME = "insurance_v1_processed"
MODEL_VERSION = "1000"

logger.info(f"Using entity: {ENTITY_CLASS_NAME}, model version: {MODEL_VERSION}")


In [None]:
# Let's do the auth first
import requests
import json

def authenticate(api_key, api_secret, api_url):
    login_url = f"{api_url}/auth/login"
    headers = {"Content-Type": "application/json", "X-Requested-With": "XMLHttpRequest"}
    auth_data = {"username": api_key, "password": api_secret}

    logger.info("Attempting to authenticate with Cyoda API.")
    
    try:
        response = requests.post(login_url, headers=headers, data=json.dumps(auth_data), timeout=10)
        
        if response.status_code == 200:
            token = response.json().get("token")
            logger.info("Authentication successful!")
            return token
        else:
            logger.error(f"Authentication failed with {response}")
            return None
    
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        return None

TOKEN = authenticate(API_KEY, API_SECRET, API_URL)

### Let's define several supplementary functions

In [5]:
def send_get_request(path):
    url = f"{API_URL}/{path}"

    headers = {"Content-Type": "application/json", "Authorization": f"Bearer {TOKEN}"}
    response = requests.get(url, headers=headers)
    return response

In [6]:
def send_post_request(path, data):
    url = f"{API_URL}/{path}"

    headers = {"Content-Type": "application/json", "Authorization": f"Bearer {TOKEN}"}
    response = requests.post(url, headers=headers, data=data)
    return response

In [7]:
def send_put_request(path, data, timeout):
    url = f"{API_URL}/{path}"
    headers = {"Content-Type": "application/json", "Authorization": f"Bearer {TOKEN}"}
    response = requests.put(url, headers=headers, data=data, timeout=timeout)
    return response

In [8]:
def send_delete_request(path):
    url = f"{API_URL}/{path}"
    headers = {"Content-Type": "application/json", "Authorization": f"Bearer {TOKEN}"}
    response = requests.delete(url, headers=headers)
    return response

In [9]:

def read_file(file_path: str) -> str:
    try:
        with open(file_path, 'r') as file:
            return file.read()
    except Exception as e:
        logger.error(f"Failed to read file at {file_path}: {e}")
        raise

## Cleaning up the env
Let's remove the data for the existing schema and the schema itself so we can start from scratch.

In [10]:
def delete_entity_data(entity_name, version):
    path = f"entity/TREE/{entity_name}/{version}"
    try:
        response = send_delete_request(path=path)
        
        if response.status_code == 200:
            logger.info(f"Successfully deleted entity '{entity_name}' with version '{version}'. Response: {response}")
        else:
            logger.error(f"Failed to delete entity '{entity_name}' with version '{version}'. Response: {response}")
        
        return response
    
    except Exception as e:
        logger.error(f"An error occurred while deleting entity '{entity_name}' with version '{version}': {e}")
        return {'error': str(e)}

In [None]:
response = delete_entity_data(ENTITY_NAME, MODEL_VERSION)
logger.info(f"Delete response: {response}")

In [12]:
def delete_entity_schema(entity_name, version):
    try:
        path = f"treeNode/model/{entity_name}/{version}"
        response = send_delete_request(path=path)
        
        if response.status_code == 200:
            logger.info(f"Successfully deleted schema for entity '{entity_name}' with version '{version}'. Response: {response}")
        else:
            logger.error(f"Failed to delete schema for entity '{entity_name}' with version '{version}'. Status Code: {response}, Response: {response}")
        
        return response
    
    except Exception as e:
        logger.error(f"An error occurred while deleting schema for entity '{entity_name}' with version '{version}': {e}")
        return {'error': str(e)}

In [None]:
response = delete_entity_schema(ENTITY_NAME, MODEL_VERSION)
logger.info(f"Delete response: {response}")

### Saving data
To begin, we'll first create the schema. Once the schema is in place, we will proceed to lock it to ensure its integrity. After that, we can move on to ingesting the data from the file into the system.

In [14]:
def save_entity_schema(entity_name, version, data):
    path = f"treeNode/model/import/JSON/SAMPLE_DATA/{entity_name}/{version}"
    
    try:
        response = send_post_request(path=path, data=data)
        if response.status_code == 200:
            logger.info(f"Successfully saved schema for entity '{entity_name}' with version '{version}'. Response: {response}")
        else:
            logger.error(f"Failed to save schema for entity '{entity_name}' with version '{version}'. Response: {response}")
        
        return response
    
    except Exception as e:
        logger.error(f"An error occurred while saving schema for entity '{entity_name}' with version '{version}': {e}")
        return {'error': str(e)}

In [None]:
def test_save_schema(entity_name, file_path):
    data = read_file(file_path)
    response = save_entity_schema(
        entity_name=entity_name, version=MODEL_VERSION, data=data
    )
    logger.info(f"Response: {response}")

file_path_base = f"{WORK_DIR}/example/resources/insurance_schema.json"
test_save_schema(ENTITY_NAME, file_path_base)

In [16]:
def lock_entity_schema(entity_name, version, data):
    path = f"treeNode/model/{entity_name}/{version}/lock"

    try:
        response = send_put_request(path=path, data=data, timeout=30)
        
        if response.status_code == 200:
            logger.info(f"Successfully locked schema for entity '{entity_name}' with version '{version}'. Response: {response}")
        else:
            logger.error(f"Failed to lock schema for entity '{entity_name}' with version '{version}'. Response: {response}")
        
        return response
    except Exception as e:
        logger.error(f"An error occurred while locking schema for entity '{entity_name}' with version '{version}': {e}")
        return {'error': str(e)}
    

In [None]:
def test_lock_schema(entity_name):
    try:
        response = lock_entity_schema(entity_name=entity_name, version=MODEL_VERSION, data={})
        logger.info(f"Response: {response}")    
    except Exception as e:
        logger.error(f"An error occurred while testing schema locking for entity '{entity_name}': {e}")

test_lock_schema(ENTITY_NAME)

In [18]:
def save_new_entity(entity_name, version, data):
    path = f"entity/JSON/TREE/{entity_name}/{version}"
    logger.info(f"Saving new entity to path: {path}")
    
    try:
        response = send_post_request(path=path, data=data)
        
        if response.status_code == 200:
            logger.info(f"Successfully saved new entity. Response: {response}")
        else:
            logger.error(f"Failed to save new entity. Response: {response}")
        
        return response
    
    except Exception as e:
        logger.error(f"An error occurred while saving new entity '{entity_name}' with version '{version}': {e}")
        return {'error': str(e)}

In [None]:
def test_save_new_entity(entity_name, file_path):
    
    try:
        data = read_file(file_path)
        response = save_new_entity(entity_name=entity_name, version=MODEL_VERSION, data=data) 
        return response
    
    except Exception as e:
        logger.error(f"An error occurred while testing save_new_entity for '{entity_name}': {e}")
        raise

file_path_base = f"{WORK_DIR}/example/resources/insurance_data.json"
response = test_save_new_entity(ENTITY_NAME, file_path_base)


In [None]:
response_json = response.json()
if 'entityIds' in response_json[0]:
    entity_id = response_json[0]['entityIds'][0]
    logger.info(f"Extracted entity ID: {entity_id}")
else:
    logger.error("Response JSON does not contain 'entityIds'.")

In [None]:
def get_entity_current_state(entity_id):
    
    path = f"platform-api/entity-info/fetch/lazy?entityClass={ENTITY_CLASS_NAME}&entityId={entity_id}&columnPath=state"
    response = send_get_request(path=path)
    logger.info(response.json())
    return response
get_entity_current_state(entity_id)

In [None]:
def get_entities(model, version):
    
    path = f"entity/TREE/{model}/{version}"
    response = send_get_request(path=path)
    logger.info(response.json())
    return response
get_entities(ENTITY_NAME, MODEL_VERSION)


In [23]:
def get_single_entity(uuid):
    path = f"entity/TREE/{uuid}"
    response = send_get_request(path=path)
    logger.info(response.json())
    return response

### Working with data
We are going to execute an example of basic data preproceesing. We will establish bi-directional grpc connection, save prizes entites, receive calculation request to perform data preparation and analysis 

In [None]:
!pip install pandas

In [25]:
from io import StringIO
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

# Handling missing values without 'inplace'
def missing_values_processing(df):
    df['claim_amount'] = df['claim_amount'].fillna(df['claim_amount'].median())
    df['claim_status'] = df['claim_status'].fillna(df['claim_status'].mode()[0])
    df['encoding'] = df['encoding'].fillna(df['encoding'].mode()[0])
    return df

# Scaling/Normalization Process
def scaling_normalization_process(df):
    scaler = MinMaxScaler()
    df[['premium_amount', 'claim_amount']] = scaler.fit_transform(df[['premium_amount', 'claim_amount']])
    return df

# Date Parsing Process
def date_parsing_process(df):
    df['policy_issue_date'] = pd.to_datetime(df['policy_issue_date'], errors='coerce')
    return df

# Character Encoding Process
def character_encoding_process(df):
    df['encoding'] = df['encoding'].apply(lambda x: 'utf-8' if x in ['ascii', 'iso-8859-1', 'utf-16'] else x)
    return df

# Data Entry Fixing Process
def data_entry_fixing_process(df):
    df['claim_status'] = df['claim_status'].str.lower()
    return df


process_dispatch = {
        "missing_values_processing": missing_values_processing,
        "scaling_normalization_process": scaling_normalization_process,
        "date_parsing_process": date_parsing_process,
        "character_encoding_process": character_encoding_process,
        "data_entry_fixing_process": data_entry_fixing_process
    }

def clean_data(processing_step, raw_data):
    input_data = raw_data.get("data")
    df = pd.read_json(StringIO(json.dumps(input_data)))
    if processing_step in process_dispatch:
        df = process_dispatch[processing_step](df)
    else:
        raise ValueError(f"Unknown processing step: {processing_step}")
    processed_data = df.to_json(orient='records', date_format='iso')
    output_data = {"data": json.loads(processed_data)}
    
    return output_data

Let's prepare the environment for the processed entities. We need to add schema for insurance_processed entity and lock it.

In [None]:
delete_entity_data(ENTITY_PROCESSED_NAME, MODEL_VERSION)
delete_entity_schema(ENTITY_PROCESSED_NAME, MODEL_VERSION)
test_save_schema(ENTITY_PROCESSED_NAME, f"{WORK_DIR}/example/resources/insurance_schema.json")
test_lock_schema(ENTITY_PROCESSED_NAME)

### GPPC client
Let's establish a gRPC bi-directional streaming connection. The process will begin by sending a 'Join' event, and we expect to receive a 'Greet' event in response.

Following this, we will save the 'Insurance' entity. This entity will undergo a step-by-step transition from the 'None' state to the 'Done' state, which will trigger an external processor. Once triggered, the external processor will send calculation requests, which will initiate data cleaning functions.

#### Please import example/export_workflow_for_TreeNodeEntity_prizes.json workflow before you proceed :)

In [None]:
# Step 1: Install gRPC and tools
!pip install grpcio grpcio-tools

# Step 2: Compile proto files
!python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. cyoda-cloud-api.proto

!python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. cloudevents.proto

In [28]:

from enum import Enum

class CloudEventType(str, Enum):
    BASE_EVENT = "BaseEvent"
    CALCULATION_MEMBER_JOIN_EVENT = "CalculationMemberJoinEvent"
    CALCULATION_MEMBER_GREET_EVENT = "CalculationMemberGreetEvent"
    ENTITY_PROCESSOR_CALCULATION_REQUEST = "EntityProcessorCalculationRequest"
    ENTITY_PROCESSOR_CALCULATION_RESPONSE = "EntityProcessorCalculationResponse"   

In [None]:
import grpc
import uuid
import json
import asyncio
from cloudevents_pb2 import CloudEvent
import cloudevents_pb2_grpc as cloudevents_grpc
import cyoda_cloud_api_pb2 as cyoda_cloud_pb2
import cyoda_cloud_api_pb2_grpc as cyoda_cloud_grpc

# Constants for event creation and configuration
TAGS = ["data_cleaning_workflow"]
OWNER = "PLAY"
SPEC_VERSION = "1.0"
SOURCE = "SimpleSample"
JOIN_EVENT_TYPE = "CalculationMemberJoinEvent"
NOTIFICATION_EVENT_TYPE = "EntityProcessorCalculationResponse"
GREET_EVENT_TYPE = "CalculationMemberGreetEvent"
EVENT_ID_FORMAT = "{uuid}"
FILE_PATH = f"{WORK_DIR}/example/resources/insurance_data.json"

def create_cloud_event(event_id: str, source: str, event_type: str, data: dict) -> CloudEvent:
    """
    Create a CloudEvent instance with the given parameters.

    :param event_id: Unique identifier for the event.
    :param source: Source of the event.
    :param event_type: Type of the event.
    :param data: Data associated with the event.
    :return: A CloudEvent instance.
    """
    return CloudEvent(
        id=event_id,
        source=source,
        spec_version=SPEC_VERSION,
        type=event_type,
        text_data=json.dumps(data)
    )

def create_join_event() -> CloudEvent:
    """
    Create a CloudEvent for a member join event.

    :return: A CloudEvent instance for the join event.
    """
    return create_cloud_event(
        event_id=str(uuid.uuid4()),
        source=SOURCE,
        event_type=JOIN_EVENT_TYPE,
        data={"owner": OWNER, "tags": TAGS}
    )

def create_notification_event(data: dict) -> CloudEvent:
    """
    Create a CloudEvent for a notification response.

    :param data: Data from the notification response.
    :return: A CloudEvent instance for the notification event.
    """
    return create_cloud_event(
        event_id=str(uuid.uuid4()),
        source=SOURCE,
        event_type=NOTIFICATION_EVENT_TYPE,
        data={
            "requestId": data.get('requestId'),
            "entityId": data.get('entityId'),
            "owner": OWNER,
            "payload": data.get('payload'),
            "success": True
        }
    )

def trigger_entity_saved_event():
    """
    Trigger an event when an entity is saved.
    """
    test_save_new_entity(ENTITY_NAME, FILE_PATH)

async def event_generator(queue: asyncio.Queue):
    """
    Generate and yield events including initial and follow-up events.

    :param queue: Async queue to get events from.
    :yield: CloudEvent instances.
    """
    # Yield the initial join event
    yield create_join_event()
    await asyncio.sleep(5)
    
    while True:
        event = await queue.get()
        if event is None:
            break
        yield event
        queue.task_done()

async def consume_stream():
    """
    Handle bi-directional streaming with response-driven event generation.
    """
    auth_creds = grpc.access_token_call_credentials(TOKEN)
    credentials = grpc.composite_channel_credentials(grpc.ssl_channel_credentials(), auth_creds)
    queue = asyncio.Queue()

    async with grpc.aio.secure_channel(GRPC_ADDRESS, credentials) as channel:
        stub = cyoda_cloud_grpc.CloudEventsServiceStub(channel)

        # Start the streaming call and pass the generator
        call = stub.startStreaming(event_generator(queue))

        async for response in call:
            if response.type == GREET_EVENT_TYPE:
                trigger_entity_saved_event()
            else:
                data = json.loads(response.text_data)
                logger.info(f"Received response: {response}")

                # Process notification response and send notification_event
                if data.get('processorName') in process_dispatch:
                    logger.info(f"Processing notification data: {data}")
                    #if this is the first transition, use the initial data attached to the insurance entity
                    previous_version_id = data['payload']['data'].get('previous_version_id')
                    if previous_version_id == "0":
                        raw_data = data['payload']['data']
                    else:
                        #else retrieve the previous version of cleaned data
                        raw_data = get_single_entity(previous_version_id).json().get("tree", {})
                    cleaned_data = clean_data(data.get('processorName'), raw_data)
                    #save insurance_processed entity with the new cleaned data
                    response = save_new_entity(ENTITY_PROCESSED_NAME, MODEL_VERSION, json.dumps(cleaned_data))
                    #update the previous version of the insurance_processed entity id
                    data['payload']['data']['previous_version_id'] = response.json()[0]["entityIds"][0]
                    notification_event = create_notification_event(data)
                    await queue.put(notification_event)                 
                elif data.get('processorName') == "finish_workflow":
                    notification_event = create_notification_event(data)
                    await queue.put(notification_event)
                    # Signal the end of the stream
                    await queue.put(None)

async def main():
    """Main function to run producer and consumer tasks."""
    try:
        await asyncio.wait_for(consume_stream(), timeout=20)
    except asyncio.TimeoutError:
        logger.info("Stream operation timed out")

await main()

We expect to have two insurance entities: one saved during testing and the other saved via the gRPC client.

In [None]:
response = get_entities(ENTITY_NAME, MODEL_VERSION)
formatted_json = json.dumps(response.json(), indent=4)
logger.info(formatted_json)


We expect to have five insurance processed entities: a new version of data for each transition

In [None]:
response = get_entities(ENTITY_PROCESSED_NAME, MODEL_VERSION)
formatted_json = json.dumps(response.json(), indent=4)
logger.info(formatted_json)