In [1]:
from kiara import KiaraAPI, Kiara
import networkx
from networkx.readwrite import json_graph
import json
import os

In [2]:
# all (global) variables that will be used in this notebook

# the folder this notebook lives in
current_path = os.getcwd()

# the onboarding pipeline, defined in 'corpus_onboarding.yaml' in the 'pipelines' folder
onboarding_pipeline = os.path.join(current_path, 'pipelines', 'corpus_onboarding.yaml')
# the onboarding pipeline, defined in 'topic_modeling.yaml' in the 'pipelines' folder
topic_modeling_pipeline = os.path.join(current_path, 'pipelines', 'topic_modeling.yaml')


# the local path to example dataset, the defaults is located in the `example_data/mini_corpus` folder next to this notebook
# change to a different one by adjusting the following variable, e.g.:
# corpus_path = '/Users/mariella.decrouychan/Documents/GitHub/kiara_plugin.playground/examples/data/CI_newspaper_subcorpora'
corpus_path = os.path.join(current_path, 'example_data', 'mini_corpus')

# the general alias related to the current data prep (to be able to easily spot data created now in data registry)
gen_alias = 'test5oct22'

In [3]:
#api = KiaraAPI.instance("topic_modeling")
api = KiaraAPI.instance()


In [4]:
# version of Kiara used in this notebook
!pip show kiara

Name: kiara
Version: 0.4.21
Summary: Data-centric workflow orchestration.
Home-page: https://github.com/DHARPA-Project/kiara
Author: Markus Binsteiner
Author-email: markus@frkl.io
License: MPL-2.0
Location: /opt/miniconda3/envs/lineage2/lib/python3.10/site-packages
Requires: airium, alembic, appdirs, bidict, black, click, dag-cbor, deepdiff, Deprecated, distro, dpath, filetype, humanfriendly, jinja2, jupytext, mistune, mkdocstrings, mmh3, multiformats, networkx, orjson, pp-ez, pydantic, python-dateutil, python-slugify, pyzmq, regex, rich, rich-click, ruamel.yaml, sortedcontainers, sqlalchemy, sqlalchemy-utc, sqlalchemy-utils, stevedore, structlog, textual, tzlocal
Required-by: kiara-plugin.core-types, kiara-plugin.tabular


#### 1. Overview of the operations we will be experimenting on to create lineage data

- corpus onboarding
example corpus onboarding pipeline from https://github.com/DHARPA-Project/DHARPA-Project-viz-observable/blob/main/dag-lineage/pipelines/corpus_onboarding.yaml 

In [5]:
onboarding_op = api.get_operation(onboarding_pipeline, allow_external=True)
onboarding_op                        

- text processing
example topic modeling pipeline from https://github.com/DHARPA-Project/DHARPA-Project-viz-observable/blob/main/dag-lineage/pipelines/topic_modeling.yaml 

In [6]:
topic_modeling_op = api.get_operation(topic_modeling_pipeline, allow_external=True)
topic_modeling_op

#### 2. Lineage data for the onboarding step

- Data onboarding

In [7]:
onboarding_result = api.run_job(operation=onboarding_op, inputs={'text_corpus_folder_path': corpus_path})

In [8]:
table = onboarding_result["corpus_table"]
api.store_value(table, f'tm_{gen_alias}_onboard')

StoreValueResult(value=Value(id=88037481-de5d-4df2-b7d1-0e405aa648f1, type=table, status=set, initialized=True optional=False), aliases=['tm_test5oct22_onboard'], persisted_data=PersistedData(model_id=zdpuAxPin6TsfMCtxHYfmPXujaY8AqcLgKG87vHrmfYKbvRbv, category=instance.persisted_data, fields=[data_type, data_type_config, serialization_profile, metadata, hash_codec, archive_id, chunk_id_map]), error=None)

In [9]:
# checking how that would appear in CLI
! kiara data list


╭─ Available aliases ──────────────────────────────────────────────────────────╮
│                                                                              │
│  [1m [0m[1malias                [0m[1m [0m [1m [0m[1mtype [0m[1m [0m [1m [0m[1m     size[0m[1m [0m                                 │
│  ───────────────────────────────────────────                                 │
│   tm_test5oct22_onboard   table   300.77 KB                                  │
│                                                                              │
╰──────────────────────────────────────────────────────────────────────────────╯


- lineage data preparation

In [10]:
corpus_table = api.get_value(value='tm_test5oct22_onboard')

In [11]:
graph = corpus_table.lineage.module_graph
result = json_graph.node_link_data(graph)

In [12]:
result

{'directed': True,
 'multigraph': False,
 'graph': {},
 'nodes': [{'data_type': 'table',
   'label': '[this value]',
   'node_type': 'value',
   'data_type_config': {},
   'level': 1,
   'id': 'value:88037481-de5d-4df2-b7d1-0e405aa648f1'},
  {'module_type': 'table.merge',
   'module_config': {'constants': {},
    'defaults': {},
    'inputs_schema': {'source_table': {'type': 'table',
      'type_config': {},
      'default': '__not_set__',
      'optional': False,
      'is_constant': False,
      'doc': {'description': 'The original table.', 'doc': None}},
     'date_array': {'type': 'array',
      'type_config': {},
      'default': '__not_set__',
      'optional': False,
      'is_constant': False,
      'doc': {'description': 'The array containing the parsed date items.',
       'doc': None}}},
    'column_map': {'date': 'date_array',
     'content': 'source_table.content',
     'file_name': 'source_table.file_name'}},
   'label': 'table.merge',
   'node_type': 'operation',
   'lev

In [13]:
nodes = graph.nodes.data()
augmented_nodes = dict()

In [14]:
def get_info(node):
    # all this is terribly inefficient
    if node[1]["node_type"] == "operation":
        result = api.retrieve_module_type_info(node[1]["module_type"]).dict()
    elif node[1]["node_type"] == "value":
        value_id = node[0][6:]
        v = api.get_value(value_id)

        render_result = api.render_value(value=v, target_format="string").rendered

        result = {
            "preview": render_result
        }
    return result

for idx, node in enumerate(nodes):
    node_dict = {
        "id": node[0],
        "desc": node[1],
        "parentIds": [pred for pred in graph.predecessors(node[0])],
        "info": get_info(node)
    }
    augmented_nodes[idx] = node_dict
node_dict

{'id': 'value:c6b4f1c4-a9c4-4e32-96cc-301e223fc1f9',
 'desc': {'label': 'remove_tokens (list)',
  'node_type': 'value',
  'data_type': 'list',
  'data_type_config': {},
  'level': 6},
 'parentIds': [],
 'info': {'preview': "list_data=[] item_schema={'title': 'list', 'type': 'object'} python_class=PythonClass(model_id=list, category=instance.wrapped_python_class, fields=[python_class_name, python_module_name, full_name])"}}

In [15]:
augmented_nodes

{0: {'id': 'value:88037481-de5d-4df2-b7d1-0e405aa648f1',
  'desc': {'data_type': 'table',
   'label': '[this value]',
   'node_type': 'value',
   'data_type_config': {},
   'level': 1},
  'parentIds': ['module:zdpuAzybdPqAwszJEtqjGgiafHHk2y6bVLerT1TfUFkhk94xV'],
  'info': {'preview': "date\tcontent\tfile_name\t\n1917-04-25 00:00:00\tLA RAGIONE\tsn84037024_1917-04-25_ed-1_seq-1_ocr.txt\t\n1917-04-25 00:00:00\tLA RAG ONE\tsn84037024_1917-04-25_ed-2_seq-1_ocr.txt\t\n1917-04-25 00:00:00\tLA RAGIONE\tsn84037024_1917-04-25_ed-3_seq-1_ocr.txt\t\n1917-04-25 00:00:00\tcontro i vili, i camorristi, i sicari, i falsari e gli austriacanti, nemici della patria di e di quella d adozione.\tsn84037024_1917-04-25_ed-4_seq-1_ocr.txt\t\n1917-05-05 00:00:00\tcontro i vili, i camorristi, i sicari, i falsari e gli austriacanti, nemici della patria di origine e di quella d' adozione\tsn84037024_1917-05-05_ed-1_seq-1_ocr.txt\t\n1917-05-05 00:00:00\tLA RAGIONA\tsn84037024_1917-05-05_ed-2_seq-1_ocr.txt\t\n1917-0

In [16]:
#uncomment to export dataset for the viz

# res = json.dumps(augmented_nodes)
# with open("lineage_data_1.json", "w") as outfile:
#     outfile.write(res)

#### 3. Lineage data for nlp step

- running the example TM pipeline with previously onboarded data

In [17]:
nlp_step = api.run_job(operation=topic_modeling_op, inputs={'corpus': corpus_table})

In [20]:
preprocessed_corpus = nlp_step["coherence_map"]
api.store_value(preprocessed_corpus, f'tm_{gen_alias}_coherence_map')

StoreValueResult(value=Value(id=fe802251-d595-4f09-869a-71e67eca18f1, type=dict, status=set, initialized=True optional=False), aliases=['tm_test5oct22_coherence_map'], persisted_data=PersistedData(model_id=zdpuApg4H742soUdpA3iLmsPmb5koxkcinR2VPkjG3vfg5J5j, category=instance.persisted_data, fields=[data_type, data_type_config, serialization_profile, metadata, hash_codec, archive_id, chunk_id_map]), error=None)

In [21]:
!kiara data list


╭─ Available aliases ──────────────────────────────────────────────────────────╮
│                                                                              │
│  [1m [0m[1malias                            [0m[1m [0m [1m [0m[1mtype [0m[1m [0m [1m [0m[1m     size[0m[1m [0m                     │
│  ───────────────────────────────────────────────────────                     │
│   tm_test5oct22_coherence_map         dict    228 bytes                      │
│   tm_test5oct22_onboard               table   300.77 KB                      │
│   tm_test5oct22_preprocessed_corpus   array   489.27 KB                      │
│                                                                              │
╰──────────────────────────────────────────────────────────────────────────────╯


- lineage data preparation

In [22]:
# this is not really necessary, since the 'preprocessed_corpus' variable still holds that value
preprocessed_corpus = api.get_value(value='tm_test5oct22_coherence_map')

In [23]:
graph = preprocessed_corpus.lineage.module_graph
result = json_graph.node_link_data(graph)

In [24]:
result

{'directed': True,
 'multigraph': False,
 'graph': {},
 'nodes': [{'data_type': 'dict',
   'label': '[this value]',
   'node_type': 'value',
   'data_type_config': {},
   'level': 1,
   'id': 'value:fe802251-d595-4f09-869a-71e67eca18f1'},
  {'module_type': 'generate.LDA.for.tokens_array',
   'module_config': {'constants': {}, 'defaults': {}},
   'label': 'generate.LDA.for.tokens_array',
   'node_type': 'operation',
   'level': 3,
   'id': 'module:zdpuAoRTS61Hf3P8nNJFAy1ryxbHQmwjC7K2qpr21X5fRM19t'},
  {'label': 'tokenize_by_word (boolean)',
   'node_type': 'value',
   'data_type': 'boolean',
   'data_type_config': {},
   'level': 8,
   'id': 'value:195805cd-bdda-45d5-abb3-968fc5d3bfd5'},
  {'label': 'num_topics_max (integer)',
   'node_type': 'value',
   'data_type': 'integer',
   'data_type_config': {},
   'level': 4,
   'id': 'value:d68fb5ef-1c43-4aef-8957-3f7f1f7e6de0'},
  {'label': 'num_topics_min (integer)',
   'node_type': 'value',
   'data_type': 'integer',
   'data_type_config':

In [25]:
nodes = graph.nodes.data()
augmented_nodes = dict()

In [26]:

for idx, node in enumerate(nodes):
    node_dict = {
        "id": node[0],
        "desc": node[1],
        "parentIds": [pred for pred in graph.predecessors(node[0])],
        "info": get_info(node)
    }
    augmented_nodes[idx] = node_dict

In [27]:
augmented_nodes

{0: {'id': 'value:fe802251-d595-4f09-869a-71e67eca18f1',
  'desc': {'data_type': 'dict',
   'label': '[this value]',
   'node_type': 'value',
   'data_type_config': {},
   'level': 1},
  'parentIds': ['module:zdpuAoRTS61Hf3P8nNJFAy1ryxbHQmwjC7K2qpr21X5fRM19t'],
  'info': {'preview': "dict_data={'7': 0.2352272347011528, '8': 0.22535913106326597, '9': 0.22648812236315757} data_schema={'title': 'dict', 'type': 'object'} python_class=PythonClass(model_id=dict, category=instance.wrapped_python_class, fields=[python_class_name, python_module_name, full_name])"}},
 1: {'id': 'module:zdpuAoRTS61Hf3P8nNJFAy1ryxbHQmwjC7K2qpr21X5fRM19t',
  'desc': {'module_type': 'generate.LDA.for.tokens_array',
   'module_config': {'constants': {}, 'defaults': {}},
   'label': 'generate.LDA.for.tokens_array',
   'node_type': 'operation',
   'level': 3},
  'parentIds': ['value:195805cd-bdda-45d5-abb3-968fc5d3bfd5',
   'value:d68fb5ef-1c43-4aef-8957-3f7f1f7e6de0',
   'value:a6454c4e-2605-444d-bff9-b7e8d184091e',
 

In [28]:
# #uncomment to export dataset for the viz
# res = json.dumps(augmented_nodes)
# with open("lineage_data_2.json", "w") as outfile:
#     outfile.write(res)