In [1]:
# housekeeping, imports, etc.
%load_ext autoreload
%autoreload 2

%env CONSOLE_WIDTH=100

from typing import Union, Dict, Any
from kiara import KiaraAPI, ValueMap, KiaraModuleConfig, ValueMapSchema
from kiara.utils.jupyter import graph_widget
from kiara.context.config import KIARA_SETTINGS
from pydantic import Field

# make code output easier to read in the jupyter context
KIARA_SETTINGS.syntax_highlight_background = "black"

# create the kiara api instance
api = KiaraAPI.instance("lumy")

env: CONSOLE_WIDTH=100


# The problem

- "like Jupyter notebooks, but for people who can't code"
- "virtual research environment"

## Data "orchestration" vs. "exploration"

### Orchestration
- static 'pipelines'
- known input & output types
- products: Apache Airflow, Kafka, Dagster, prefect

### Exploration
- dynamic
- (mostly) known inputs & input types (but weakly specified)
- (mostly) unknown outputs and output types
- products: Jupyter, Rstudio, ...

# Solution

## if 'orchestration'
- focus on processing modules and pipelines
- pre-assembled pipelines, built by workflow developers

## if 'exploration'
- data-centric
- dynamically build pipelines, built by end-users

-> mutually exclusive?

# Modules / Operations / Pipelines / Workflows

## Module
- Python class (subclass of the `KiaraModule` base class)
- designed to be re-usable
- configurable
- implement 3 methods: `create_inputs_schema`, `create_outputs_schema`, `process`
- (optionally) add documentation & other metadata (author, tags, etc.)
- core modules bundled with *kiara* and 'official' plugin packages

```python
class CutColumnModuleConfig(KiaraModuleConfig):

    column_name: Union[str, None] = Field(
        description="A hardcoded column name to cut.",
        default=None
    ) 
```

```python
class CutColumnModule(KiaraModule):
    """Cut off one column from a table, returning an array."""

    _module_type_name = "table.cut_column"
    _config_cls = CutColumnModuleConfig
```

```python
    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        inputs: Dict[str, Any] = {
            "table": {"type": "table", "doc": "A table."}
        }
        column_name = self.get_config_value("column_name")
        if not column_name:
            inputs["column_name"] = {"type": "string", "doc": "The name of the column to extract."}

        return inputs
```

```python
   def create_outputs_schema(self) -> ValueMapSchema:

        outputs: Mapping[str, Any] = {"array": {"type": "array", "doc": "The column"}}
        return outputs
```

```python
   def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

        import pyarrow as pa
        column_name: Union[str, None] = self.get_config_value("column_name")
        if not column_name:
            column_name = inputs.get_value_data("column_name")

        if not column_name:
            raise KiaraProcessingException("Could not cut column from table: column_name not provided or empty string.")

        table_value: Value = inputs.get_value_obj("table")
        table_metadata: KiaraTableMetadata = table_value.get_property_data(
            "metadata.table"
        )
        available = table_metadata.table.column_names

        if column_name not in available:
            raise KiaraProcessingException(
                f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}"
            )

        table: pa.Table = table_value.data.arrow_table
        column = table.column(column_name)

        outputs.set_value("array", column)
```

In [2]:
api.retrieve_module_type_info("table.cut_column")

## Operation
- a conceptual logical unit of work
- a (configured) module, in most cases, but can also be a pipeline
- well defined (set of) inputs and (set of) outputs
- does a single thing: transform (set of) inputs to (set of) outputs
- (in most cases) idempotent:
  - same module type, module configuration & (set of) inputs -> same set of outputs


In [3]:
! kiara operation list table.


╭─ Filtered operations ────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  [1m [0m[1mId                                [0m[1m [0m [1m [0m[1mType(s)    [0m[1m [0m [1m [0m[1mDescription                              [0m[1m [0m  │
│  ──────────────────────────────────────────────────────────────────────────────────────────────  │
│  [3m [0m[3mcreate.table.from.csv_file        [0m[3m [0m [32m [0m[32mcreate_from[0m[32m [0m  Create a table from a csv_file value.       │
│  [3m [0m[3mcreate.table.from.text_file_bundle[0m[3m [0m [32m [0m[32mcreate_from[0m[32m [0m  Create a table value from a text            │
│  [3m                                    [0m [32m             [0m  file_bundle.                                │
│  [3m [0m[3mexport.table.as.csv_file          [0m[3m [0m [32m [0m[32mexport_as  [0m[32m 

In [4]:
# 'get_operation_info' provides augmented metadata in addition to what 'get_operation' would give us
api.get_operation_info("table.cut_column")

In [5]:
other_op = api.create_operation("table.cut_column", module_config={"column_name": "file_name"})
other_op

## Pipeline

In [6]:
pipeline_desc = """

  pipeline_name: corpus_onboarding
  
  doc: Onboard a text corpus from a local folder of timestamped files.
  
  steps:
    - module_type: import.file_bundle
      step_id: import_text_corpus
      
    - module_type: create.table.from.text_file_bundle
      step_id: create_corpus_table
      input_links:
        text_file_bundle: import_text_corpus.file_bundle
        
    - module_type: table.cut_column
      step_id: extract_filename_column
      module_config:
        column_name: "file_name"
      input_links:
        table: create_corpus_table.table
        
    - module_type: parse.date_array
      step_id: create_date_array
      input_links:
        array: extract_filename_column.array
        
    - module_type: table.merge
      step_id: merge_table
      module_config:
        inputs_schema:
          source_table:
            type: table
            doc: The original table.
          date_array:
            type: array
            doc: The array containing the parsed date items.
        column_map:
          date: date_array
          content: source_table.content
          file_name: source_table.file_name
      input_links:
        source_table: create_corpus_table.table
        date_array: create_date_array.date_array
"""

In [7]:
# hidden cell, pipeline config has inputs/outputs (would be too large for slide)
pipeline_desc = """
pipeline_name: corpus_onboarding
doc: Onboard a text corpus.
steps:
  - module_type: import.file_bundle
    step_id: import_text_corpus
  - module_type: create.table.from.text_file_bundle
    step_id: create_corpus_table
    input_links:
      text_file_bundle: import_text_corpus.file_bundle
  - module_type: table.cut_column
    step_id: extract_filename_column
    module_config:
      column_name: "file_name"
    input_links:
      table: create_corpus_table.table
  - module_type: parse.date_array
    step_id: create_date_array
    input_links:
      array: extract_filename_column.array
  - module_type: table.merge
    step_id: merge_table
    module_config:
      inputs_schema:
        source_table:
          type: table
          doc: The original table.
        date_array:
          type: array
          doc: The array containing the parsed date items.
      column_map:
        date: date_array
        content: source_table.content
        file_name: source_table.file_name
    input_links:
      source_table: create_corpus_table.table
      date_array: create_date_array.date_array

input_aliases:
  import_text_corpus.path: text_corpus_folder_path
  create_date_array.min_index: date_parse_min
  create_date_array.max_index: date_parse_max
  create_date_array.force_non_null: force_parsed_date
  create_date_array.remove_tokens: remove_tokens

output_aliases:
  merge_table.table: merged_table
"""

In [8]:
op = api.create_operation(module_type="pipeline", module_config=pipeline_desc)
op.pipeline_config

In [9]:
graph_widget(op.pipeline_config)

DagreD3Widget()

In [10]:
graph_widget(op.pipeline_config, graph_type="data-flow-simple")

DagreD3Widget()

In [11]:
! kiara run --save corpus_import /home/markus/projects/kiara/dev/kiara.examples/examples/pipelines/topic_modeling/corpus_onboarding.yaml text_corpus_folder_path=/home/markus/projects/kiara/dev/kiara.examples/examples/data/text_corpus/data/



╭─ [1mResult[0m ─────────────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  [1m [0m[1mfield       [0m[1m [0m [1m [0m[1mdata_type[0m[1m [0m [1m [0m[1mvalue                                                            [0m[1m [0m  │
│  ──────────────────────────────────────────────────────────────────────────────────────────────  │
│  [1m [0m[1mmerged_table[0m[1m [0m  table      [3m [0m[3m                                                                 [0m[3m [0m  │
│  [1m              [0m             [3m [0m[3m [0m[1;3m [0m[1;3mdate[0m[1;3m              [0m[1;3m [0m[3m [0m[1;3m [0m[1;3mcontent[0m[1;3m            [0m[1;3m [0m[3m [0m[1;3m [0m[1;3mfile_name[0m[1;3m         [0m[1;3m [0m[3m [0m[3m [0m  │
│  [1m              [0m             [3m [0m[3m ─────────────────────────────────────

In [12]:
! kiara data list



╭─ Available aliases ──────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  [1m [0m[1malias                      [0m[1m [0m [1m [0m[1mtype        [0m[1m [0m [1m [0m[1m     size[0m[1m [0m                                        │
│  ────────────────────────────────────────────────────────                                        │
│   network.network_data          network_data    61.44 KB                                         │
│   tm.text_corpus_file_bundle    file_bundle    300.31 KB                                         │
│   tm.text_corpus_table          table          303.45 KB                                         │
│   tm.content_array              array          298.99 KB                                         │
│   tm.tokenized_corpus           array          709.15 KB                                         │
│   tm.preprocesse

## Workflow

- basically a user 'research session', dealing with one (input) data set or research question
- consists of:
  - internal pipeline structure, and its history (if applicable)
  - user inputs (and their history)
- workflow history:
  - a list of pipeline/input combinations (workflow states) over time
  - can be queried via timestamps, or as a graph (state transitions)
- the main API endpoint which frontends will interact with (I imagine)

### Workflow parts

### Data onboarding

- 'freeze' & 'formalize' user data
- transition from 'file' metaphor to 'relevant information'
- partially relevant: data cleaning/fixing
- goal: clean, specified, 'tagged' & recorded information
- always relevent, even if importing clean data from a respected source


### Data processing

- answering the actual research question
- firm requirements of which data types/shapes are expected as inputs
- use pre-defined pipeline with expected and documented outcomes
-... or iteratively transform input(s) to explore data dynamically


# Data types

- central to ... everything
- focus on information content and structure, not file-tyes/byte-array layout
- (arguably) more important than modules/transformation code
- well defined information content and serialization/deserialization functions
- metadata: lineage, type-dependent properties
- core types bundled with *kiara* and *kiara_plugin.core_types*:
  - scalars: `bytes`, `string`, `integer`, `float`, `boolean`
  - generic container types: `list`, `dict`
  - core types: `file`, `file_bundle` (basically raw byte arrays, used when 'onboarding', to be converted to table, network_data, etc.)
- domain-specific types are bundled with *kiara* plugin packages:
  - `kaira_plugin.tabular`: `table`, `database`
  - `kiara_plugin.network_analysis`: `network_data`
- utility-types: e.g. `html`, `terminal_renderable`, ... (mostly for use in frontends)

In [13]:
! kiara data explain --properties corpus_import.merged_table


╭─ Value details for: [1;3mcorpus_import.merged_table[0m ──────────────────────────────────────────────────╮
│                                                                                                  │
│  [3m [0m[3mvalue_id         [0m[3m [0m  64865d23-f456-4fa4-b455-e96108e3cb98                                       │
│  [3m [0m[3mkiara_id         [0m[3m [0m  aad63fe4-b488-421e-8bbc-d97e8571870b                                       │
│  [3m [0m[3m                 [0m[3m [0m                                                                             │
│  [3m [0m[3m                 [0m[3m [0m  [92m────────────────────────────────────────────────────────────────────────[0m   │
│  [3m [0m[3mdata_type_info   [0m[3m [0m                                                                             │
│  [3m                   [0m   [3m [0m[3mdata_type_name  [0m[3m [0m  table                                                 │
│  [3m             

In [14]:
! kiara data explain --lineage corpus_import.merged_table



╭─ Value details for: [1;3mcorpus_import.merged_table[0m ──────────────────────────────────────────────────╮
│                                                                                                  │
│  [3m [0m[3mvalue_id         [0m[3m [0m  64865d23-f456-4fa4-b455-e96108e3cb98                                       │
│  [3m [0m[3mkiara_id         [0m[3m [0m  aad63fe4-b488-421e-8bbc-d97e8571870b                                       │
│  [3m [0m[3m                 [0m[3m [0m                                                                             │
│  [3m [0m[3m                 [0m[3m [0m  [92m────────────────────────────────────────────────────────────────────────[0m   │
│  [3m [0m[3mdata_type_info   [0m[3m [0m                                                                             │
│  [3m                   [0m   [3m [0m[3mdata_type_name  [0m[3m [0m  table                                                 │
│  [3m             

In [15]:
! kiara data load -s corpus_import.merged_table


                                                                                                    
 [1m [0m[1mdate[0m[1m               [0m[1m [0m [1m [0m[1mcontent[0m[1m                             [0m[1m [0m [1m [0m[1mfile_name[0m[1m                          [0m[1m [0m 
 ────────────────────────────────────────────────────────────────────────────────────────────────── 
  1917-04-25 00:00:00   LA RAGIONE                             sn84037024_1917-04-25_ed-1_seq-1_oc  
  1917-04-25 00:00:00   LA RAG ONE                             sn84037024_1917-04-25_ed-2_seq-1_oc  
  1917-04-25 00:00:00   LA RAGIONE                             sn84037024_1917-04-25_ed-3_seq-1_oc  
  1917-04-25 00:00:00   contro i vili, i camorristi, i sicar   sn84037024_1917-04-25_ed-4_seq-1_oc  
  1917-05-05 00:00:00   contro i vili, i camorristi, i sicar   sn84037024_1917-05-05_ed-1_seq-1_oc  
  1917-05-05 00:00:00   LA RAGIONA                             sn84037024_1917-05-05_ed-2_seq-1_

# Random frontend thoughts/questions

- (more or less) functional command-line interface (mostly for development)
- modularity:
  - 'indirect' design: strategies/rules which components and how to place them
  - writing extensions (custom data type preview, etc.) should be easy & accessible for devs outside core team
- progressive dev strategy?
  - implement 'fixed', pre-assembled workflows
  - implement dynamic data exploration option
- major components:
  - data navigator/browser
  - data type specific preview/filter/query components
  

# Random frontend thoughts/questions, pt. II

- trade-offs:
  - usefulness > useability
  - flexibility > clunkyness
  - (enforced?) best data practices -- intuitiveness
- educational aspect:
  - make inherent data management concepts intuitive
  - make users understand difference between raw data (files) and (meaningful) structured pieces of data (context-dependent in some cases)
- tools to take inspiration from:
  - no/low-code tools (tooljet, appsmith, airtable, n8n, node-red, patterns, windmill,...)
  - business-intelligence tools (superset, metabase, tableau, ...)
  - domain specific apps (gephi, ...)
  - notebook and similar products (jupyter, streamlit, plotly dash, observable, ...)
