In [1]:
from itertools import chain

import nltk
import sklearn
import scipy.stats
from sklearn.metrics import make_scorer
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RandomizedSearchCV
import sklearn_crfsuite
from sklearn_crfsuite import scorers
from sklearn_crfsuite import metrics
import pickle

import pandas as pd
import numpy as np
import re
import os

In [2]:
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package punkt to /home/jovyan/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/jovyan/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

In [8]:
! pwd

/home/jovyan/work/NER-Test


In [3]:
lab_names = pd.read_csv('/home/jovyan/work/NER-Test/data/labtest_dictionary_v01.csv')

UNITS = pd.read_csv('/home/jovyan/work/NER-Test/data/labtest_units_v01.csv')
UNITS = UNITS['units'].tolist()
UNITS.extend(['%','#','(Auto)','(Manual)'])

def extract_labs(t):
    if len(str(t).strip())<1:
        return ['','']
    
    t = re.sub(r'\"','',str(t).strip())
    wordlist = nltk.pos_tag(nltk.word_tokenize(str(t)))
    tagged_list = []
    for atagged in wordlist:
        tagged_list.append([0,atagged[0],atagged[1]])
    tagged_df = pd.DataFrame(tagged_list)
    if tagged_df.shape[0]==0:
        return ['','']
    tagged_df = tagged_df.rename(columns={0:'Sentence #',1:'Word',2:'POS'})
    
    #print(tagged_df.head())
    func = lambda s: [(w, p) for w, p in zip(s["Word"].values.tolist(), s["POS"].values.tolist())]
    grouped = tagged_df.groupby("Sentence #").apply(func)
    sentences = [s for s in grouped]
    test_sents = sentences
    
    X_test = [sent2features(s) for s in test_sents]
    y_pred = crf_model.predict(X_test)
    prediction = []
    for sent, label in zip(test_sents,y_pred):
        asent = []
        atruth = []
        apred = []
        for s, l in zip(sent, label):
            asent.append(s[0])

            if l == 'LAB':
                apred.append(s[0])
        # Original text, and extracted text
        prediction.append([' '.join(asent), ' '.join(apred)])
    sent, lab = prediction[0]    
    return [sent, lab]

class SentenceGetter(object):
    
    def __init__(self, data):
        self.n_sent = 1
        self.data = data
        self.empty = False
        agg_func = lambda s: [(w, p, t) for w, p, t in zip(s["Word"].values.tolist(),
                                                           s["POS"].values.tolist(),
                                                           s["Tag"].values.tolist())]
        self.grouped = self.data.groupby("Sentence #").apply(agg_func)
        self.sentences = [s for s in self.grouped]
    
    def get_next(self):
        try:
            s = self.grouped["Sentence: {}".format(self.n_sent)]
            self.n_sent += 1
            return s
        except:
            return None


def word2features(sent, i):
    word = str(sent[i][0])
    postag = str(sent[i][1])

    isTestName = True if len([token for w in lab_names['test'].tolist() for token in str(w).split(' ') if token.find(word)>-1])>0 else False
    isUnit = True if word in UNITS else False
    isDecimal = True if re.findall(r'(\.)', word) and word.isdigit() else False
    isDigit = True if word.isdigit() else False
    isRange = True if re.match(r'\(\d*.?\d+-\d*.?\d+\)', word) else False
    
    features = {
        'word.isTestName()': word if isTestName else '',
        'word.isUnit()': word if isUnit else '',
        #'word.isDecimal()': word if isDecimal else '',
        'word.isDigit()': word if isDigit else '',
        'word.isRange()': word if isRange else '',
        'postag': postag,
        'postag[:2]': postag[:2],
    }
    try:
        if i > 0:
            word1 = sent[i - 1][0]
            postag1 = sent[i - 1][1]


            isTestName1 = True if len([token for w in lab_names['test'].tolist() for token in str(w).split(' ') if token.find(word1)>-1])>0 else False
            isUnit1 = True if word1 in UNITS else False
            isDecimal1 = True if re.findall(r'(\.)', word1) and word1.isdigit() else False
            isDigit1 = True if word1.isdigit() else False
            isRange1 = True if re.match(r'\(\d*.?\d+-\d*.?\d+\)', word1) else False

            features.update({
                '-1word.isTestName()': word1 if isTestName1 else '',
                '-1word.isUnit()': word1 if isUnit1 else '',
                #'-1word.isDecimal()': word1 if isDecimal1 else '',
                '-1word.isDigit()': word1 if isDigit1 else '',
                '-1word.isRange()': word1 if isRange1 else '',
                '-1postag': postag1,
                '-1postag[:2]': postag1[:2],
            })
        else:
            #features['BOS'] = True
            features.update({
                'BOS': word
            })

        if i < len(sent) - 1:
            word1 = sent[i + 1][0]
            postag1 = sent[i + 1][1]


            isTestName1 = True if len([token for w in lab_names['test'].tolist() for token in str(w).split(' ') if token.find(word1)>-1])>0 else False
            isUnit1 = True if word1 in UNITS else False
            isDecimal1 = True if re.findall(r'(\.)', word1) and word1.isdigit() else False
            isDigit1 = True if word1.isdigit() else False
            isRange1 = True if re.match(r'\(\d*.?\d+-\d*.?\d+\)', word1) else False

            features.update({
                '+1word.isTestName()': word1 if isTestName1 else '',
                '+1word.isUnit()': word1 if isUnit1 else '',
                #'+1word.isDecimal()': word1 if isDecimal1 else '',
                '+1word.isDigit()': word1 if isDigit1 else '',
                '+1word.isRange()': word1 if isRange1 else '',
                '+1postag': postag1,
                '+1postag[:2]': postag1[:2],
            })
        else:
            #features['EOS'] = True        
            features.update({
                'EOS': word
            })
    except:
        print(sent,word)

    return features

def sent2features(sent):
    return [word2features(sent, i) for i in range(len(sent))]


def sent2labels(sent):
    return [label for token, postag, label in sent]


def sent2tokens(sent):
    return [token for token, postag, label in sent]



# Simple Test

In [7]:
crf_model = pickle.load(open('/home/jovyan/work/NER-Test/models/crf_model_lab_v12.pkl','rb'))
# lab test: WBC 7.7 10E3/uL (4.0-11.0)
sent, pred = extract_labs('WBC 7.7 10E3/uL (4.0-11.0)')
print(f'input:{sent}, output:{pred}')

input:WBC 7.7 10E3/uL ( 4.0-11.0 ), output:WBC 7.7 10E3/uL ( 4.0-11.0 )


# Complete Test

In [4]:
crf_model = pickle.load(open('/home/jovyan/work/NER-Test/models/crf_model_lab_v12.pkl','rb'))

In [8]:
data = pd.read_csv('/home/jovyan/work/NER-Test/data/train/train_test_samples.csv')
data.head()

Unnamed: 0,lab_result_0,CLASS
0,"Atchison Hospital Raven Hill ve Atchison, KS",O
1,Emergency Department Note Signed,O
2,DOB: // ...,O
3,Age/Sex: / M ...,O
4,Loc: ED,O


In [9]:
X_test = data.copy()
X_test = X_test.fillna(value='')
X_test['lab_result'] = X_test['lab_result_0'].apply(extract_labs)
X_test['LAB_TEXT'] = X_test['lab_result'].apply(lambda s:s[0])
X_test['LAB_TEST'] = X_test['lab_result'].apply(lambda s:s[1])
X_test.head()

Unnamed: 0,lab_result_0,CLASS,lab_result,LAB_TEXT,LAB_TEST
0,"Atchison Hospital Raven Hill ve Atchison, KS",O,"[Atchison Hospital Raven Hill ve Atchison , KS, ]","Atchison Hospital Raven Hill ve Atchison , KS",
1,Emergency Department Note Signed,O,"[Emergency Department Note Signed, ]",Emergency Department Note Signed,
2,DOB: // ...,O,"[DOB : // Acct : AH, ]",DOB : // Acct : AH,
3,Age/Sex: / M ...,O,"[Age/Sex : / M ADM Date : //, ]",Age/Sex : / M ADM Date : //,
4,Loc: ED,O,"[Loc : ED, ]",Loc : ED,


In [10]:
df_data = X_test.copy()
df_data['CLASS'] = df_data[['lab_result_0','CLASS']].apply(lambda s: '' if re.sub(r'[^\w]+|\s+','',re.sub(r'\d{1,2}\/\d{1,2}\/\d{1,2}','',re.sub(r'\d{1,2}:\d{1,2}:\d{1,2}','',s[0])))=='' else s[1], axis=1)
df_data['check'] = df_data[['CLASS','lab_result']].apply(lambda s:True if (s[0]!='' and s[1]!='') or (s[0]==s[1]=='') else False,axis=1)
print(df_data[df_data['check']].shape, df_data[df_data['check']==False].shape)
print(df_data.head())

(2864, 6) (7, 6)
                                        lab_result_0 CLASS  \
0     Atchison Hospital  Raven Hill ve Atchison, KS      O   
1                   Emergency Department Note Signed     O   
2  DOB: //                                       ...     O   
3  Age/Sex:  / M                                 ...     O   
4                                            Loc: ED     O   

                                          lab_result  \
0  [Atchison Hospital Raven Hill ve Atchison , KS, ]   
1               [Emergency Department Note Signed, ]   
2                             [DOB : // Acct : AH, ]   
3                    [Age/Sex : / M ADM Date : //, ]   
4                                       [Loc : ED, ]   

                                        LAB_TEXT LAB_TEST  check  
0  Atchison Hospital Raven Hill ve Atchison , KS            True  
1               Emergency Department Note Signed            True  
2                             DOB : // Acct : AH            True  
3    

In [11]:
df_data.to_csv('data/test/test-11-28-2022-validation.csv',index=False)