This notebook handles:
1. Creating Elastis Search Index
2. Uploading data to the index


- Google Maps uses lat,lon
- spatial context uses lon,lat

Importing Libraries

In [16]:
import os
import json
from bs4 import BeautifulSoup
from elasticsearch import Elasticsearch
import spacy
from geopy.geocoders import Nominatim
from datetime import datetime

## These are used to re-connect to geocoder when max limit is reached
from geopy.exc import GeocoderTimedOut
# from geopy.exc import GeocoderUnavailable
# from requests.exceptions import ConnectionError
# import time

Creating an Index

In [3]:
# Elasticsearch configuration
es = Elasticsearch([{'host':'localhost','port':9200,'scheme':'http'}])
index_name = 'reut2'

In [20]:
# Define the mapping and settings
mapping = {
    "mappings": {
        "properties": {
            "date": {"type": "date"},
            "author": {"type": "text"},
            "title": {
                 "type" : "text",
                 "analyzer":"autocomplete"
            },
            
            "body": {
                "type": "text",
                "analyzer": "custom_analyzer"
                },
            "temporal_expressions": {
                "type": "nested",
                "properties": {
                    "text": {"type": "text"},
                    "label": {"type": "text"},
                }
            },
            "georeferences": {
                "type": "nested",
                "properties": {
                    "location": {"type": "keyword"},
                    "coordinates": {"type": "geo_point"},
                }
            },
            "geopoint_author": {
                "type": "nested",
                "properties": {
                    "location": {"type": "text"},
                    "coordinates": {"type": "geo_point"},
                }
            },
            #  "avg_geopoint": {
            #     "type": "nested",
            #     "properties": {
            #         "location": {"type": "text"},
            #         "coordinates": {"type": "geo_point"},
            #     }
            # },
        }
    },
    "settings": {
        "analysis": {
            "analyzer": {
                "autocomplete": {
                    "type": "custom",
                    "tokenizer": "autocomplete_tokenizer",
                    "filter": [
                        "lowercase",
                        "autocomplete_filter",
                        "shingle_filter"
                    ]
                },
                "custom_analyzer": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": ["lowercase", "stop", "stemmer"],
                },
            },
            "tokenizer": {
                "autocomplete_tokenizer": {
                    "type": "edge_ngram",
                    "min_gram": 1,
                    "max_gram": 20,
                    "token_chars": ["letter", "digit"]
                },
            },
            "filter": {
                "autocomplete_filter": {
                    "type": "edge_ngram",
                    "min_gram": 1,
                    "max_gram": 20
                },
                "shingle_filter": {
                    "type": "shingle",
                    "min_shingle_size": 2,
                    "max_shingle_size": 4
                },
            },
        },
    },
}
 
# Create the index with the mapping and settings
es.indices.create(index=index_name, body=mapping, ignore=400)


  es.indices.create(index=index_name, body=mapping, ignore=400)


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'reut2'})

Tracking the last file and line parsed

In [4]:
def get_last_parsed_info():
    """
    Function to read and return the last parsed file and line.

    Returns:
        Tuple[str, int]: A tuple containing the last parsed file and line number.
                        If no information is available, returns (None, 0).
    """
    file_path = "D:/4th year 1st semester/IR/Project/cache/last_parsed_info.txt"
    try:
        with open(file_path, "r") as file:
            content = file.readline().strip()
            if content:
                last_parsed_info = content.split(",")
                if len(last_parsed_info) == 2:
                    last_parsed_file, last_parsed_line = last_parsed_info
                    last_parsed_line = int(last_parsed_line)
                else:
                    last_parsed_file = None
                    last_parsed_line = 0
            else:
                last_parsed_file = None
                last_parsed_line = 0
    except FileNotFoundError:
        # If the file is not found, create a new file
        with open(file_path, "w") as file:
            last_parsed_file = None
            last_parsed_line = 0
    return last_parsed_file, last_parsed_line



def write_last_parsed_info(file_name, line_number):
    """
    Function to write the last parsed file and line to the file.

    Args:
        file_name (str): The name of the last parsed file.
        line_number (int): The line number in the last parsed file.
    """
    file_path = "D:/4th year 1st semester/IR/Project/cache/last_parsed_info.txt"
    with open(file_path, "w") as file:
        file.write(f"{file_name},{line_number}")

Functions to extract geopoints, georeferences, and temporal expressions.

**Cashing concept is used here**

In [17]:
# Initialize the geopy geocoder
geolocator = Nominatim(user_agent="tri-3")

# Initialize spacy for extracting georeferences
nlp = spacy.load("en_core_web_sm")

# File paths for saving data
cache_directory = 'D:/4th year 1st semester/IR/Project/cache/'
reuters_info_json_path = os.path.join(cache_directory, 'reuters_info.json')
geopoints_file_path = os.path.join(cache_directory, 'geopoints.csv')

# Create the cache directory if it does not exist
os.makedirs(cache_directory, exist_ok=True)



def extract_geopoints(location,attempt=1, max_attempts=5):
    """
    Function to extract geopoints (latitude, longitude) for a given location.

    Args:
        location (str): The location to extract geopoints for.

    Returns:
        Union[Dict[str, Union[str, Dict[str, float]]], int]: 
            If geopoints are found, returns a dictionary containing the location and coordinates.
            If not found, returns 0.
    """

    processed_locations = []
    location=location.replace(",","")


    # Check if the location is already processed
    if os.path.exists(geopoints_file_path):
        with open(geopoints_file_path, 'r') as geopoints_file:
            for line in geopoints_file:
                parts = line.strip().split(',')
                if len(parts) >= 3:
                    saved_location, lat, lon = parts[0].strip().lower(), parts[1], parts[2]
                    if lat and lon:
                        processed_locations.append({"location": saved_location, "coordinates": {"lat": float(lat), "lon": float(lon)}})
                    

    # Normalize the location to lowercase
    normalized_location = location.strip().lower()

    # Check if the location is already processed
    for geopoint in processed_locations:
        if normalized_location == geopoint["location"]:
            return {"location": normalized_location, "coordinates": geopoint['coordinates']}

    # If not already processed, use geopy to get lon, lat
    try:
        geocoded_location = geolocator.geocode(normalized_location, timeout=None)
        if geocoded_location and geocoded_location.longitude and geocoded_location.latitude:
            lon, lat = geocoded_location.longitude, geocoded_location.latitude
            if lat and lon:
                with open(geopoints_file_path, 'a') as geopoints_file:
                    geopoints_file.write(f'{normalized_location},{float(lat)},{float(lon)}\n')

                return {"location": normalized_location, "coordinates": {"lat": float(lat), "lon": float(lon)}}
            else:
                return 0
    except GeocoderTimedOut:
        if attempt <= max_attempts:
            return extract_geopoints(normalized_location, attempt=attempt+1)





def extract_georeferences(body):
    """
    Function to extract georeferences from the given text body.

    Args:
        body (str): The text body to extract georeferences from.

    Returns:
        List[Dict[str, Union[str, Dict[str, Union[str, float]]]]]: A list of dictionaries containing
        the extracted georeferences and their coordinates.
    """
    doc = nlp(body)
    georefs = []

    for ent in doc.ents:
        if ent.label_ in ['GPE', 'LOC']:
            results = extract_geopoints(ent.text)
            if results != 0:
                if results and results not in georefs:
                    georefs.append(results)

    return georefs





def extract_temporal_expressions(text):
    """
    Function to extract temporal expressions from the given text.

    Args:
        text (str): The text to extract temporal expressions from.

    Returns:
        List[Dict[str, str]]: A list of dictionaries containing the extracted temporal expressions and their labels.
    """
    doc = nlp(text)
    temporal_expressions = []

    for ent in doc.ents:
        if ent.label_ in ["DATE", "TIME"]:
            temporal_expressions.append({
                "text": ent.text,
                "label": ent.label_
            })

    return temporal_expressions

This function was used to get the reversed location for an average georeferences point, but after testing, the idea is not valid to use.

 (NOT USED IN THE LAST VERSION) 

In [3]:
# from geopy.exc import GeocoderTimedOut

# def reverse_geocode(lon, lat):
#     """
#     Function to perform reverse geocoding, converting coordinates (longitude, latitude) to a human-readable address.

#     Args:
#         lon (float): The longitude of the location.
#         lat (float): The latitude of the location.

#     Returns:
#         str or None: The human-readable address if successfully reverse geocoded, or None if a timeout occurs.
#     """
#     try:
#         location = geolocator.reverse((lat, lon), language='en')
#         print("location: ", location)
#         return location.address
#     except GeocoderTimedOut:
#         return None


This is the main function, it handles:
1. Reading and parsing the files.
2. Extract "date, authors, title, body" from the files directly.
3. Calling the functions to extract geopoint (fromm date line and places), georeferences (using body), and temporal expressions (from body).
4. Data are saved as JSON objects in a file.

In [18]:
def extract_reuters_info(folder_path):
    """
    Function to extract information from Reuters files in a given folder and store it in a JSON file.

    Args:
        folder_path (str): The path to the folder containing Reuters files.
    """
  
    for file_name in os.listdir(folder_path):
        
        if file_name.endswith('.sgm'):
            file_path = os.path.join(folder_path, file_name)
            with open(file_path, 'r', encoding='latin-1') as file:
                content = file.readlines()

            soup = BeautifulSoup("".join(content), 'html.parser')

            total=len(soup.find_all('reuters'))

            for i, reuters_tag in enumerate(soup.find_all('reuters')):

                # Get the last parsed file and line
                last_parsed_file, last_parsed_line = get_last_parsed_info()
                
                if last_parsed_file is not None:
                    current_file_number = int(file_name[6:9])
                    last_parsed_file_number = int(last_parsed_file[6:9])

                    if current_file_number < last_parsed_file_number:
                        break  # Skip files that have already been processed

                    if current_file_number == last_parsed_file_number and i <= last_parsed_line:
                        continue  # Skip lines that have already been processed
                    
                date_string = reuters_tag.find('date').text.strip() if reuters_tag.find('date') else ''
                input_date = datetime.strptime(date_string, '%d-%b-%Y %H:%M:%S.%f')
                formatted_date = input_date.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
                reuters_info = {
                    'date': formatted_date,
                    'authors': reuters_tag.find('author').text.strip() if reuters_tag.find('author') else 'ANONYMOUS',
                    'title': reuters_tag.find('title').text.strip() if reuters_tag.find('title') else '',
                    'body': reuters_tag.find('body').text.strip() if reuters_tag.find('body') else 0
                }

                #if "body" is empty, ignore the file
                if reuters_info['body'] == 0:
                    continue

                
                # Extract additional information
                reuters_info['geopoint_author'] = extract_geopoints(reuters_tag.find('dateline').text.strip().split()[0].replace(',', '') if reuters_tag.find('dateline') else '')
                reuters_info['temporal_expressions'] = extract_temporal_expressions(reuters_info['body'])
                reuters_info['georeferences'] = extract_georeferences(reuters_info['body'])
                places = [place.text.strip() for place in reuters_tag.find('places').find_all('d')] if reuters_tag.find('places') else []
                geopoints = [extract_geopoints(place) for place in places if extract_geopoints(place) != 0]
                reuters_info['geopoint_author'] = geopoints
                

                # This was used to compute an average geo-point for georeferences, but the results are not valid.
                 
                # if(reuters_info['georeferences']):
                #     if (len(reuters_info['georeferences'])>1):
                #         # Calculate the average latitude and longitude
                #         geopoints = reuters_info['georeferences']
                #         print(geopoints)
                #         if geopoints:
                #             sum_lon=0
                #             sum_lat=0
                #             dom=len(geopoints)
                            
                #             for point in geopoints:  
                #                 print(type(float(point['coordinates']['lon'])))
                #                 sum_lon += float(point['coordinates']['lon'])
                #                 sum_lat += float(point['coordinates']['lat'])

                #             avg_lon=sum_lon/dom
                #             avg_lat=sum_lat/dom
                #             avg_geopoint = {"coordinates": {"lat": avg_lat, "lon": avg_lon}}
                #             print("avgp: ",avg_geopoint)

                #             # Reverse geocode to get an estimated location name
                #             estimated_location_name = reverse_geocode(avg_lon, avg_lat)
                #             if estimated_location_name:
                #                 avg_geopoint['location'] = estimated_location_name      
                #                 reuters_info['avg_geopoint'] = avg_geopoint
                #             else: 
                #                 reuters_info['avg_geopoint'] = reuters_info['georeferences']


                # Save reuters info to JSON file
                with open(reuters_info_json_path, 'a') as json_file:
                    json.dump(reuters_info, json_file, indent=2)
                    json_file.write(',\n')

                # Update the tracking of file and line
                write_last_parsed_info(file_name, i)

    # Check if the end of the file is reached
    if i == total:
        # Reset the line number to 0 if the end of the file is reached
        write_last_parsed_info(file_name, 0)
        print(f"Updated last parsed info for {file_name}:0")

# Folder path
folder_path = "D:/4th year 1st semester/IR/Project/data"

# Extracting
extract_reuters_info(folder_path)


ValueError: unconverted data remains: RM

Inserting JSON objects to the elastic search index

In [21]:
# Specify the index name
index_name = 'reut2'

# Read the JSON file
with open('D:/4th year 1st semester/IR/Project/cache/reuters_info.json', 'r') as file:
    json_data = file.read()

# Parse the JSON data into a list of dictionaries
try:
    documents = json.loads(json_data)
except json.JSONDecodeError as e:
    print(f"Error decoding JSON: {e}")
    documents = []

# Insert each document into the Elasticsearch index
for idx, document in enumerate(documents, start=1):
    try:
        # Insert the document into the Elasticsearch index
        es.index(index=index_name, id=idx, body=document)
        print(f"Inserted document {idx} into the '{index_name}' index.")
    except Exception as e:
        print(f"Error inserting document {idx}: {e}")

# Refresh the index to make the documents searchable immediately 
es.indices.refresh(index=index_name)


Inserted document 1 into the 'reut2' index.
Inserted document 2 into the 'reut2' index.
Inserted document 3 into the 'reut2' index.
Inserted document 4 into the 'reut2' index.
Inserted document 5 into the 'reut2' index.
Inserted document 6 into the 'reut2' index.
Inserted document 7 into the 'reut2' index.
Inserted document 8 into the 'reut2' index.
Inserted document 9 into the 'reut2' index.
Inserted document 10 into the 'reut2' index.
Inserted document 11 into the 'reut2' index.
Inserted document 12 into the 'reut2' index.
Inserted document 13 into the 'reut2' index.
Inserted document 14 into the 'reut2' index.
Inserted document 15 into the 'reut2' index.
Inserted document 16 into the 'reut2' index.
Inserted document 17 into the 'reut2' index.
Inserted document 18 into the 'reut2' index.
Inserted document 19 into the 'reut2' index.
Inserted document 20 into the 'reut2' index.
Inserted document 21 into the 'reut2' index.
Inserted document 22 into the 'reut2' index.
Inserted document 2

ObjectApiResponse({'_shards': {'total': 2, 'successful': 1, 'failed': 0}})