In [1]:
## project configurations
from elasticsearch import Elasticsearch
from typing import List, Dict
import json
import os

files = os.listdir('archive/sgm')
es = Elasticsearch()
index_name = "esdocs"
files_limit = len(files) # len(files) or 1 for testing
bulk_size = 20
index_limit = files_limit*1000 # of bulk size, less than files_limit*1000
max_geo_limit = 2 # max number of geo to collect from API (Time consuming)
date_format = "%d-%b-%Y %H:%M:%S" # date format in sgm files
python_date_format = "%Y-%m-%d %H:%M:%S" # date format in python

In [2]:
## ElasticSearch Setup

# This test is done during development only. 
if es.indices.exists(index_name):
  es.indices.delete(index=index_name)

#? creating index
with open('elastic.config.json', 'r') as file:
  es_config = json.load(file)
  
#? getting status of index creation
es_init = es.indices.create(index=index_name, ignore=400, body=es_config)
es_test = es.indices.get(index=index_name)
print(f'init status: {es_init}\ntest status: {es_test}')

init status: {'acknowledged': True, 'shards_acknowledged': True, 'index': 'esdocs'}
test status: {'esdocs': {'aliases': {}, 'mappings': {'properties': {'author': {'type': 'nested', 'properties': {'first_name': {'type': 'text'}, 'last_name': {'type': 'text'}}}, 'content': {'type': 'text'}, 'date': {'type': 'date', 'format': 'dd-MMM-yyyy HH:mm:ss'}, 'geo_references': {'type': 'nested', 'properties': {'geo_name': {'type': 'text'}, 'geopoint': {'type': 'geo_point'}, 'name': {'type': 'keyword'}}}, 'geopoint': {'type': 'geo_point'}, 'id': {'type': 'text'}, 'people': {'type': 'keyword'}, 'temporal_expressions': {'type': 'nested', 'properties': {'text': {'type': 'text'}}}, 'title': {'type': 'text', 'analyzer': 'autocomplete', 'search_analyzer': 'standard'}}}, 'settings': {'index': {'routing': {'allocation': {'include': {'_tier_preference': 'data_content'}}}, 'number_of_shards': '1', 'provided_name': 'esdocs', 'creation_date': '1704207151546', 'analysis': {'analyzer': {'autocomplete': {'filter'

In [3]:
## data processing functions
def author_handler(text) -> List[str]:
  author_fname, author_lname = None, None
  if text:
    authors = text.split(' ')
    if len(authors) >= 7:
      author_fname = authors[5]
      author_lname = authors[6].split(',')[0]
  return [author_fname, author_lname]

In [4]:
## Data Preprocessing
from bs4 import BeautifulSoup
import codecs

reuters_dict = {}
fails = 0
for file in files[:files_limit]:
  with codecs.open(f'archive/sgm/{file}', 'r', 'latin-1') as f:
    soup = BeautifulSoup(f.read(), 'lxml')
    reuters = soup.find_all('reuters')
    for i in reuters:
      try:
        id = i.get('newid')
        date = i.find('date').get_text()
        places = [tag.get_text() for tag in i.find('places').find_all('d')]
        people = i.find('people').text
        authors = i.find('author')
        author_fname, author_lname = None, None
        if authors:
          authors = authors.text.split(' ') 
          author_fname = authors[5]
          author_lname = authors[6].split(',')[0]
        else:
          authors = None
        title = i.find('text').find('title').get_text()
        dateline = i.find('dateline').get_text()
        body = i.find('content').get_text()
      except:
        fails += 1
        pass
      reuters_dict[id] = {
        'places': places,
        'people': people,
        'author': [author_fname, author_lname],
        'title': title,
        'dateline': dateline,
        'date': date,
        'body': body
        }

#* Finding Total Indexed Documents (some documents don't all fields)
all_docs = len(reuters_dict)
print(f'Full-featured documents: {round((all_docs-fails)/all_docs*100, 2)}%')

Full-featured documents: 88.25%


In [5]:
## data extraction definitions
from datetime import datetime
import dateparser
from collections import Counter

import spacy
from geopy.geocoders import Nominatim
nlp = spacy.load('en_core_web_sm')
geo_locator = Nominatim(user_agent="es_indexer app")

loc_types = ['GPE', 'LOC', 'FAC']
ner_types_datetime = ['DATE', 'TIME']

#* taking a sample
reuters_dict['1']



{'places': ['el-salvador', 'usa', 'uruguay'],
 'people': '',
 'author': [None, None],
 'title': 'BAHIA COCOA REVIEW',
 'dateline': '    SALVADOR, Feb 26 - ',
 'date': '26-FEB-1987 15:01:01.79',
 'body': 'Showers continued throughout the week in\nthe Bahia cocoa zone, alleviating the drought since early\nJanuary and improving prospects for the coming temporao,\nalthough normal humidity levels have not been restored,\nComissaria Smith said in its weekly review.\n    The dry period means the temporao will be late this year.\n    Arrivals for the week ended February 22 were 155,221 bags\nof 60 kilos making a cumulative total for the season of 5.93\nmln against 5.81 at the same stage last year. Again it seems\nthat cocoa delivered earlier on consignment was included in the\narrivals figures.\n    Comissaria Smith said there is still some doubt as to how\nmuch old crop cocoa is still available as harvesting has\npractically come to an end. With total Bahia crop estimates\naround 6.4 mln bags

In [6]:
## data extraction functions
def date_extractor(date) -> str:
  """
  #### Args
    date (str): The date holding format 'dd-MMM-yyyy HH:mm:ss.sss'.
  #### Returns
    str: The extracted date in the format 'dd-MMM-yyyy HH:mm:ss'.
  """
  datesplit = date.split('.')[0]
  dt = datetime.strptime(datesplit, date_format)
  new_dt = dt.strftime(date_format)
  return new_dt

def ner_extractor(doc, doc_date) -> List[int]:
  """
  Extracts named entities related to datetime from the given document.

  #### Args:
    doc (Dict): The document to extract named entities from.
    doc_date (str): The date of the document in the format specified by `date_format`.

  #### Returns:
    List[int]: A list of integers representing the number of days between the named entity and the document date.
  """
  body_text = nlp(doc['body'])
  expressions = []
  for ent in body_text.ents:
    if ent.label_ in ner_types_datetime:
      #* getting the datetime object, then parsing the relative date 
      doc_datetime = datetime.strptime(doc_date, date_format)
      newdate = dateparser.parse(ent.text, settings={'RELATIVE_BASE': doc_datetime})
      if newdate:
        newdate_datetime = datetime.strptime(str(newdate)[0:19], python_date_format)
        days = (newdate_datetime - doc_datetime).days
        expressions.append(days)
  expressions = list(set(expressions))
  return expressions

def get_locations(doc) -> List[Dict]:
  """
  - Extracts and geocodes the top geo locations from a document.
  #### Args
    doc (dict): The document containing `title`, `body`, `dateline`, and `places` fields.
  #### Returns
    list: A list of dictionaries representing the top geo locations. Each dictionary contains the following keys:
      - `name`: The name of the location.
      - `geo_name`: The geocoded name of the location.
      - `geopoint`: A dictionary with 'lon' and 'lat' keys representing the longitude and latitude of the location.
  """
  title_text = nlp(doc['title'].upper())
  title_ents = [i.text for i in title_text.ents if i.label_ in loc_types]
  
  body_text = nlp(doc['body'].replace('\n', ' ').upper())
  body_ents = [i.text for i in body_text.ents if i.label_ in loc_types]
  
  dateline_text = nlp(doc['dateline'].upper())
  dateline_ents = [i.text for i in dateline_text.ents if i.label_ in loc_types]

  places_ents = ' '.join(doc['places']).upper().split(' ')
  all_entities = list(body_ents + dateline_ents + places_ents + title_ents)
  
  #* counting and sorting the entities by frequency
  location_counts = Counter(all_entities)
  location_counts = dict(sorted(location_counts.items(), key=lambda item: item[1], reverse=True))
  
  all_locs = []
  idx = 1
  for key, value in location_counts.items():
    if idx > max_geo_limit and key:
      all_locs.append({
        'name': key,
      })
    else:
      try:
        location = geo_locator.geocode(key)
      except:
        pass
      if location:
        all_locs.append({
          'name': key,
          'geo_name': location.address,
          'geopoint': {
            'lon': location.longitude,
            'lat': location.latitude
          }
        })
        idx += 1
  return all_locs

def get_item(key, object) -> dict:
  """
  - Get item details from the given object.
  #### Args
    key (str): The key of the item.
    object (dict): The object containing item details.
  #### Returns
    dict: A dictionary containing the item details.
  """
  id = key
  date = date_extractor(object['date'])
  title = object['title']
  content = object['body']
  people = object['people']
  #* These are too slow to process
  all_locations = get_locations(object)
  try:
    geopoint = {
    'lon': all_locations[0]['geopoint']['lon'],
    'lat': all_locations[0]['geopoint']['lat'],
  }
  except:
    geopoint = {}
  geo_ref = all_locations
  exprs = ner_extractor(object, date)
  item = {
    'id': id,
    'title': title,
    'date': date,
    'geopoint': geopoint,
    'geo_references': geo_ref,
    'expressions': exprs,
    'content': content,
    'people': people,
    'author': {
      'first_name': object['author'][0],
      'last_name': object['author'][1]
    }
  }
  return item

get_item('1',reuters_dict['283']) #? testing geopy service for a sample

{'id': '1',
 'title': 'Japan February external reserves record 51.73 billion dlrs (January 51.46 billion)\n',
 'date': '02-Mar-1987 02:48:11',
 'geopoint': {'lon': 139.2394179, 'lat': 36.5748441},
 'geo_references': [{'name': 'JAPAN',
   'geo_name': '日本',
   'geopoint': {'lon': 139.2394179, 'lat': 36.5748441}}],
 'expressions': [0, 2921, -366, 2010, -1],
 'content': 'Thai Airways International plans to\nexpand its fleet to 58 from 30 aircraft by 1995, company\nofficials said.\n    Thamnoon Wanglee, vice-president for finance, told a\nweekend marketing conference Thai would finance the expansion\nby borrowing, but he did not give details.\n    He said the airline planned to reduce its yen borrowing to\n36.4 pct of overall debt by September 1992. It is currently\n64.3 pct of overall debt.\n    He said dollar borrowing should rise to 56.2 pct of overall\ndebt in the same period, compared to 15.7 pct now.\n    Other company officials said the state-owned airline had no\nplans to go private

In [None]:
## indexing to Elasticsearch
from elasticsearch import helpers

actions = []
for idx, (key, value) in enumerate(reuters_dict.items()):
  if idx > index_limit:
    break
  try:
    actions.append({
      "_index": index_name,"_id": int(key),
      "_source": get_item(key, value)
    })
    if idx % bulk_size == 0 and idx > 0:
      print(f"Processing bulk: {idx} records")
      helpers.bulk(es, actions)
      actions = []
  except Exception as e:
    print(f"Error processing at key: {key}")
    print(e)