In [118]:
import pandas as pd
import numpy as np
import time
import sys

data = '../data/test_data/2022_04_22_NEG_RSL3DAAvsCtrl_test_input.csv'
D = 1.0063

def upload(file, limit=None):
    df = pd.read_csv(file, header=2, nrows=limit)
    return df

df_raw = upload(data)
# df_raw

In [119]:
def format(df, samples=None):
    col_main = ['Compound',
                'm/z',
                'Retention time (min)',
                'CCS (angstrom^2)']
    col = df.columns.tolist() #create a list of all column names
    if samples is not None:
        stop = 16 + samples
    else:
        stop = col.index([col for col in df.columns if '.1' in col][0]) #index of duplicate columns we don't need
    intensities = col[16:stop] #intensity columns we wish to keep
    
    df_keep = df[col_main + intensities].sort_values(by=["Compound"], ascending=False).reset_index(drop=True)
    return df_keep

df_keep = format(df_raw)
# df_keep

In [120]:
def pick_pairs(df, a, b):
    '''Returns index pairs of compounds with same RT and CCS'''

    '''Define lists and tolerances of each column to compare with itself'''
    mz = np.array(df['m/z'])
    mz_tol = 1e-4
    rt = np.array(df['Retention time (min)'])
    rt_tol = 1e-3
    ccs = np.array(df['CCS (angstrom^2)'])
    ccs_tol = 1e-2
    D = 1.0063
    mass_adjust = D*(b - a)

    toolbar_width = 80

    # setup toolbar
    start = time.time()
    sys.stdout.write("[%s]" % ("#"))
    sys.stdout.flush()
    sys.stdout.write("\b" * (toolbar_width+1)) # return to start of line, after '['

    '''Initialize a list for indexes to be held for each pair of matched values'''
    idxs = []
    '''Create nested for loop to compare i with j in each column'''
    for i in range(len(df)):
        for j in range(i+1, len(df)):

            """Define checks for each column"""
            check_rt = np.isclose(rt[i], rt[j], rt_tol)
            if check_rt == True: pass
            else: continue
            
            check_mz = np.isclose(mz[i], mz[j] + mass_adjust, mz_tol)
            if check_mz == True: pass
            else: continue

            check_ccs = np.isclose(ccs[i], ccs[j], ccs_tol)
            if check_ccs == True: pass
            else: continue

            idxs.append([i,j])

            # update the bar
            sys.stdout.write("#")
            sys.stdout.flush()

    idx_pairs = np.array(idxs)
    flat_pairs = idx_pairs.flatten().tolist()
    mass_pairs = np.array(df['m/z'].iloc[flat_pairs]).reshape(len(idx_pairs), 2)

    sys.stdout.write("]\n") # this ends the progress bar
    end = time.time()
    print(mass_pairs.shape[0], "pairs found | Run time = ", end-start)
    return idx_pairs, mass_pairs

pairs, masses = pick_pairs(df_keep, 5, 11)
# masses

[#####################]
20 pairs found | Run time =  1.9568278789520264


In [127]:
data2 = '../data/test_data/2022_04_22_NEG_RSL3DAAvsCtrl_CSV_pairs_test_output.csv'
true_data = pd.read_csv(data2)
true_mz = np.array(true_data['m/z']).reshape(int(len(true_data)/2), 2)
true_mz

array([[978.6213752, 972.5837655],
       [976.6035114, 970.5633508],
       [949.6497534, 943.6137902],
       [924.6491739, 918.6112064],
       [922.6331353, 916.5968354],
       [905.6895514, 899.652542 ],
       [896.759428 , 890.7209797],
       [896.6183362, 890.5814109],
       [894.6029557, 888.5667271],
       [894.6027743, 888.5646863],
       [882.6029416, 876.5672706],
       [877.6611081, 871.6239716],
       [874.6422771, 868.6061886],
       [861.6305583, 855.593193 ],
       [839.640121 , 833.6022407],
       [808.598483 , 802.5765976],
       [805.639452 , 799.6012036],
       [803.6240684, 797.586861 ],
       [777.6085929, 771.5711204],
       [761.6128061, 755.5739202]])

In [122]:
"""Test for comparing pairs"""
import unittest

print("True Length: ", len(true_mz), "Masses Length: ", len(masses))

l1 = []
for tpair in true_mz:
    for pair in masses:
        l2 = []
        res = np.isclose(pair, tpair,rtol=1e-5)
        l2.append(res)
        if np.any(l2):
            break
        else:
            continue
    l1.append(np.any(l2))
print(l1)
print(np.count_nonzero(l1)/len(true_mz)*100, "%")
print("Pairs match: ", np.all(l1))

True Length:  20 Masses Length:  20
[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]
100.0 %
Pairs match:  True


In [123]:
# Define a class in which the tests will run
class UnitTests(unittest.TestCase):
    def test_pairs(self):
        l1 = []
        for pair in compounds:
            for tpair in true_pairs:
                res = np.any(Counter(pair)==Counter(tpair))
                if res == True: 
                    l1.append(res)
                    break
                elif tpair == true_pairs[-1]:
                    l1.append(res)
        self.assertNotIn(False, l1)

suite = unittest.TestLoader().loadTestsFromTestCase(UnitTests)
_ = unittest.TextTestRunner().run(suite)

E
ERROR: test_pairs (__main__.UnitTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/tmp/ipykernel_7161/3586644625.py", line 5, in test_pairs
    for pair in compounds:
NameError: name 'compounds' is not defined

----------------------------------------------------------------------
Ran 1 test in 0.004s

FAILED (errors=1)


In [124]:
# df_pairs = df_keep.iloc[pairs.flatten()]
# df_pairs.head()

### fcn to adjust masses of each pair

In [125]:
def mass_adj(pairs, df, a, b):
    '''Adjusts masses of given dataframe and list of pairs. Pairs must be together, with higher mass first. x is the lower value, y is the higher value.'''
    
    df_pairs = df.iloc[pairs.flatten()]
    masses = np.array(df_pairs["m/z"]).reshape((len(pairs), 2))
    masses[:, 0] -= b
    masses[:, 1] -= a

    df_pairs.insert(2, "m/z_adj", masses.flatten().tolist())

    return df_pairs

df_adj = mass_adj(pairs, df_keep, 5, 11)
# df_adj

In [126]:
from lipydomics.data import Dataset
from lipydomics.identification import add_feature_ids
# from tempfile import TemporaryFile


def lipid_id(input, output_name): 
    """identifies mass adjusted lipids and exports .xlsx to path specified"""
    # full = pd.read_csv(input) # this line isn't necessary if we feed in a dataframe
    trim = input.drop(columns=['Compound', 'm/z'])
    trim.to_csv('../data/trim.csv', index=False)
    data = open('../data/trim.csv')
    # need to change this to save to a temp directory
    dset = Dataset(data, esi_mode='neg')
    mz_tol = 0.03
    rt_tol = 0.3
    ccs_tol = 3.0
    tol = [mz_tol, rt_tol, ccs_tol]
    add_feature_ids(dset, tol, level='any')
    dset.export_xlsx(output_name)
    print('Identification Complete!')
    return output_name

# lipid_id(df_adj, "output")
input = df_adj
trim = input.drop(columns=['Compound', 'm/z'])
trim.to_csv('../data/trim.csv', index=False)
data = open('../data/trim.csv')
dset = Dataset(data, esi_mode='neg')
tol = [.03, .3, 3]
add_feature_ids(dset, tol, level='any')
print(dset)
dset.export_xlsx("ouptut.xlsx")

Dataset(
	csv="<_io.TextIOWrapper name='../data/trim.csv' mode='r' encoding='UTF-8'>",
	esi_mode="neg",
	samples=8,
	features=40,
	identified=40,
	normalized=False,
	rt_calibrated=False,
	ext_var=False,
	group_indices=None,
	stats={}
)


ModuleNotFoundError: No module named 'xlsxwriter'