# Introduction to Tasks & Flows

## Tasks

Lets create couple of conversight tasks and work with task libraries. To start with, import the required modules from the conversight library

In [None]:
from conversight import TaskLibrary, task , FlowLibrary , Flow, Parameter, SmartAnalytics, Context

In [2]:
import pandas 

In [4]:
@task
def resolve_query( dataset_id : str , query: str  , ctx : Context) -> dict:
    ''''Task that converts a standard query into conversight platform query'''
  
    try:
        import os
        import requests 

        dataEngineURL = "{}/formatQuery".format(os.getenv("DATAENGINE_SERVICE"))
        body = {"query": query, "dataSetID": dataset_id}
         
        response = requests.post(url=dataEngineURL, json=body)
        status = {401:"Un Authorized", 500: "User does not have dataset Access", 400: "Bad Request"}                                                             
        if response.status_code in status:
            return {"error":status[response.status_code]}
            
        res = response.json()
        if res is not None:
            return res
        else:
            return {"error": "Empty response from data engine "}
    except Exception as e:
        return {"error": str(e)}

## Run the task

To run or execute a task, use the run function. Providing the inputs are in the required format, the task can be executed.

In [None]:
resolve_query.run(dataset_id="655ceb56-HrXES9SSm", query="""Select @RetailSales.revenue as newcost, @RetailSales.delivery_date, @RetailSales.buyer from #RetailSales""")

The task takes 3 inputs but only 2 were given, this is because the third is a context argument which is handled by the Athena internally

## Register the task

In [5]:
description="Task that converts a standard query into conversight platform query"
resolve_query.register(libraryName="cxapp", description=description, sourceControl="edit", apiAccess=True, deployable=True)

[0;34m[2023-11-21 19:00:48,231] [INFO] resolve_query has been successfully registered. The most recent version available is 0.1 !![0m


When we register the task, the first version of that task is saved. If we make a code change and register that task, it is saved as a new version. Let's create two more tasks for this example to create a smart analytics

In [6]:
@task
def get_data(ctx: Context, queryInfo: dict) -> pandas.core.frame.DataFrame:
    '''get the data from database based on the given query''' 
    
    try:
        from conversight import TaskRunError
        import pyarrow as pa
        from heavyai import connect
        
        if isinstance(queryInfo, TaskRunError):
            ctx.log.error("Received error from previous task, skipping the process: {}".format(queryInfo))
            return queryInfo
    
        fullUrl = queryInfo["connectionDetails"]["connectionData"]["url"].split(":")
        con = connect(
            host=fullUrl[0],
            dbname=queryInfo["connectionDetails"]["connectionData"]["database"],
            user=queryInfo["connectionDetails"]["connectionData"]["username"],
            port=int(fullUrl[1]),
            password=queryInfo["connectionDetails"]["connectionData"]["password"],
        )
        
        df = con.select_ipc(queryInfo["query"])
        return df
    except Exception as e:
        print("Exception in omnisciToPandas: {}".format(str(e)))
        return {"status": "failed", "message": str(e)}


In [7]:
description = "get the data from database based on the given query"
get_data.register(libraryName="cxapp", description=description, sourceControl="edit", apiAccess=True, deployable=True)

[0;34m[2023-11-21 19:01:00,708] [INFO] get_data has been successfully registered. The most recent version available is 0.1 !![0m


In [15]:
@task
def create_smart_analytics(ctx: Context, objectName: str, dataSetId: str, dataFrame: pandas.core.frame.DataFrame, isArrow: bool=False , isOverwrite: bool=True)-> dict:
    '''Task will cretae smart Analytics '''
    try:
        from conversight import TaskRunError, SmartAnalytics
        if isinstance(dataFrame, TaskRunError):
            ctx.log.error("Error from previous task for dataFrame: {}".format(dataFrame))
            return dataFrame
            
        sm_obj = SmartAnalytics(dataSetId, ctx.token)
        isCreated = sm_obj.create(objectName, dataFrame, isArrow, isOverwrite, True, ctx.token)
        
        if "status" in isCreated and isCreated["status"] == "success":
            return {"status": "success", "message ": isCreated["message"]}
        elif "status" in isCreated and isCreated["status"] != "success":
            ctx.log.critical("Failure msg from  Failed status ")
            return  {"status": "failed", "message": isCreated["message"]}
        else:
            return {"status": "failed", "message": isCreated}
            ctx.log.critical("Smart Analytics   creation is failed")
    except Exception as e:
        print("Error From createSmartAnalytics {}".format(str(e)))
        return {"status": "failed", "message": str(e)}

In [16]:
description = "This task will create a new smart analytics"
create_smart_analytics.register(libraryName="cxapp", description=description, sourceControl="edit", apiAccess=True, deployable=True)

[0;34m[2023-11-21 19:04:14,483] [INFO] create_smart_analytics has been successfully registered. The most recent version available is 0.2 !![0m


## TaskLibrary

Now that all the three tasks have been created and registered in the conversight library. If you plan to reuse that task, it will be saved in the Task library. TaskLibrary is a catalog in ConverSight that includes all of the available tasks organized by library.

In [17]:
tsk = TaskLibrary()

Tasks loaded  !!


## Flows

A Flow in ConverSight is the core component of the system. Similar to tasks, Flows take inputs, perform operations and generate outcomes. Flows act as managers, coordinating tasks for efficient execution, making complex tasks achievable within ConverSight. It’s crucial to note that a Flow is made up entirely of Tasks.

Flows are identical to functions. They can accept inputs, perform work, and output results. The Flow starts with with Flow() as flow. 

In [18]:
with Flow(name="cxapp_smart_analytics") as flow:
    datasetIdEffectiveness = Parameter("datasetId", "655ceb56-HrXES9SSm")
    Query = Parameter("Query", """Select @RetailSales.revenue as newcost, @RetailSales.delivery_date, @RetailSales.buyer from #RetailSales""")
    objectName = Parameter("objectName", "Flow_Sales")
    isArrow = Parameter("arrowData", False)
    isOverWrite = Parameter("overwriteTable", False)
    isPublish = Parameter("publishDataset", True)
    resolvedForecastQuery = tsk.cxapp.resolve_query(datasetIdEffectiveness, Query)
    forecast = tsk.cxapp.get_data(resolvedForecastQuery)
    smartAnalytics = tsk.cxapp.create_smart_analytics(objectName, datasetIdEffectiveness, forecast, isArrow, isOverWrite)
   

## Run the flow

To run the flow, use the run function, by default flow will take the default arguments defined within the construction of the flow. If you supply different values then it should be named parameters to the run function

In [19]:
flow.run()

skipLog is: False
[0;34m[2023-11-21 19:04:19,234] [INFO] [Main-Flow]  Received request from AI Workbench, considering as test run..[0m
[0;34m[2023-11-21 19:04:19,235] [INFO] [Main-Flow]  No input parameters detected, running with default or previous run parameters[0m
[0;34m[2023-11-21 19:04:19,237] [INFO] Context Actor [aac402da2e7a4baa9a239b15681aa0c0] deployed..[0m
[36m(runWrapperNotebook pid=528)[0m [0;34m[2023-11-21 19:04:31,578] [INFO] Setting connection details[0m
[0;34m[2023-11-21 19:04:35,720] [INFO] [Main-Flow]  Flow response: [{status: success, message : Smart analytics created and sme published successfully !!}][0m


[{'status': 'success',
  'message ': 'Smart analytics created and sme published successfully !!'}]

[36m(runWrapperNotebook pid=528)[0m [0;34m[2023-11-21 19:04:35,718] [INFO] SME data published successfully !![0m


## Register the flow

In [None]:
description = "Smart analytics Flow"
flow.register(libraryName="cxapp", flowName="cxapp_smart_analytics",description=description)

In [None]:
query = """Select @RetailSales.revenue as newcost, @RetailSales.delivery_date, @RetailSales.buyer from #RetailSales"""
flw.cxapp.cxapp_smart_analytics.run(datasetIdEffectiveness="650d3806-Jn6txRWIi", objectName="Flow_Sales", Query=query, isArrow=False, isOverWrite=False, isPublish=True)

## Flow Library

In [16]:
flw = FlowLibrary()

Tasks loaded  !!


In [None]:
flw.cxapp.cxapp_smart_analytics.run()

## Promote the flow

Promote the flow to different visibility levels such as “O”, “U” or “P”. O” represents the organization level, “U” represents the user level and “P” represents the platform level.

In [33]:
flw.cxapp.cxapp_smart_analytics("O")

Version 0.5 for the Flow Sanity_Flow is already in level O, these are the levels available for you to promote => [S, P, O, U]
