# Initial setup

Let's import the required libraries and set up global variables for the rest of the script.

In [1]:
# coding: utf-8
!pip install tqdm
import csv
import os
import re
import shutil
import string
import zipfile
import sys
from collections import defaultdict
from lxml import objectify
import codecs
import nltk
import pandas as pd
import requests
import tarfile
import subprocess
import platform
import time
from tqdm import tqdm as progressbar # pandas df usage: 'for row in progressbar(df.itertuples(), total=df.shape[0])'



Helper function to create a directory under the specified path, gracefully handling errors.

In [2]:
def __mkdir(*args):
    path = os.path.join(*args)
    try: 
        os.makedirs(path)
    except OSError:
        if not os.path.isdir(path):
            raise
    return path

In [3]:
# Create the project directory holding the downloaded data, serialized dataframes and MetaMap install.
# working_dir = __mkdir(os.path.expanduser("~"), "Medframes")

# Set working directory as the current directory of the ipython notebook
working_dir = os.getcwd()

print("Working directory: %s" % working_dir)

Working directory: C:\Study\CS102\project\project2\repro\CS109Project


# Test MetaMap installation

This will start the server, run a simple query in interactive mode and stop the server.

In [4]:
def test_metamap(mm_dir):
    mm_scripts = {"Linux": 'echo "common flu" | ./bin/metamap -I',
                  "Windows": 'echo "common flu" | bin\metamap.bat -I',
                  "MacOS": 'echo "common flu" | ./bin/metamap -I'}
    mm_script = mm_scripts[platform.system()]
    os.chdir(mm_dir)
    print subprocess.check_output(mm_script, shell=True)

##### Set here the path to the MetaMap installation folder:

In [5]:
mm_dir = "C:\\Study\\CS102\\project\\public_mm_win32_main_2014\\public_mm\\"

In [6]:
test_metamap(mm_dir)


C:\Study\CS102\project\public_mm_win32_main_2014\public_mm>set path=C:\Anaconda\lib\site-packages\numpy\core;C:\ProgramData\Oracle\Java\javapath;C:\Program Files\Common Files\Microsoft Shared\Windows Live;C:\Program Files (x86)\Common Files\Microsoft Shared\Windows Live;C:\Program Files (x86)\Intel\iCLS Client\;C:\Program Files\Intel\iCLS Client\;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Program Files (x86)\Intel\OpenCL SDK\3.0\bin\x86;C:\Program Files (x86)\Intel\OpenCL SDK\3.0\bin\x64;C:\Program Files\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files\Intel\Intel(R) Management Engine Components\IPT;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\IPT;c:\Program Files (x86)\Microsoft SQL Server\100\Tools\Binn\;c:\Program Files\Microsoft SQL Server\100\Tools\Binn\;c:\Program Files\Microsoft SQL Server\100\DTS\Binn\;C:\Program Files (x86)\Microsoft ASP.NET\ASP.NET We

# Process the data with MetaMap

#### If you did not install metamap, skip to section "Process MetaMap results"

Dump the ngrams to a text file in the "list of terms with IDs" format(id: "{criteria id}-{ngram index in criteria}") and tag it with MetaMap. Outputs the "fielded NMI" format.

In [7]:
def prepare_mm_input(df, dest_file, verbose=True):
    if verbose:
        print("Generating MetaMap input at %s" % dest_file)
    with codecs.open(dest_file, 'w', encoding='ascii', errors='ignore') as fh:
        for r in df[['Ngrams', 'criteria_id']].itertuples():
            ngrams = r[1]
            cri_id = r[2]
            for ngrami, ngram in enumerate(ngrams):
                line = '-'.join((str(cri_id), str(ngrami))) +'|'+' '.join(_[0] for _ in ngram)
                fh.write(line+os.linesep)
    return dest_file

In [8]:
def run_metamap(mm_dir, src_file, dest_file, verbose=True):
    try:
        num_lines = sum(1 for line in open(src_file, 'r'))
        total = 2*num_lines + 23
        if verbose:
            print("Running MetaMap on file %s, writing to %s" % (src_file, dest_file))
        mm_scripts = {"Linux": './bin/metamap --sldiID -z -i -N %s %s',
                      "Windows": 'bin\metamap.bat --sldiID -z -i -N %s %s',
                      "MacOS": './bin/metamap -I --sldiID -z -i -N %s %s'}
        mm_script = mm_scripts[platform.system()] % (src_file, dest_file)
        os.chdir(mm_dir)
        process = subprocess.Popen(mm_script, stdout=subprocess.PIPE, shell=True)
        if verbose:
            for line in progressbar(iter(process.stdout.readline, ''), total=total):
#                 sys.stdout.write(line)
                pass # no need for stdout if we use progressbar
        else:
            for line in iter(process.stdout.readline, ''):
                pass
        return dest_file
    except Exception as e:
        return None

In [9]:
def process_mm_serial(data):
    start = time.time()

    mm_in = prepare_mm_input(data, os.path.join(data_dir, 'mm_in.txt'))
    m_out = run_metamap(mm_dir, mm_in, os.path.join(data_dir, 'mm_out.txt'))

    print 'Done. Serial processing time: %.1f sec' % round(time.time() - start, 1)

In [10]:
def process_mm_parallel(data, n_jobs):
    import numpy as np
    from IPython.lib.backgroundjobs import BackgroundJobManager
    
    jobs = BackgroundJobManager()
    
    # each job will get df chunk, prepare mm input and give it to mm, returning path to mm output file
    def mm_job(i, _df):
        mm_in = prepare_mm_input(_df, os.path.join(data_dir, 'mm_in_{}.txt'.format(i)), verbose=False)
        m_out = run_metamap(mm_dir, mm_in, os.path.join(data_dir, 'mm_out_{}.txt'.format(i)), verbose=False)
        return m_out

    start = time.time()

    print 'Splitting dataframe..'
    dfs = np.array_split(data, n_jobs)
    
    print 'Done. Starting jobs..'
    for i, _df in enumerate(dfs):
        jobs.new(mm_job, i, _df, daemon=True)
    
    # Ping jobs status each 10 seconds while we dont complete all jobs or have error.
    while len(jobs.dead) == 0 and len(jobs.completed) < n_jobs:
        time.sleep(10)
        print '%d/%d jobs completed..' % (len(jobs.completed), n_jobs)

    print '%d/%d jobs completed. Jobs results:' % (len(jobs.completed), n_jobs)
    for job_n in jobs.all.keys():
        print jobs.result(job_n)

    jobs.flush()
    print 'Done. Parallel processing time: %.1f sec' % round(time.time() - start, 1)

In [11]:
def process_mm(data, conf):
    
    # split data if we have data_limit in config
    _data = data if not conf['data_limit'] else data[:conf['data_limit']]
    
    if conf['parallel']:
        print 'Processing %d records in parallel with %d jobs..' % (_data.shape[0], conf['n_jobs'])
        process_mm_parallel(_data, conf['n_jobs'])
    else:
        print 'Processing %d records in serial..' % _data.shape[0]
        process_mm_serial(_data)

# Settings
Here you can configure metamap processing options.

In [12]:
MM_CONFIG = {
    'parallel': True, # parallel execution gives X2 speed up
    'n_jobs': 2, # process all the data in ~1.5 hour in my setup
    'data_limit': None # set it to None to process all data  
}

In [13]:
data_dir = __mkdir(working_dir, "data")

criteria = pd.read_pickle(os.path.join(data_dir, "ct_filtered.pckl"))



In [14]:
criteria.head(2)

Unnamed: 0,Lemmas,NctId,Ngrams,Tags,Tokens,criteria_id
0,"[(History, history), (of, of), (uncontrolled, ...",NCT00001149,"[((time, NN),), ((prior, RB), (to, TO), (admis...","[(History, NN), (of, IN), (uncontrolled, VBN),...","[History, of, uncontrolled, seizures, at, the,...",0
1,"[(Seizure, seizure), (frequency, frequency), (...",NCT00001149,"[((frequency, NN), (by, IN)), ((video, NN),), ...","[(Seizure, NN), (frequency, NN), (by, IN), (hi...","[Seizure, frequency, by, history, must, be, su...",1


In [15]:
process_mm(criteria, MM_CONFIG)

Processing 17144 records in parallel with 2 jobs..
Splitting dataframe..
Done. Starting jobs..
Starting job # 0 in a separate thread.
Starting job # 2 in a separate thread.
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs completed..
0/2 jobs

# Process MetaMap results

Convert the MetaMap-processed data to a Pandas dataframe and use the dataframe.

In [16]:
def pass_threshold(score):
    return score >= 5.5

def metamap_to_dataframe(src_file):
    print("Converting MetaMap results in %s to a dataframe." % src_file)
    data = []
    with open(src_file, 'r') as fh:
        for line in fh:
            row = line.split("|")
            id = row[0]
            cri_id, ngrami = map(int, id.split("-"))
            score = float(row[2])
            term = row[3]
            cui = row[4]
            stype = row[5].strip("[]")
            cid = row[-1].strip().split(";")
            if pass_threshold(score):
                data.append([cri_id,ngrami,score,term,cui,stype,cid])
    df = pd.DataFrame(columns=["criteria_id", "ngram_index", "score", "term", "cui", "stype", "cid"], data=data)
    return df

def process_mm_results(conf):
    if conf['parallel']:
        # collect mm_out_... files, transform them to dataframes & concat in resulting mm:
        mm_dfs = []
        for i in range(conf['n_jobs']):
            mm_df = metamap_to_dataframe(os.path.join(data_dir, "mm_out_{}.txt".format(i)))
            mm_dfs.append(mm_df)
        mm = pd.concat(mm_dfs)
        mm.reset_index(drop=True, inplace=True)
        return mm
    else:
        mm = metamap_to_dataframe(os.path.join(data_dir, "mm_out.txt"))
        return mm
    
def extend_with_nctids(mm_df, criteria_df):
    nctids = []
    for row in mm_df[['criteria_id']].itertuples():
        cri_id = row[1]
        nct_id = criteria_df[criteria_df['criteria_id'] == cri_id].NctId.values[0]
        nctids.append(nct_id)
    mm_df.insert(0, 'nct_id', nctids)
    return mm_df

def extend_with_ngrams(mm_df, incl_df):
    ngrams = []
    for row in progressbar(mm_df[['criteria_id', 'ngram_index']].itertuples(), total=mm_df.shape[0]):
        cri_id = row[1]
        ngrami = row[2]
        ngram = incl_df[incl_df['criteria_id'] == cri_id]['Ngrams'].values[0][ngrami]
        ngrams.append(ngram)
    mm_df.insert(0, 'ngram', ngrams)
    return mm_df

In [17]:
data_dir = __mkdir(working_dir, "data")

mm = process_mm_results(MM_CONFIG)

# we can get NctId by criteria id from criterias df:
criteria = pd.read_pickle(os.path.join(data_dir, 'ct_criteria.pckl'))
mm = extend_with_nctids(mm, criteria)

# we can get ngrams from filtered pckl
incl = pd.read_pickle(os.path.join(data_dir, 'ct_filtered.pckl'))
mm = extend_with_ngrams(mm, incl)

mm.to_pickle(os.path.join(data_dir, "mm.pckl"))

Converting MetaMap results in C:\Study\CS102\project\project2\repro\CS109Project\data\mm_out_0.txt to a dataframe.
Converting MetaMap results in C:\Study\CS102\project\project2\repro\CS109Project\data\mm_out_1.txt to a dataframe.

                                                                                                                                                               






Load the serialized MetaMap results and display sample data.

In [18]:
data_dir = __mkdir(working_dir, "data")
mm = pd.read_pickle(os.path.join(data_dir, "mm.pckl"))
mm.head(100)

Unnamed: 0,ngram,nct_id,criteria_id,ngram_index,score,term,cui,stype,cid
0,"((time, NN),)",NCT00001149,0,0,8.34,Time,C0040223,tmco,[G01.910]
1,"((uncontrolled, VBN), (seizure, NNS), (at, IN))",NCT00001149,0,8,16.21,Seizures,C0036572,sosy,"[C10.228.140.490.631, C10.597.742, C23.888.592..."
2,"((of, IN), (seizure, NNS), (during, IN))",NCT00001149,0,10,16.21,Seizures,C0036572,sosy,"[C10.228.140.490.631, C10.597.742, C23.888.592..."
3,"((seizure, NNS), (at, IN), (the, DT))",NCT00001149,0,11,16.21,Seizures,C0036572,sosy,"[C10.228.140.490.631, C10.597.742, C23.888.592..."
4,"((pattern, NN), (of, IN), (seizure, NNS))",NCT00001149,0,12,16.07,Seizures,C0036572,sosy,"[C10.228.140.490.631, C10.597.742, C23.888.592..."
5,"((seizure, NNS), (during, IN), (the, DT))",NCT00001149,0,13,16.21,Seizures,C0036572,sosy,"[C10.228.140.490.631, C10.597.742, C23.888.592..."
6,"((seizure, NNS), (at, IN))",NCT00001149,0,17,16.26,Seizures,C0036572,sosy,"[C10.228.140.490.631, C10.597.742, C23.888.592..."
7,"((the, DT), (present, NN), (time, NN))",NCT00001149,0,18,6.74,Time,C0040223,tmco,[G01.910]
8,"((seizure, NNS),)",NCT00001149,0,21,17.80,Seizures,C0036572,sosy,"[C10.228.140.490.631, C10.597.742, C23.888.592..."
9,"((uncontrolled, VBN), (seizure, NNS))",NCT00001149,0,22,16.26,Seizures,C0036572,sosy,"[C10.228.140.490.631, C10.597.742, C23.888.592..."


In [19]:
print len(mm.index), len(set(mm.index))

245201 245201
