Skip to content

Commit

Permalink
Merge pull request #59 from ChalkLab/49-jcamp-writer
Browse files Browse the repository at this point in the history
Adds jcamp writer to scidatalib.io.jcamp
  • Loading branch information
stuchalk committed May 3, 2021
2 parents 011ec66 + 263f039 commit 7bf9f50
Show file tree
Hide file tree
Showing 2 changed files with 419 additions and 13 deletions.
307 changes: 307 additions & 0 deletions scidatalib/io/jcamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,21 @@ def read_jcamp(filename: str) -> SciData:
return scidata_obj


def write_jcamp(filename: str, scidata: SciData):
"""
Writer for SciData object to JCAMP-DX files.
JCAMP-DX is Joint Committee on Atomic and Molecular Physical Data eXchange
JCAMP-DX URL: http://jcamp-dx.org/
:param filename: Filename for JCAMP-DX file
:param scidata: SciData object to write out
"""
_write_jcamp_header_section(filename, scidata, mode='w')
_write_jcamp_data_section(filename, scidata, mode='a')
with open(filename, 'a') as fileobj:
fileobj.write('##END=\n')


def _reader_is_float(strings: List[str]) -> bool:
'''
Test if a string, or list of strings, contains a numeric value(s).
Expand Down Expand Up @@ -1006,3 +1021,295 @@ def _read_translate_jcamp_to_scidata(jcamp_dict: dict) -> SciData:
# Issue: https://github.com/ChalkLab/SciDataLib/issues/43

return scidata


def _write_extract_description_section(description: str, key: str) -> str:
"""
Given a description string of the form "KEY1: VALUE1, KEY2: VALUE2, ..."
extract the KEY that matches input key and extract the VALUE
:param desc: The description of form "KEY1: VALUE1, KEY2: VALUE2, ..."
:param key: The key used to extract value from the description
:return: String of the VALUE extracted from description for the key
provided. None is returned if key is not in the description.
"""
desc_list = description.split(_DESCRIPTION_KEY_SPLIT_CHAR)
results = [x for x in desc_list if x.strip().startswith(key)]
if not results:
return None
element = results[0]
value = element.split(':')[1].strip()
return value


def _write_add_header_lines_general(scidata: SciData) -> List[str]:
"""
Get the general graph header lines from the SciData object
used to write the JCAMP-DX header lines
:param scidata: SciData object to write as JCAMP-DX file
:return: List of header lines to write to the JCAMP-DX file
"""
graph = scidata.output.get("@graph")
description = graph.get("description", "")
jcamp_dx = _write_extract_description_section(description, "JCAMP-DX")
lines = []
lines.append(f'##JCAMP-DX={jcamp_dx}')
if "property" in graph["scidata"]:
lines.append(f'##DATA TYPE={graph["scidata"]["property"][0]}')
if "publisher" in graph:
lines.append(f'##ORIGIN={graph["publisher"]}')
if "authors" in graph:
lines.append(f'##OWNER={graph["authors"][0]["name"]}')

date_and_time = None
if "starttime" in graph:
date_and_time = graph.get("starttime").split(" ")
elif "generatedAt" in scidata.output:
date_and_time = scidata.output.get("generatedAt").split(" ")

if date_and_time:
date = date_and_time[0].strip()
lines.append(f'##DATE={date}')

if len(date_and_time) > 1:
time = date_and_time[1].strip()
lines.append(f'##TIME={time}')

the_class = _write_extract_description_section(description, "CLASS")
if the_class:
lines.append(f'##CLASS={the_class}')

sources = graph.get("sources")
if sources:
citation = graph.get("sources")[0]["citation"]
lines.append(f'##SOURCE REFERENCE={citation}')

for source in sources:
nist_description = source.get("citation", "")
if nist_description.startswith("NIST"):
nist_source = _write_extract_description_section(
nist_description,
"NIST SOURCE")
lines.append(f'##$NIST SOURCE={nist_source}')

nist_image = _write_extract_description_section(
nist_description,
"NIST IMAGE")
lines.append(f'##$NIST IMAGE={nist_image}')

return '\n'.join(lines)


def _write_add_header_lines_methodology(scidata: SciData) -> List[str]:
"""
Get the methodology header lines from the SciData object
used to write the JCAMP-DX header lines
:param scidata: SciData object to write as JCAMP-DX file
:return: List of header lines to write to the JCAMP-DX file
"""
lines = []
graph = scidata.output.get("@graph")
methodology = graph.get("scidata").get("methodology")

# Aspects
aspects = methodology.get("aspects")
measurement = ""
for aspect in aspects:
if aspect.get("@id").startswith("procedure"):
lines.append(f'##SAMPLING PROCEDURE={aspect["description"]}')

if aspect.get("@id").startswith("resource"):
lines.append(f'##DATA PROCESSING={aspect["description"]}')

if aspect.get("@id").startswith("measurement"):
measurement = aspect
instrument = measurement.get("instrument")
lines.append(f'##SPECTROMETER/DATA SYSTEM={instrument}')

# Settings
settings = measurement.get("settings", "")
for setting in settings:
if setting.get("property").startswith("instrument parameters"):
parameters = setting["value"]["number"]
lines.append(f'##INSTRUMENT PARAMETERS={parameters}')
if setting.get("property").startswith("path length"):
reverse_length_map = {v: k for k, v in _LENGTH_UNIT_MAP.items()}
scidata_path_unit = settings[1]["value"]["unitref"]
jcamp_path_unit = reverse_length_map[scidata_path_unit]
path_length = f'{settings[1]["value"]["number"]} '
path_length += f'{jcamp_path_unit.upper()}'
lines.append(f'##PATH LENGTH={path_length}')
if setting.get("property").startswith("resolution"):
resolution = setting["value"]["number"]
lines.append(f'##RESOLUTION={resolution}')

return '\n'.join(lines)


def _write_add_header_lines_system(scidata: SciData) -> List[str]:
"""
Get the system header lines from the SciData object
used to write the JCAMP-DX header lines
:param scidata: SciData object to write as JCAMP-DX file
:return: List of header lines to write to the JCAMP-DX file
"""
lines = []
graph = scidata.output.get("@graph")
system = graph.get("scidata").get("system")

# Facets
facets = system.get("facets")
if facets:
for facet in facets:
if facet.get("@id").startswith("compound"):
if "casrn" in facet:
lines.append(f'##CAS REGISTRY NO={facet["casrn"]}')

if "formula" in facet:
lines.append(f'##MOLFORM={facet["formula"]}')

if facet.get("@id").startswith("substance"):
if "phase" in facet:
lines.append(f'##STATE={facet["phase"]}')

if facet.get("@id").startswith("condition"):
items = _PRESSURE_UNIT_MAP.items()
reverse_pressure_map = {v: k for k, v in items}
scidata_punit = facet["value"]["unitref"]
jcamp_punit = reverse_pressure_map[scidata_punit]
partial_pressure = f'{facet["value"]["number"]} '
partial_pressure += f'{jcamp_punit}'
lines.append(f'##PARTIAL_PRESSURE={partial_pressure}')

return '\n'.join(lines)


def _write_add_header_lines_dataset(scidata: SciData) -> List[str]:
"""
Get the dataset header lines from the SciData object
used to write the JCAMP-DX header lines
:param scidata: SciData object to write as JCAMP-DX file
:return: List of header lines to write to the JCAMP-DX file
"""
lines = []

graph = scidata.output.get("@graph")
dataset = graph.get("scidata").get("dataset")

attributes = dataset["attribute"]

reverse_xunit_map = {v: k for k, v in _XUNIT_MAP.items()}
scidata_xunits = attributes[1]["value"]["unitref"]
xunits = reverse_xunit_map[scidata_xunits]

yunits = attributes[5]["value"]["unitref"]
xfactor = attributes[9]["value"]["number"]
yfactor = attributes[10]["value"]["number"]
first_x = attributes[1]["value"]["number"]
last_x = attributes[2]["value"]["number"]
first_y = attributes[5]["value"]["number"]
max_x = attributes[4]["value"]["number"]
min_x = attributes[3]["value"]["number"]
max_y = attributes[8]["value"]["number"]
min_y = attributes[7]["value"]["number"]
npoints = attributes[0]["value"]["number"]
delta_x = (float(last_x) - float(first_x)) / (float(npoints) - 1)

lines.append(f'##XUNITS={xunits}')
lines.append(f'##YUNITS={yunits}')
lines.append(f'##XFACTOR={xfactor}')
lines.append(f'##YFACTOR={yfactor}')
lines.append(f'##DELTAX={delta_x:.6f}')
lines.append(f'##FIRSTX={first_x}')
lines.append(f'##LASTX={last_x}')
lines.append(f'##FIRSTY={first_y}')
lines.append(f'##MAXX={max_x}')
lines.append(f'##MINX={min_x}')
lines.append(f'##MAXY={max_y}')
lines.append(f'##MINY={min_y}')
lines.append(f'##NPOINTS={npoints}')

description = graph.get("description")
xydata = _write_extract_description_section(description, "XYDATA")
lines.append(f'##XYDATA={xydata}')

return '\n'.join(lines)


def _write_jcamp_header_section(
filename: str, scidata: SciData, mode: str = 'w'
):
"""
Writes header of the JCAMP-DX file for given SciData object
to the filename provided. Default mode is to overwrite the file.
:param filename: String name of file to write JCAMP header
:param scidata: SciData object to extract header info
:param mode: File mode to use (i.e. 'w' for overwrite, 'a' for append, ...)
"""
lines = []

graph = scidata.output.get("@graph")
lines.append(f'##TITLE={graph.get("title")}')

headers = [
_write_add_header_lines_general(scidata),
_write_add_header_lines_methodology(scidata),
_write_add_header_lines_system(scidata),
_write_add_header_lines_dataset(scidata),
]
for header in headers:
if header:
lines.append(header)
lines = '\n'.join(lines) + '\n'

with open(filename, mode) as fileobj:
for line in lines:
fileobj.write(line)


def _write_jcamp_data_section(
filename: str,
scidata: SciData,
mode: str = 'w',
precision: int = 3,
trim: int = None,
):
"""
Writes dataset section of the JCAMP-DX file for given SciData object
to the filename provided. Default mode is to overwrite the file.
:param filename: String name of file to write JCAMP header
:param scidata: SciData object to extract dataset info
:param mode: File mode to use (i.e. 'w' for overwrite, 'a' for append, ...)
:param precision: Floating point number for formatting the output data
"""
pass
# TODO: add the dataseries
# Issue: https://github.com/ChalkLab/SciDataLib/issues/43
'''
graph = scidata.output.get("@graph")
dataset = graph.get("scidata").get("dataset")
dataseries = dataset.get("dataseries")
with open(filename, mode) as fileobj:
xdata = []
ydata = []
for data in dataseries:
if data.get("axis") == "x-axis":
xdata = data["parameter"]["valuearray"]["numberarray"]
if data.get("axis") == "y-axis":
ydata = data["parameter"]["valuearray"]["numberarray"]
for x, y in zip(xdata, ydata):
line = f' {x:.{precision}f}, {y:.{precision}f}'
if trim:
xline = f'{x:.{precision}f}'[0:trim]
yline = f'{y:.{precision}f}'[0:trim]
line = f' {xline}, {yline}'
fileobj.write(f'{line}\n')
'''

0 comments on commit 7bf9f50

Please sign in to comment.