In [1]:
%load_ext autoreload

In [2]:
%autoreload 1

In [3]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [4]:
import os
import sys

In [5]:
library_path = os.environ.get("NLP_PATENT_PATH")
if library_path not in sys.path:
    sys.path.append(library_path)

In [6]:
%aimport settings
%aimport app.io.file_readers

In [7]:
import json

from docx import Document
import re
from pathlib import Path
import spacy
from spacy.matcher import PhraseMatcher, Matcher
from spacy.tokens import Span, Doc, Token
from spacy import displacy

In [8]:
from settings import DATA_DIR
from app.io.file_readers import get_text_from_word_document

##### SpaCy re-cap

In [31]:
Doc.set_extension('title', default=None, force=True)
Token.set_extension('is_color', default=False)

In [49]:
from spacy.lang.en import English
from spacy.tokens import Span

nlp = English()

# Define the method
def to_html(span, tag):
    # Wrap the span text in a HTML tag and return it
    return "<{tag}>{text}</{tag}>".format(tag=tag, text=span.text)


# Register the Span property extension 'to_html' with the method to_html
Span.set_extension("to_html", method=to_html)

# Process the text and call the to_html method on the span with the tag name 'strong'
doc = nlp("Hello world, this is a sentence.")
span = doc[0:2]
print(span._.to_html("strong"))

<strong>Hello world</strong>


In [20]:
import spacy
from spacy.tokens import Span

nlp = spacy.load("en_core_web_sm")


def get_wikipedia_url(span):
    # Get a Wikipedia URL if the span has one of the labels
    if span.label_ in ("PERSON", "ORG", "GPE", "LOCATION"):
        entity_text = span.text.replace(" ", "_")
        return "https://en.wikipedia.org/w/index.php?search=" + entity_text


# Set the Span extension wikipedia_url using get getter get_wikipedia_url
Span.set_extension("wikipedia_url", getter=get_wikipedia_url, force=True)

doc = nlp(
    "In over fifty years from his very first recordings right through to his "
    "last album, David Bowie was at the vanguard of contemporary culture."
)
for ent in doc.ents:
    # Print the text and Wikipedia URL of the entity
    print(ent.text, ent._.wikipedia_url)

fifty years None
first None
David Bowie https://en.wikipedia.org/w/index.php?search=David_Bowie


In [33]:
from spacy.lang.en import English
from spacy.tokens import Span
from spacy.matcher import PhraseMatcher

COUNTRIES = ["Czech Republic", "Slovakia"]

CAPITALS = {"Czech Republic": "A", "Slovakia": "B"}

matcher = PhraseMatcher(nlp.vocab)
matcher.add("COUNTRY", None, *list(nlp.pipe(COUNTRIES)))


def countries_component(doc):
    # Create an entity Span with the label 'GPE' for all matches
    matches = matcher(doc)
    doc.ents = [Span(doc, start, end, label="GPE") for match_id, start, end in matches]
    return doc

# Add the component to the pipeline
nlp.add_pipe(countries_component)
print(nlp.pipe_names)

# Getter that looks up the span text in the dictionary of country capitals
get_capital = lambda span: CAPITALS.get(span.text)

# Register the Span extension attribute 'capital' with the getter get_capital
Span.set_extension("capital", getter=get_capital, force=True)

# Process the text and print the entity text, label and capital attributes
doc = nlp("Czech Republic may help Slovakia protect its airspace")
print([(ent.text, ent.label_, ent._.capital) for ent in doc.ents])

['tagger', 'parser', 'ner', 'countries_component']
[('Czech Republic', 'GPE', 'A'), ('Slovakia', 'GPE', 'B')]


In [76]:
t = "A big and cool Great Britain is here."
d = nlp(t)
pos = [X.idx for X in d]
print(pos)
c = d.ents[0]
country_name = c.text
start = pos[c.start]
end = start + len(country_name) - 1
print(start, end, t[start:end+1])

[0, 2, 6, 10, 15, 21, 29, 32, 36]
15 27 Great Britain


In [45]:
from spacy.lang.en import English

nlp = English()

# Process the text
doc = nlp(
    "In 1990, more than 60% of people in East Asia were in extreme poverty. "
    "Now less than 4% are."
)

# Iterate over the tokens in the doc
for token in doc:
    # Check if the token resembles a number
    if token.like_num:
        # Get the next token in the document
        next_idx = token.i + 1
        if next_idx < len(doc):
            next_token = doc[next_idx]
        # Check if the next token's text equals '%'
            if next_token.text == "%":
                print("Percentage found:", token.text)

Percentage found: 60
Percentage found: 4


In [85]:
import spacy
from spacy.matcher import Matcher

nlp = spacy.load("en_core_web_sm")
doc = nlp(
    "Twitch Prime, the perks program for Amazon Prime members offering free "
    "loot, games and other benefits, is ditching one of its best features: "
    "ad-free viewing. According to an email sent out to Amazon Prime members "
    "today, ad-free viewing will no longer be included as a part of Twitch "
    "Prime for new members, beginning on September 14. However, members with "
    "existing annual subscriptions will be able to continue to enjoy ad-free "
    "viewing until their subscription comes up for renewal. Those with "
    "monthly subscriptions will have access to ad-free viewing until October 15."
)

# Create the match patterns
pattern1 = [{"LOWER": "amazon"}, {"IS_TITLE": True, "POS": "PROPN"}]
pattern2 = [{"LOWER": "ad"}, {"LOWER": "-"}, {"LOWER": "free"}, {"POS": "NOUN"}]

# Initialize the Matcher and add the patterns
matcher = Matcher(nlp.vocab)
matcher.add("PATTERN1", None, pattern1)
matcher.add("PATTERN2", None, pattern2)

# Iterate over the matches
for match_id, start, end in matcher(doc):
    # Print pattern string name and text of matched span
    print(doc.vocab.strings[match_id], doc[start:end].text)

PATTERN1 Amazon Prime
PATTERN2 ad-free viewing
PATTERN1 Amazon Prime
PATTERN2 ad-free viewing
PATTERN2 ad-free viewing
PATTERN2 ad-free viewing


##### Read the data

In [9]:
path = DATA_DIR / "pipeline_sample"
json_files = list(path.glob("*json"))
data = {}
for file in json_files:
    key = file.name.split(".")[1].strip().lower()
    with file.open() as f:
        data[key] = json.load(f)

print(list(data.keys()))
texts = [claim["Preamble"] for claim in data["before_analyze"]["Claims"]]

['context', 'before_analyze', 'context_request', 'after_analyze']


In [11]:
json.dumps(data["before_analyze"])

'{"Claims": [{"Preamble": "A method of wireless communication performed by a user equipment (UE), comprising:", "ClaimNumber": 1, "Limitations": [{"LimitationText": "receiving, when operating in a network that provides synchronization signals at a plurality of locations in a channel bandwidth, a common search space (CSS) indicator for a synchronization signal, wherein the CSS indicator indicates that the synchronization signal is not associated with a CSS for a downlink control channel; and", "Limitations": []}, {"LimitationText": "determining a location of the CSS for the downlink control channel based at least in part on the CSS indicator.", "Limitations": []}], "ClaimType": 0, "ParentClaimNumbers": [], "DeviceName": ""}, {"Preamble": "The method of claim 1, wherein the CSS indicator is receiving in a physical broadcast channel (PBCH).", "ClaimNumber": 2, "Limitations": [], "ClaimType": 0, "ParentClaimNumbers": [], "DeviceName": ""}, {"Preamble": "The method of claim 2, wherein the l

##### Extract Parent claim number

In [10]:
nlp = spacy.load("en_core_web_sm", disable=["textcat"])
print(nlp.pipe_names)

['tagger', 'parser', 'ner']


In [11]:
text = "A user equipment (UE) for wireless communication, comprising:"
doc = nlp(text)
for token in doc:
    print(token.text, token.pos_, token.tag_, token.head)
displacy.render(doc, style="dep", minify=True)

A DET DT equipment
user NOUN NN equipment
equipment NOUN NN equipment
( PUNCT -LRB- equipment
UE PROPN NNP equipment
) PUNCT -RRB- equipment
for ADP IN equipment
wireless ADJ JJ communication
communication NOUN NN for
, PUNCT , equipment
comprising VERB VBG equipment
: PUNCT : equipment


In [25]:
class ParentClaimComponent(object):
    
    def __init__(self, nlp, *patterns):
        self.matcher = Matcher(nlp.vocab)
        for k, pattern in enumerate(patterns):          
            self.matcher.add(f"PARENT_CLAIM_{k}", None, pattern)
        Doc.set_extension("parent_claim_numbers", 
                          default=[], 
                          force=True)
        
    def __call__(self, doc):
        parent_claims = []
        for match_id, start, end in self.matcher(doc):
            try:
                parent_claim = int(doc[end-1].text)
                parent_claims.append(parent_claim)
            except ValueError:
                continue
        doc._.parent_claim_numbers = parent_claims
        return doc

# Load language
nlp = spacy.load("en_core_web_sm", disable=["textcat"])

# Define parent pattern
parent_claim_pattern = [{"LOWER": "claim", "POS": "NOUN"}, {"IS_DIGIT": True}]

# Create component
parent_component = ParentClaimComponent(nlp, parent_claim_pattern)
try:
    nlp.add_pipe(parent_component)
except ValueError as err:
    print(err)
    
for k, doc in enumerate(nlp.pipe(texts), 1):
    print(f"Claim: {k}, Parent: {doc._.parent_claim_numbers}")

Claim: 1, Parent: []
Claim: 2, Parent: [1]
Claim: 3, Parent: [2]
Claim: 4, Parent: [1]
Claim: 5, Parent: [4]
Claim: 6, Parent: [1]
Claim: 7, Parent: [1]
Claim: 8, Parent: [1]
Claim: 9, Parent: []
Claim: 10, Parent: [9]
Claim: 11, Parent: [10]
Claim: 12, Parent: [9]
Claim: 13, Parent: [12]
Claim: 14, Parent: [9]
Claim: 15, Parent: [9]
Claim: 16, Parent: [9]
Claim: 17, Parent: []
Claim: 18, Parent: [17]
Claim: 19, Parent: [18]
Claim: 20, Parent: [17]
Claim: 21, Parent: [17]
Claim: 22, Parent: [17]
Claim: 23, Parent: [17]
Claim: 24, Parent: []
Claim: 25, Parent: [24]
Claim: 26, Parent: [25]
Claim: 27, Parent: [24]
Claim: 28, Parent: [24]
Claim: 29, Parent: [24]
Claim: 30, Parent: [24]
Claim: 31, Parent: []
Claim: 32, Parent: [31]
Claim: 33, Parent: [32]
Claim: 34, Parent: [31]
Claim: 35, Parent: [34]
Claim: 36, Parent: [31]
Claim: 37, Parent: [31]
Claim: 38, Parent: [31]
Claim: 39, Parent: []
Claim: 40, Parent: [39]
Claim: 41, Parent: [40]
Claim: 42, Parent: [39]
Claim: 43, Parent: [42]
C

In [51]:
class ClaimTypeComponent(object):
    
    def __init__(self, claim_type_to_pattern, flags=0):
        self.claim_type_to_pattern = claim_type_to_pattern
        self.flags = flags
        Doc.set_extension("claim_type", 
                          default="Unknown", 
                          force=True)
        
    def __call__(self, doc):
        for claim_type, pattern in self.claim_type_to_pattern:
            if re.search(pattern, doc.text, self.flags):
                doc._.claim_type = claim_type
                break
        return doc
    
claim_type_to_pattern = [("Omnibus", r"^A method, device, apparatus"),
                         ("Process", r"method"),
                         ("Device", r"^(A|An).*?compris(e|es|ing):"),
                         ("CRM", r"^A non-transitory computer-readable medium storing(.*)compris(e|es|ing):"),
                         ("MeansFor", r"^An apparatus(.*?)compris(e|es|ing):")]
claim_type_component = ClaimTypeComponent(claim_type_to_pattern)
try:
    nlp.add_pipe(claim_type_component, name="claim_type")
except ValueError as err:
    nlp.remove_pipe("claim_type")
    nlp.add_pipe(claim_type_component, name="claim_type")
    
for k, doc in enumerate(nlp.pipe(texts), 1):
    print(f"Claim: {k}, Parent: {doc._.parent_claim_numbers}, Type: `{doc._.claim_type}`")

('claim_type', <__main__.ClaimTypeComponent at 0x11eec9b10>)

Claim: 1, Parent: [], Type: `Process`
Claim: 2, Parent: [1], Type: `Process`
Claim: 3, Parent: [2], Type: `Process`
Claim: 4, Parent: [1], Type: `Process`
Claim: 5, Parent: [4], Type: `Process`
Claim: 6, Parent: [1], Type: `Process`
Claim: 7, Parent: [1], Type: `Process`
Claim: 8, Parent: [1], Type: `Process`
Claim: 9, Parent: [], Type: `Device`
Claim: 10, Parent: [9], Type: `Unknown`
Claim: 11, Parent: [10], Type: `Unknown`
Claim: 12, Parent: [9], Type: `Unknown`
Claim: 13, Parent: [12], Type: `Unknown`
Claim: 14, Parent: [9], Type: `Unknown`
Claim: 15, Parent: [9], Type: `Unknown`
Claim: 16, Parent: [9], Type: `Unknown`
Claim: 17, Parent: [], Type: `Device`
Claim: 18, Parent: [17], Type: `Unknown`
Claim: 19, Parent: [18], Type: `Unknown`
Claim: 20, Parent: [17], Type: `Unknown`
Claim: 21, Parent: [17], Type: `Unknown`
Claim: 22, Parent: [17], Type: `Unknown`
Claim: 23, Parent: [17], Type: `Unknown`
Claim: 24, Parent: [], Type: `Device`
Claim: 25, Parent: [24], Type: `Unknown`
Claim: 

##### Extract DeviceName

In [27]:
text = "A method of wireless communication performed by a user equipment (UE), comprising:"
doc = nlp(text)
options = {"compact": True, 
           "bg": "white",
           "color": "black"}
displacy.render(doc, style="dep", minify=True, options=options)

In [192]:
device_name = {}
for chunk in doc.noun_chunks:
    root = chunk.root
    root_head = root.head
    if root_head.lower_ == "by" and root_head.head.lemma_ in ["perform", "do"]:
        if chunk[0].pos_ in ["DET"]:
            chunk = chunk[1:]
        if chunk.text.lower() not in ["device", "method"]:
            device_name = {"Text": chunk.text,
                           "Offset": chunk.start_char,
                           "Length": len(chunk.text),
                           "Location": "Preamble",
                           "Acronym": ""}
            break
if device_name:
    match = re.search(rf"{device_name['Text']}\s*(\((?P<Acronym>\w*?)\))?", doc.text, re.I | re.M)
    if match:
        device_name["Acronym"] = match.group("Acronym")

device_name

{'Text': 'user equipment',
 'Offset': 50,
 'Length': 14,
 'Location': 'Preamble',
 'Acronym': 'UE'}

##### Test code

In [10]:
%aimport app.nlp.components
%aimport app.nlp.pipeline
%aimport settings

In [11]:
from app.nlp.components import ParentClaimComponent, ClaimTypeComponent
from app.nlp.pipeline import Pipeline

In [42]:
pipeline = Pipeline(model_name="en_core_web_sm")
limitations =  [claim["Limitations"] for claim in data["before_analyze"]["Claims"]]
texts_with_context = list(zip(texts, limitations))
output = pipeline.run(texts_with_context, as_tuples=True)

for k, doc in enumerate(output, 1):
    print(f"Claim: {k}, Type: `{doc._.claim_type}`, Parent: `{doc._.parent_claim}`,\n"
          f"Device: `{doc._.device_name}`\n")

Claim: 1, Type: `Process`, Parent: `[]`,
Device: `{'Text': 'user equipment', 'Offset': 50, 'Length': 14, 'Location': 'Preamble', 'Acronym': 'UE'}`

Claim: 2, Type: `Process`, Parent: `[1]`,
Device: `{}`

Claim: 3, Type: `Process`, Parent: `[2]`,
Device: `{}`

Claim: 4, Type: `Process`, Parent: `[1]`,
Device: `{}`

Claim: 5, Type: `Process`, Parent: `[4]`,
Device: `{}`

Claim: 6, Type: `Process`, Parent: `[1]`,
Device: `{}`

Claim: 7, Type: `Process`, Parent: `[1]`,
Device: `{}`

Claim: 8, Type: `Process`, Parent: `[1]`,
Device: `{}`

Claim: 9, Type: `Device`, Parent: `[]`,
Device: `{}`

Claim: 10, Type: `Unknown`, Parent: `[9]`,
Device: `{}`

Claim: 11, Type: `Unknown`, Parent: `[10]`,
Device: `{}`

Claim: 12, Type: `Unknown`, Parent: `[9]`,
Device: `{}`

Claim: 13, Type: `Unknown`, Parent: `[12]`,
Device: `{}`

Claim: 14, Type: `Unknown`, Parent: `[9]`,
Device: `{}`

Claim: 15, Type: `Unknown`, Parent: `[9]`,
Device: `{}`

Claim: 16, Type: `Unknown`, Parent: `[9]`,
Device: `{}`

Claim

In [41]:
d = nlp('A user equipment (UE) for wireless communication, comprising.')
for ch in d.noun_chunks:
    print(ch.text, list(ch.subtree))
displacy.render(d, style="dep", minify=True, options=options)

A user equipment [A, user, equipment, (, UE, ), for, wireless, communication, ,, comprising, .]
UE [UE]
wireless communication [wireless, communication]


In [21]:
[el["DeviceName"] for el in data["after_analyze"]["Claims"]]

['user equipment (UE)',
 'method of claim 1',
 'method of claim 2',
 'method of claim 1',
 'method of claim 4',
 'method of claim 1',
 'method of claim 1',
 'method of claim 1',
 'user equipment (UE',
 'UE of claim 9',
 'UE of claim 10',
 'UE of claim 9',
 'UE of claim 12',
 'UE of claim 9',
 'UE of claim 9',
 'UE of claim 9',
 'non-transitory computer-readable medium storing one or more instructions for wireless communication',
 'non-transitory computer-readable medium of claim 17',
 'non-transitory computer-readable medium of claim 18',
 'non-transitory computer-readable medium of claim 17',
 'non-transitory computer-readable medium of claim 17',
 'non-transitory computer-readable medium of claim 17',
 'non-transitory computer-readable medium of claim 17',
 'apparatus for wireless communication',
 'apparatus of claim 24',
 'apparatus of claim 25',
 'apparatus of claim 24',
 'apparatus of claim 24',
 'apparatus of claim 24',
 'apparatus of claim 24',
 '111 for wireless communication',

In [129]:
def flatten_limitations(limitations):
    text = []
    for limitation in limitations:
        chunk = [limitation['LimitationText']]
        text += chunk
        subtree = flatten_limitations(limitation['Limitations'])
        text += subtree
    return text

text = " ".join(flatten_limitations(limitations[8]))
patt = re.compile(r",\sby\s(an|a|the)\s(.*?)(\sand\s(?P<AfterAndPart>.*?))?,",
                  flags=re.I)
match = patt.search(text)

In [113]:
print(json.dumps({"Claims": [data["after_analyze"]["Claims"][16]]}, indent=4))

{
    "Claims": [
        {
            "Preamble": "A non-transitory computer-readable medium storing one or more instructions for wireless communication, the one or more instructions comprising:",
            "ClaimNumber": 17,
            "Limitations": [
                {
                    "LimitationText": "one or more instructions that, when executed by one or more processors of a user equipment (UE), cause the one or more processors to:",
                    "Limitations": [
                        {
                            "LimitationText": "receive, when operating in a network that provides synchronization signals at a plurality of locations in a channel bandwidth, a common search space (CSS) indicator for a synchronization signal, wherein the CSS indicator indicates that the synchronization signal is not associated with a CSS for a downlink control channel; and",
                            "Limitations": []
                        },
                        {
         