In [268]:
import spacy
from spacy.matcher import Matcher
nlp = spacy.load("en_core_sci_lg")


In [323]:
import re
def query_to_patterns(query: str) -> list:
    """
    Expected query will be a set of keywords separated
    by OR keyword.

    Each expression separated by OR can have expressions
    combined by AND or NOT and the keywords can also contain
    wildcards.

    Spacy Requirements:
    tokenization: the paper states using custom tokenizer
    ! - negation
    Each dictionary in a list matches one token only
    A list matches all the dictionaries inside it (and condition)
    A list of list contains OR conditions
    [{"TEXT": {"REGEX": "abc*"}}] represents one token with regex match
    [{"LOWER": "dvt"}] matches case-insenstitive  DVT
    [{"LEMMA": "embolus"}] matches the lemmatized version of embolus as well in text

    Implementation:
    1. Split the query by OR
    2. Split each expression by AND
    3. Split each expression by NOT
    4. Split each expression by wildcard
    5. Convert each expression to a spacy pattern
    6. Combine the patterns
    7. Return the combined pattern
    """
    def get_regex_dict(token):
        return {"TEXT": {"REGEX": token}}

    def get_lemma_dict(token):
        return {"LEMMA": token}
    
    def get_negated_dict(token):
        return {"LOWER": token, "OP": "!"}
    
    match_pattern = r'^\s*((\(\s*[a-zA-Z0-9*?!]+(\s*AND\s*[a-zA-Z0-9*?!]+)*\s*\))|[a-zA-Z0-9*?!]+)(\s*OR\s*((\(\s*[a-zA-Z0-9*?!]+(\s*AND\s*[a-zA-Z0-9*?!]+)*\s*\))|[a-zA-Z0-9*?!]+))*\s*$'
    assert re.match(match_pattern, query), "Query must be a set of keywords separated by OR keyword"

    or_expressions = query.split(" OR ")
    res = [[] for _ in range(len(or_expressions))]
    for i, expression in enumerate(or_expressions):
        spacy_pattern = []
        expression = expression.strip().replace("(", "").replace(")", "")
        and_expressions = expression.split(" AND ")
        for tok in and_expressions:
            if "*" in tok or "?" in tok:
                spacy_pattern.append(get_regex_dict(tok))
            elif "!" in tok:
                spacy_pattern.append(get_negated_dict(tok.replace("!", "")))
            else:
                spacy_pattern.append(get_lemma_dict(tok)) 
        print(f"{expression} -> {[spacy_pattern]}")
        res[i] = [spacy_pattern]
    return  res

In [324]:
documents = [ """
Mr First is a 60 YO M with a history of metastatic colon CA, diagnosed in 1-2008, initially with stage III disease and s/p hemicolectomy followed by adjuvant chemo, with recurrence in the liver 6 months ago, evaluated today for management of pulmonary embolism. Pt reports no prior hx of VTE, CAD or CVA/TIA. Lately he has been on regular FOLFOX chemotherapy with satisfactory results. Two weeks ago he sustained sudden onset of chest pain and dyspnea, along with palpitations. He presented to his local ER; CT angio was performed and revealed PE's in the R LL and R ML. Pt was hemodynamically stable. He was started on rivaroxaban 15 mg BID and observed overnight. B LE Dopplers did not reveal and DVT. Since discharge he has done well, with resolution of chest pain and substantial improvement of dyspnea. He is very active and denies any decrease in balance or falls. He does not take ASA but he uses ibuprophen every once in a while for joint pains. He does not drink alcohol. He has had not rectorrhagia, melena or hematuria. He tends to have mild epistaxis in the winter when the air is dry, but this has not been a problem lately.
""", """
78 YO M referred for new diagnosis of CLL. Mr Down has followed-up for the same PCP for the last 15 years now, and in the last 5 y was noted to have progressive lymphocytosis, last time measure at 19,000. His PCP me and at my request sent for flow cytometry of peripheral blood, which showed a population of abnormal cells co-expressing CD5 and CD19, c/w CLL. Cell counts have otherwise been normal, with no anemia, neutropenia or thrombocytopenia. Pt feels generally well. His PCP already told him of the diagnosis. Mr Down denies fevers, NS or LAN. Wt has been stable. He denies significant fatigue and is very active. As a matter of fact, he just came back from a camping trip in the Poconos.
"""]

In [325]:
spacy_patterns

[[[{'LEMMA': 'dyspnea'}]]]

In [328]:
matcher = Matcher(nlp.vocab)
query = "dyspnea"
spacy_patterns = query_to_patterns(query)
assert len(matcher) == 0  
# pattern = [[{"LOWER": "deep"}, {"LOWER": "vein"}, {"TEXT": {"REGEX": "thromb*"}}]]
# matcher.add("expandedDVT", pattern)
# matcher.add("DVT", [[{"LOWER": "dvt"}]])
# matcher.add("emboli", [[{"LOWER": "pulmonary"}, {"LEMMA": "embolus"}]])
for i, item in enumerate(spacy_patterns):
    print(item)
    matcher.add(f"DVT_{i}", item)
doc = nlp(documents[0])
for sent in doc.sents:
    matches = matcher(sent)
    # print("********* Sentence", sent.text)
    print(len(matches))
    for match_id, start, end in matches:
        string_id = nlp.vocab.strings[match_id]  # Get string representation
        span = sent[start:end]                    # The matched span
        print(match_id, string_id, start, end, span.text)
        print("Found somehting \n\n+++++++++++++++")



dyspnea -> [[{'LEMMA': 'dyspnea'}]]
[[{'LEMMA': 'dyspnea'}]]
0
0
0
1
7005283834202530599 DVT_0 11 12 dyspnea
Found somehting 

+++++++++++++++
0
0
0
0
1
7005283834202530599 DVT_0 16 17 dyspnea
Found somehting 

+++++++++++++++
0
0
0
0


In [243]:
for match_id, start, end in matches:
    string_id = nlp.vocab.strings[match_id]  # Get string representation
    span = doc[start:end]                    # The matched span
    print(match_id, string_id, start, end, span.text)

15963895243908845058 DVT_1 2 5 has vein thrombus
7005283834202530599 DVT_0 15 16 dvt
13936575303155538993 DVT_2 19 20 clot


In [None]:
for match_id, start, end in matches:
    

In [200]:
%%file test_query_to_pattern.py
import pytest
from app import nlpprocessor
@pytest.mark.parametrize("query, expected", [
    ("(DVT) OR (!deep AND vein AND thromb*) OR (clot) ", [3, {"LEMMA": "DVT"}, {"LOWER": "deep", "OP": "!"}]),
    ("bleed", [1, {"LEMMA": "bleed"}])
    ])
def test_query_to_pattern(query, expected):
    res = query_to_patterns(query)
    assert len(res) == expected[0]
    assert res[0][0] == expected[1]


Overwriting test_query_to_pattern.py


In [201]:
!python -m pytest test_query_to_pattern.py

platform darwin -- Python 3.9.0, pytest-7.4.0, pluggy-1.2.0
rootdir: /Users/rsingh/Programming/cedars/cedars
configfile: pyproject.toml
plugins: Faker-19.3.0, anyio-3.7.1
collected 2 items                                                              [0m[1m

test_query_to_pattern.py [31mF[0m[31mF[0m[31m                                              [100%][0m

[31m[1m_ test_query_to_pattern[(DVT) OR (!deep AND vein AND thromb*) OR (clot) -expected0] _[0m

query = '(DVT) OR (!deep AND vein AND thromb*) OR (clot) '
expected = [3, {'LEMMA': 'DVT'}, {'LOWER': 'deep', 'OP': '!'}]

    [37m@pytest[39;49;00m.mark.parametrize([33m"[39;49;00m[33mquery, expected[39;49;00m[33m"[39;49;00m, [[90m[39;49;00m
        ([33m"[39;49;00m[33m(DVT) OR (!deep AND vein AND thromb*) OR (clot) [39;49;00m[33m"[39;49;00m, [[94m3[39;49;00m, {[33m"[39;49;00m[33mLEMMA[39;49;00m[33m"[39;49;00m: [33m"[39;49;00m[33mDVT[39;49;00m[33m"[39;49;00m}, {[33m"[39;49;00m[33mLOWER[39;49;

In [153]:
matcher = Matcher(nlp.vocab)
assert len(matcher) == 0  
# pattern = [[{"LOWER": "deep"}, {"LOWER": "vein"}, {"TEXT": {"REGEX": "thromb*"}}]]
# matcher.add("expandedDVT", pattern)
# matcher.add("DVT", [[{"LOWER": "dvt"}]])
# matcher.add("emboli", [[{"LOWER": "pulmonary"}, {"LEMMA": "embolus"}]])
for i, item in enumerate(query_to_patterns(query)):
    # if len(item) == 1:
    print(item)
    matcher.add(f"DVT_{i}", [item])
    # else:
        # matcher.add(f"DVT_{i}", item)
# matcher.add("DVT", query_to_patterns(query))
doc = nlp("The patient has vein thrombus in the left leg. Also had past history of DVT and pulmonary emboli clot")
matches = matcher(doc)


Processing expression:  (DVT)
Processing expression:  (!deep AND vein AND thromb*)
Processing expression:  (clot) 
[{'LEMMA': 'DVT'}]
[{'LOWER': 'deep', 'OP': '!'}, {'LEMMA': 'vein'}, {'TEXT': {'REGEX': 'thromb*'}}]
[{'LEMMA': 'clot'}]


In [154]:
for match_id, start, end in matches:
    string_id = nlp.vocab.strings[match_id]  # Get string representation
    span = doc[start:end]                    # The matched span
    print(match_id, string_id, start, end, span.text)

15963895243908845058 DVT_1 2 5 has vein thrombus
13936575303155538993 DVT_2 19 20 clot


<spacy.matcher.matcher.Matcher at 0x1776e09c0>

In [74]:
import spacy
from spacy.matcher import Matcher, PhraseMatcher
from spacy.tokens import Doc

nlp = spacy.load("en_core_web_sm")


AND_0 quick


In [77]:
def create_matcher(query):
    # Split the query by logical operators
    tokens = query.split()
    patterns = []
    pattern = []

    # Process tokens
    for token in tokens:
        if token in ["AND", "OR", "NOT"]:
            if pattern:
                patterns.append(("AND", pattern))
            patterns.append((token, []))
            pattern = []
        else:
            pattern.append(token)
    
    if pattern:
        patterns.append(("AND", pattern))

    matcher = Matcher(nlp.vocab)
    phrase_matcher = PhraseMatcher(nlp.vocab, attr="LOWER")

    for operator, pattern in patterns:
        if operator == "AND":
            for i, word in enumerate(pattern):
                if "*" in word or "?" in word:
                    word = word.replace("*", ".*").replace("?", ".")
                    matcher.add(f"AND_{i}", [[{"LOWER": {"REGEX": word}}]])
                else:
                    lemma = nlp(word)[0].lemma_
                    matcher.add(f"AND_{i}", [[{"LEMMA": lemma}]])
        elif operator == "OR":
            or_patterns = []
            for word in pattern:
                if "*" in word or "?" in word:
                    word = word.replace("*", ".*").replace("?", ".")
                    or_patterns.append([{"LOWER": {"REGEX": word}}])
                else:
                    lemma = nlp(word)[0].lemma_
                    or_patterns.append([{"LEMMA": lemma}])
            matcher.add("OR", or_patterns)
        elif operator == "NOT":
            for i, word in enumerate(pattern):
                if "*" in word or "?" in word:
                    word = word.replace("*", ".*").replace("?", ".")
                    matcher.add(f"NOT_{i}", [[{"LOWER": {"REGEX": word}}]], on_match=negate_match)
                else:
                    lemma = nlp(word)[0].lemma_
                    matcher.add(f"NOT_{i}", [[{"LEMMA": lemma}]], on_match=negate_match)

    return matcher, phrase_matcher

def negate_match(matcher, doc, i, matches):
    matches[:] = [m for m in matches if m[0] != matcher.vocab.strings[f"NOT_{i}"]]

def process_query(doc, query):
    matcher, phrase_matcher = create_matcher(query)
    matches = matcher(doc)
    return matches

doc = nlp("The quick brown fox jumps over the lazy dog.")
query = "quick AND jumps OR dog NOT fox* OR "
matches = process_query(doc, query)

for match_id, start, end in matches:
    print(nlp.vocab.strings[match_id], doc[start:end].text)

AND_0 quick
AND_0 fox
AND_0 jumps
AND_0 dog


In [76]:
patterns

[[{'LEMMA': 'acknowledge'}],
 [{'TEXT': {'REGEX': 'severe'}}, {'LEMMA': 'bleeding'}]]

In [69]:
"" in nlp.vocab

False

In [62]:
nlp.vocab.has

KeyError: "[E159] Can't find table 'lemma_rules' in lookups. Available tables: []"