# RunIndexing

This notebook performs indexing of all the medical reports in the database. It stores the results in index files that are subsequently used by the Backend notebook in order to enable efficient querying.

In [1]:
%run includes/imports.py
%load_ext autoreload
%autoreload 2
nlp = spacy.load('en', disable=['textcat', 'ner'])
from includes.stringop import StringOp

In [2]:
data = pd.read_csv('data/NOTEEVENTS.csv', dtype={'TEXT': str}, usecols = ['TEXT'])

In [3]:
with open('words/conditions.txt') as f:
    wlist = f.readlines()
    wlist = [str.lower(x.strip()) for x in wlist]
    
wordlist = []
for w in wlist:
    wordlist.append(w.split('/'))

### Define helper functions

In [4]:
def full_query(base_token):
    '''
    Performs a parsing tree traversal, and returns the aggregated words.
    base_token: the starting point.
    '''
    Q = [base_token]
    results = []
    while(len(Q)>0):
        element = Q[0]
        del Q[0]
        #backward
        if element.dep_ in ['pobj','amod','prep','conj','nsubj','neg'] and not element.head==base_token and not str.lower(element.head.text) in results:
            Q.append(element.head)
            results.append(str.lower(element.head.text))
        #forward
        for c in element.children:
            if c.dep_ in ['amod','compound','nsubj','prep','pobj','advmod','det','neg'] and not str.lower(c.text) in results:
                Q.append(c)
                results.append(str.lower(c.text))
    return results
                
def extract(text):
    '''
    Finds all occurrences of conditions present in the given full text, and all the related attributes by executing the query function.
    Returns a dictionary that maps from condition to a dictionary of attributes -> number of occurrences.
    text: the text to operate on.
    '''
    conditions = StringOp.find_condition(wordlist, text)
    doc = nlp(text)
    
    #create map of char index -> token
    mp = {}
    for i in range(1, len(doc)):
        left = doc[i-1].idx
        right = doc[i].idx
        for j in range(left, right):
            mp[j] = doc[i-1]
        
    results = {}
    
    for condition, char_index_matches in conditions.items():
        num_cond_occurred = len(char_index_matches)
        results[condition] = {}
        for char_index in char_index_matches:
            if not char_index in mp:
                print('error index not found', char_index)
                #print(mp)
                continue
            basetoken = mp[char_index]
            att = full_query(basetoken)
            for at in att:
                if not at in results[condition]: results[condition][at] = 0
                results[condition][at]+=1
        results[condition]['-total-'] = num_cond_occurred
    
    return results
    
def run_indexing(path, df, selected_records=None):
    '''
    Runs the indexing process on the selected records sequentially (single-threaded).
    path: the path in which to save the resulting index file.
    df: the original dataframe.
    selected_records: operates on the whole dataframe if None, otherwise just on the given indices.
    '''
    df_subset = df
    if selected_records!=None:
        df_subset = df.iloc[selected_records]
    f = open(path,'w+')
    for i in tqdm(range(df_subset.shape[0])): 
        med_text = df_subset.iloc[i]['TEXT']
        sentences = StringOp.clean(med_text)
        document = " ".join(sentences)
        res_dict = extract(document) 
        json.dump(res_dict,f)
        f.write('\n')
    f.close()
    
def write_index_single(path, df): 
    '''
    Helper function where individual processes operate on.
    path: the path in which to save the resulting index file.
    df: the dataframe to operate on.
    '''
    f = open(path,'w+')
    for index in range(df.shape[0]): 
        sentences = StringOp.clean(df.iloc[index]['TEXT'])
        document = " ".join(sentences)
        ddict = extract(document)
        json.dump(ddict,f)
        f.write('\n')       
        
    f.close()
    
def run_indexing_parallel(path, df, num_cores, selected_records=None, verbose=False):
    '''
    Runs the indexing process on the selected records in parallel (multi-threaded).
    path: the path in which to save the resulting index file.
    df: the original dataframe.
    num_cores: the number of parallel processes to use.
    selected_records: operates on the whole dataframe if None, otherwise just on the given indices.
    verbose: determines verbosity level.
    '''
    path_list = [path + 'index' + str(i) + '.json' for i in range(num_cores)]
    df_subset = df
    numrecords = df.shape[0]
    if selected_records!=None:
        df_subset = df.iloc[selected_records]
        numrecords = len(selected_records)
    range_list = [] 
    
    #split up work for seperate processes
    step = numrecords//num_cores
    if numrecords%num_cores == 0:
        range_list = [(i*step, (i+1)*step) for i in range(num_cores)]
    else:
        range_list = [(i*step, (i+1)*step) for i in range(num_cores-1)]
        range_list.append(((num_cores-1)*step, numrecords))
        
    processes = []
    for i in range(num_cores): #create processes operating on subsets of the data
        lst = range_list[i]
        df_sub = df_subset.iloc[lst[0]:lst[1]]
        proc = mp.Process(target=write_index_single, args=(path_list[i], df_sub))
        processes.append(proc)
        del df_sub
 
    if verbose:
        start = timeit.default_timer()
    for p in processes:
        p.start()
    print('processing started...')
    
    for p in processes:
        p.join()
    if verbose:
        end = timeit.default_timer()
        print("The running time is ", end - start)

### Perform single-threaded indexing

In [None]:
t1 = time.time()
run_indexing('index/index.json', data, None)
print(time.time()-t1)

### Perform multi-threaded indexing

In [None]:
t1 = time.time()
run_indexing_parallel('index/', data, 8, None)
print(time.time()-t1)