# Translation of an xdf file to snirf format 

We have the same nirs data in two formats: xdf and snirf. 

The xdf format is a general format for storing time series data (https://github.com/sccn/xdf). 

The snirf format is a format for storing nirs data (https://github.com/fNIRS/snirf).



In [None]:
import numpy as np
import matplotlib.pyplot as plt

%matplotlib qt

# Flag to run tests and visualizations for each function 
doRunTests = False

if doRunTests: 
    # define the files to be tested 
    xdf_fullFile = "/Users/denismottet/Documents/GitHub/NeuArm-DataAnalysis/data/AgePie/015_AgePie_20211112_1_r(1).xdf"
    xdf_fullFile = "/Users/denismottet/Documents/GitHub/NeuArm-DataAnalysis/data/ReArm.lnk/twoTestPatientsForOXY4/C1P07_20210802_1_r.xdf"


## Load the xdf file and return only the NIRS and Event streams

In [None]:
import pyxdf


def get_NIRS_and_Event_streams(x_file):
    """
    Load the xdf file and returns only the NIRS and Event streams
    """
    # load only the NIRS and Event streams
    data, header = pyxdf.load_xdf(
        filename=x_file,
        select_streams=[{"type": "NIRS"}, {"type": "Event"}],
        synchronize_clocks=True,
        # NOTE: dejitter is necessary to get closer to the oxy4 data
        dejitter_timestamps=True,
        verbose=False,
    )
    # find the nirs stream among the list of streams
    for i in range(len(data)):
        if data[i]["info"]["type"][0] == "NIRS":
            nirsStream = data[i]
            break
    # find the Event stream among the list of streams
    for i in range(len(data)):
        if data[i]["info"]["type"][0] == "Event":
            eventStream = data[i]
            break

    return nirsStream, eventStream


if doRunTests:
    # load the xdf file and get the NIRS and Event streams
    nirsStream, eventStream = get_NIRS_and_Event_streams(xdf_fullFile)
    # print the name and type of each retruned stream
    print("File: {}".format(xdf_fullFile))
    print(
        "Nirs : {}, {}".format(
            nirsStream["info"]["name"][0], nirsStream["info"]["type"][0]
        )
    )
    print(
        "Event: {}, {}".format(
            eventStream["info"]["name"][0], eventStream["info"]["type"][0]
        )
    )
    # print the number of samples in each stream
    print("Nirs : {}".format(nirsStream["time_series"].shape))

# Reorganize the xdf channels as it is in the corresponding snirf file

In the XDF file, we have 34 channels, but only 16 are of interest, i.e., only channels 0 to 7 and 24 to 31 are effectively used for the nirs data that is also present in the snirf file. It seems that the channels are organized in the following way:
 - channels 0 to 7 are the channels on the left hemisphere
 - channels 24 to 31 are the channels on the right hemisphere

However, in the snirf file, the channels are organized in the following way:
- channels with the lowest wavelength first (i.e., 757 nm)
- channels with the highest wavelength last (i.e., 852 nm)

Moreover, the channels values are stored as a log of the inverse of the intensity.
# modify the data according to the ARTINIS matlab code

In the ARTINIS matlab code, the data is transformed as follows:
```matlab
    data.dataTimeSeries = 1./exp(log(10).* [rawvals(:, 2:2:end) rawvals(:, 1:2:end)]); %change dataTimeSeries to correct values
```
 In python, we can do the same thing with the following code:
```python
    data.dataTimeSeries = 1./np.exp(np.log(10)*np.concatenate((rawvals[:, 1::2], rawvals[:, 0::2]), axis=1))
```

The key question is why do we need to transform the values to the log of the inverse of the intensity?

```python
    data.x = 1./np.exp(np.log(10)* x) 
    # which is equivalent to
    data.x = (1.0 / 10.0) ** x
```

By definition, optical density is $ OD = log_{10}(\frac{I_0}{I}) $, where $I_0$ is the incident light intensity and $I$ is the transmitted light intensity, 

As snirf stores the data in the form of optical density, it comes that xdf data is the base 10 logarithm of the inverse of the optical density:  
$$ y = [\frac{1}{10}]^x \Leftrightarrow \frac{1}{y} = 10^x \Leftrightarrow  log_{10}(\frac{1}{y}) = x $$
where $y$ is the optical density and $x$ is the xdf data.



In [None]:
def print_xdf_stream_labels(stream):
    """
    Print the labels of the channels by channel number
    """

    channels = []
    for chan in stream["info"]["desc"][0]["channels"][0]["channel"]:
        label = chan["label"]
        unit = chan["unit"]
        type = chan["type"]
        channels.append({"label": label, "unit": unit, "type": type})
    print("Found {} channels: ".format(len(channels)))
    for i in range(len(channels)):
        print(
            "  {:02d}: {} ({} {})".format(
                i,
                channels[i]["label"][0][8:],  # remove the first 8 characters
                channels[i]["type"][0],
                channels[i]["unit"][0],
            )
        )


def print_xdf_stream_labels_and_first_last_data(stream):
    """
    Print the labels of the channels + first data value by channel number
    """
    channels = []
    for chan in stream["info"]["desc"][0]["channels"][0]["channel"]:
        label = chan["label"]
        unit = chan["unit"]
        type = chan["type"]
        channels.append({"label": label, "unit": unit, "type": type})
    print("Found {} channels: ".format(len(channels)))
    for i in range(len(channels)):
        print(
            "  {:02d}: {} ({} {}) [{:5.3f}...{:5.3f}]".format(
                i,
                channels[i]["label"][0][8:],  # remove the first 8 characters
                channels[i]["type"][0],
                channels[i]["unit"][0],
                stream["time_series"][0, i],
                stream["time_series"][-1, i],
            )
        )


def xdf_reorganize_channels_as_in_snirf(nirsStream):
    """
    Reorganize the xdf stream channels and info/desc0/channels0/ as it is in the snirf file
    """
    # if the stream already has 16 channels, do nothing
    if len(nirsStream["info"]["desc"][0]["channels"][0]["channel"]) == 16:
        print("Stream already has 16 channels")
        return nirsStream

    # # modify the data according to the ARTINIS matlab code
    # # data.dataTimeSeries = 1./exp(log(10).* [rawvals(:, 2:2:end) rawvals(:, 1:2:end)]);%change dataTimeSeries to correct values
    # In the XDF file, we have 34 channels, but only 16 are of interest
    # only channels 0 to 7 and 24 to 31 are effectively used
    # and the order should be changed to match the snirf file (small wavelength first)

    new_order = [
        1,
        3,
        5,
        7,
        25,
        27,
        29,
        31,
        0,
        2,
        4,
        6,
        24,
        26,
        28,
        30,
    ]

    # keep only the 16 channels used and in the snirf order
    # NOTE: we do this for the time series AND the channel labels in info/desc0/channels0/
    channels = []
    time_series = np.zeros((len(nirsStream["time_series"]), len(new_order)))
    for i in range(len(new_order)):
        iNew = new_order[i]
        channels.append(nirsStream["info"]["desc"][0]["channels"][0]["channel"][iNew])
        time_series[:, i] = nirsStream["time_series"][:, iNew]

    # modify the stream itself
    nirsStream["info"]["desc"][0]["channels"][0]["channel"] = channels
    nirsStream["time_series"] = time_series

    # convert the modified stream to the correct values for snirf
    # NOTE: comment out => only change the order of the channels (for verification)
    # NOTE: the following two lines are equivalent
    # nirsStream["time_series"] = 1.0 / 10.0 ** nirsStream["time_series"]
    nirsStream["time_series"] = 1.0 / np.exp(np.log(10) * nirsStream["time_series"])


if doRunTests:
    xdf_reorganize_channels_as_in_snirf(nirsStream)

# Explore the events in the marker stream

In [None]:
if doRunTests:

    def get_events_100_111(event_data):
        """
        Find all events containing the word 111 and 100
        """
        i111 = []
        i100 = []
        for i in range(len(event_data)):
            if "111" in event_data[i][0]:
                i111.append(i)
            if "100" in event_data[i][0]:
                i100.append(i)
        print("Found {} events 111".format(len(i111)))
        print("Found {} events 100".format(len(i100)))
        return i111, i100

    event_data = eventStream["time_series"]
    event_time = eventStream["time_stamps"]

    i111, i100 = get_events_100_111(event_data)

    data_111 = []
    for i in i111:
        print("Event {}: {} at {}".format(i, event_data[i][0][3:], event_time[i]))
        data_111.append([event_time[i], 5.0, 1.0])

    data_100 = []
    for i in i100:
        print("Event {}: {} at {}".format(i, event_data[i][0][3:], event_time[i]))
        data_100.append([event_time[i], 5.0, 1.0])

    stim_data = []
    for i in i111:
        stim_data.append([event_time[i], 5.0, 1.0])

    # make it a numpy array
    data_111 = np.array(data_111)
    data_100 = np.array(data_100)

    print("data_111: ", data_111.shape)
    print("data_100: ", data_100.shape)

# Prepare the xdf data for the snirf file

In [None]:
def make_xdf_time_relative_to_first_data(nirsStream, eventStream):
    """
    Make the time of the NIRS and Event streams relative to the first data time
    As this is in the snirf file
    """

    nirs_time = nirsStream["time_stamps"]
    event_time = eventStream["time_stamps"]

    # make time relative to the beginning of the recording
    t_zero = nirs_time[0]
    nirs_time = nirs_time - t_zero
    event_time = event_time - t_zero

    nirsStream["time_stamps"] = nirs_time
    eventStream["time_stamps"] = event_time


if doRunTests:
    make_xdf_time_relative_to_first_data(nirsStream, eventStream)
    print("nirs_time: ", nirsStream["time_stamps"].shape)
    print("nirs_data: ", nirsStream["time_series"].shape)

# Copy the template snirf file to a new file

In [None]:
from snirf import Snirf
import os


def copy_snirf_file(file_name, new_file_name):
    """
    Copy the snirf file to a new file using the snirf library
    """
    snirf = Snirf(file_name, "r")
    snirf.save(new_file_name)
    snirf.close()


def create_snirf_in_results(xdf_fullFile):
    """
    Create a snirf (template) file named as the xdf file in the results directory
    """

    # get the file name without the path and extension from the xdf file
    file_name = os.path.basename(xdf_fullFile)
    new_fname = os.path.splitext(file_name)[0] + ".snirf"

    template_file = "rearm_template.snirf"
    # if the pwd is "notebooks", then go up one level to the root directory
    if os.path.basename(os.getcwd()) == "notebooks":
        new_fpath = os.path.join("..", "results")
    elif os.path.basename(os.getcwd()) == "NeuArm-DataAnalysis":
        new_fpath = os.path.join("results")

    new_file_name = os.path.join(new_fpath, new_fname)

    copy_snirf_file(template_file, new_file_name)

    return new_file_name


if doRunTests:
    new_fname = create_snirf_in_results(xdf_fullFile)

# Modify the new snirf file to include the xdf data

In [None]:
def get_data_for_label(labels, i_label_in_events, event_time, label):
    """
    Get the data for a given label
    The data is a numpy array with 3 columns: onset, duration, amplitude
    https://github.com/BUNPC/pysnirf2/blob/main/docs/pysnirf2.md#class-stimelement
    """

    i_label_in_labels = np.where(labels == label)[0]
    i_i_label = np.where(i_label_in_events == i_label_in_labels)[0]
    onsets = event_time[i_i_label]
    data = np.zeros((len(onsets), 3))
    data[:, 0] = onsets
    data[:, 1] = 5.0
    data[:, 2] = 1.0
    return data 


def print_stim(stim):
    print("stim: ", stim)
    for i in range(len(stim)):
        print(stim[i])


def print_n_events(stim):
    n_events = 0
    for i in range(len(stim)):
        n_events += len(stim[i].data)
    print("n_events: ", n_events)
    print(" ")


def set_stim(stim, event_data, event_time):
    """
    Set the stim from the event data and event time
    """
    # remove the existing stim data
    for i in range(len(stim)):
        del stim[0]

    # https://numpy.org/doc/stable/reference/generated/numpy.unique.html
    labels, i_labels = np.unique(event_data, return_inverse=True)

    for label in labels:
        stim.appendGroup()
        stim[-1].name = label[3:]  # remove the 'L: ' prefix
        stim[-1].data = get_data_for_label(labels, i_labels, event_time, label)

    return stim  # not necessary (it is pointer), but for clarity during call


def modify_snirf_file(new_fname, event_data, event_time, nirs_data, nirs_time):
    """
    Modify the snirf file with the new data and event from the xdf file
    """
    snirf = Snirf(new_fname, "r+")
    snirf.nirs[0].stim = set_stim(snirf.nirs[0].stim, event_data, event_time)
    snirf.nirs[0].data[0].dataTimeSeries = nirs_data
    snirf.nirs[0].data[0].time = nirs_time
    snirf.save(new_fname)
    snirf.close()


if doRunTests:
    nirs_time = nirsStream["time_stamps"]
    event_time = eventStream["time_stamps"]
    event_data = eventStream["time_series"]
    nirs_data = nirsStream["time_series"]
    modify_snirf_file(new_fname, event_data, event_time, nirs_data, nirs_time)



    print("Done")

# Check the new snirf file

In [None]:
if doRunTests:
    with Snirf(new_fname, "r+") as snirf:
        validation_results = snirf.validate()
        validation_results.display()

# Load and plot the new snirf file using the mne library

In [None]:
import mne

if doRunTests:
    new = mne.io.read_raw_snirf(new_fname, preload=True, verbose="CRITICAL")
    new.plot()

# Correction of the snirf with mne 

Tests against the reference markers files showed that one of the two snirf files is not correctly loaded as RAW data by the mne library: it has twice the LSL 111 makers.

Here we use the mne library to remove and re-write the markers in the snirf file.

In [None]:
from mne_nirs.io import write_raw_snirf

def modify_annotations_with_mne (new_fname, event_data, event_time):
    new = mne.io.read_raw_snirf(new_fname, preload=True, verbose="CRITICAL")
    new.annotations.delete(np.arange(len(new.annotations)))
    for i in range(len(event_data)):
        description = event_data[i][0][3:]
        duration = 5.0
        onset = event_time[i]
        new.annotations.append(onset, duration, description)
    
    write_raw_snirf(new, new_fname, add_montage=False)
    return new

if doRunTests:
    # get the name without the path and extension from the xdf file
    file_name = os.path.basename(new_fname)
    name = os.path.splitext(file_name)[0]

    # load the xdf reference file
    reference_xMarkers_time = np.genfromtxt(
        name + ".reference.xdf_markers.csv", delimiter=","
    )
    reference_xMarkers_label = np.genfromtxt(
        name + ".reference.xdf_markers.csv", delimiter=",", dtype="str"
    )
    # remove the first column (time)
    reference_xMarkers_label = reference_xMarkers_label[:, 1]
    # remove the first 3 characters (L: )
    reference_xMarkers_label = np.array([x[3:] for x in reference_xMarkers_label])

    # get the annotations from the snirf file
    labels = new.annotations.description
    onsets = new.annotations.onset

    #######################
    event_data = eventStream["time_series"]
    event_time = eventStream["time_stamps"]
    nirs_data = nirsStream["time_series"]
    nirs_time = nirsStream["time_stamps"]

    new = modify_annotations_with_mne(new_fname, event_data, event_time)

    for i in range(len(new.annotations)):
        print(new.annotations.description[i], new.annotations.onset[i], new.annotations.duration[i])

    new.plot()

# Conversion of the xdf to snirf 

In [None]:
def xdf2snirf(xdf_fullFile):
    nirsStream, eventStream = get_NIRS_and_Event_streams(xdf_fullFile)
    xdf_reorganize_channels_as_in_snirf(nirsStream)

    make_xdf_time_relative_to_first_data(nirsStream, eventStream)
    new_fname = create_snirf_in_results(xdf_fullFile)

    nirs_time = nirsStream["time_stamps"]
    event_time = eventStream["time_stamps"]
    event_data = eventStream["time_series"]
    nirs_data = nirsStream["time_series"]

    modify_snirf_file(new_fname, event_data, event_time, nirs_data, nirs_time)
    modify_annotations_with_mne (new_fname, event_data, event_time)
    return new_fname

if doRunTests:
    new_fname = xdf2snirf(xdf_fullFile)

# Test the exactitude of the new snirf file against the reference data (oxy4 file converted to snirf)

In [None]:
# load the reference files (snirf and xdf)
import os

def assert_snirf_file(name, new):   
    assert_data(name, new)
    assert_markers(name, new)
    print("assert_snirf_file: ", name, " OK")

def assert_data(name, new):
    # load the snirf reference file
    reference_sData = np.loadtxt(name + ".reference.snirf.csv", delimiter=",")
    data = new.get_data().T

    err = np.max(np.abs(reference_sData - data))
    tolerance = 1e-7
    assert err < tolerance, "max abs error = {} is higher than tolerance {}".format(
        err, tolerance
    )


def assert_markers(name, new):
    # load the xdf reference file
    reference_xMarkers_time = np.genfromtxt(
        name + ".reference.xdf_markers.csv", delimiter=","
    )
    reference_xMarkers_label = np.genfromtxt(
        name + ".reference.xdf_markers.csv", delimiter=",", dtype="str"
    )
    # remove the first column (time)
    reference_xMarkers_label = reference_xMarkers_label[:, 1]
    # remove the first 3 characters (L: )
    reference_xMarkers_label = np.array([x[3:] for x in reference_xMarkers_label])

    # get the annotations from the snirf file
    labels = new.annotations.description
    onsets = new.annotations.onset

    assert (
        reference_xMarkers_time.shape[0] == labels.shape[0]
    ), "Error in xdf file, reference_xMarkers_time.shape[0] = {} != labels.shape[0] = {}".format(
        reference_xMarkers_time.shape[0], labels.shape[0]
    )

    for i in range(len(reference_xMarkers_time)):
        maxTimeError = np.max(
            # np.abs(reference_xMarkers_time[i][0] - eventStream["time_stamps"][i])
            np.abs(reference_xMarkers_time[i][0] - onsets[i])
        )

        timeTolerance = 1
        assert (
            maxTimeError < timeTolerance
        ), "Error in xdf file at index {}, maxTimeError = {} ".format(i, maxTimeError)

        assert (
            reference_xMarkers_label[i] == labels[i]
        ), "Error in xdf file at index {}, label = {} ".format(
            i, reference_xMarkers_label[i][0]
        )


if doRunTests:

    # get the name without the path and extension from the xdf file
    file_name = os.path.basename(new_fname)
    name = os.path.splitext(file_name)[0]

    new = mne.io.read_raw_snirf(new_fname, preload=True, verbose="CRITICAL")

    print("new_fname: ", new_fname)
    assert_snirf_file(name, new)


    