-
Notifications
You must be signed in to change notification settings - Fork 2
/
robot_utils.py
145 lines (115 loc) · 4.76 KB
/
robot_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sh # type: ignore
from sh import chmod # type: ignore
from post_setup.post_setup import robot_setup
# Note that sh module can take environment variables, see
# https://amoffat.github.io/sh/sections/special_arguments.html#env
def initialize_robot(robot_path: str) -> list:
"""
This initializes ROBOT with necessary configuration.
During install, ROBOT is downloaded to the same directory as kg-obo,
and the path variable used here is only necessary if it varies from
the kg-obo location.
:param path: Path to ROBOT files.
:return: A list consisting an instance of Command and dict of all environment variables.
"""
# We may have made it this far without installing ROBOT, so do that now if needed
if not os.path.exists(robot_path):
robot_setup()
# Make sure it's executable
chmod("+x","robot")
# Declare environment variables
env = os.environ.copy()
# (JDK compatibility issue: https://stackoverflow.com/questions/49962437/unrecognized-vm-option-useparnewgc-error-could-not-create-the-java-virtual)
# env['ROBOT_JAVA_ARGS'] = '-Xmx8g -XX:+UseConcMarkSweepGC' # for JDK 9 and older
env['ROBOT_JAVA_ARGS'] = '-Xmx12g -XX:+UseG1GC' # For JDK 10 and over
try:
robot_command = sh.Command(robot_path)
except sh.CommandNotFound: # If for whatever reason ROBOT isn't available
robot_command = None
return [robot_command, env]
def relax_owl(robot_path: str, input_owl: str, output_owl: str, robot_env: dict) -> bool:
"""
This method runs the ROBOT relax command on a single OBO.
Has a three-hour timeout limit - process is killed if it takes this long.
:param robot_path: Path to ROBOT files
:param input_owl: Ontology file to be relaxed
:param output_owl: Ontology file to be created (needs valid ROBOT suffix)
:param robot_env: dict of environment variables, including ROBOT_JAVA_ARGS
:return: True if completed without errors, False if errors
"""
success = False
print(f"Relaxing {input_owl} to {output_owl}...")
robot_command = sh.Command(robot_path)
try:
robot_command('relax',
'--input', input_owl,
'--output', output_owl,
'--vvv',
_env=robot_env,
_timeout=10800
)
print("Complete.")
success = True
except sh.ErrorReturnCode_1 as e: # If ROBOT runs but returns an error
print(f"ROBOT encountered an error: {e}")
success = False
return success
def merge_and_convert_owl(robot_path: str, input_owl: str, output_owl: str, robot_env: dict) -> bool:
"""
This method runs a merge and convert ROBOT command on a single OBO.
Has a three-hour timeout limit - process is killed if it takes this long.
:param robot_path: Path to ROBOT files
:param input_owl: Ontology file to be relaxed
:param output_owl: Ontology file to be created (needs valid ROBOT suffix)
:param robot_env: dict of environment variables, including ROBOT_JAVA_ARGS
:return: True if completed without errors, False if errors
"""
success = False
print(f"Merging and converting {input_owl} to {output_owl}...")
robot_command = sh.Command(robot_path)
try:
robot_command('merge',
'--input', input_owl,
'convert',
'--output', output_owl,
'--vvv',
_env=robot_env,
_timeout=10800
)
print("Complete.")
success = True
except sh.ErrorReturnCode_1 as e: # If ROBOT runs but returns an error
print(f"ROBOT encountered an error: {e}")
success = False
return success
def measure_owl(robot_path: str, input_owl: str, output_log: str, robot_env: dict) -> bool:
"""
This method runs the ROBOT measure command on a single OBO in OWL.
Yields all metrics as string and as a log file.
:param robot_path: Path to ROBOT files
:param input_owl: Ontology file to be validated
:param output_owl: Location of log file to be created
:param robot_env: dict of environment variables, including ROBOT_JAVA_ARGS
:return: True if completed without errors, False if errors
"""
success = False
print(f"Obtaining metrics for {input_owl}...")
robot_command = sh.Command(robot_path)
profile = 'Full'
try:
robot_command('measure',
'--input', input_owl,
'--format', 'tsv',
'--metrics', 'all',
'--output', output_log,
_env=robot_env,
)
print(f"Complete. See log in {output_log}")
success = True
except sh.ErrorReturnCode_1 as e: # If ROBOT runs but returns an error
print(f"ROBOT encountered an error: {e}")
success = False
return success