<a href="https://colab.research.google.com/github/UCREL/IAA-Oracle-ULTEC/blob/main/Named_entity_extractor_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Named Entity Extraction with Plant Names


This notebook provides a detailed explanation of the `NamedEntityExtractor` class for named entity extraction, emphasizing the addition of plant names to the recognized entities. The code is adapted and modified from an existing demo at https://github.com/SpaceTimeNarratives/demo.


## Overview

The `NamedEntityExtractor` is initialized with a spaCy NLP model and sets up an entity ruler for custom pattern matching. The class has several key functions, including setting up entity patterns, merging entities, geocoding, and more. Each of these functions plays a crucial role in identifying and processing entities within the text.


# NamedEntityExtractor Class

`NamedEntityExtractor` class,  aims to elucidate how the class operates, especially its use of spaCy for natural language processing and its approach to recognizing and extracting named entities, with particular attention to plant names.


In [None]:
class NamedEntityExtractor:
    def __init__(self, nlp_model):
        self.nlp = nlp_model
        self.nlp.add_pipe("sentencizer")
        self.ruler = self.nlp.add_pipe("entity_ruler", before='ner')
        self.setup_entity_patterns()
        self.combine = lambda x, y: (x[0], x[1], x[2]+' '+y[2], x[3])
        self.geolocation_tags = ['GEO', 'PLNAME', 'GPE']
        self.geocode_cache = {}

## setup_entity_patterns



### Initialization and Entity Ruler Setup

The class is initialized with a spaCy NLP model. A sentencizer is added to split the text into sentences, which helps in understanding the context better. An entity ruler is then added for pattern matching, which is crucial for recognizing custom entities like plant names.



**Purpose:** Initializes entity patterns for spaCy's EntityRuler. It loads lists of terms (e.g., plant names, locations) from external files and creates matcher patterns.

**Inputs:** None directly; utilizes external text files as sources for terms.

**Outputs:** Updates the EntityRuler in the spaCy pipeline with custom patterns for named entity recognition.
    

In [None]:
def setup_entity_patterns(self):
        # Get the list of placenames and geonouns
        place_names = [name.strip().title().replace("'S", "'s") for name in open('resources/LD_placenames.txt').readlines()] #read and convert to title case
        place_names += [name.upper() for name in place_names] #retain the upper case versions
        geonouns = self.get_inflections([noun.strip() for noun in open('resources/geo_feature_nouns.txt').readlines()])

        # Get the locative adverbs
        loc_advs = [l.split()[0] for l in open('resources/locative_adverbs.txt').readlines()]
        sp_prep  = [l.strip() for l in open('resources/spatial_prepositions.txt').readlines()
                                                                    if len(l.strip())>2]
        # Get distances
        distances = [l.strip() for l in open('resources/distances.txt').readlines()]

        # Get dates
        dates     = [l.strip() for l in open('resources/dates.txt').readlines()]

        # Get times
        times     = [l.strip() for l in open('resources/times.txt').readlines()]

        # Get events
        events    = [l.strip() for l in open('resources/events.txt').readlines()]

        # Get Plant- names this is new adding the plant names list to the NES
        pnames = [l.strip() for l in open('resources/Plant_list.txt').readlines()]

        # Get the list of positive and negative words from the sentiment lexicon
        pos_words = [w.strip() for w in open('resources/positive-words.txt','r', encoding='latin-1').readlines()[35:]]
        neg_words = [w.strip() for w in open('resources/negative-words.txt','r', encoding='latin-1').readlines()[35:]]




        # Define the patterns for the EntityRuler by labelling all the names with the tag PLNAME
        patterns = [{"label": "PLANT", "pattern": word} for word in pnames]
        patterns +=  [{"label": "PLNAME",  "pattern": plname} for plname in set(place_names)]
        patterns += [{"label": "GEONOUN", "pattern": noun} for noun in geonouns]
        patterns += [{"label": "+EMOTION", "pattern": word} for word in pos_words]
        patterns += [{"label": "-EMOTION", "pattern": word} for word in neg_words]
        patterns += [{"label": "EVENT",   "pattern": word} for word in events]
        patterns += [{"label": "DATE", "pattern": word} for word in dates]
        patterns += [{"label": "TIME", "pattern": word} for word in times]
        patterns += [{"label": "DISTANCE", "pattern": word} for word in distances]
        patterns += [{"label": "LOCADV", "pattern": word} for word in loc_advs]
        patterns += [{"label": "SP-PREP", "pattern": word} for word in sp_prep]


        self.ruler.add_patterns(patterns)



## get_inflections


**Purpose:** Generates inflected forms of nouns, aiding in the comprehensive matching of entities regardless of their grammatical number.

**Inputs:** List of base nouns.

**Outputs:** Extended list including inflected forms of the input nouns.
    
        

In [None]:
# Get inflections and lemmas of geo nouns
def get_inflections(self,names_list):
        gf_names_inflected = []
        for w in names_list:
            gf_names_inflected.append(w)
            gf_names_inflected.extend(list(getInflection(w.strip(), tag='NNS', inflect_oov=False)))
            gf_names_inflected.extend(list(getLemma(w.strip(), 'NOUN', lemmatize_oov=False)))
        return list(set(gf_names_inflected))


## combine_multi_tokens Function

**Purpose:** Combines adjacent tokens into a single entity when they are part of the same named entity, improving entity recognition accuracy.

**Inputs:** Sequence of tokens identified as potential parts of named entities.

**Outputs:** List of combined entities where applicable.

In [None]:
def combine_multi_tokens(self,a_list):
        new_list = [a_list.pop()]
        while a_list:
            last = a_list.pop()
            if new_list[-1][0] - last[0] == 1:
                new_list.append(self.combine(last, new_list.pop()))
            else:
                new_list.append(last)
        return sorted(new_list)


## extract_sem_entities


**Purpose:** Extracts semantic entities from processed text, applying custom tags based on predefined patterns.

**Inputs:** Processed text (spaCy Doc object) and a list of tag types to extract.

**Outputs:** Ordered dictionary of extracted entities with their semantic tags.


In [None]:
# Generates a dictionary of semantic entities combining adjacent ones
def extract_sem_entities(self,processed_text, tag_types):
        entities, tokens = {}, [token.text for token in processed_text]
        for tag_type in tag_types:
            tag_indices = [(i, token.idx, token.text, tag_type) for i, token in enumerate(processed_text)
                                if token._.pymusas_tags[0].startswith(tag_type[0])]
            if tag_indices:
                for i, idx, token, tag in self.combine_multi_tokens(tag_indices):
                    entities[idx] = token, tag
        return OrderedDict(sorted(entities.items()))


## merge_entities

**Purpose:** Merges adjacent entities of the same type into a single entity to improve entity recognition coherence.

**Inputs:** spaCy Doc object with recognized entities.

**Outputs:** List of merged entities for further processing or visualization.
        

In [None]:
def merge_entities(self, doc):
        merged_entities = []
        temp_entity = {"text": "", "start": None, "end": None, "label": None}

        for ent in doc.ents:
            if temp_entity["label"] == ent.label_ and (temp_entity["end"] == ent.start_char or temp_entity["end"] + 1 == ent.start_char):
                temp_entity["text"] += " " + ent.text
                temp_entity["end"] = ent.end_char
            else:
                if temp_entity["text"]:
                    merged_entities.append(temp_entity.copy())
                temp_entity = {"text": ent.text, "start": ent.start_char, "end": ent.end_char, "label": ent.label_}

        if temp_entity["text"]:
            merged_entities.append(temp_entity.copy())

        return merged_entities


## Function: geocode

**Purpose:** Performs geocoding for place names to obtain latitude and longitude coordinates.

**Inputs:** Name of the place to geocode.

**Outputs:** Dictionary containing latitude and longitude of the given place name, if found.

In [None]:
async def geocode(self, place_name):
        if place_name in self.geocode_cache:
            return self.geocode_cache[place_name]

        base_url = "https://nominatim.openstreetmap.org/search"
        params = {'q': place_name, 'format': 'json'}
        async with aiohttp.ClientSession() as session:
            async with session.get(base_url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    if data:
                        result = {'latitude': data[0].get('lat'), 'longitude': data[0].get('lon')}
                        self.geocode_cache[place_name] = result
                        return result
        return {'latitude': None, 'longitude': None}

## Function: convert_to_iob_format

**Purpose:** Converts the list of entities into the IOB (Inside, Outside, Beginning) format, useful for training sequence labeling models.

**Inputs:** List of entities and the spaCy Doc object they were extracted from.

**Outputs:** List of tokens in the text with their corresponding IOB tags.

In [None]:
async def convert_to_iob_format(self, merged_entities, doc):
        iob_entities = []
        for sent in doc.sents:
            sent_entities = [e for e in merged_entities if e["start"] >= sent.start_char and e["end"] <= sent.end_char]
            for token in sent:
                merged_entity = next((e for e in sent_entities if e["start"] <= token.idx < e["end"]), None)
                if merged_entity:
                    tag_prefix = 'B-' if token.idx == merged_entity["start"] else 'I-'
                    base_label = merged_entity["label"].split('-')[-1]
                    print('base_label:',base_label )
                    if base_label in ["PLNAME", "GEONOUN",  "GPE"]:
                        geolocation = await self.geocode(merged_entity["text"])

                    else:
                        geolocation = None
                    iob_entities.append((token.text, tag_prefix + merged_entity["label"], geolocation))
                else:
                    iob_entities.append((token.text, 'O', None))
        return iob_entities

## Function: process_text


**Purpose:** Main function that processes input text, extracting and annotating named entities based on custom and spaCy's built-in recognizers.

**Inputs:** Text to process for named entity extraction.

**Outputs:** Processed text with entities annotated, ready for visualization or further analysis.

In [None]:
async def process_text(self, text):
        doc = self.nlp(text)
        merged_entities = self.merge_entities(doc)
        return await self.convert_to_iob_format(merged_entities, doc)


## visualize_entities Function

**Purpose:** Utilizes spaCy's displaCy visualization to render the named entities in the text with custom colors for each entity type.

**Inputs:** Text with entities to visualize.

**Outputs:** Visualization of text with highlighted entities.
        

In [None]:
def visualize_entities(self, text):
        doc = self.nlp(text)
        options = {"ents": list(BG_COLOR.keys()), "colors": BG_COLOR}
        displacy.render(doc, style="ent", options=options)


## Class Overview


Finally, here is a complete overview of the `NamedEntityExtractor` class, integrating all the functions discussed. This section aims to provide a holistic view of how the class is structured and how it functions as a whole for named entity extraction.
    

In [None]:
!pip install https://github.com/UCREL/pymusas-models/releases/download/en_dual_none_contextual-0.3.2/en_dual_none_contextual-0.3.2-py3-none-any.whl

Collecting en-dual-none-contextual==0.3.2
  Downloading https://github.com/UCREL/pymusas-models/releases/download/en_dual_none_contextual-0.3.2/en_dual_none_contextual-0.3.2-py3-none-any.whl (901 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m901.5/901.5 kB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
Collecting pymusas<0.4.0,>=0.3.0 (from en-dual-none-contextual==0.3.2)
  Downloading pymusas-0.3.0-py3-none-any.whl (51 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.9/51.9 kB[0m [31m1.0 MB/s[0m eta [36m0:00:00[0m
Collecting click<8.1.0 (from pymusas<0.4.0,>=0.3.0->en-dual-none-contextual==0.3.2)
  Downloading click-8.0.4-py3-none-any.whl (97 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m97.5/97.5 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: click, pymusas, en-dual-none-contextual
  Attempting uninstall: click
    Found existing installation: click 8.1.7
    Uninstalling click-8.1.7:
  

In [None]:
!pip install lemminflect
!pip install folium



In [None]:
import spacy
from spacy.tokens import Span
from spacy import displacy
from collections import OrderedDict
from lemminflect import getLemma, getInflection
import requests
import re
import asyncio
import aiohttp
import folium
from IPython.display import IFrame

In [None]:
# Global spaCy model
NLP_MODEL = spacy.load('en_core_web_sm', exclude=['parser'])


## Entity Color Mapping



Defines colors for different entity types, including a specific color for plant names (`PLANT`).

In [None]:
BG_COLOR = {
    'PLANT': '#a9dfbf',  ### the added plant name
    'PLNAME':'#feca74',
	'GEONOUN': '#9cc9cc',
	'GPE':'#feca74',
	'CARDINAL':'#e4e7d2',
	'FAC':'#9cc9cc',
	'QUANTITY':'#e4e7d2',
	'PERSON':'#aa9cfc',
	'ORDINAL':'#e4e7d2',
	'ORG':'#7aecec',
	'NORP':'#d9fe74',
	'LOC':'#9ac9f5',
	'DATE':'#c7f5a9',
	'DISTANCE':'#edf5a9',
	'EVENT': '#e1a9f5',
	'TIME':'#a9f5bc',
	'WORK_OF_ART':'#e6c1d7',
	'LAW':'#e6e6c1',
	'LOCADV':'##f79188',
	'SP-PREP':'#f5b5cf',
	'PERCENT':'#c9ebf5',
	'MONEY':'#b3d6f2',
	'+EMOTION':'#94f72a',
	'-EMOTION':'#f75252',
	'TIME-SEM':'#d0e0f2',
	'MOVEMENT':'#f2d0d0',
	'no_tag':'#FFFFFF'
}

In [None]:
class NamedEntityExtractor:
    def __init__(self, nlp_model="en_core_web_sm"):
        self.nlp = spacy.load(nlp_model, exclude=['parser'])
        self.nlp.add_pipe("sentencizer")
        self.ruler = self.nlp.add_pipe("entity_ruler", before='ner')
        self.setup_entity_patterns()
        self.combine = lambda x, y: (x[0], x[1], x[2]+' '+y[2], x[3])
        self.geolocation_tags = ['GEO', 'PLNAME', 'GPE']  # Tags for which to perform geocoding
        self.geocode_cache = {}


    def setup_entity_patterns(self):
        # Get the list of placenames and geonouns
        place_names = [name.strip().title().replace("'S", "'s") for name in open('resources/LD_placenames.txt').readlines()] #read and convert to title case
        place_names += [name.upper() for name in place_names] #retain the upper case versions
        geonouns = self.get_inflections([noun.strip() for noun in open('resources/geo_feature_nouns.txt').readlines()])

        # Get the locative adverbs
        loc_advs = [l.split()[0] for l in open('resources/locative_adverbs.txt').readlines()]
        sp_prep  = [l.strip() for l in open('resources/spatial_prepositions.txt').readlines()
                                                                    if len(l.strip())>2]
        # Get distances
        distances = [l.strip() for l in open('resources/distances.txt').readlines()]

        # Get dates
        dates     = [l.strip() for l in open('resources/dates.txt').readlines()]

        # Get times
        times     = [l.strip() for l in open('resources/times.txt').readlines()]

        # Get events
        events    = [l.strip() for l in open('resources/events.txt').readlines()]

        # Get Plant- names this is new adding the plant names list to the NES
        pnames = [l.strip() for l in open('resources/Plant_list.txt').readlines()]

        # Get the list of positive and negative words from the sentiment lexicon
        pos_words = [w.strip() for w in open('resources/positive-words.txt','r', encoding='latin-1').readlines()[35:]]
        neg_words = [w.strip() for w in open('resources/negative-words.txt','r', encoding='latin-1').readlines()[35:]]


        # Define the patterns for the EntityRuler by labelling all the names with the tag PLNAME
        patterns = [{"label": "PLANT", "pattern": word} for word in pnames]
        patterns +=  [{"label": "PLNAME",  "pattern": plname} for plname in set(place_names)]
        patterns += [{"label": "GEONOUN", "pattern": noun} for noun in geonouns]
        patterns += [{"label": "+EMOTION", "pattern": word} for word in pos_words]
        patterns += [{"label": "-EMOTION", "pattern": word} for word in neg_words]
        patterns += [{"label": "EVENT",   "pattern": word} for word in events]
        patterns += [{"label": "DATE", "pattern": word} for word in dates]
        patterns += [{"label": "TIME", "pattern": word} for word in times]
        patterns += [{"label": "DISTANCE", "pattern": word} for word in distances]
        patterns += [{"label": "LOCADV", "pattern": word} for word in loc_advs]
        patterns += [{"label": "SP-PREP", "pattern": word} for word in sp_prep]


        self.ruler.add_patterns(patterns)



    # Get inflections and lemmas of geo nouns
    def get_inflections(self,names_list):
        gf_names_inflected = []
        for w in names_list:
            gf_names_inflected.append(w)
            gf_names_inflected.extend(list(getInflection(w.strip(), tag='NNS', inflect_oov=False)))
            gf_names_inflected.extend(list(getLemma(w.strip(), 'NOUN', lemmatize_oov=False)))
        return list(set(gf_names_inflected))

    # Generates a dictionary of entities with the indexes as keys
    def extract_entities(self,text, ent_list, tag='PLNAME'):
        sorted(set(ent_list), key=lambda x:len(x), reverse=True)
        extracted_entities = {}
        for ent in ent_list:
            for match in re.finditer(f' {ent}[\.,\s\n;:]', text):

                extracted_entities[match.start()+1]=text[match.start()+1:match.end()-1], tag
        return {i:extracted_entities[i] for i in sorted(extracted_entities.keys())}


    def combine_multi_tokens(self,a_list):
        new_list = [a_list.pop()]
        while a_list:
            last = a_list.pop()
            if new_list[-1][0] - last[0] == 1:
                new_list.append(self.combine(last, new_list.pop()))
            else:
                new_list.append(last)
        return sorted(new_list)

     # Generates a dictionary of semantic entities combining adjacent ones
    def extract_sem_entities(self,processed_text, tag_types):
        entities, tokens = {}, [token.text for token in processed_text]
        for tag_type in tag_types:
            tag_indices = [(i, token.idx, token.text, tag_type) for i, token in enumerate(processed_text)
                                if token._.pymusas_tags[0].startswith(tag_type[0])]
            if tag_indices:
                for i, idx, token, tag in self.combine_multi_tokens(tag_indices):
                    entities[idx] = token, tag
        return OrderedDict(sorted(entities.items()))



    # Generates a list of all tokens, tagged and untagged, for visualisation
    def get_tagged_list(text, entities):
        begin, tokens_tags = 0, []
        for start, (ent, tag) in entities.items():
            if begin <= start:
                tokens_tags.append((text[begin:start], None))
                tokens_tags.append((text[start:start+len(ent)], tag))
                begin = start+len(ent)
        tokens_tags.append((text[begin:], None)) #add the last untagged chunk
        return tokens_tags

    def merge_entities(self, doc):
        merged_entities = []
        temp_entity = {"text": "", "start": None, "end": None, "label": None}

        for ent in doc.ents:
            if temp_entity["label"] == ent.label_ and (temp_entity["end"] == ent.start_char or temp_entity["end"] + 1 == ent.start_char):
                temp_entity["text"] += " " + ent.text
                temp_entity["end"] = ent.end_char
            else:
                if temp_entity["text"]:
                    merged_entities.append(temp_entity.copy())
                temp_entity = {"text": ent.text, "start": ent.start_char, "end": ent.end_char, "label": ent.label_}

        if temp_entity["text"]:
            merged_entities.append(temp_entity.copy())

        return merged_entities



    async def geocode(self, place_name):
        if place_name in self.geocode_cache:
            return self.geocode_cache[place_name]

        base_url = "https://nominatim.openstreetmap.org/search"
        params = {'q': place_name, 'format': 'json'}
        async with aiohttp.ClientSession() as session:
            async with session.get(base_url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    if data:
                        result = {'latitude': data[0].get('lat'), 'longitude': data[0].get('lon')}
                        self.geocode_cache[place_name] = result
                        return result
        return {'latitude': None, 'longitude': None}

    async def convert_to_iob_format(self, merged_entities, doc):
        iob_entities = []
        for sent in doc.sents:
            sent_entities = [e for e in merged_entities if e["start"] >= sent.start_char and e["end"] <= sent.end_char]
            for token in sent:
                merged_entity = next((e for e in sent_entities if e["start"] <= token.idx < e["end"]), None)
                if merged_entity:
                    tag_prefix = 'B-' if token.idx == merged_entity["start"] else 'I-'
                    base_label = merged_entity["label"].split('-')[-1]
                    print('base_label:',base_label )
                    if base_label in ["PLNAME", "GEONOUN",  "GPE"]:
                        geolocation = await self.geocode(merged_entity["text"])

                    else:
                        geolocation = None
                    iob_entities.append((token.text, tag_prefix + merged_entity["label"], geolocation))
                else:
                    iob_entities.append((token.text, 'O', None))
        return iob_entities


    async def process_text(self, text):
        doc = self.nlp(text)
        merged_entities = self.merge_entities(doc)
        return await self.convert_to_iob_format(merged_entities, doc)


    def visualize_entities(self, text):
        doc = self.nlp(text)
        options = {"ents": list(BG_COLOR.keys()), "colors": BG_COLOR}
        displacy.render(doc, style="ent", jupyter=True, options=options)

    def visualize_on_map(self, entities):
        # Create a map centered around a default location
        map_center = [30, 0]  #  center of the world
        map = folium.Map(location=map_center, zoom_start=2)

        # Add markers for each geolocated entity
        for _, label, geo in entities:
            if geo and geo['latitude'] and geo['longitude']:
                folium.Marker(
                    location=[float(geo['latitude']), float(geo['longitude'])],
                    popup=label,
                    icon=folium.Icon(icon="info-sign")
                ).add_to(map)

        return map




  return self.tokenizer(text)
  return self.tokenizer(text)


base_label: GEONOUN
base_label: GPE
base_label: GPE
Map has been saved to map.html


In [None]:
async def main():
    text = "The Nile is a major north-flowing river in Northeastern Africa."
    extractor = NamedEntityExtractor()
    entities = await extractor.process_text(text)
    print(entities)
    extractor.visualize_entities(text)
    # Assuming visualize_on_map returns a folium.Map object
    map_obj = extractor.visualize_on_map(entities)

    # Save the map to an HTML file
    map_obj.save("map.html")
    print("Map has been saved to map.html")
    return map_obj

# Ensure asyncio.run is called in the main guard to prevent running the async function on import
if __name__ == "__main__":
    # Directly await the main function in a notebook cell
  map_obj = await main()

# Display the map
map_obj


base_label: GEONOUN
base_label: GPE
base_label: GPE
[('The', 'O', None), ('Nile', 'O', None), ('is', 'O', None), ('a', 'O', None), ('major', 'O', None), ('north', 'O', None), ('-', 'O', None), ('flowing', 'O', None), ('river', 'B-GEONOUN', {'latitude': '51.1410065', 'longitude': '1.2748110678641456'}), ('in', 'O', None), ('Northeastern', 'B-GPE', {'latitude': '46.2588615', 'longitude': '-83.6403313'}), ('Africa', 'I-GPE', {'latitude': '46.2588615', 'longitude': '-83.6403313'}), ('.', 'O', None)]


Map has been saved to map.html
