In [1]:
import traceback
import elasticsearch
import time
from elasticsearch import Elasticsearch, helpers

In [3]:
es = Elasticsearch()
es.info()

{'name': 'ULTIMECIA',
 'cluster_name': 'elasticsearch',
 'cluster_uuid': 'MHYEAbcOS_i6ybp0d4NE2A',
 'version': {'number': '7.9.1',
  'build_flavor': 'default',
  'build_type': 'zip',
  'build_hash': '083627f112ba94dffc1232e8b42b73492789ef91',
  'build_date': '2020-09-01T21:22:21.964974Z',
  'build_snapshot': False,
  'lucene_version': '8.6.2',
  'minimum_wire_compatibility_version': '6.8.0',
  'minimum_index_compatibility_version': '6.0.0-beta1'},
 'tagline': 'You Know, for Search'}

In [4]:
FIELDS = ['abstract', 'subject', 'instance']
INDEX_NAME = 'fasttest'
INDEX_SETTINGS = {
    'mappings': {
            'properties': {
                'abstract': {
                    'type': 'text',
                    'term_vector': 'yes',
                    'analyzer': 'english'
                },
                'subject': {
                    'type': 'text',
                    'term_vector': 'yes',
                    'analyzer': 'english'
                },
                'instance': {
                    'type': 'text',
                    'term_vector': 'yes',
                    'analyzer': 'english'
                }
            }
        }
    }

In [5]:
def createTheIndex():
    if es.indices.exists(INDEX_NAME):
        es.indices.delete(index=INDEX_NAME)    
    es.indices.create(index=INDEX_NAME, body=INDEX_SETTINGS)

In [33]:
#createTheIndex()

In [6]:
def peek(filename, size, enc='utf-8'):
    """
    Print out the first X lines in the file.
    """
    if size <= 0:
        print("Size must be greater than zero!")
        return

    with open(filename, encoding=enc) as f:
        for i,line in enumerate(f):
            if (size >= 0) and (i >= size):
                break
            if i == 0: # Skip top line.
                continue
            print(line.strip())

In [7]:
ENTITIES_PROCESSED = None
DEBUGGING = False # If true, only test (index) on a small subset!

In [8]:
def parseAbstract(data, line):
    """Parse a line from long_abstract."""
    if (line is None) or (line[0] == '#'):
        return
    line = line.strip().replace('"', '').replace('\'', '').replace('@en .', '').replace('/>', '>').split(' ')
    if len(line) < 3:
        return # Invalid line.
    entity = line[0][1:-1].split('/')[-1].replace('_', ' ')
    # TODO, long abstracts might need some more preprocessing, like removing symbols except for ',.- etc.. ??
    value = ' '.join(line[2:]).replace('\\', '')
    data.append({
                "_id": entity, 
                "_source": {'abstract': value, 'subject': '', 'instance': 'Thing'}
    })
    if DEBUGGING:
        ENTITIES_PROCESSED.add(entity) # Testing

def parseSubject(data, line):
    """Parse a line from categories."""
    if (line is None) or (line[0] == '#'):
        return None, None
    line = line.strip().replace('/>', '>').split(' ')
    if len(line) < 3:
        return None, None # Invalid line.
    entity = line[0][1:-1].split('/')[-1].replace('_', ' ')
    value = line[2][1:-1].split('/')[-1][len('Category:'):].replace('_', ' ')
    if not entity in data:
        data[entity] = {
            "_id": entity, 
            "_source": {"doc": {'subject': value}},
            "_op_type": "update"
        }
    else:
        data[entity]['_source']['doc']['subject'] = data[entity]['_source']['doc']['subject'] + ', ' + value # Spaghetti?!
    return entity[0].upper(), entity

def parseType(data, line):
    """Parse a line from instances."""
    if (line is None) or (line[0] == '#'):
        return
    line = line.strip().replace('/>', '>').split(' ')
    if len(line) < 3:
        return # Invalid line.
    entity = line[0][1:-1].split('/')[-1].replace('_', ' ')
    value = line[2][1:-1].split('/')[-1].replace('owl#', '').replace('_', ' ')
    data.append({
                "_id": entity, 
                "_source": {"doc": {'instance': value}},
                "_op_type": "update"
    })
    
def getBulkData(data):
    """
    To prevent issues when debugging,
    we only bulk data which was indexed @ abstract.
    """
    if DEBUGGING:
        return [d for d in data if (d['_id'] in ENTITIES_PROCESSED)]
    else:
        return data

In [9]:
def indexData(size=5000):
    """
    Index the data, size = how many entities to parse at a time.
    size should not be much bigger than 20000, due to bulk index size limitations @ elasticsearch!
    """
    global ENTITIES_PROCESSED
    ENTITIES_PROCESSED = set()
    files = [
        ('datasets/DBpedia/long_abstracts_en.ttl', 'utf-8'),
        ('datasets/DBpedia/article_categories_en.ttl', 'utf-8'),
        ('datasets/DBpedia/instance_types_en.ttl', 'utf-8')
    ]
    try:
        files = [open(f, 'r', encoding=e) for f, e in files] # Datasets to index.
        listAbstract, listSubject, listType = [], {}, []
        abstractFile, categoriesFile, instancesFile = files[0], files[1], files[2]
        start_time = time.time()
        
        # Process abstracts first! (bulk)
        for i, line in enumerate(abstractFile):
            if i == 0: # Skip top line.
                continue
            parseAbstract(listAbstract, line)
            if (len(listAbstract) > size):
                helpers.bulk(es, listAbstract, index=INDEX_NAME, raise_on_error=False, raise_on_exception=False)
                listAbstract.clear()
                if DEBUGGING: # Only consider a small subset during test.
                    break
                
        if len(listAbstract): # Still have some remaining items? Bulk them now.
            helpers.bulk(es, listAbstract, index=INDEX_NAME, raise_on_error=False, raise_on_exception=False)
            listAbstract.clear()
            
        print("Indexed abstracts.")
        print("Time Elapsed: {:.4f} sec.".format((time.time()-start_time)))
        
        lineB, lineC = next(categoriesFile, None), next(instancesFile, None) # Skip top lines!
        test1, test2 = False, False
        
        while (lineB or lineC):
            if DEBUGGING and test1 and test2: # Limit to a small subset during testing.
                break

            if lineB:
                lineB = next(categoriesFile, None)
                
            if lineC:
                lineC = next(instancesFile, None)
                
            parseType(listType, lineC)
            currSubjectChar, ent = parseSubject(listSubject, lineB)
            
            # When we have at least 'size' subjects (entities)
            # Continue to the first next char which differs from the previous entry 
            # Which triggered the underneath condition.
            # Add further entries until the first letter in the ent. changes.
            # Bulk the entries until that entry!
            if (len(listSubject) > size):
                lastSubjectChar = currSubjectChar # Find the next first letter of ent. which differs from this. Then bulk.
                newValue = None
                while True:
                    if lineB:
                        lineB = next(categoriesFile, None)                        
                    if lineB is None:
                        break
                    currSubjectChar, ent = parseSubject(listSubject, lineB)
                    if currSubjectChar and (currSubjectChar != lastSubjectChar):
                        newValue = listSubject[ent] # This value belongs to the next 'group', save it for that group.
                        del listSubject[ent]
                        break
                helpers.bulk(es, getBulkData(listSubject.values()), index=INDEX_NAME, raise_on_error=False, raise_on_exception=False)
                listSubject.clear()
                if newValue: # Add the newest value back again.
                    listSubject[ent] = newValue
                test2 = True

            if (len(listType) > size):
                helpers.bulk(es, getBulkData(listType), index=INDEX_NAME, raise_on_error=False, raise_on_exception=False)
                listType.clear()
                test1 = True

        # If there are remaining elements left, be sure to bulk index them!
        if len(listSubject):
            helpers.bulk(es, getBulkData(listSubject.values()), index=INDEX_NAME, raise_on_error=False, raise_on_exception=False)

        if len(listType):
            helpers.bulk(es, getBulkData(listType), index=INDEX_NAME, raise_on_error=False, raise_on_exception=False)
        
        print("Finished indexing successfully!")
        print("Time Elapsed: {:.4f} sec.".format((time.time()-start_time)))
    except Exception as e:
        print('Error:', e)
        print(traceback.format_exc())
    finally:
        for f in files:
            f.close()
        listAbstract.clear()
        listSubject.clear()
        listType.clear()
        ENTITIES_PROCESSED.clear()

In [10]:
# Takes between 2 to 3 hours!
# indexData(10000)

In [11]:
peek('datasets/DBpedia/long_abstracts_en.ttl', 3)

<http://dbpedia.org/resource/Animalia_(book)> <http://dbpedia.org/ontology/abstract> "Animalia is an illustrated children's book by Graeme Base. It was originally published in 1986, followed by a tenth anniversary edition in 1996, and a 25th anniversary edition in 2012. Over three million copies have been sold. A special numbered and signed anniversary edition was also published in 1996, with an embossed gold jacket."@en .
<http://dbpedia.org/resource/Actrius> <http://dbpedia.org/ontology/abstract> "Actresses (Catalan: Actrius) is a 1997 Catalan language Spanish drama film produced and directed by Ventura Pons and based on the award-winning stage play E.R. by Josep Maria Benet i Jornet. The film has no male actors, with all roles played by females. The film was produced in 1996."@en .


In [12]:
peek('datasets/DBpedia/article_categories_en.ttl', 3)

<http://dbpedia.org/resource/A> <http://purl.org/dc/terms/subject> <http://dbpedia.org/resource/Category:ISO_basic_Latin_letters> .
<http://dbpedia.org/resource/A> <http://purl.org/dc/terms/subject> <http://dbpedia.org/resource/Category:Vowel_letters> .


In [13]:
peek('datasets/DBpedia/instance_types_en.ttl', 3)

<http://dbpedia.org/resource/Anarchism> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#Thing> .
<http://dbpedia.org/resource/Achilles> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#Thing> .


In [14]:
es.search(index=INDEX_NAME, body={'query': {'match': {'abstract': 'who killed kennedy?'}}}, _source=True, size=3)

{'took': 324,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 10000, 'relation': 'gte'},
  'max_score': 18.70188,
  'hits': [{'_index': 'fasttest',
    '_type': '_doc',
    '_id': 'Killing Kennedy',
    '_score': 18.70188,
    '_source': {'abstract': 'Killing Kennedy: The End of Camelot is a 2012 non-fiction book by Bill OReilly and Martin Dugard about the 1963 assassination of U.S. President John Fitzgerald Kennedy. It is a follow-up to OReillys 2011 book Killing Lincoln. Killing Kennedy was released on October 2, 2012 through Henry Holt and Company.',
     'subject': '',
     'instance': 'Book'}},
   {'_index': 'fasttest',
    '_type': '_doc',
    '_id': 'The Killing of a President',
    '_score': 16.92986,
    '_source': {'abstract': 'The Killing of a President : The Complete Photographic Record of the JFK Assassination, the Conspiracy, and the Cover-up is a book by Robert J. Groden which discusses the assassina

### Remove entities/resources/docs with instance == "Thing"

In [34]:
start_time = time.time()
print("Number of documents in index before deletion: {}".format(es.count(index=INDEX_NAME)["count"]))

BATCH_SIZE = 10000
i = 0
bulk_deletes = []
for result in helpers.scan(es, index=INDEX_NAME, query={"query":{"match": {"instance": "Thing"}}}):
    if i == BATCH_SIZE:
        helpers.bulk(es, bulk_deletes)
        bulk_deletes = []
        i = 0

    result['_op_type'] = 'delete'
    bulk_deletes.append(result)

    i += 1

helpers.bulk(es, bulk_deletes)

print("Number of documents in index after deletion: {}".format(es.count(index=INDEX_NAME)["count"]))
print("Total execution time: {}".format(time.time()-start_time))


Number of documents in index before deletion: 4663102
Number of documents in index after deletion: 4663102
Total execution time: 0.017001628875732422


In [35]:
print("Number of documents in index: {}".format(es.count(index=INDEX_NAME)["count"]))
es.search(index=INDEX_NAME, body={"query":{"match": {"instance": "Thing"}}}, size=1).get("hits", {}).get("total", {}).get("value", 0)
print("Amount of documents with instance = 'Thing': {}".format(es.search(index=INDEX_NAME, body={"query":{"match": {"instance": "Thing"}}}, size=1).get("hits", {}).get("total", {}).get("value", 0)))
print("Amount of documents with instance = '': {}".format(es.search(index=INDEX_NAME, body={"query":{"match": {"instance": ""}}}, size=1).get("hits", {}).get("total", {}).get("value", 0)))



Number of documents in index: 4663102
Amount of documents with instance = 'Thing': 0
Amount of documents with instance = '': 0
