# 6. DSMS Apps and Pipelines

In this tutorial we see how to create apps and run them manually

### 6.1: Setting up


In [1]:
from dsms import DSMS, KItem, AppConfig
import time

Now source the environmental variables from an `.env` file and start the DSMS-session.

In [2]:
dsms = DSMS(env="../../.env")

### 6.1. Investigating Available Apps

We can investigate which apps are available:

In [3]:
dsms.app_configs

[AppConfig(name=ckan-fetch, specification={'metadata': {'generateName': 'ckan-resource-request-'}}),
 AppConfig(name=csv_tensile_test, specification={'metadata': {'generateName': 'data2rdf-'}}),
 AppConfig(name=csv_tensile_test_f2, specification={'metadata': {'generateName': 'data2rdf-'}}),
 AppConfig(name=excel_notched_tensile_test, specification={'metadata': {'generateName': 'data2rdf-'}}),
 AppConfig(name=excel_shear_test, specification={'metadata': {'generateName': 'data2rdf-'}}),
 AppConfig(name=excel_tensile_test, specification={'metadata': {'generateName': 'data2rdf-'}}),
 AppConfig(name=ternary-plot, specification={'metadata': {'generateName': 'ckan-tenary-app-'}})]

### 6.2 Create a new app config and apply it to a KItem

### 6.2.1 Arbitrary python code

To be defined.

### 6.2.2 - Data2RDF

#### 6.2.2.1 Prepare app and its config

In the following example, we would like to upload some csv with some arbitrary data and describe it through an RDF. This will give us the opportunity to harmonize the entities of the data file through ontological concepts and allow us to convert values of the data frame columns in to any compatible unit we would like to have.

Fist of all, let us define the data:

In [4]:
data = """A,B,C
1.2,1.3,1.5
1.7,1.8,1.9
2.0,2.1,2.3
2.5,2.6,2.8
3.0,3.2,3.4
3.6,3.7,3.9
4.1,4.3,4.4
4.7,4.8,5.0
5.2,5.3,5.5
5.8,6.0,6.1
"""

We will also give the config a defined name:

In [5]:
configname = "testapp2"

As a next step, we want to create a new app specification. The specification is following the definition of an [**Argo Workflow**](https://argo-workflows.readthedocs.io/en/latest/). The workflow shall trigger a pipeline from a docker image with the  [**Data2RDF package**](https://data2rdf.readthedocs.io/en/latest/). 

The image has already been deployed on the k8s cluster of the DSMS and the workflow template with the name `dsms-data2rdf` has been implemented previously. Hence we only need to configure our pipeline for our data shown above, which we would like to upload and describe through an RDF.

For more details about the data2rdf package, please refer to the documentation of Data2RDF mentioned above.

The parameters of the app config are defining the inputs for our Data2RDF pipeline. This e.g. are: 

* the parser kind (`csv` here)
* the time series header length (`1` here)
* the metadata length (`0` here)
* the time series separator (`,` here)
* the log level (`DEBUG` here)
* the mapping 
    * `A` is the test time and has a unit in seconds
    * `B` is the standard force in kilonewtons
    * `C` is the absolut cross head travel in millimeters

In [6]:
parameters = [
    {"name": "parser", "value": "csv"},
    {"name": "time_series_header_length", "value": 1},
    {"name": "metadata_length", "value": 0},
    {"name": "time_series_sep", "value": ","},
    {"name": "log_level", "value": "DEBUG"},
    {
        "name": "mapping",
        "value": """
            [
                {
                    "key": "A",
                    "iri": "https://w3id.org/steel/ProcessOntology/TestTime",
                    "unit": "s"
                },
                {
                    "key": "B",
                    "iri": "https://w3id.org/steel/ProcessOntology/StandardForce",
                    "unit": "kN"
                },
                {
                    "key": "C",
                    "iri": "https://w3id.org/steel/ProcessOntology/AbsoluteCrossheadTravel",
                    "unit": "mm"
                }
            ]
            """,
    },
]

Now we add the parameters to our app specification. We assign a prefix `datardf-` which shall generate a new name with some random characters as suffix. The workflow template with the Docker image we want to run is called `dsms-data2rdf` and its `entrypoint` is `execute_pipeline`.

In [7]:
# Define app specification
specification = {
    "apiVersion": "argoproj.io/v1alpha1",
    "kind": "Workflow",
    "metadata": {"generateName": "data2rdf-"},
    "spec": {
        "entrypoint": "execute_pipeline",
        "workflowTemplateRef": {"name": "dsms-data2rdf"},
        "arguments": {"parameters": parameters},
    },
}

Now we instanciate the new app config:

In [8]:
appspec = AppConfig(
    name=configname,
    specification=specification,  # this can also be a file path to a yaml file instead of a dict
)


We commit the new app config:

In [9]:
dsms.commit()

Now we would like to apply the app config to a KItem. The set the `triggerUponUpload` must be set to `True` so that the app is triggered automatically when we upload an attachment.

Additionally, we must tell the file extension for which the upload shall be triggered. Here it is `.csv`.

We also want to generate a qr code as avatar for the KItem with `avatar={"include_qr": True}`.

In [10]:
item = KItem(
    name="my tensile test experiment",
    ktype_id=dsms.ktypes.Dataset,
    kitem_apps=[
        {
            "executable": appspec.name,
            "title": "data2rdf",
            "additional_properties": {
                "triggerUponUpload": True,
                "triggerUponUploadFileExtensions": [".csv"],
            },
        }
    ],
    avatar={"include_qr": True},
)

We commit the KItem:

In [11]:
dsms.commit()

Now we add our data with our attachment:

In [12]:
item.attachments = [{"name": "dummy_data.csv", "content": data}]

And we commit again:

In [13]:
dsms.commit()

#### 6.2.2.2 Get results

Now we can verify that the data extraction was successful:

In [14]:
print(item)

KItem(

	name = my tensile test experiment, 

	id = 58683a2d-7823-4638-bc8e-91b461afa593, 

	ktype_id = dataset, 

	in_backend = True, 

	slug = mytensiletestexperiment-58683a2d, 

	annotations = [], 

	attachments = [
		{
			name: dummy_data.csv
		}
	], 

	linked_kitems = [], 

	affiliations = [], 

	authors = [
		{
			user_id: 7f0e5a37-353b-4bbc-b1f1-b6ad575f562d
		}
	], 

	avatar_exists = True, 

	contacts = [], 

	created_at = 2024-08-20 08:04:25.756982, 

	updated_at = 2024-08-20 08:04:25.756982, 

	external_links = [], 

	kitem_apps = [
		{
			kitem_app_id: 22,
			executable: testapp2,
			title: data2rdf,
			description: None,
			tags: None,
			additional_properties: {triggerUponUpload: True, triggerUponUploadFileExtensions: ['.csv']}
		}
	], 

	summary = None, 

	user_groups = [], 

	custom_properties = None, 

	dataframe = [
		{
			column_id: 0,
			name: TestTime
		}, 
		{
			column_id: 1,
			name: StandardForce
		}, 
		{
			column_id: 2,
			name: AbsoluteCrossheadTravel
		}
	]

And also that the RDF generation was successful:

In [15]:
print(item.subgraph.serialize())

@prefix csvw: <http://www.w3.org/ns/csvw#> .
@prefix dcat: <http://www.w3.org/ns/dcat#> .
@prefix dcterms: <http://purl.org/dc/terms/> .
@prefix ns1: <http://qudt.org/schema/qudt/> .
@prefix ns2: <http://xmlns.com/foaf/spec/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

<https://bue.materials-data.space/58683a2d-7823-4638-bc8e-91b461afa593/dataset> a dcat:Dataset ;
    dcterms:hasPart <https://bue.materials-data.space/58683a2d-7823-4638-bc8e-91b461afa593/tableGroup> ;
    dcat:distribution [ a dcat:Distribution ;
            dcat:accessURL "https://bue.materials-data.space/api/knowledge/data_api/58683a2d-7823-4638-bc8e-91b461afa593"^^xsd:anyURI ;
            dcat:mediaType "http://www.iana.org/assignments/media-types/text/csv"^^xsd:anyURI ] .

<https://bue.materials-data.space/58683a2d-7823-4638-bc8e-91b461afa593/AbsoluteCrossheadTravel> a <https://w3id.org/steel/ProcessOntology/AbsoluteCrossheadTravel> ;
    ns1:hasUnit 

And now we are able to convert our data into any compatiable unit we want. For the `StandardForce`, it was previously `kN`, but we want to have it in `N` now:


In [16]:
item.dataframe.StandardForce.convert_to("N")

[1300.0,
 1800.0,
 2100.0,
 2600.0,
 3200.0,
 3700.0,
 4300.0,
 4800.0,
 5300.0,
 6000.0]

#### 6.2.2.3 Manipulate dataframe

We are able to retrieve the dataframe as pd.DataFrame:

In [17]:
item.dataframe.to_df()

Unnamed: 0,TestTime,StandardForce,AbsoluteCrossheadTravel
0,1.2,1.3,1.5
1,1.7,1.8,1.9
2,2.0,2.1,2.3
3,2.5,2.6,2.8
4,3.0,3.2,3.4
5,3.6,3.7,3.9
6,4.1,4.3,4.4
7,4.7,4.8,5.0
8,5.2,5.3,5.5
9,5.8,6.0,6.1


We are able to overwrite the dataframe with new data:

In [18]:
item.dataframe = {
    "TestTime": list(range(100)),
    "StandardForce": list(range(1,101)),
    "AbsoluteCrossheadTravel": list(range(2,102))
}
dsms.commit()

We are able to retrieve the data colum-wise:

In [19]:
for column in item.dataframe:
    print("column:", column.name, ",\n", "data:", column.get())


column: TestTime ,
 data: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
column: StandardForce ,
 data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
column: AbsoluteCrossheadTravel ,
 data: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 3

... and also to modify the dataframe directly as we need:

In [20]:
new_df = item.dataframe.to_df().drop(['TestTime'], axis=1)
item.dataframe = new_df

#### 6.2.2.4 Run app on demand

We are able to run the app on demand, not being triggered automatically during the upload of an attachment every time. For this purpose, we just need to refer to the name of the app we assigned during the KItem creation ( here it is simply `data2rdf`).

Additionally, we need to tell the `attachment_name` and hand over the access token and host url to the app by explicitly setting `set_token` and `set_host_url` to `True`.

The app is running synchronously, hence the `job` is created when the pipeline run finished.

In [21]:
job = item.kitem_apps.by_title["data2rdf"].run(
    attachment_name=item.attachments[0].name,
    set_token=True,
    set_host_url=True
)

We are able to retrieve the job status:

In [22]:
job.status

JobStatus(phase='Succeeded', estimated_duration=None, finished_at='08/20/2024, 08:05:35', started_at='08/20/2024, 08:05:15', message=None, progress='1/1')

... and the job logs:

In [23]:
print(job.logs)

"[2024-08-20 08:05:23,481 - dsms_data2rdf.main - INFO]: Fetch KItem: \n KItem(\n\n\tname = my tensile test experiment, \n\n\tid = 58683a2d-7823-4638-bc8e-91b461afa593, \n\n\tktype_id = dataset, \n\n\tin_backend = True, \n\n\tslug = mytensiletestexperiment-58683a2d, \n\n\tannotations = [], \n\n\tattachments = [\n\t\t{\n\t\t\tname: dummy_data.csv\n\t\t}\n\t], \n\n\tlinked_kitems = [], \n\n\taffiliations = [], \n\n\tauthors = [\n\t\t{\n\t\t\tuser_id: 7f0e5a37-353b-4bbc-b1f1-b6ad575f562d\n\t\t}\n\t], \n\n\tavatar_exists = True, \n\n\tcontacts = [], \n\n\tcreated_at = 2024-08-20 08:04:25.756982, \n\n\tupdated_at = 2024-08-20 08:04:25.756982, \n\n\texternal_links = [], \n\n\tkitem_apps = [\n\t\t{\n\t\t\tkitem_app_id: 22,\n\t\t\texecutable: testapp2,\n\t\t\ttitle: data2rdf,\n\t\t\tdescription: None,\n\t\t\ttags: None,\n\t\t\tadditional_properties: {triggerUponUpload: True, triggerUponUploadFileExtensions: ['.csv']}\n\t\t}\n\t], \n\n\tsummary = None, \n\n\tuser_groups = [], \n\n\tcustom_proper

In case we would like to run the job in the background, we simply add a `wait=False`:

In [24]:
job = item.kitem_apps.by_title["data2rdf"].run(
    attachment_name=item.attachments[0].name,
    set_token=True,
    set_host_url=True,
    wait=False,
)

We are able to monitor the job status and logs asynchronously:

In [25]:
while True:
    time.sleep(1)
    print("\n Current status:")
    print(job.status)
    print("\n Current logs:")
    print(job.logs)
    if job.status.phase != "Running":
        break


 Current status:
phase='Running' estimated_duration=None finished_at=None started_at='08/20/2024, 08:05:39' message=None progress='0/1'

 Current logs:
""

 Current status:
phase='Running' estimated_duration=None finished_at=None started_at='08/20/2024, 08:05:39' message=None progress='0/1'

 Current logs:
""

 Current status:
phase='Running' estimated_duration=None finished_at=None started_at='08/20/2024, 08:05:39' message=None progress='0/1'

 Current logs:
""

 Current status:
phase='Running' estimated_duration=None finished_at=None started_at='08/20/2024, 08:05:39' message=None progress='0/1'

 Current logs:
""

 Current status:
phase='Running' estimated_duration=None finished_at=None started_at='08/20/2024, 08:05:39' message=None progress='0/1'

 Current logs:
""

 Current status:
phase='Running' estimated_duration=None finished_at=None started_at='08/20/2024, 08:05:39' message=None progress='0/1'

 Current logs:
""

 Current status:
phase='Running' estimated_duration=None finish

**IMPORTANT**: When job has run asychronously (in the background), we need to manually refresh the KItem afterwards:

In [26]:
item.refresh()

Clean up the DSMS from the tutorial

In [27]:
del dsms[item]
del dsms[appspec]
dsms.commit()