Read Agilent GC raw data, plot and export as csv
================================================

Using this notebook, you can read in the raw data files as saved by Agilent Chemstation, plot them and export them in `.csv` format. 
All you need to do it put in the paths to the folders containing the collection of raw data (of the type `my_experiment_name.D`), a label for the legend and colour for the plot, as well as a filename for saving the plot and exporting the data. The comments in this notebook wil guide you through the process.

There are two parts: 

_Part A_ first sets up the libraries and functions required. Then, it allows you to import, plot and export a manual selection of chromatograms based on information you enter (including the raw data filepath, legend label, plotting color and linestyle, as well as filenames for the plot and exported csv file). 

_Part B_ allows you to quickly export all the runs in all the sequences saved into a shared directory (i.e. a directory containing a directory containing the raw data directories). This automatically selects the name of the run (i.e. whatever is written before `.D`) as the label, makes a plot containing all the runs in a sequence and saves the sequence and the plot automatically using the name of the sequence. 

The sampling rate of Chemstation is very high (60 Hz). In order to reduce the file size of the exported datafiles to an acceptable level, this was downsampled to 6 Hz. There is a comment in the code highlighting where this is done (in the definition of the function `plot_raw_and_export_as_csv()`, if you would like to deactivate the downsampling. 

_Important_, as of November 2024, this notebook does not contain any functionality for integrating peaks or reading integration reports or the like.

Authors: Anna Winiwarter (anna.winiwarter@radicaldot.com)
based on previous work by Kenneth Nielsen

Part A
------

Let's start by importing the necessary external libraries.

In [None]:
import time
import struct
import numpy
import os
import pandas as pd
import matplotlib.colors as mcolors
from pathlib import Path
from struct import unpack
from matplotlib import pyplot as plt
from scipy import signal

STEP 1

Specify the folder containing the ".D" files. Note, if you would like to co-plot files from different folders, 
you can add the folder path directly in front of the file name in step 2..

In [None]:
file_folder = Path(r"Raw data\2024-10-31-my-experiment_1")

STEP 2

Specify the paths to the files, a label, color and linestyle. You can add as many files as you would like to the list. If you need to export a lot of files it might be easier to use the more automated approach in Part B.

For valid color names in Python check out this page: https://matplotlib.org/stable/gallery/color/named_colors.html

For linestyles, see here: https://matplotlib.org/stable/gallery/lines_bars_and_markers/linestyles.html 

"-" is solid, ":" is dotted

In [None]:
file_paths_Ch1 = [
    # (r'xyz.D', legend name, 'blue', "-"), 
              (r'SAMPLE_1.D', 'label 1', 'lightblue', "-"), # enter your own file, label color and linestyle here. 
              ]

STEP 3

Specify a name for saving the plot

In [None]:
plot_name = "my_test_plot_01"

STEP 4

Specify a name for the exported csv file

In [None]:
csv_name = "my_test_file_01"

STEP 5: 

Run all the code below.
_Important_: You need to run the big chunks of code below, even if you want to use Part B only, as the later part uses some of the functions defined here.

First, we need to run some code which will allow us to read the raw binary data files. 

In [None]:
# ----------- Classes and methods for reading .ch file ---------------
# This part of the code is written by Kenneth Nielsen.
# See https://github.com/CINF/PyExpLabSys/blob/master/PyExpLabSys/file_parsers/chemstation.py
# The class/methods used in this script is copied here for the convenience of the user. For the full parser, check the original file.

# Constants used for binary file parsing
ENDIAN = '>'
STRING = ENDIAN + '{}s'
UINT8 = ENDIAN + 'B'
UINT16 = ENDIAN + 'H'
INT16 = ENDIAN + 'h'
INT32 = ENDIAN + 'i'

def parse_utf16_string(file_, encoding='UTF16'):
    """Parse a pascal type UTF16 encoded string from a binary file object"""
    # First read the expected number of CHARACTERS
    string_length = unpack(UINT8, file_.read(1))[0]
    # Then read and decode
    parsed = unpack(STRING.format(2 * string_length),
                    file_.read(2 * string_length))
    return parsed[0].decode(encoding)


class CHFile(object):
    """Class that implementats the Agilent .ch file format version 179

    .. warning:: Not all aspects of the file header is understood, so there may and probably
       is information that is not parsed. See the method :meth:`._parse_header_status` for
       an overview of which parts of the header is understood.

    .. note:: Although the fundamental storage of the actual data has change, lots of
       inspiration for the parsing of the header has been drawn from the parser in the
       `ImportAgilent.m file <https://github.com/chemplexity/chromatography/blob/dev/
       Methods/Import/ImportAgilent.m>`_ in the `chemplexity/chromatography project
       <https://github.com/chemplexity/chromatography>`_ project. All credit for the parts
       of the header parsing that could be reused goes to the author of that project.

    Attributes:
        values (numpy.array): The internsity values (y-value) or the spectrum. The unit
            for the values is given in `metadata['units']`
        metadata (dict): The extracted metadata
        filepath (str): The filepath this object was loaded from

    """

    # Fields is a table of name, offset and type. Types 'x-time' and 'utf16' are specially
    # handled, the rest are format arguments for struct unpack
    fields = (
        ('sequence_line_or_injection', 252, UINT16),
        ('injection_or_sequence_line', 256, UINT16),
        ('start_time', 282, 'x-time'),
        ('end_time', 286, 'x-time'),
        ('version_string', 326, 'utf16'),
        ('description', 347, 'utf16'),
        ('sample', 858, 'utf16'),
        ('operator', 1880, 'utf16'),
        ('date', 2391, 'utf16'),
        ('inlet', 2492, 'utf16'),
        ('instrument', 2533, 'utf16'),
        ('method', 2574, 'utf16'),
        ('software version', 3601, 'utf16'),
        ('software name', 3089, 'utf16'),
        ('software revision', 3802, 'utf16'),
        ('units', 4172, 'utf16'),
        ('detector', 4213, 'utf16'),
        ('yscaling', 4732, ENDIAN + 'd')
    )
    # The start position of the data
    data_start = 6144
    # The versions of the file format supported by this implementation
    supported_versions = {179} #should be 179

    def __init__(self, filepath):
        """Instantiate object

        Args:
            filepath (str): The path of the data file
        """
        self.filepath = filepath
        self.metadata = {}
        with open(self.filepath, 'rb') as file_:
            self._parse_header(file_)
            self.values = self._parse_data(file_)

    def _parse_header(self, file_):
        """Parse the header"""
        # Parse and check version
        length = unpack(UINT8, file_.read(1))[0]
        parsed = unpack(STRING.format(length), file_.read(length))
        version = int(parsed[0])
        if version not in self.supported_versions:
            raise ValueError('Unsupported file version {}'.format(version))
        self.metadata['magic_number_version'] = version

        # Parse all metadata fields
        for name, offset, type_ in self.fields:
            file_.seek(offset)
            if type_ == 'utf16':
                self.metadata[name] = parse_utf16_string(file_)
            elif type_ == 'x-time':
                self.metadata[name] = unpack(ENDIAN + 'f', file_.read(4))[0] / 60000
            else:
                self.metadata[name] = unpack(type_, file_.read(struct.calcsize(type_)))[0]

        # Convert date
        self.metadata['datetime'] = time.strptime(self.metadata['date'], '%d-%b-%y, %H:%M:%S')

    def _parse_header_status(self):
        """Print known and unknown parts of the header"""
        file_ = open(self.filepath, 'rb')
        # Map positions to fields for all the known fields
        knowns = {item[1]: item for item in self.fields}
        # A couple of places has a \x01 byte before a string, these we simply skip
        skips = {325, 3600}
        # Jump to after the magic number version
        file_.seek(4)

        # Initialize variables for unknown bytes
        unknown_start = None
        unknown_bytes = b''
        # While we have not yet reached the data
        while file_.tell() < self.data_start:
            current_position = file_.tell()
            # Just continue on skip bytes
            if current_position in skips:
                file_.read(1)
                continue

            # If we know about a data field that starts at this point
            if current_position in knowns:
                # If we have collected unknown bytes, print them out and reset
                if unknown_bytes != b'':
                    print('Unknown at', unknown_start, repr(unknown_bytes.rstrip(b'\x00')))
                    unknown_bytes = b''
                    unknown_start = None

                # Print out the position, type, name and value of the known value
                print('Known field at {: >4},'.format(current_position), end=' ')
                name, _, type_ = knowns[current_position]
                if type_ == 'x-time':
                    print('x-time, "{: <19}'.format(name + '"'),
                          unpack(ENDIAN + 'f', file_.read(4))[0] / 60000)
                elif type_ == 'utf16':
                    print(' utf16, "{: <19}'.format(name + '"'),
                          parse_utf16_string(file_))
                else:
                    size = struct.calcsize(type_)
                    print('{: >6}, "{: <19}'.format(type_, name + '"'),
                          unpack(type_, file_.read(size))[0])
            else:  # We do not know about a data field at this position If we have already
                # collected 4 zero bytes, assume that we are done with this unkonw field,
                # print and reset
                if unknown_bytes[-4:] == b'\x00\x00\x00\x00':
                    print('Unknown at', unknown_start, repr(unknown_bytes.rstrip(b'\x00')))
                    unknown_bytes = b''
                    unknown_start = None

                # Read one byte and save it
                one_byte = file_.read(1)
                if unknown_bytes == b'':
                    # Only start a new collection of unknown bytes, if this byte is not a
                    # zero byte
                    if one_byte != b'\x00':
                        unknown_bytes = one_byte
                        unknown_start = file_.tell() - 1
                else:
                    unknown_bytes += one_byte

        file_.close()

    def _parse_data(self, file_):
        """Parse the data"""
        # Go to the end of the file and calculate how many points 8 byte floats there are
        file_.seek(0, 2)
        n_points = (file_.tell() - self.data_start) // 8

        # Read the data into a numpy array
        file_.seek(self.data_start)
        return numpy.fromfile(file_, dtype='<d', count=n_points) * self.metadata['yscaling']

    def times(self):
        """The time values (x-value) for the data set in minutes"""
        return numpy.linspace(self.metadata['start_time'], self.metadata['end_time'],
                              len(self.values))

# ----------- Function for plotting and exporting as csv file ---------------
def plot_raw_and_export_as_csv(file_paths, file_folder, plot_name, csv_name, axis_y_lims=(-1, 500), downsampling_rate=10):
    # Create a figure and a grid of subplots
    fig, ax = plt.subplots(1, figsize=(9, 6))

    # Loop over the file paths for Ch1
    data_dict = {} # prepare an empty dictionary to store the data for saving
    for file_path, label, color, linestyle in file_paths:
        if not ".D" in file_path:
            continue
        full_path = file_folder/file_path/"FID1A.ch"
        try:
            # Read the file using the .ch class defined above.
            ch1 = CHFile(full_path)
        except FileNotFoundError:
            print("Could not find a file at: " + str(full_path))
            print("Moving on...")
            continue
        # Get the y values from the CHFile class and DOWNSAMPLE
        ch1_y_raw = ch1.values
        ch1_y = signal.decimate(ch1_y_raw, downsampling_rate)
        data_dict[label+"_values"] = ch1_y
    
        # Get the x values using the method "times" to generate the time value in min from the metadata (start time and end time)
        ch1_x_raw = ch1.times()
        ch1_x = signal.decimate(ch1_x_raw, downsampling_rate)
        # Add to the dictionary
        data_dict[label+"_times"] = ch1_x
    
        # Plot the raw chromatogram with the filenname as label and color and linestyle as selected.
        ax.plot(ch1_x, ch1_y, label=label, color=color, linestyle=linestyle)
   
    ax.set_ylabel('Raw signal / pA')
    ax.set_xlabel('Time / min')
    ax.legend()
    ax.set_xlim([-1, 29.5])
    ax.set_ylim(axis_y_lims) # adjust the y-axis scale depending on the height of your peaks of interest.
    
    # Adjust the layout so that there is enough space between the subplots
    plt.tight_layout()
    plt.savefig(plot_name + ".png", dpi=600)
    plt.show()
    
    # Export the data as csv
    d_sorted = {k: v for k, v in sorted(data_dict.items())}
    export_df = pd.DataFrame(
                        {key: pd.Series(value) for key, value in d_sorted.items()}
                    )
    export_df.to_csv(csv_name + ".csv", index=False)

Now we can load the raw data, plot it and export it as csv. This function will use the file path information, plot name etc as you entered above in steps 1-4. 

Seeing that the autoscale of the y-axis will often not be very meaningful, you can set different limits to the y-axis here. Also, if you would prefer to export the dataset at the original sampling rate of 60Hz, set the downsampling_rate to 1. Note, that this is not recommended, as the resulting csv files become very large. 

In [None]:
plot_raw_and_export_as_csv(file_paths_Ch1, file_folder, plot_name, csv_name, axis_y_lims=(-1, 500), downsampling_rate=10)

Part B
------

If you'd like to export the data from a number of folders, you can also rely on this semi automatic approach below.

First, define the folder path in which you expect the files.

Agilent Chemstation saves the data recorded within a sequence (i.e. one set of samples that were run in one go) in a directory, which then contains the raw data for the individual runs (or samples) in subdirectories with the ending `.D`. The `folder_path`asked for here is the root directory containing the sequence directories. I.e if the full path to the raw data file is `"My raw data/my favourite sequence/sample_01.D/FID1.ch"` then what you want to enter here is `"My raw data"`. (the `r`in front of a string just means that it will read it as raw string and tolerate spaces and other non-standard characters)

In [None]:
folder_path = Path(r"Raw data")

The cell below will then find all the relevant files in the sub-directories of the path given above and print them (to double check, this should now contain all the `.D` files you'd like to export).

In [None]:
subfolder_list = []
for root, subfolders, files in os.walk(folder_path):
    #this is bit annoying to work with, so lets better create a dictionary that has the initial subfolders as key and a list of
    #the files as values. then we can iterate through that and that should work if the root is a folder containing folders
    #that contain the data)
    subfolder_list.append(subfolders)
folders_dict = {}
for folder in subfolder_list[0]:
    for root, subfolders, files in os.walk(os.path.join(folder_path, folder)):
        folders_dict[folder] = subfolders
        break #this is to not enter into the subfolders but actually add the files in the original folder!!
print(folders_dict)

Optional: if you don't want to re-plot and re-export all the files every time you add a new measurment folder to your main folder, you can run the following cell, which will check if a csv file exists for a measurement folder, and if this is the case, it will delete the folder from the dictionary which is passed to the loop processing the plotting and exporting. If you'd like to re-export (e.g. because you've made some changes somewhere), simply skip running this cell. 

If it returns {} as the last line (i.e. and empty dictionary, this means that there is no new data in the directory and therefore, nothing will be plotted/exported.

In [None]:
folders_dict_cut = folders_dict.copy()
for series in folders_dict.keys():
    if os.path.isfile(str(folder_path) + "/" + series + ".csv"):
        print(series + " has been exported, deleting from the folders_dict.")
        del folders_dict_cut[series]
folders_dict = folders_dict_cut
print(folders_dict)  

Now, let's loop though all the measurment series in the folders dictionary, plot and export them. It's using the same function as in Part A, just feeding them automatically. Like in Part A, also here you can change the settings for the axis limits and the sampling rate (absolutely not recommended here, since the files will become really, really large).

In [None]:
for series in folders_dict.keys():
    file_paths_auto = []
    for folder, color in zip(folders_dict[series], mcolors.TABLEAU_COLORS.keys()):
        if not ".D" in folder:
            continue
        file_paths_auto.append((folder, folder, color, "-"))
    plot_raw_and_export_as_csv(file_paths=file_paths_auto, file_folder=Path(r"Raw Data/" + series), plot_name=str(series), csv_name=str(series),
                               axis_y_lims=(-1, 500), downsampling_rate=10)