# Quality Control Workflow for Quotes

In this notebook, we demonstrate how users can create a workflow to upload quote data into LUSID. The workflow will then conduct two data quality checks on these quotes, raising any issues as exceptions for approval. This will help us to ensure the data we are upserting to LUSID is valid and suitable.

## Overview

We will be making use of the Workflow Service which enables you to connect and control movement of data throughout LUSID, allowing you to model data and business operational controls. You can read more about the Workflow Service in the [Knowledge Base](https://support.lusid.com/knowledgebase/article/KA-02181/en-us).

For the purposes of this notebook, we will consider an example where an Excel file has been uploaded to LUSID Drive. This file contains quotes for various instruments. Our workflow will then work as follows:
1. The `ImportQuotes` task will be created and started.
2. A worker will be triggered to import quote data, from the given file in Drive, into LUSID.
3. A child task `ReasonableValueDataControl` will be created and started. This will trigger a worker to flag values that are not between 1 and 1000.
4. A child task `IQROutlierDataControl` will be created and started. This will trigger a worker to flag values that are not within the interquartile range of existing quotes for that instrument. 
5. A child task `HandleException` will be created for each flagged value so a user can review any failures.

An overview of our workflow is below.
![Full Workflow](./images/workflow_all.png)

## Setup

We start by importing relevant libraries, authenticating our user, and creating our API client.

In [1]:
# Import general purpose packages
import json
import os
import time

# Import LUSID specific packages
from fbnsdkutilities import ApiClientFactory
import lumipy as lm
import lusid
import lusid_drive
import lusid_workflow
import lusid_workflow.models as wf_models
from lusid_workflow.rest import ApiException
from lusidjam.refreshing_token import RefreshingToken
from lusidtools.pandas_utils.lusid_pandas import lusid_response_to_data_frame

# Authenticate out user and create our API client
secrets_path = os.getenv("FBN_SECRETS_PATH")
api_factory = lusid.utilities.ApiClientFactory(
    token=RefreshingToken(),
    api_secrets_filename=secrets_path,
    app_name="LusidJupyterNotebook"
)

# Get URL and use it to configure Workflow and Drive SDKs
api_url = api_factory.api_client.configuration._base_path.replace("api","")

wf_configuration = lusid_workflow.Configuration(host=api_url + "workflow")
drive_configuration = lusid_drive.Configuration(host=api_url + "drive")
wf_configuration.access_token = api_factory.api_client.configuration.access_token
drive_configuration.access_token = api_factory.api_client.configuration.access_token

# Setup Workflow, Drive, and Lumipy Clients
wf_client = lusid_workflow.ApiClient(wf_configuration)
drive_client = lusid_drive.ApiClient(drive_configuration)
client = lm.get_client(api_secrets_filename=secrets_path, token=RefreshingToken())



In [2]:
# Define the relevant APIs we will use
files_api = lusid_drive.FilesApi(drive_client)
folders_api = lusid_drive.FoldersApi(drive_client)
task_def_api = lusid_workflow.TaskDefinitionsApi(wf_client)
task_api = lusid_workflow.TasksApi(wf_client)
worker_api = lusid_workflow.WorkersApi(wf_client)

# Notebook constants
notebook_scope = 'notebook-example-dq-workflow'
drive_filepath = 'example-quotes'
local_filepath = 'data/'
example_filename = 'exampledata.xlsx'

## Upload Excel File

We will upload an example Excel workbook into LUSID Drive. This contains two worksheets: one containing instrument information and one containing quote information.

In [3]:
# Attempt to create the folder in LUSID Drive
try:
    response = folders_api.create_folder(
        lusid_drive.models.CreateFolder(path='/', name=drive_filepath))
except lusid_drive.rest.ApiException as e:
    if json.loads(e.body)["code"] == 664:
        print(f"Folder '/{drive_filepath}' already exists in Drive.")
    else:
        raise

# Attempt to upload the Excel file in LUSID Drive
try:
    with open(local_filepath + example_filename, 'rb') as data:
        response = files_api.create_file(
            x_lusid_drive_filename=example_filename,
            x_lusid_drive_path=drive_filepath,
            content_length=os.stat(local_filepath + example_filename).st_size,
            body=data.read()
        )
except lusid_drive.rest.ApiException as e:
    if json.loads(e.body)["code"] == 671:
        print(
            f"File '/{drive_filepath}/{example_filename}' already exists in Drive.")
    else:
        raise

Folder '/example-quotes' already exists in Drive.
File '/example-quotes/exampledata.xlsx' already exists in Drive.


## 1. Prepare Luminesce Views & Workers

In our workflow, we will be making use of workers. A worker makes use of Luminesce Views to input parameters mapped from the fields of a task and return results which can be mapped to a task definition. Luminesce Views allow a user to access any number of other Luminesce providers, rather than engaging with multiple providers and complex SQL. You can read more about Custom Luminesce Views in [this Knowledge Base article](https://support.lusid.com/knowledgebase/article/KA-01767/en-us).

In this notebook, we will be interacting with Luminesce via Lumipy. Lumipy is a Python library that makes it easy to use Luminesce as part of the Python data science ecosystem. We are able to write SQL queries as Python strings, and send these directly to Luminesce. Again this is documented in our Knowledge Base; you can read more about Lumipy in [this article](https://support.lusid.com/knowledgebase/article/KA-02095/en-us). 

### 1.1. Import Quotes from File

We can use Luminesce to create a custom view which takes in a parameter of a filename to extract quotes from to load into LUSID.

In [4]:
# Define our view
create_import_view_sql = f"""
    -- Create view and set parameters
    @import_quotes_view = use Sys.Admin.SetupView
        --provider=Custom.Quotes.ImportFromExcel
        --parameters
            filename,Text,/{drive_filepath}/{example_filename},false
            quotes_scope,Text,{notebook_scope},false
    ----

    @@filename = select #PARAMETERVALUE(filename);
    @@quotes_scope = select #PARAMETERVALUE(quotes_scope);


    -- Load data from Excel
    @inst_data = use Drive.Excel with @@filename
        --file={{@@filename}}
        --worksheet=instrument
    enduse;

    @quote_data = use Drive.Excel with @@filename
        --file={{@@filename}}
        --worksheet=price_time_series
    enduse;


    -- Transform quote data
    @quotes_for_upload =
        select
            'ClientInternal' as InstrumentIdType,
            instrument_id as InstrumentId,
            @@quotes_scope as QuoteScope,
            'Price' as QuoteType,
            'Lusid' as Provider,
            'Mid' as Field,
            price_date as QuoteEffectiveAt,
            price as Value,
            ccy as Unit
        from @quote_data;

    -- Transform instrument data
    @equity_instruments =
        select 
            inst_id as ClientInternal, 
            name as DisplayName, 
            ccy as DomCcy,
            @@quotes_scope as Scope
        from @inst_data;


    -- Return quotes in view
    select * from @quotes_for_upload;

    -- Create instruments if not Active
    select *
    from Lusid.Instrument.Equity.Writer
    where ToWrite = @equity_instruments;

    -- Upload quotes into LUSID
    select *
    from Lusid.Instrument.Quote.Writer
    where ToWrite = @quotes_for_upload;

    enduse;
"""

# Run query to create view
client.run(create_import_view_sql)
print("Successfully created view.")

Successfully created view.


In [5]:
# Allow time for our view to appear in the Luminesce catalogue
time.sleep(3)

# Define and run an example query to test our view
import_view_example_sql = f"""
    select * from Custom.Quotes.ImportFromExcel
    where filename='/{drive_filepath}/{example_filename}'
    and quotes_scope='{notebook_scope}';
"""

client.run(import_view_example_sql)

Unnamed: 0,InstrumentIdType,InstrumentId,QuoteScope,QuoteType,Provider,Field,QuoteEffectiveAt,Value,Unit
0,ClientInternal,EQ56JD720MDJ,quotes-workflow,Price,Lusid,Mid,01/01/2022 00:00:00,100,USD
1,ClientInternal,EQ56JD720MDJ,quotes-workflow,Price,Lusid,Mid,01/02/2022 00:00:00,92,USD
2,ClientInternal,EQ56JD720MDJ,quotes-workflow,Price,Lusid,Mid,01/03/2022 00:00:00,104,USD
3,ClientInternal,EQ56JD720MDJ,quotes-workflow,Price,Lusid,Mid,01/04/2022 00:00:00,99,USD
4,ClientInternal,EQ56JD720MDJ,quotes-workflow,Price,Lusid,Mid,01/05/2022 00:00:00,99,USD
...,...,...,...,...,...,...,...,...,...
742,ClientInternal,EQ56JD720345,quotes-workflow,Price,Lusid,Mid,09/02/2022 00:00:00,56,USD
743,ClientInternal,EQ56JD720345,quotes-workflow,Price,Lusid,Mid,09/03/2022 00:00:00,76,USD
744,ClientInternal,EQ56JD720345,quotes-workflow,Price,Lusid,Mid,09/04/2022 00:00:00,89,USD
745,ClientInternal,EQ56JD720345,quotes-workflow,Price,Lusid,Mid,09/05/2022 00:00:00,74,USD


### 1.2. Data Quality Check for Reasonable Values 

Similarly, we can configure workers to conduct various data quality checks. For example, we will create a worker to ensure a quote's price is a sensible value (between 1 and 1000, say). As before, we will create a custom view that checks all quotes in our scope.

In [6]:
# Define our view
create_rv_dq_view = f"""
    -- Create view and set parameters
    @iqr_outlier_view = use Sys.Admin.SetupView
    --provider=Custom.PriceCheck.ReasonableValue
    --parameters
        quotes_scope,Text,{notebook_scope},true
    ----
    
    @@quotes_scope = select #PARAMETERVALUE(quotes_scope);

    -- Collect quotes for all instruments
    @quotes_data = select *
        from Lusid.Instrument.Quote
        where QuoteScope = @@quotes_scope
            and InstrumentIdType = 'ClientInternal'
            and QuoteType = 'Price';

    -- Collect instrument static
    @instrument_data = select
        ClientInternal,
        DisplayName
        from Lusid.Instrument.Equity
        where @@quotes_scope = Scope
            and State = 'Active';

    -- Generate time series
    @price_ts = select
        ClientInternal,
        DisplayName,
        QuoteEffectiveAt as [PriceDate],
        Unit as [Currency],
        Value as [Price]
        from @instrument_data i
        join @quotes_data q on (i.ClientInternal = q.InstrumentId);

    -- Run reasonable value check for each quote
    select
        PriceDate,
        @@quotes_scope as QuoteScope,
        ClientInternal,
        DisplayName,
        Price,
        case 
            when Price >= 1000 then 'Unreasonably Large Value'
            when Price <= 1 then 'Unreasonably Small Value'
            else 'OK'
        end as Result
        from @price_ts
        where not Result = 'OK';

    enduse;
"""

# Run query to create view
client.run(create_rv_dq_view)
print("Successfully created view.")

Successfully created view.


In [7]:
# Allow time for our view to appear in the Luminesce catalogue
time.sleep(3)

# Define and run an example query to test our view
rv_dq_view_example_sql = f"""
    select * from Custom.PriceCheck.ReasonableValue
    where quotes_scope='{notebook_scope}';
"""

client.run(rv_dq_view_example_sql)

Unnamed: 0,PriceDate,QuoteScope,ClientInternal,DisplayName,Price,Result
0,2022-07-25,quotes-workflow,EQ56JD720LSU,Tesco Plc,1005.0,Unreasonably Large Value
1,2022-04-21,quotes-workflow,EQ56JD720345,Visa Inc,1.0,Unreasonably Small Value


### 1.3. Data Quality Checks for Interquartile Range (IQR) Outliers

Another example of a data quality check we may want to conduct, is finding any outliers for a given instrument using the interquartile range. This uses the $1.5\times \texttt{IQR}$ rule, that is, to find any quotes that fall between $\texttt{Q}1 - 1.5\times \texttt{IQR}$ or above $\texttt{Q}3 + 1.5\times \texttt{IQR}$. You can read more about IQR outliers in [this Wikipedia article](https://en.wikipedia.org/wiki/Interquartile_range#Outliers). As before, we will create a custom view that checks all quotes in our scope.

In [8]:
# Define our view
create_iqr_outlier_dq_view = f"""
    -- Create view and set parameters
    @iqr_outlier_view = use Sys.Admin.SetupView
    --provider=Custom.PriceCheck.OnePointFiveIQR
    --parameters
        quotes_scope,Text,{notebook_scope},true
    ----
    
    @@quotes_scope = select #PARAMETERVALUE(quotes_scope);

    -- Collect quotes for all instruments
    @quotes_data = select *
        from Lusid.Instrument.Quote
        where QuoteScope = @@quotes_scope
            and InstrumentIdType = 'ClientInternal'
            and QuoteType = 'Price';

    -- Collect instrument static
    @instrument_data = select
        ClientInternal,
        DisplayName
        from Lusid.Instrument.Equity
        where @@quotes_scope = Scope
            and State = 'Active';

    -- Generate time series
    @price_ts = select
        ClientInternal,
        DisplayName,
        QuoteEffectiveAt as [PriceDate],
        Unit as [Currency],
        Value as [Price]
        from @instrument_data i
        join @quotes_data q on (i.ClientInternal = q.InstrumentId);

    -- Run IQR checks for each instrument
    @iqr_data = select
        ClientInternal,
        interquartile_range(price) * (1.5) as [iqr_x1_5],
        quantile(price, 0.25) as [q1],
        quantile(price, 0.75) as [q3]
        from @price_ts
        group by ClientInternal;

    -- Join the IQR data with the time series and identify outliers
    select
        p.PriceDate,
        @@quotes_scope as QuoteScope,
        p.ClientInternal,
        p.DisplayName,
        i.q1,
        i.q3,
        (i.q3 + i.iqr_x1_5) as [UpperLimit],
        (i.q1 - i.iqr_x1_5) as [LowerLimit],
        p.Price,
        case when p.Price not between (i.q1 - i.iqr_x1_5) and (i.q3 + i.iqr_x1_5)
            then 'IQR Outlier'
            else 'OK'
        end as Result
        from @price_ts p
        join @iqr_data i on p.ClientInternal = i.ClientInternal
        where not Result = 'OK';

    enduse;
"""

# Run query to create view
client.run(create_iqr_outlier_dq_view)
print("Successfully created view.")

Successfully created view.


In [9]:
# Allow time for our view to appear in the Luminesce catalogue
time.sleep(3)

# Define and run an example query to test our view
iqr_outlier_dq_view_example_sql = f"""
    select * from Custom.PriceCheck.OnePointFiveIQR
    where quotes_scope='{notebook_scope}';
"""

client.run(iqr_outlier_dq_view_example_sql)

Unnamed: 0,PriceDate,QuoteScope,ClientInternal,DisplayName,q1,q3,UpperLimit,LowerLimit,Price,Result
0,2022-01-08,quotes-workflow,EQ56JD720LSU,Tesco Plc,280.0,362.0,485.0,157.0,98.0,IQR Outlier
1,2022-02-20,quotes-workflow,EQ56JD720LSU,Tesco Plc,280.0,362.0,485.0,157.0,900.0,IQR Outlier
2,2022-02-21,quotes-workflow,EQ56JD720LSU,Tesco Plc,280.0,362.0,485.0,157.0,800.0,IQR Outlier
3,2022-07-25,quotes-workflow,EQ56JD720LSU,Tesco Plc,280.0,362.0,485.0,157.0,1005.0,IQR Outlier
4,2022-04-14,quotes-workflow,EQ56JD720345,Visa Inc,61.0,89.0,131.0,19.0,5.0,IQR Outlier
5,2022-04-21,quotes-workflow,EQ56JD720345,Visa Inc,61.0,89.0,131.0,19.0,1.0,IQR Outlier
6,2022-05-26,quotes-workflow,EQ56JD720345,Visa Inc,61.0,89.0,131.0,19.0,509.0,IQR Outlier
7,2022-01-23,quotes-workflow,EQ56JD720MDJ,Pay Pal Holdings Inc,94.0,105.0,121.5,77.5,150.0,IQR Outlier
8,2022-02-20,quotes-workflow,EQ56JD720MDJ,Pay Pal Holdings Inc,94.0,105.0,121.5,77.5,40.0,IQR Outlier


### 1.4. Create Workers

In order to use the utilise our custom views within our workflow, we now need to create workers to provide wrappers for each piece of functionality. We will define a function to do this.

In [10]:
# Define a function to create a worker
def upload_worker(worker_scope, worker_code, display_name, description, lumi_view):
    request = wf_models.CreateWorkerRequest(
        id=wf_models.ResourceId(scope=worker_scope, code=worker_code),
        display_name=display_name,
        description=description,
        worker_configuration=wf_models.LuminesceView(
            type="LuminesceView", name=lumi_view
        )
    )
    
    try:
        response = worker_api.create_worker(create_worker_request=request)
    except ApiException as e:
        if e.status == 409:
            print(f"Worker with code '{worker_code}' in scope '{worker_scope}' already exists.")
        else:
            raise e
    
    return request

We can now call the above function to create each worker.

In [11]:
import_from_excel_worker_request = upload_worker(
    worker_scope=notebook_scope,
    worker_code="ImportFromExcelFile",
    display_name="Import From Excel",
    description="Imports quote data from specified Excel file in Drive.",
    lumi_view="Custom.Quotes.ImportFromExcel"
)

reasonable_value_worker_request = upload_worker(
    worker_scope=notebook_scope,
    worker_code="ReasonableValueChecker2",
    display_name="Reasonable Value Checker",
    description="Find any quotes with values not between 1 and 1000.",
    lumi_view="Custom.PriceCheck.ReasonableValue"
)

iqr_outliers_worker_request = upload_worker(
    worker_scope=notebook_scope,
    worker_code="IQROutliers2",
    display_name="IQR Outliers",
    description="Find any IQR outlier quotes.",
    lumi_view="Custom.PriceCheck.OnePointFiveIQR"
)

Worker with code 'ImportFromExcelFile' in scope 'quotes-workflow' already exists.
Worker with code 'ReasonableValueChecker2' in scope 'quotes-workflow' already exists.
Worker with code 'IQROutliers2' in scope 'quotes-workflow' already exists.


## 2. Task Definitions

Now that we have our workers setup, we will define the Task Definitions that will govern our workflow. Looking at our example, we have four different Task Definitions that make up our workflow: **Import Quotes**, **Reasonable Value DQ**, **IQR Outlier DQ**, and **Handle Exception**. We will analyse and define each in reverse order.

### 2.1. Handle Exception
![Handle Exception Workflow](./images/workflow-he.png)

Our first task definition, `HandleException`, takes details of individual quotes that have failed a DQ check. This will effectively form a queue of potentially erroneous data for review by a user.

Details of the quote exception is passed and the task is started with the `Start` trigger. When in the `InProgress` state, a user can choose to place the exception on hold, resolve the exception, or mark the exception as ignored. To resolve or ignore the exception, the user must supply an explanation in the `Details` field; this is ensured with the use of Guards. A Guard is an operational condition which must be met for a state transition to succeed, defined using [LUSID's filtering syntax](https://support.lusid.com/knowledgebase/article/KA-01914/en-us). In our specific scenario we can utilise them to ensure a field is populated; if it is, the relevant trigger is passed to the parent.

In [12]:
handle_exception_task_definition_request = wf_models.CreateTaskDefinitionRequest(
    id=wf_models.ResourceId(scope=notebook_scope, code="HandleException"),
    display_name="Handle Exception",
    description="Handle any data outliers that are raised.",

    # Define the states of the workflow
    states=[
        wf_models.TaskStateDefinition(name="Pending"),
        wf_models.TaskStateDefinition(name="InProgress"),
        wf_models.TaskStateDefinition(name="Resolved"),
        wf_models.TaskStateDefinition(name="OnHold"),
        wf_models.TaskStateDefinition(name="Ignored")
    ],

    # Define the input parameters
    field_schema=[
        wf_models.TaskFieldDefinition(name="PriceDate", type="DateTime"),
        wf_models.TaskFieldDefinition(name="ClientInternal", type="String"),
        wf_models.TaskFieldDefinition(name="DisplayName", type="String"),
        wf_models.TaskFieldDefinition(name="Price", type="Decimal"),
        wf_models.TaskFieldDefinition(name="Result", type="String"),
        wf_models.TaskFieldDefinition(name="Details", type="String")
    ],

    # Define the default state a task should enter and the required parameters it should pass
    initial_state=wf_models.InitialState(name="Pending", required_fields=[
        "PriceDate", "ClientInternal", "DisplayName", "Price", "Result"
    ]),

    # Define the triggers which cause state transitions that respond to external stimuli
    triggers=[
        wf_models.TransitionTriggerDefinition(
            name="Start", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="Resolve", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="PlaceOnHold", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="Resume", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="Ignore", trigger=wf_models.TriggerSchema(type="External"))
    ],

    # Define the state transitions including the states the task should move from and to, the trigger causing it to occur,
    # any guard conditions required to be met, and any actions that should be taken upon completion.
    transitions=[
        wf_models.TaskTransitionDefinition(
            from_state="Pending",
            to_state="InProgress",
            trigger="Start"
        ),
        wf_models.TaskTransitionDefinition(
            from_state="InProgress",
            to_state="Resolved",
            trigger="Resolve",
            guard="fields['Details'] neq ''",
            action="resolved-trigger-parent"
        ),
        wf_models.TaskTransitionDefinition(
            from_state="InProgress",
            to_state="OnHold",
            trigger="PlaceOnHold",
        ),
        wf_models.TaskTransitionDefinition(
            from_state="OnHold",
            to_state="InProgress",
            trigger="Resume"
        ),
        wf_models.TaskTransitionDefinition(
            from_state="InProgress",
            to_state="Ignored",
            trigger="Ignore",
            guard="fields['Details'] neq ''",
            action="resolved-trigger-parent"
        )
    ],
    actions=[
        # Define the action to trigger the parent task
        wf_models.ActionDefinition(
            name="resolved-trigger-parent",
            action_details=wf_models.TriggerParentTaskAction(
                type="TriggerParentTask",
                trigger="Resolved"
            )
        )
    ]
)

### 2.2. Reasonable Value Data Control
![Reasonable Value DC Workflow](./images/workflow-rvdq.png)

Our second task definition, `ReasonableValueDataControl`, is started by our main/parent workflow and checks all quotes in the given scope have a sensible value (between 1 and 1000). Any potentially erroneous data creates a `HandleException` Task, defined above, for review by a user.

The task is started with the `Start` trigger by the parent workflow. When in the `InDQControl` state, a Worker is started which checks that all quote values within the scope are within the specified range. If there are no exceptions then the Task moves to the `Complete` state, however, if exceptions are found then a `HandleException` Task is created for each one, as defined above. Upon completion of handling all examples, the state will pass to `Complete` state, only when all child tasks are resolved, governed by a Guard. In either case, a trigger is passed to the parent Task so it can progress.

In [13]:
reasonable_value_control_task_definition_request = wf_models.CreateTaskDefinitionRequest(
    id=wf_models.ResourceId(scope=notebook_scope,
                            code="ReasonableValueDataControl"),
    display_name="Reasonable Value Data Control",
    description="Conduct reasonable value data control on quote data. Raise any exceptions.",

    # Define the states of the workflow
    states=[
        wf_models.TaskStateDefinition(name="Pending"),
        wf_models.TaskStateDefinition(name="InDQControl"),
        wf_models.TaskStateDefinition(name="Exceptions"),
        wf_models.TaskStateDefinition(name="Complete")
    ],

    # Define the input parameter
    field_schema=[
        wf_models.TaskFieldDefinition(name="quotes_scope", type="String"),
    ],

    # Define the default state a task should enter and the required parameter it should pass
    initial_state=wf_models.InitialState(
        name="Pending", required_fields=["quotes_scope"]),

    # Define the triggers which cause state transitions that respond to external stimuli
    triggers=[
        wf_models.TransitionTriggerDefinition(
            name="Start", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="NoExceptions", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="ExceptionsFound", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="Resolved", trigger=wf_models.TriggerSchema(type="External"))
    ],

    # Define the state transitions including the states the task should move from and to, the trigger causing it to occur,
    # any guard conditions required to be met, and any actions that should be taken upon completion.
    transitions=[
        wf_models.TaskTransitionDefinition(
            from_state="Pending",
            to_state="InDQControl",
            trigger="Start",
            action="start-reasonable-value-worker"
        ),
        wf_models.TaskTransitionDefinition(
            from_state="InDQControl",
            to_state="Complete",
            trigger="NoExceptions",
            action="complete-trigger-parent"
        ),
        wf_models.TaskTransitionDefinition(
            from_state="InDQControl",
            to_state="Exceptions",
            trigger="ExceptionsFound"
        ),
        wf_models.TaskTransitionDefinition(
            from_state="Exceptions",
            to_state="Complete",
            trigger="Resolved",
            guard="childTasks all (state eq 'Resolved' or state eq 'Ignored')",
            action="complete-trigger-parent"
        )
    ],
    actions=[
        # Define action to start reasonable value worker
        wf_models.ActionDefinition(
            name="start-reasonable-value-worker",
            action_details=wf_models.RunWorkerAction(
                type="RunWorker",
                worker_id=wf_models.ResourceId(
                    scope=notebook_scope, code="ReasonableValueChecker2"),
                worker_status_triggers=wf_models.WorkerStatusTriggers(
                    completed_with_results="ExceptionsFound",
                    completed_no_results="NoExceptions",
                ),
                worker_parameters={
                    "quotes_scope": wf_models.FieldMapping(map_from="quotes_scope")
                },
                child_task_configurations=[wf_models.ResultantChildTaskConfiguration(
                    task_definition_id=wf_models.ResourceId(
                        scope=notebook_scope, code="HandleException"),
                    initial_trigger="Start",
                    child_task_fields={
                        "PriceDate": wf_models.FieldMapping(map_from="PriceDate"),
                        "ClientInternal": wf_models.FieldMapping(map_from="ClientInternal"),
                        "DisplayName": wf_models.FieldMapping(map_from="DisplayName"),
                        "Price": wf_models.FieldMapping(map_from="Price"),
                        "Result": wf_models.FieldMapping(map_from="Result"),
                    }
                )]
            )
        ),
        # Define action to trigger parent on completion
        wf_models.ActionDefinition(
            name="complete-trigger-parent",
            action_details=wf_models.TriggerParentTaskAction(
                type="TriggerParentTask",
                trigger="RV-DQ-Complete"
            )
        )
    ]
)

### 2.3. Interquartile Range (IQR) Outlier Data Control
![IQR Outlier DC Workflow](./images/workflow-iqrdq.png)

Our next task definition, `IQROutlierDataControl`, is again started by our main/parent workflow and checks all quotes in the given scope meet the $1.5\times \texttt{IQR}$ rule for each given instrument. This task operates in the same way as defined in our previous task definition; any potentially erroneous data creates a `HandleException` Task. The main difference in definition is the use of another previously defined Workers.

In [14]:
iqr_outlier_control_task_definition_request = wf_models.CreateTaskDefinitionRequest(
    id=wf_models.ResourceId(scope=notebook_scope,
                            code="IQROutlierDataControl"),
    display_name="IQR Outlier Data Control",
    description="Conduct IQR outlier data control on quote data. Raise any exceptions.",

    # Define the states of the workflow
    states=[
        wf_models.TaskStateDefinition(name="Pending"),
        wf_models.TaskStateDefinition(name="InDQControl"),
        wf_models.TaskStateDefinition(name="Exceptions"),
        wf_models.TaskStateDefinition(name="Complete")
    ],

    # Define the input parameter
    field_schema=[
        wf_models.TaskFieldDefinition(name="quotes_scope", type="String"),
    ],

    # Define the default state a task should enter and the required parameter it should pass
    initial_state=wf_models.InitialState(
        name="Pending", required_fields=["quotes_scope"]),

    # Define the triggers which cause state transitions that respond to external stimuli
    triggers=[
        wf_models.TransitionTriggerDefinition(
            name="Start", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="NoExceptions", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="ExceptionsFound", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="Resolved", trigger=wf_models.TriggerSchema(type="External"))
    ],

    # Define the state transitions including the states the task should move from and to, the trigger causing it to occur,
    # any guard conditions required to be met, and any actions that should be taken upon completion.
    transitions=[
        wf_models.TaskTransitionDefinition(
            from_state="Pending",
            to_state="InDQControl",
            trigger="Start",
            action="start-iqr-outlier-worker"
        ),
        wf_models.TaskTransitionDefinition(
            from_state="InDQControl",
            to_state="Complete",
            trigger="NoExceptions",
            action="complete-trigger-parent"
        ),
        wf_models.TaskTransitionDefinition(
            from_state="InDQControl",
            to_state="Exceptions",
            trigger="ExceptionsFound"
        ),
        wf_models.TaskTransitionDefinition(
            from_state="Exceptions",
            to_state="Complete",
            trigger="Resolved",
            guard="childTasks all (state eq 'Resolved' or state eq 'Ignored')",
            action="complete-trigger-parent"
        )
    ],
    actions=[
        # Define action to start IQR outlier worker
        wf_models.ActionDefinition(
            name="start-iqr-outlier-worker",
            action_details=wf_models.RunWorkerAction(
                type="RunWorker",
                worker_id=wf_models.ResourceId(
                    scope=notebook_scope, code="IQROutliers2"),
                worker_status_triggers=wf_models.WorkerStatusTriggers(
                    completed_with_results="ExceptionsFound",
                    completed_no_results="NoExceptions",
                ),
                worker_parameters={
                    "quotes_scope": wf_models.FieldMapping(map_from="quotes_scope")
                },
                child_task_configurations=[wf_models.ResultantChildTaskConfiguration(
                    task_definition_id=wf_models.ResourceId(
                        scope=notebook_scope, code="HandleException"),
                    initial_trigger="Start",
                    child_task_fields={
                        "PriceDate": wf_models.FieldMapping(map_from="PriceDate"),
                        "ClientInternal": wf_models.FieldMapping(map_from="ClientInternal"),
                        "DisplayName": wf_models.FieldMapping(map_from="DisplayName"),
                        "Price": wf_models.FieldMapping(map_from="Price"),
                        "Result": wf_models.FieldMapping(map_from="Result"),
                    }
                )]
            )
        ),
        # Define action to trigger parent on completion
        wf_models.ActionDefinition(
            name="complete-trigger-parent",
            action_details=wf_models.TriggerParentTaskAction(
                type="TriggerParentTask",
                trigger="IQR-DQ-Complete"
            )
        )
    ]
)

### 2.4. (Main/Parent) Import Quotes

![Reasonable Value DC Workflow](./images/workflow-main.png)

Piecing everything together, we now define our parent `ImportQuotes` Task Definition. This will import our quote data and start our data quality control checks.

First, a user would upload an Excel file into LUSID Drive. They can then start the task by using the `Start` Trigger and passing the parameters `filename` and `quotes_scope`, the file path of the uploaded Excel file. This starts the `ImportFromExcel` file which loads the relevant instrument and quote data into LUSID. Then two child Tasks are created sequentially, `ReasonableValueDataControl` and `IQROutlierDataControl`, to check for unreasonable values and IQR outliers, respectively. As discussed above, any exceptions found will create `HandleException` Tasks to be managed by the user.

In [15]:
import_quotes_task_definition_request = wf_models.CreateTaskDefinitionRequest(
    id=wf_models.ResourceId(scope=notebook_scope, code="ImportQuotes"),
    display_name="Import Quotes",
    description="Import and validate quote data from specified Excel file.",
    
    # Define the states of the workflow
    states=[
        wf_models.TaskStateDefinition(name="Pending"),
        wf_models.TaskStateDefinition(name="ImportingQuotes"),
        wf_models.TaskStateDefinition(name="InReasonableValueDQControl"),
        wf_models.TaskStateDefinition(name="InIQROutlierDQControl"),
        wf_models.TaskStateDefinition(name="Done"),
        wf_models.TaskStateDefinition(name="Error")
    ],
    
    # Define input parameters
    field_schema=[
        wf_models.TaskFieldDefinition(name="filename", type="String"),
        wf_models.TaskFieldDefinition(name="quotes_scope", type="String")
    ],
    
    # Define the default state a task should enter and the required parameters it should pass 
    initial_state=wf_models.InitialState(name="Pending", required_fields=[
                                         "filename", "quotes_scope"]),
    
    # Define the triggers which cause state transitions that respond to external stimuli
    triggers=[
        wf_models.TransitionTriggerDefinition(
            name="Start", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="Failure", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="Imported", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="RV-DQ-Complete", trigger=wf_models.TriggerSchema(type="External")),
        wf_models.TransitionTriggerDefinition(
            name="IQR-DQ-Complete", trigger=wf_models.TriggerSchema(type="External"))
    ],
    
    # Define the state transitions including the states the task should move from and to, the trigger causing it to occur,
    # any guard conditions required to be met, and any actions that should be taken upon completion.
    transitions=[
        wf_models.TaskTransitionDefinition(
            from_state="Pending",
            to_state="ImportingQuotes",
            trigger="Start",
            action="start-import-worker"
        ),
        wf_models.TaskTransitionDefinition(
            from_state="ImportingQuotes",
            to_state="InReasonableValueDQControl",
            trigger="Imported",
            action="create-reasonable-value-task"
        ),
        wf_models.TaskTransitionDefinition(
            from_state="InReasonableValueDQControl",
            to_state="InIQROutlierDQControl",
            trigger="RV-DQ-Complete",
            action="create-IQR-outlier-task"
        ),
        wf_models.TaskTransitionDefinition(
            from_state="ImportingQuotes",
            to_state="Error",
            trigger="Failure"
        ),
        wf_models.TaskTransitionDefinition(
            from_state="InIQROutlierDQControl",
            to_state="Done",
            trigger="IQR-DQ-Complete",
            guard="childTasks all (state eq 'Complete')"
        )
    ],
    
    actions=[
        # Define action to start the worker importing the quotes
        wf_models.ActionDefinition(
            name="start-import-worker",
            action_details=wf_models.RunWorkerAction(
                type="RunWorker",
                worker_id=wf_models.ResourceId(
                    scope=notebook_scope, code="ImportFromExcelFile"),
                worker_parameters={
                    "filename": wf_models.FieldMapping(map_from="filename"),
                    "quotes_scope": wf_models.FieldMapping(map_from="quotes_scope")
                },
                worker_status_triggers=wf_models.WorkerStatusTriggers(
                    started="Start",
                    completed_with_results="Imported",
                    completed_no_results="Failure",
                    failed_to_complete="Failure",
                    failed_to_start="Failure",
                ),

            )
        ),
        
        # Define action to create a child task for the reasonable value check
        wf_models.ActionDefinition(
            name="create-reasonable-value-task",
            action_details=wf_models.CreateChildTasksAction(
                type="CreateChildTasks",
                child_task_configurations=[wf_models.CreateChildTaskConfiguration(
                    task_definition_id=wf_models.ResourceId(
                        scope=notebook_scope, code="ReasonableValueDataControl"),
                    initial_trigger="Start",
                    child_task_fields={
                        "quotes_scope": wf_models.FieldMapping(map_from="quotes_scope")
                    }
                )]
            )
        ),
        
        # Define action to create a child task for the IQR outlier check
        wf_models.ActionDefinition(
            name="create-IQR-outlier-task",
            action_details=wf_models.CreateChildTasksAction(
                type="CreateChildTasks",
                child_task_configurations=[wf_models.CreateChildTaskConfiguration(
                    task_definition_id=wf_models.ResourceId(
                        scope=notebook_scope, code="IQROutlierDataControl"),
                    initial_trigger="Start",
                    child_task_fields={
                        "quotes_scope": wf_models.FieldMapping(map_from="quotes_scope")
                    }
                )]
            )
        )
    ]
)

In [16]:
def upload_task_definition(request):
    try:
        response = task_def_api.create_task_definition(
            create_task_definition_request=request
        )
        print(f"Task Definition with code '{request.id.code}' in scope '{request.id.scope}' created successfully.")
    except ApiException as e:
        if e.status == 409:
            print(f"Task Definition with code '{request.id.code}' in scope '{request.id.scope}' already exists.")
        else:
            raise e
            
upload_task_definition(handle_exception_task_definition_request)
upload_task_definition(reasonable_value_control_task_definition_request)            
upload_task_definition(iqr_outlier_control_task_definition_request)
upload_task_definition(import_quotes_task_definition_request)

Task Definition with code 'HandleException' in scope 'quotes-workflow' created successfully.
Task Definition with code 'ReasonableValueDataControl' in scope 'quotes-workflow' created successfully.
Task Definition with code 'IQROutlierDataControl' in scope 'quotes-workflow' created successfully.
Task Definition with code 'ImportQuotes' in scope 'quotes-workflow' created successfully.


## 3. Example Usage

We have now defined our task definitions to handle our data quality control workflow. We will now see a working example. 

### 3.1. Create Task

Earlier, we uploaded an example quotes file into LUSID Drive. We will now trigger the parent task manually to import quotes in that file into LUSID before checking the data's validity and suitability. It is worth noting that we are doing this manually, however, this can be triggered automatically by creating a webhook notification to be triggered by subscription. An example of this being implemented can be seen in this [sample notebook](https://github.com/finbourne/sample-notebooks/blob/master/examples/use-cases/portfolio-construction/Call%20Api%20On%20File%20Upload.ipynb).

We pass two fields, `filename` and `quotes_scope`, as specified within our task definitions. 

In [17]:
response = task_api.create_task(
    trigger="Start",
    create_task_request=wf_models.CreateTaskRequest(
        task_definition_id=wf_models.ResourceId(scope=notebook_scope, code="ImportQuotes"),
        fields=[
            wf_models.TaskInstanceField(name="filename", value=f'/{drive_filepath}/{example_filename}'),
            wf_models.TaskInstanceField(name="quotes_scope", value="QuoteScope1")
        ]
    )
)

print(f"Started task '{response.task_definition_id.scope}/{response.task_definition_id.code}' with the following fields:")
print(f" {response.fields[0].name}:\t{response.fields[0].value}")
print(f" {response.fields[1].name}:\t{response.fields[1].value}")

Started task 'quotes-workflow/ImportQuotes' with the following fields:
 filename:	/example-quotes/exampledata.xlsx
 quotes_scope:	QuoteScope1


### 3.2. Examining the Workflow in Action

Now the task has been triggered, we will be able to see it in action by navigating to the **Workflow Service** tab in the [LUSID web app](https://www.lusid.com/app/home).

Inside of the web app, a user can examine the current state of any task that has been created as well as any children the parent task may have. By navigating to the **Workflow Service > Tasks** dashboard, one can filter by the child task `HandleException`, which will provide a list of all potentially erroneous data that our checks have found. A user can then manually resolve or ignore these tasks to transition the workflow.

Further examples of the workflow service in action can be found in the relevant [Knowledge Base](https://support.lusid.com/knowledgebase/article/KA-02186/en-us) articles. 

### 3.3. More Data Quality Control Checks

In this notebook, we have completed two data quality control checks. Moreover, using Luminesce, a user is able to define other checks to meet their own requirements. Some more basic data integrity checks, that are completed via Luminesce views, can be found in our [`luminesce-examples` GitHub repository](https://github.com/finbourne/luminesce-examples/tree/master/examples/data-qc-checks/basic-data-integrity).