# Dbpedia Indexing

## Imports

In [9]:
import elasticsearch
import os
import re
import string
import time


# stop words
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')

from tqdm import tqdm
from pprint import pprint

from elasticsearch import Elasticsearch, helpers, exceptions
from typing import Dict

# path variables, etc.
from config import *

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Yohannes\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [10]:
# debug mode
DEBUG = False

In [11]:
print("Elasticsearch version:", elasticsearch.__version__)
print("Index", INDEX_NAME)
print("Index settings:")
pprint(INDEX_SETTINGS)

print("Files to index:", SHORT_ABSTRACT_PATH, INSTANCE_TYPES_EN_PATH)

Elasticsearch version: (7, 17, 6)
Index smart_index
Index settings:
{'mappings': {'properties': {'abstract': {'analyzer': 'english',
                                          'term_vector': 'yes',
                                          'type': 'text'},
                             'instance_type': {'type': 'text'}}}}
Files to index: c:\Users\Yohannes\Documents\School\Master\DAT640-FinalProject\datasets\dbpedia\short_abstracts_en.ttl c:\Users\Yohannes\Documents\School\Master\DAT640-FinalProject\datasets\dbpedia\instance_types_en.ttl


## DbPedia Indexing class

In [12]:
class DbPediaCollection:
    def __init__(self, index_name: str, index_settings: Dict, stop_words=[], DEBUG=False, FILES=[]) -> None:
        self._index_name = index_name
        self._index_settings = index_settings
        self.es = Elasticsearch()
        self.stop_words = stop_words
        self.FILES = FILES

        # for local dev
        self.DEBUG = DEBUG
        self.DEBUG_DATA = set()
    
    def preprocess(self, line, remove_stopwords=False):
        line = line.strip().lower().replace("_", " ").translate(str.maketrans('', '', string.punctuation))
        return " ".join([
            term 
            for term in re.sub(r"\s+", " ", line).split(" ") 
            if term not in self.stop_words
        ]).strip() if remove_stopwords else line

    def parse_instance_types(self, line):
        if line == None or line[0] == "#":
            return
        
        line = line.strip().replace('/>', '>').split(' ')
        if len(line) < 3:
            return
        entity = self.preprocess(line[0][1:-1].split("/")[-1]) # remove < and >, get entity + preprocess
        instance_type = self.preprocess(line[2][1:-1].split("/")[-1][4:]) # remove < and >, get instance type + preprocess
        return {
            "_id": entity,
            # "_source": {"doc" : {"instance_type": instance_type}},
            "doc" : {"instance_type": instance_type},
            "_op_type": "update"
        }
    
    def parse_abstracts(self, line):
        if line == None or line[0] == "#":
            return
        
        line = line.strip().replace('@en .', '') \
            .replace('"', '').replace('\\', '') \
            .replace('\'', '').replace('/>', '>').split(' ')

        if len(line) < 3:
            return
        entity = self.preprocess(line[0][1:-1].split("/")[-1])
        abstract = self.preprocess(' '.join(line[2:]), True)

        if self.DEBUG and entity not in self.DEBUG_DATA:
            self.DEBUG_DATA.add(entity) # TODO fix later

        return {
            "_id": entity,
            "_source" : {"abstract": abstract, "instance_type": "_"}
        }
    
    def getBulkedData(self, entities):
        if self.DEBUG:
            return [e for e in entities if e["_id"] in self.DEBUG_DATA]
        return entities
    
    def create_index(self, recreate_index=False):
        if self.es.indices.exists(self._index_name):
            if recreate_index:
                self.es.indices.delete(index=self._index_name)
            return
        self.es.indices.create(index=self._index_name, body=self._index_settings)
    
    def query(self, body, size=10):
        try:
            start_time = time.time()
            res = self.es.search(index=self._index_name, body=body, size=size)
            print("Query time: {:4f} seconds".format(time.time() - start_time))
            return res
        except exceptions.RequestError as e:
            print(e)
            return None

    def index(self, bulk_size=1000, override_debug=False):
        try:
            start_time = time.time()
            for file in self.FILES:
                count = 0
                with open(file, "r", encoding="utf-8") as f:
                    entities = []
                    for i, line in enumerate(tqdm(f)):
                        if i == 0:
                            continue

                        if file == SHORT_ABSTRACT_PATH:
                           entities.append(self.parse_abstracts(line))
                        elif file == INSTANCE_TYPES_EN_PATH:
                            entities.append(self.parse_instance_types(line))
                        else:
                            print("Unknown file", file)
                            break
                        
                        if i % bulk_size == 0: # bulk insert
                            helpers.bulk(self.es, self.getBulkedData(entities), index=self._index_name, raise_on_error=False)
                            entities = []
                            if self.DEBUG:
                                break
                    if len(entities) > 0:
                        helpers.bulk(self.es, self.getBulkedData(entities), index=self._index_name, raise_on_error=False)
                        entities = []

        except Exception as e:
            print(e)
        finally:
            print("Indexing finished, time elapsed: {:4f} seconds".format(time.time() - start_time))
            entities, self.DEBUG_DATA = [], set() # reset data
        
    
    
    # factory method for creating dbpedia collection
    @classmethod
    def create_dbpedia_collection(cls):
        return cls(INDEX_NAME, INDEX_SETTINGS, stop_words=stopwords.words('english'), DEBUG=DEBUG, FILES=[SHORT_ABSTRACT_PATH, INSTANCE_TYPES_EN_PATH])

## Run indexing

In [13]:
dbpedia_index = DbPediaCollection.create_dbpedia_collection()
dbpedia_index.create_index(recreate_index=True)

  if self.es.indices.exists(self._index_name):


In [14]:
dbpedia_index.index(bulk_size=1000)

147502it [00:59, 2498.17it/s]


Indexing finished, time elapsed: 59.057037 seconds


KeyboardInterrupt: 

In [None]:
print("Number of documents:", dbpedia_index.es.count(index=INDEX_NAME)["count"])

Number of documents: 4925720
