# Use my custom pyimzml package for converting mzml files to imzml

In [1]:
!pip install git+https://github.com/EmersonHernly/pyimzML.git

Collecting git+https://github.com/EmersonHernly/pyimzML.git
  Cloning https://github.com/EmersonHernly/pyimzML.git to c:\users\elher\appdata\local\temp\pip-req-build-t3d4698y
  Resolved https://github.com/EmersonHernly/pyimzML.git to commit 7dacbeda8f16458508427638fd712201ce70fe2a
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: pyimzML
  Building wheel for pyimzML (setup.py): started
  Building wheel for pyimzML (setup.py): finished with status 'done'
  Created wheel for pyimzML: filename=pyimzML-1.5.1+gtluu.2-py3-none-any.whl size=66501 sha256=f31ea67c623bb1b2f58ee1223048018580a8c1314e0ca942d1e3983f3a47c570
  Stored in directory: C:\Users\elher\AppData\Local\Temp\pip-ephem-wheel-cache-p9wlcoqz\wheels\96\a3\90\b38d674d7d41369509b1396ed3ab595bf56f19ad08683aa56b
Successfully built pyimzML
Installing collected packages: pyimzML
Successfully installed pyimzML-1.5.1+gtluu.2


  Running command git clone --filter=blob:none --quiet https://github.com/EmersonHernly/pyimzML.git 'C:\Users\elher\AppData\Local\Temp\pip-req-build-t3d4698y'


In [2]:
import pymzml
from pathlib import Path

from MSIGen import msigen
from MSIGen.mzml import getUserParam
import os, sys
import numpy as np
from tqdm import tqdm
from pyimzml.compression import NoCompression, ZlibCompression
from scipy.interpolate import interpn
from pyimzml.ImzMLWriter import ImzMLWriter
import xml.etree.ElementTree as ET

def find_repeating_pattern(lst):
    # Iterate over possible pattern lengths
    for pattern_length in range(1, len(lst) // 2 + 1):
        # Extract a potential pattern
        pattern = lst[:pattern_length]
        
        # Check if the rest of the list is composed of full repeats of this pattern
        is_repeating = True
        for i in range(pattern_length, len(lst), pattern_length):
            chunk = lst[i:i + pattern_length]
            # Allow incomplete chunk at the end if it matches the start of the pattern
            if chunk != pattern[:len(chunk)]:
                is_repeating = False
                break
        
        # If we found a repeating pattern, return it
        if is_repeating:
            return pattern
    
    # If no repeating pattern is found, return an empty list
    return []

def match_pattern(lst, pattern):
    # Check if the list is empty or the pattern is longer than the list
    if not lst or len(pattern) > len(lst):
        return False
    
    # Check if the pattern matches the list
    for i in range(len(lst)):
        if lst[i] != pattern[i % len(pattern)]:
            return False
    
    return True

def xml_as_string(xml):
    ET.register_namespace('', 'http://psi.hupo.org/ms/mzml')
    remove_namespace_from_element(xml)
    output_list = ET.tostring(xml,encoding='unicode', method='xml').splitlines()
    whitespaces_to_remove = len(output_list[1]) - len(output_list[1].lstrip())-2
    output_list[0] = (' '*whitespaces_to_remove)+output_list[0]
    return [i[whitespaces_to_remove:] for i in output_list if i[whitespaces_to_remove:]!=""]

def remove_namespace_from_element(element):
    # Remove namespace from the element itself
    try:
        if '}' in element.tag:
            element.tag = element.tag.split('}', 1)[1]
    
        # Recursively remove namespaces from child elements
        for child in element:
            remove_namespace_from_element(child)
    except:
        pass

def get_subset_of_element(element, subset):
    ET.register_namespace('', 'http://psi.hupo.org/ms/mzml')
    namespace = {"":"http://psi.hupo.org/ms/mzml"}
    string = ".//{0}".format(subset)
    remove_namespace_from_element(element)
    matches = element.findall(string, namespace)
    return [xml_as_string(match) for match in matches]

class HiddenPrints:
    '''Allows code to be run without displaying messages.'''
    def __enter__(self):
        self._original_stdout = sys.stdout
        sys.stdout = open(os.devnull, 'w')

    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.stdout.close()
        sys.stdout = self._original_stdout


class ImagingExperiment(object):
    """
        Create an imzML+ibd file.
        :param example_file:
            The path to one of the line scan files. Must be in mzML format.
        :param output_filename:
            is used to make the base name by removing the extension (if any).
            two files will be made by adding ".ibd" and ".imzML" to the base name
        :param intensity_dtype:
            The numpy data type to use for saving intensity values
        :param mz_dtype:
            The numpy data type to use for saving mz array values
        :param mobility_dtype:
            The numpy data ttype to use for saving mobility array values
        :param mode:

            * "continuous" mode will save the first mz array only
            * "processed" mode save every mz array separately
            * "auto" mode writes only mz arrays that have not already been written
        :param intensity_compression:
            How to compress the intensity data before saving
            must be an instance of :class:`~pyimzml.compression.NoCompression` or :class:`~pyimzml.compression.ZlibCompression`
        :param mz_compression:
            How to compress the mz array data before saving
        :param mobility_compression:
            How to compress the mobility array data before saving
        :param include_mobility:
            bool: True or False
            Whether imzML schema should include trapped ion mobility spectrometry data. Units/metadata based
            on Bruker TIMS data.
        :param polarity:
            str: "positive" or "negative"
            The polarity of the data. If not specified, the polarity will be left blank.
        :param image_x_dimension:
            int: The x dimension of the image in micrometers
        :param image_y_dimension:
            int: The y dimension of the image in micrometers
    """

    def __init__(self, example_file, 
                 output_filename=None,
                 mz_dtype=np.float64,
                 intensity_dtype=np.float32,
                 mobility_dtype=np.float64,
                 spec_type="centroid",
                 scan_direction="top_down",
                 line_scan_direction="line_left_right",
                 scan_pattern="flyback",
                 scan_type="horizontal_line",
                 mz_compression=NoCompression(),
                 intensity_compression=NoCompression(),
                 mobility_compression=NoCompression(),
                 mode="processed",
                 polarity=None,
                 include_mobility=False,
                 image_x_dimension = None,
                 image_y_dimension = None,
                 mobility_range_same_for_all_scans = True,
                 include_scan_times = False):

        self.example_file = example_file
        self.mz_dtype=mz_dtype
        self.intensity_dtype=intensity_dtype
        self.mobility_dtype=mobility_dtype
        self.spec_type=spec_type
        self.scan_direction=scan_direction
        self.line_scan_direction=line_scan_direction
        self.scan_pattern=scan_pattern
        self.scan_type=scan_type
        self.mz_compression=mz_compression
        self.intensity_compression=intensity_compression
        self.mobility_compression=mobility_compression
        self.polarity=polarity
        self.include_mobility = include_mobility
        self.image_x_dimension = image_x_dimension
        self.image_y_dimension = image_y_dimension
        self.mobility_range_same_for_all_scans = mobility_range_same_for_all_scans
        self.include_scan_times = include_scan_times
        self.spectra_per_group_per_line = []

        self.name_body, self.name_post = msigen.segment_filename(example_file)
        self.line_list = msigen.get_line_list(self.example_file, display = False)
        if not self.line_list:
            raise ValueError("No line scan files found.")
        self.output_filename = self.get_output_filename(output_filename)
        self.mode=mode

        # consider spectra as different group if:
            # if filter differs (assuming filter is present).
            # Otherwise:
                # Not the same ms_level
                # Precursor m/z is different
                # m/z range is significantly different (+/- 5% of range of mass list)
                # Mobility range is significantly different (+/- 5% of range of mobility list)
                # maybe should also consider fragmentation conditions.
        self.spectra_groups = []
        self.spectra_group_idx_per_scan = []
        self.filter_string_per_scan = []
        self.raw_coords = []
        self.ms_level_per_scan = []
        self.precursor_mz_per_scan = []
        self.scan_times = []
        self.mz_min_per_scan = []
        self.mz_max_per_scan = []
        self.ms_window_per_scan = []
        self.idx_mapping = None
        self.line_lengths = None
        self.mobility_ranges = []
        self.mobility_info_per_scan = []
        self.scan_IDs = []
        self.xml_element_strings = {}
        self.precursor_element_strings = []

        self.get_all_metadata_all_lines()

    def get_output_filename(self, output_filename):
        if not output_filename:
            output_filename = self.name_body + ".imzML"
        if not output_filename.endswith(".imzML"):
            if len(output_filename.split(".")) > 1:
                tmp_output_name = output_filename.split(".")
                tmp_output_name[-1] = "imzML"
                output_filename = ".".join(tmp_output_name)
            else:
                output_filename += ".imzML"
        return output_filename

    def get_all_metadata_all_lines(self):
        # reset these variables to ensure there isnt duplicated data
        self.spectra_groups = []
        self.spectra_group_idx_per_scan = []
        self.spectra_per_group_per_line = []
        self.filter_string_per_scan = []
        self.raw_coords = []
        self.ms_level_per_scan = []
        self.precursor_mz_per_scan = []
        self.scan_times = []
        self.mz_min_per_scan = []
        self.mz_max_per_scan = []
        self.ms_window_per_scan = []
        self.mobility_ranges = []
        self.mobility_info_per_scan = []
        self.scan_IDs = []
        self.xml_element_strings = {}
        self.precursor_element_strings = []

        for line_num, line in tqdm(enumerate(self.line_list), total=len(self.line_list)):
            self.get_all_metadata(line, line_num)
        
        # This shouldnt be necessary, but its there just in case.
        if any([self.mobility_info_per_scan[0] != i for i in self.mobility_info_per_scan]):
            raise ValueError("Mobility information is not consistent across all lines. \
                             Please ensure all lines have the same type of mobility data, \
                             since all mobility data written to the .imzML file will be \
                             considered as the same type of mobility data as the first scan.")
        self.mobility_info = self.mobility_info_per_scan[0]

    def get_all_metadata(self, line, line_num = 0):
        with pymzml.run.Reader(line, obo_version='4.1.33') as data:
            self.xml_element_strings = self.get_xml_elements_from_dict(data.info, line_num)
            for scan_num, scan in enumerate(data):
                self.get_metadata(scan, line_num, scan_num)

    def get_metadata(self, scan, line_num=0, scan_num=0):
        self.raw_coords.append((line_num, scan_num))
        self.ms_level_per_scan.append(scan.ms_level)
        self.scan_IDs.append(scan.ID)

        if scan.ms_level == 1:
            precursor = None
            self.precursor_element_strings.append(None)
        else:
            try:
                precursor = scan.selected_precursors[0]["mz"]
            except:
                precursor = None
            try:
                ele = scan.element
                remove_namespace_from_element(ele)
                self.precursor_element_strings.append(xml_as_string(ele.find(".//precursorList")))
            except:
                self.precursor_element_strings.append(None)

        self.precursor_mz_per_scan.append(precursor)

        scan_time_obj = scan.scan_time
        if scan_time_obj[1].lower() in ["second", "seconds"]:
            scan_time = scan_time_obj[0]/60
        else:
            scan_time = scan_time_obj[0]
        self.scan_times.append(scan_time)
        
        mz_min, mz_max = scan['lowest observed m/z'], scan['highest observed m/z']
        mass_window = (scan['scan window lower limit'], scan['scan window upper limit'])
        if not (mz_min and mz_max):
            mz_min, mz_max = scan.extremeValues('mz')
        if not (mass_window[0] and mass_window[1]):
            mass_window = (mz_min, mz_max)
            
        self.mz_min_per_scan.append(mz_min)
        self.mz_max_per_scan.append(mz_max)
        self.ms_window_per_scan.append(mass_window)

        if self.include_mobility:
            (mobility_range_min, mobility_range_max), mob_accession, mob_unit, mob_unit_accession =\
                self.get_mobility_info_from_mzml_spectrum(scan, self.mobility_range_same_for_all_scans)
        else: 
            mobility_range_min, mobility_range_max = 0.0, 0.0
            mob_accession, mob_unit, mob_unit_accession = None, None, None
        self.mobility_ranges.append((mobility_range_min, mobility_range_max))
        self.mobility_info_per_scan.append((mob_accession, mob_unit, mob_unit_accession))
        
        self.filter_string_per_scan.append(scan.get("filter string"))

        # assume data is from thermo since there is a filter string
        # also assume ALL scans will contain a filter string
        if self.filter_string_per_scan[-1] not in [None, ""]:
            if self.filter_string_per_scan[-1] not in self.spectra_groups:
                self.spectra_groups.append(self.filter_string_per_scan[-1])
                self.spectra_group_idx_per_scan.append(len(self.spectra_groups)-1)
                self.spectra_per_group_per_line.append([0]*len(self.line_list))
                self.spectra_per_group_per_line[-1][line_num] += 1
            else:
                grp_idx = self.spectra_groups.index(self.filter_string_per_scan[-1])
                self.spectra_group_idx_per_scan.append(grp_idx)
                self.spectra_per_group_per_line[grp_idx][line_num] += 1
        # any other vendor
        else:
            filter_info = [scan.ms_level,
                           precursor,
                           mass_window[0],
                           mass_window[1],
                           mobility_range_min,
                           mobility_range_max]
            
            # if filter_info not in self.spectra_groups:
            match = False
            for grp_idx in range(len(self.spectra_groups)):
                if self.do_filters_match(filter_info, self.spectra_groups[grp_idx]):
                    self.spectra_group_idx_per_scan.append(grp_idx)
                    self.spectra_per_group_per_line[grp_idx][line_num] += 1
                    match = True
                    break 
            if not match:
                self.spectra_groups.append(filter_info)
                self.spectra_group_idx_per_scan.append(len(self.spectra_groups)-1)
                self.spectra_per_group_per_line.append([0]*len(self.line_list))
                self.spectra_per_group_per_line[-1][line_num] += 1

    def get_xml_elements_from_dict(self, info_dict, dict_already_present = False):
        if not dict_already_present:
            self.xml_element_strings
            ele_list = ["referenceable_param_group_list_element",
                "software_list_element",
                "instrument_configuration_list_element",
                "data_processing_list_element"
                ]
            for key in ele_list:
                if info_dict.get(key) is not None:
                    ele = info_dict.get(key)
                    remove_namespace_from_element(ele)
                    if key == "referenceable_param_group_list_element":
                        self.xml_element_strings[key] = xml_as_string(ele.find('.//referenceableParamGroup[@id="CommonInstrumentParams"]'))
                    elif key == "software_list_element":
                        self.xml_element_strings["software_list_count"] = str(int(ele.get("count"))+1)
                        self.xml_element_strings["software_list_element"] = xml_as_string(ele)[1:-1]
                    elif key == "data_processing_list_element":
                        self.xml_element_strings["data_processing_list_count"] = str(int(ele.get("count"))+1)
                        self.xml_element_strings["data_processing_list_element"] = xml_as_string(ele)[1:-1]
                    else:
                        self.xml_element_strings[key] = xml_as_string(ele)
            self.xml_element_strings["source_file_list"] = []
            self.xml_element_strings["source_file_count"] = str(0)

        ele = info_dict.get("file_description_element")
        remove_namespace_from_element(ele)
        ele = ele.find(".//sourceFileList")

        self.xml_element_strings["source_file_list"].extend(xml_as_string(ele)[1:-1])
        try:
            current_count = int(self.xml_element_strings["source_file_count"])
            count_to_add = int(ele.get("count"))
            self.xml_element_strings["source_file_count"] = str(current_count + count_to_add)
        except:
            pass

        return self.xml_element_strings

    def do_filters_match(self, filter_info1, filter_info2):
        # filter info is [ms_level, precursor, mz_window_min, mz_window_max, mob_range_min, mob_range_max]
        if filter_info1[:2] != filter_info2[:2]:
            return False
        # check if the lower and upper bounds for mz and mobility are close to each other.
        # close is defined as the bound +/-5% of the total mz or mobility range.
        mz_range1_5_percent = (filter_info1[3] - filter_info1[2])*0.05
        if filter_info1[2]-mz_range1_5_percent>=filter_info2[2]>=filter_info1[2]+mz_range1_5_percent:
            return False
        if filter_info1[3]-mz_range1_5_percent>=filter_info2[3]>=filter_info1[3]+mz_range1_5_percent:
            return False
        
        if filter_info1[4] > 0.0:
            if filter_info2[4] == 0.0:
                return False
            mob_range1_5_percent = (filter_info1[5] - filter_info1[4])*0.05
            if filter_info1[4]-mob_range1_5_percent>=filter_info2[4]>=filter_info1[4]+mob_range1_5_percent:
                return False
            if filter_info1[5]-mob_range1_5_percent>=filter_info2[5]>=filter_info1[5]+mob_range1_5_percent:
                return False
        return True
            
    def interpolate_to_determine_pixel_locations(self, pixels_per_line = "max"):
        """
        :param pixels_per_line:
            str: "max", "min", or "mean"
            The number of pixels per line to interpolate to. 
            Max will use the number of pixels in the line with the most pixels.
            Min will use the number of pixels in the line with the least pixels.
            Mean will use the average number of pixels per line.
            
            Depending on which is chosen, there may be deleted or duplicated spectra.
"""
        raw_coords_arr = np.array(self.raw_coords)
        raw_coords_arr = np.array(self.raw_coords)
        scan_times_arr = np.array(self.scan_times)
        spectra_group_idx_per_scan_arr = np.array(self.spectra_group_idx_per_scan)
        spectra_per_group_per_line_arr = np.array(self.spectra_per_group_per_line)
        # Check first line for a repeating scan pattern, maintain the pattern after interpolation.
        pattern = find_repeating_pattern(spectra_group_idx_per_scan_arr[raw_coords_arr[:,0]==0].tolist())
        
        pattern_not_consistant = False
        for i in range(1, np.max(raw_coords_arr[:,0])+1):
            if pattern!=find_repeating_pattern(spectra_group_idx_per_scan_arr[raw_coords_arr[:,0]==i].tolist()):
                pattern_not_consistant = True

        self.line_lengths = np.array([len(np.where(raw_coords_arr[:,0]==i)[0]) for i in range(len(self.line_list))])
        if np.all(np.equal(self.line_lengths, self.line_lengths[0])):
            # no interpolation is necessary
            self.idx_mapping = np.arange(len(self.raw_coords)).reshape(len(self.line_list), len(np.where(raw_coords_arr[:,0]==0)[0]))

        elif pattern == [] or pattern_not_consistant == True:
            # In this case, simply resample the same locations without considering scan groupings.
            resampling_points = spectra_per_group_per_line_arr.sum(axis = 0)
            if pixels_per_line == "max":
                resampling_points = np.linspace(0,1,int(resampling_points.max()))
            elif pixels_per_line == "min":
                resampling_points = np.linspace(0,1,int(resampling_points.min()))
            elif pixels_per_line == "mean":
                resampling_points = np.linspace(0,1,np.ceil(int(resampling_points.mean())))
            
            self.idx_mapping = np.full((len(self.line_list), len(resampling_points)),-1, dtype=int)
            for line_idx in range(len(self.line_list)):
                idxs_to_use = np.where(raw_coords_arr[:,0]==line_idx)[0]
                rts = scan_times_arr[idxs_to_use]
                normalized_rts = (rts - rts.min())/(rts.max()-rts.min())
                self.idx_mapping[line_idx] = interpn((normalized_rts,), idxs_to_use, resampling_points, method="nearest", bounds_error=False)

        else:
            # ensure there are a whole number of scans after dividing by the number of repititions in pattern (usually 1)
            _, count = np.unique(pattern, return_counts=True)
            count = np.array(count)
            resampling_points_temp = np.ceil(spectra_per_group_per_line_arr/count[:,None])
            
            # set the pixels per line to the max, min, or mean based on given parameter
            if pixels_per_line == "max":
                resampling_points_temp = np.max(resampling_points_temp)
            elif pixels_per_line == "min":
                resampling_points_temp = np.mean(resampling_points_temp)
            elif pixels_per_line == "mean":
                resampling_points_temp = np.ceil(np.mean(resampling_points_temp))
            # return resampling_points_temp, count
            resampling_points = [np.linspace(0,1,int(resampling_points_temp)*int(count[i])) for i in range(len(count))]

            self.resampling_points = resampling_points

            # initialize variable using list comprehension to avoid shallow copies.
            resampled_idxs = [[None for _ in range(len(self.line_list))] for _ in range(len(self.spectra_groups))]

            # return pattern, resampled_idxs
            for grp_idx in range(len(self.spectra_groups)):
                for line_idx in range(len(self.line_list)):
                    grp_mask = spectra_group_idx_per_scan_arr==grp_idx
                    line_mask = raw_coords_arr[:,0]==line_idx

                    rts = scan_times_arr[grp_mask & line_mask]
                    if len(rts) == 0:
                        raise ValueError("No scans found for line {} and group {}\n\
                                         Ensure that all lines contain data and contain \
                                         the same filters.".format(line_idx, self.spectra_groups[grp_idx]))
                    normalized_rts = (rts - rts.min())/(rts.max()-rts.min())

                    # get x and y values to sample from
                    idxs_to_use = np.where(grp_mask & line_mask)[0]
                    
                    # interpolate
                    resampled_idxs[grp_idx][line_idx] = interpn(
                        (normalized_rts,), idxs_to_use, resampling_points[grp_idx],
                        method="nearest", bounds_error=False)
            
            # get resampled idxs as a set of 2d arrays
            resampled_idxs = [np.vstack(i) for i in resampled_idxs]
            
            self.idx_mapping = self.assign_to_output(pattern=pattern, arrays=resampled_idxs)
        return self.idx_mapping
    
    def assign_to_output(self, pattern, arrays):
        n = arrays[0].shape[0]  # Number of lines (y pixels)
        m_total = sum(arr.shape[1] for arr in arrays)  # Total number of columns (x pixels) for the final array
        output = np.full((n, m_total), -1, dtype = int)  # Initialize the output array

        total_occurances = {i: pattern.count(i) for i in set(pattern)}
        occurrences = {i: 0 for i in set(pattern)}  # Track occurrences of each integer in the pattern
        output_step = len(pattern)  # We skip every len(pattern) columns

        for i, num in enumerate(pattern):
            array = arrays[num]  # Get the array corresponding to the current number
            times_appeared = occurrences[num]  # How many times this number has appeared
            
            # Assign values to the output array, every `step` columns
            output[:, i::output_step] = array[:, times_appeared::total_occurances[num]]

            # Update occurrence of the current number
            occurrences[num] += 1

        return output

    def get_mobility_from_mzml_spectrum(self, spectrum):
        mob = None
        for i in ["mean inverse reduced ion mobility array",
                    "raw ion mobility array", 
                    "raw inverse reduced ion mobility array",
                    "mean drift time array",
                    "mean ion mobility array"]:
            try:
                with HiddenPrints():
                    mob = spectrum.get_array(i)
                    if mob is None:
                        continue
                    else:
                        break
            except:
                pass

        return mob
    
    def get_mobility_info_from_mzml_spectrum(self, spectrum, mobility_range_same_for_all_scans = True):
        # get mobility range from keyword parameters if possible
        mob_range_start, mob_range_end = getUserParam(spectrum, 'ion mobility lower limit'), getUserParam(spectrum, 'ion mobility upper limit') 
        # Get units and if needed, the array otherwise extract the mobility array and get the max & min values.
        mob = None
        mob_accession = None
        mob_unit = None
        mob_unit_accession = None

        for i in ["mean inverse reduced ion mobility array",
                    "raw ion mobility array", 
                    "raw inverse reduced ion mobility array",
                    "mean drift time array",
                    "mean ion mobility array"]:
            try:
                with HiddenPrints():
                    if ((mob_range_start is None) or (mob_range_end is None)) and (mob is None) and (not mobility_range_same_for_all_scans):
                        mob = spectrum.get_array(i)
                        if mob is None:
                            continue
                    ele = spectrum.get_element_by_name(i)
                    mob_accession = ele.get("accession")
                    mob_unit = ele.get("unitName")
                    mob_unit_accession = ele.get("unitAccession")
                    break
            except:
                pass
        if mob is None:
            mob = [0.0]
        mob_range_start, mob_range_end = np.min(mob), np.max(mob)
        return [mob_range_start, mob_range_end], mob_accession, mob_unit, mob_unit_accession

    # This is necessary for files where pymzml does not properly read offset idRef values, 
    # such as files tims files with merged mobility scans.
    def build_custom_spectrum_index(self, reader):
        # Dictionary to store the custom index: {index: offset}
        spectrum_index = {}

        # Iterate through spectra once and store their positions (or other metadata)
        for idx, spectrum in enumerate(reader):
            spectrum_index[idx] = spectrum  # Store by index

        return spectrum_index

    def write_imzml(self, output_filename=None,
                    polarity=None,
                    mode=None,
                    include_mobility=None,
                    image_x_dimension=None,
                    image_y_dimension=None,
                    spec_type=None,
                    scan_direction=None,
                    line_scan_direction=None,
                    scan_pattern=None,
                    scan_type=None,
                    mz_dtype=None,
                    intensity_dtype=None,
                    mobility_dtype=None,
                    mz_compression=None,
                    intensity_compression=None,
                    mobility_compression=None,
                    xml_element_strings=None,
                    userParams=[]):
        
        # Redefine given variables
        if output_filename is not None: self.output_filename = output_filename
        if polarity is not None: self.polarity = polarity
        if mode is not None: self.mode = mode
        if include_mobility is not None: self.include_mobility = include_mobility
        if image_x_dimension is not None: self.image_x_dimension = image_x_dimension
        if image_y_dimension is not None: self.image_y_dimension = image_y_dimension
        if spec_type is not None: self.spec_type = spec_type
        if scan_direction is not None: self.scan_direction = scan_direction
        if line_scan_direction is not None: self.line_scan_direction = line_scan_direction
        if scan_pattern is not None: self.scan_pattern = scan_pattern
        if scan_type is not None: self.scan_type = scan_type
        if mz_dtype is not None: self.mz_dtype = mz_dtype
        if intensity_dtype is not None: self.intensity_dtype = intensity_dtype
        if mobility_dtype is not None: self.mobility_dtype = mobility_dtype
        if mz_compression is not None: self.mz_compression = mz_compression
        if intensity_compression is not None: self.intensity_compression = intensity_compression
        if mobility_compression is not None: self.mobility_compression = mobility_compression

        if self.mz_dtype not in [np.float32, np.float64]:
            self.mz_dtype=np.float32
        if self.intensity_dtype not in [np.float32, np.float64]:
            self.intensity_dtype=np.float32
        if self.mobility_dtype not in [np.float32, np.float64]:
            self.mobility_dtype=np.float32

        with ImzMLWriter(self.output_filename, 
                         polarity = self.polarity,
                         include_mobility=self.include_mobility,
                         mobility_info=self.mobility_info,
                         image_x_dimension=self.image_x_dimension,
                         image_y_dimension=self.image_y_dimension,
                         spec_type=self.spec_type,
                         mode=self.mode,
                         scan_direction=self.scan_direction,
                         line_scan_direction=self.line_scan_direction,
                         scan_pattern=self.scan_pattern,
                         scan_type=self.scan_type,
                         mz_dtype=self.mz_dtype,
                         intensity_dtype=self.intensity_dtype,
                         mobility_dtype=self.mobility_dtype,
                         mz_compression=self.mz_compression,
                         intensity_compression=self.intensity_compression,
                         mobility_compression=self.mobility_compression,
                         xml_element_strings=self.xml_element_strings) as w:

            data = None
            previous_file_index = None
            previous_scan_index = -1

            ## Uses iteration of the reader since it is much faster than selecting scan indices. 
            # iterate over all pixels in output imzml
            for (y, x), scan_num in tqdm(np.ndenumerate(self.idx_mapping), total=self.idx_mapping.size):
                current_file_index, current_scan_index = self.raw_coords[scan_num]
                # Check if we need to open a new file or reinitialize due to the scan index decreasing
                if (current_file_index != previous_file_index) or (current_scan_index < previous_scan_index):
                    if data:
                        data.close()
                    data = pymzml.run.Reader(self.line_list[current_file_index], obo_version='4.1.33')
                    previous_file_index = current_file_index
                    previous_scan_index = -1
                
                # Iterate only until we reach the desired spectrum index`
                while previous_scan_index < current_scan_index:
                    scan = next(data)
                    previous_scan_index += 1
                
                if self.include_mobility:
                    mobility = self.get_mobility_from_mzml_spectrum(scan)
                else:
                    mobility = None
                ms_level = self.ms_level_per_scan[scan_num]
                precursor_mz = self.precursor_mz_per_scan[scan_num]
                if self.include_scan_times:
                    scan_start_time = self.scan_times[scan_num]
                else: 
                    scan_start_time = None

                w.addSpectrum(scan.mz, scan.i, 
                              coords=(x+1, y+1),
                              mobilities = mobility,
                              ms_level=ms_level,
                              precursor_mz=precursor_mz,
                              scan_start_time=scan_start_time,
                              filter_string=self.filter_string_per_scan[scan_num],
                              isolation_window_offset=None,
                              activation=None,
                              userParams=userParams,
                              precursor_element_string=self.precursor_element_strings[scan_num])
        data.close()

        

In [3]:
# If you are unsure about the variables to pass, pass None
img_exp = ImagingExperiment(r"C:\im test1.mzML",
                            include_mobility=True, # True or False
                            polarity="positive", # "positive" or "negative"
                            spec_type="profile", # "centroid" or "profile"
                            image_x_dimension = 6000, #um
                            image_y_dimension = 4950, #um
                            scan_direction="top_down", # "top_down" or "bottom_up", usually ignore this
                            line_scan_direction="line_left_right", # "line_left_right" or "line_right_left", usually ignore this
                            scan_pattern="flyback", # "flyback" or "back_and_forth", usually ignore this
                            scan_type="horizontal_line", # "horizontal_line" or "vertical_line", usually ignore this
                            mz_compression="none", # "none" or "zlib"
                            intensity_compression="none", # "none" or "zlib"
                            mobility_compression="none", # "none" or "zlib"
                            mode="processed", # unless you have a good reason, use "processed". "continuous", "processed", or "auto", 
                            mz_dtype=np.float32, # np.float32 or np.float64
                            intensity_dtype=np.float32, # np.float32 or np.float64
                            mobility_dtype=np.float32, # np.float32 or np.float64
                            include_scan_times=False) #True or False
output = img_exp.interpolate_to_determine_pixel_locations(pixels_per_line = "max") # "max", "min", or "mean". Max will duplicate some spectra, min will delete some spectra, mean may do some of each.
img_exp.write_imzml()

  0%|          | 0/1 [00:00<?, ?it/s]

100%|██████████| 1/1 [00:00<00:00,  1.88it/s]
100%|██████████| 14/14 [00:00<00:00, 16.51it/s]
